mirror of
https://github.com/openfoodfoundation/openfoodnetwork
synced 2026-02-27 01:43:22 +00:00
Merge pull request #13167 from dacook/sync-products-on-oc-opened-12986
[DFC Orders] Sync remote products when order cycle opens
This commit is contained in:
@@ -16,6 +16,10 @@ STRIPE_PUBLIC_TEST_API_KEY="bogus_stripe_publishable_key"
|
||||
|
||||
SITE_URL="test.host"
|
||||
|
||||
# OIDC Settings for DFC authentication
|
||||
# Find secrets in BitWarden.
|
||||
# To get a refresh token: log into the OIDC provider, connect your OFN user to it at /admin/oidc_settings, then copy the token from the database:
|
||||
# ./bin/rails runner 'puts "OPENID_REFRESH_TOKEN=\"#{OidcAccount.last.refresh_token}\""'
|
||||
OPENID_APP_ID="test-provider"
|
||||
OPENID_APP_SECRET="dummy-openid-app-secret-token"
|
||||
OPENID_REFRESH_TOKEN="dummy-refresh-token"
|
||||
|
||||
67
app/jobs/open_order_cycle_job.rb
Normal file
67
app/jobs/open_order_cycle_job.rb
Normal file
@@ -0,0 +1,67 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Run any pre-conditions and mark order cycle as open.
|
||||
#
|
||||
# Currently, an order cycle is considered open in the shopfront when orders_open_at >= now.
|
||||
# But now there are some pre-conditions for opening an order cycle, so we would like to change that.
|
||||
# Instead, the presence of opened_at (and absence of processed_at) should indicate it is open.
|
||||
class OpenOrderCycleJob < ApplicationJob
|
||||
sidekiq_options retry_for: 10.minutes
|
||||
|
||||
def perform(order_cycle_id)
|
||||
ActiveRecord::Base.transaction do
|
||||
# Fetch order cycle if it's still unopened, and lock DB row until finished
|
||||
order_cycle = OrderCycle.lock.find_by!(id: order_cycle_id, opened_at: nil)
|
||||
|
||||
sync_remote_variants(order_cycle)
|
||||
|
||||
# Mark as opened
|
||||
opened_at = Time.zone.now
|
||||
order_cycle.update_columns(opened_at:)
|
||||
|
||||
# And notify any subscribers
|
||||
OrderCycles::WebhookService.create_webhook_job(order_cycle, 'order_cycle.opened', opened_at)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def sync_remote_variants(order_cycle)
|
||||
# Sync any remote variants for each supplier
|
||||
order_cycle.suppliers.each do |supplier|
|
||||
links = variant_links_for(order_cycle, supplier)
|
||||
next if links.empty?
|
||||
|
||||
# Find authorised user to access remote products
|
||||
dfc_user = supplier.owner # we assume the owner's account is the one used to import from dfc.
|
||||
|
||||
import_variants(links, dfc_user)
|
||||
end
|
||||
end
|
||||
|
||||
# Fetch all remote variants for this supplier in the order cycle
|
||||
def variant_links_for(order_cycle, supplier)
|
||||
variants = order_cycle.exchanges.incoming.from_enterprise(supplier)
|
||||
.joins(:exchange_variants).select('exchange_variants.variant_id')
|
||||
SemanticLink.where(subject_id: variants)
|
||||
end
|
||||
|
||||
def import_variants(links, dfc_user)
|
||||
# Find any catalogues associated with the variants
|
||||
catalogs = links.group_by do |link|
|
||||
FdcUrlBuilder.new(link.semantic_id).catalog_url
|
||||
end
|
||||
|
||||
# Import selected variants from each catalog
|
||||
catalogs.each do |catalog_url, catalog_links|
|
||||
catalog = DfcCatalog.load(dfc_user, catalog_url)
|
||||
catalog.apply_wholesale_values!
|
||||
|
||||
catalog_links.each do |link|
|
||||
catalog_item = catalog.item(link.semantic_id)
|
||||
|
||||
SuppliedProductImporter.update_product(catalog_item, link.subject) if catalog_item
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -25,8 +25,7 @@ class OrderCycleClosingJob < ApplicationJob
|
||||
|
||||
def mark_as_processed
|
||||
OrderCycle.where(id: recently_closed_order_cycles).update_all(
|
||||
processed_at: Time.zone.now,
|
||||
updated_at: Time.zone.now
|
||||
processed_at: Time.zone.now
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Trigger jobs for any order cycles that recently opened
|
||||
class OrderCycleOpenedJob < ApplicationJob
|
||||
def perform
|
||||
ActiveRecord::Base.transaction do
|
||||
recently_opened_order_cycles.find_each do |order_cycle|
|
||||
OrderCycles::WebhookService.create_webhook_job(order_cycle, 'order_cycle.opened')
|
||||
end
|
||||
mark_as_opened(recently_opened_order_cycles)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def recently_opened_order_cycles
|
||||
@recently_opened_order_cycles ||= OrderCycle
|
||||
.where(opened_at: nil)
|
||||
.where(orders_open_at: 1.hour.ago..Time.zone.now)
|
||||
.lock.order(:id)
|
||||
end
|
||||
|
||||
def mark_as_opened(order_cycles)
|
||||
now = Time.zone.now
|
||||
order_cycles.update_all(opened_at: now, updated_at: now)
|
||||
end
|
||||
end
|
||||
18
app/jobs/trigger_order_cycles_to_open_job.rb
Normal file
18
app/jobs/trigger_order_cycles_to_open_job.rb
Normal file
@@ -0,0 +1,18 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Trigger jobs for any order cycles that recently opened
|
||||
class TriggerOrderCyclesToOpenJob < ApplicationJob
|
||||
def perform
|
||||
recently_opened_order_cycles.find_each do |order_cycle|
|
||||
OpenOrderCycleJob.perform_later(order_cycle.id)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def recently_opened_order_cycles
|
||||
OrderCycle
|
||||
.where(opened_at: nil)
|
||||
.where(orders_open_at: 1.hour.ago..Time.zone.now)
|
||||
end
|
||||
end
|
||||
@@ -13,11 +13,11 @@ class WebhookDeliveryJob < ApplicationJob
|
||||
|
||||
queue_as :default
|
||||
|
||||
def perform(url, event, payload)
|
||||
def perform(url, event, payload, at: Time.zone.now)
|
||||
body = {
|
||||
id: job_id,
|
||||
at: Time.zone.now.to_s,
|
||||
event:,
|
||||
at: at.to_s,
|
||||
data: payload,
|
||||
}
|
||||
|
||||
|
||||
@@ -5,9 +5,9 @@
|
||||
|
||||
module OrderCycles
|
||||
class WebhookService
|
||||
def self.create_webhook_job(order_cycle, event)
|
||||
def self.create_webhook_job(order_cycle, event, at)
|
||||
webhook_payload = order_cycle
|
||||
.slice(:id, :name, :orders_open_at, :orders_close_at, :coordinator_id)
|
||||
.slice(:id, :name, :orders_open_at, :opened_at, :orders_close_at, :coordinator_id)
|
||||
.merge(coordinator_name: order_cycle.coordinator.name)
|
||||
|
||||
# Endpoints for coordinator owner
|
||||
@@ -17,7 +17,7 @@ module OrderCycles
|
||||
webhook_endpoints |= order_cycle.distributors.map(&:owner).flat_map(&:webhook_endpoints)
|
||||
|
||||
webhook_endpoints.each do |endpoint|
|
||||
WebhookDeliveryJob.perform_later(endpoint.url, event, webhook_payload)
|
||||
WebhookDeliveryJob.perform_later(endpoint.url, event, webhook_payload, at:)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
every: "5m"
|
||||
SubscriptionConfirmJob:
|
||||
every: "5m"
|
||||
OrderCycleOpenedJob:
|
||||
TriggerOrderCyclesToOpenJob:
|
||||
every: "5m"
|
||||
OrderCycleClosingJob:
|
||||
every: "5m"
|
||||
|
||||
@@ -6,6 +6,7 @@ FactoryBot.define do
|
||||
uid { user&.email || generate(:random_email) }
|
||||
|
||||
# This is a live test account authenticated via Les Communes.
|
||||
# See .env.test for tips on connecting the account for recording VCR cassettes.
|
||||
factory :testdfc_account do
|
||||
uid { "testdfc@protonmail.com" }
|
||||
refresh_token { ENV.fetch("OPENID_REFRESH_TOKEN") }
|
||||
|
||||
File diff suppressed because one or more lines are too long
110
spec/jobs/open_order_cycle_job_spec.rb
Normal file
110
spec/jobs/open_order_cycle_job_spec.rb
Normal file
@@ -0,0 +1,110 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
RSpec.describe OpenOrderCycleJob do
|
||||
let(:now){ Time.zone.now }
|
||||
let(:order_cycle) { create(:simple_order_cycle, orders_open_at: now) }
|
||||
subject { OpenOrderCycleJob.perform_now(order_cycle.id) }
|
||||
|
||||
around do |example|
|
||||
Timecop.freeze(now) { example.run }
|
||||
end
|
||||
|
||||
it "marks as open" do
|
||||
expect {
|
||||
subject
|
||||
order_cycle.reload
|
||||
}
|
||||
.to change { order_cycle.opened_at }
|
||||
|
||||
expect(order_cycle.opened_at).to be_within(1).of(now)
|
||||
end
|
||||
|
||||
it "enqueues webhook job" do
|
||||
expect(OrderCycles::WebhookService)
|
||||
.to receive(:create_webhook_job).with(order_cycle, 'order_cycle.opened', now).once
|
||||
|
||||
subject
|
||||
end
|
||||
|
||||
describe "syncing remote products" do
|
||||
let!(:user) { create(:testdfc_user, owned_enterprises: [enterprise]) }
|
||||
|
||||
let(:enterprise) { create(:supplier_enterprise) }
|
||||
let!(:variant) { create(:variant, name: "Sauce", supplier_id: enterprise.id) }
|
||||
let!(:order_cycle) {
|
||||
create(:simple_order_cycle, orders_open_at: now,
|
||||
suppliers: [enterprise], variants: [variant])
|
||||
}
|
||||
|
||||
it "synchronises products from a FDC catalog", vcr: true do
|
||||
user.update!(oidc_account: build(:testdfc_account))
|
||||
# One product is existing in OFN
|
||||
product_id =
|
||||
"https://env-0105831.jcloud-ver-jpe.ik-server.com/api/dfc/Enterprises/test-hodmedod/SuppliedProducts/44519466467635"
|
||||
variant.semantic_links << SemanticLink.new(semantic_id: product_id)
|
||||
|
||||
expect {
|
||||
subject
|
||||
variant.reload
|
||||
order_cycle.reload
|
||||
}.to change { order_cycle.opened_at }
|
||||
.and change { enterprise.supplied_products.count }.by(0) # It shouldn't add, only update
|
||||
.and change { variant.display_name }
|
||||
.and change { variant.unit_value }
|
||||
# 18.85 wholesale variant price divided by 12 cans in the slab.
|
||||
.and change { variant.price }.to(1.57)
|
||||
.and change { variant.on_demand }.to(true)
|
||||
.and change { variant.on_hand }.by(0)
|
||||
.and query_database 46
|
||||
end
|
||||
end
|
||||
|
||||
describe "concurrency", concurrency: true do
|
||||
let(:breakpoint) { Mutex.new }
|
||||
|
||||
it "doesn't open order cycle twice" do
|
||||
# Pause in the middle of the job to test if the second job is trying
|
||||
# to do the same thing at the same time.
|
||||
breakpoint.lock
|
||||
expect_any_instance_of(OpenOrderCycleJob).to(
|
||||
receive(:sync_remote_variants).and_wrap_original do |method, *args|
|
||||
breakpoint.synchronize { nil }
|
||||
method.call(*args)
|
||||
end
|
||||
)
|
||||
|
||||
expect(OrderCycles::WebhookService)
|
||||
.to receive(:create_webhook_job).with(order_cycle, 'order_cycle.opened', now).once
|
||||
|
||||
# Start two jobs in parallel:
|
||||
threads = [1, 2].map do
|
||||
Thread.new do
|
||||
# Disable printing expected error
|
||||
Thread.current.report_on_exception = false
|
||||
|
||||
OpenOrderCycleJob.perform_now(order_cycle.id)
|
||||
end
|
||||
end
|
||||
|
||||
# Wait for both to jobs to pause.
|
||||
# This can reveal a race condition.
|
||||
sleep 0.1
|
||||
|
||||
# Resume and complete both jobs:
|
||||
breakpoint.unlock
|
||||
|
||||
# Join the threads until an error is raised.
|
||||
# We expect one of them to raise an error but we don't know which one.
|
||||
expect {
|
||||
threads.pop.join
|
||||
threads.pop.join
|
||||
}.to raise_error ActiveRecord::RecordNotFound
|
||||
|
||||
# If the first `join` raised an error, we still need to wait for the
|
||||
# second thread to finish:
|
||||
threads.pop.join if threads.present?
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,62 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
RSpec.describe OrderCycleOpenedJob do
|
||||
let(:oc_opened_before) {
|
||||
create(:order_cycle, orders_open_at: 1.hour.ago)
|
||||
}
|
||||
let(:oc_opened_now) {
|
||||
create(:order_cycle, orders_open_at: Time.zone.now)
|
||||
}
|
||||
let(:oc_opening_soon) {
|
||||
create(:order_cycle, orders_open_at: 1.minute.from_now)
|
||||
}
|
||||
|
||||
it "enqueues jobs for recently opened order cycles only" do
|
||||
expect(OrderCycles::WebhookService)
|
||||
.to receive(:create_webhook_job).with(oc_opened_now, 'order_cycle.opened')
|
||||
|
||||
expect(OrderCycles::WebhookService)
|
||||
.not_to receive(:create_webhook_job).with(oc_opened_before, 'order_cycle.opened')
|
||||
|
||||
expect(OrderCycles::WebhookService)
|
||||
.not_to receive(:create_webhook_job).with(oc_opening_soon, 'order_cycle.opened')
|
||||
|
||||
OrderCycleOpenedJob.perform_now
|
||||
end
|
||||
|
||||
describe "concurrency", concurrency: true do
|
||||
let(:breakpoint) { Mutex.new }
|
||||
|
||||
it "doesn't place duplicate job when run concurrently" do
|
||||
oc_opened_now
|
||||
|
||||
# Pause jobs when placing new job:
|
||||
breakpoint.lock
|
||||
allow(OrderCycleOpenedJob).to(
|
||||
receive(:new).and_wrap_original do |method, *args|
|
||||
breakpoint.synchronize { nil }
|
||||
method.call(*args)
|
||||
end
|
||||
)
|
||||
|
||||
expect(OrderCycles::WebhookService)
|
||||
.to receive(:create_webhook_job).with(oc_opened_now, 'order_cycle.opened').once
|
||||
|
||||
# Start two jobs in parallel:
|
||||
threads = [
|
||||
Thread.new { OrderCycleOpenedJob.perform_now },
|
||||
Thread.new { OrderCycleOpenedJob.perform_now },
|
||||
]
|
||||
|
||||
# Wait for both to jobs to pause.
|
||||
# This can reveal a race condition.
|
||||
sleep 0.1
|
||||
|
||||
# Resume and complete both jobs:
|
||||
breakpoint.unlock
|
||||
threads.each(&:join)
|
||||
end
|
||||
end
|
||||
end
|
||||
22
spec/jobs/trigger_order_cycles_to_open_job_spec.rb
Normal file
22
spec/jobs/trigger_order_cycles_to_open_job_spec.rb
Normal file
@@ -0,0 +1,22 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
RSpec.describe TriggerOrderCyclesToOpenJob do
|
||||
let(:oc_opened_before) {
|
||||
create(:simple_order_cycle, orders_open_at: 1.hour.ago)
|
||||
}
|
||||
let(:oc_opened_now) {
|
||||
create(:simple_order_cycle, orders_open_at: Time.zone.now)
|
||||
}
|
||||
let(:oc_opening_soon) {
|
||||
create(:simple_order_cycle, orders_open_at: 1.minute.from_now)
|
||||
}
|
||||
|
||||
it "enqueues jobs for recently opened order cycles only" do
|
||||
expect{ TriggerOrderCyclesToOpenJob.perform_now }
|
||||
.to enqueue_job(OpenOrderCycleJob).with(oc_opened_now.id)
|
||||
.and enqueue_job(OpenOrderCycleJob).with(oc_opened_before.id).exactly(0).times
|
||||
.and enqueue_job(OpenOrderCycleJob).with(oc_opening_soon.id).exactly(0).times
|
||||
end
|
||||
end
|
||||
@@ -3,9 +3,10 @@
|
||||
require 'spec_helper'
|
||||
|
||||
RSpec.describe WebhookDeliveryJob do
|
||||
subject { WebhookDeliveryJob.new(url, event, data) }
|
||||
subject { WebhookDeliveryJob.new(url, event, data, at:) }
|
||||
let(:url) { 'https://test/endpoint' }
|
||||
let(:event) { 'order_cycle.opened' }
|
||||
let(:at) { 1.second.ago }
|
||||
let(:data) {
|
||||
{
|
||||
order_cycle_id: 123, name: "Order cycle 1", open_at: 1.minute.ago.to_s, tags: ["tag1", "tag2"]
|
||||
@@ -25,7 +26,7 @@ RSpec.describe WebhookDeliveryJob do
|
||||
Timecop.freeze do
|
||||
expected_body = {
|
||||
id: /.+/,
|
||||
at: Time.zone.now.to_s,
|
||||
at: at.to_s,
|
||||
event:,
|
||||
data:,
|
||||
}
|
||||
|
||||
@@ -8,11 +8,14 @@ RSpec.describe OrderCycles::WebhookService do
|
||||
:simple_order_cycle,
|
||||
name: "Order cycle 1",
|
||||
orders_open_at: Time.zone.parse("2022-09-19 09:00:00"),
|
||||
opened_at: Time.zone.parse("2022-09-19 09:00:01"),
|
||||
orders_close_at: Time.zone.parse("2022-09-19 17:00:00"),
|
||||
coordinator:,
|
||||
)
|
||||
}
|
||||
let(:coordinator) { create :distributor_enterprise, name: "Starship Enterprise" }
|
||||
let(:at) { Time.zone.parse("2022-09-19 09:00:02") }
|
||||
subject { OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened", at) }
|
||||
|
||||
describe "creating payloads" do
|
||||
it "doesn't create webhook payload for enterprise users" do
|
||||
@@ -21,7 +24,7 @@ RSpec.describe OrderCycles::WebhookService do
|
||||
coordinator_user = create(:user, enterprises: [coordinator])
|
||||
coordinator_user.webhook_endpoints.create!(url: "http://coordinator_user_url")
|
||||
|
||||
expect{ OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened") }
|
||||
expect{ subject }
|
||||
.not_to enqueue_job(WebhookDeliveryJob).with("http://coordinator_user_url", any_args)
|
||||
end
|
||||
|
||||
@@ -31,7 +34,7 @@ RSpec.describe OrderCycles::WebhookService do
|
||||
end
|
||||
|
||||
it "creates webhook payload for order cycle coordinator" do
|
||||
expect{ OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened") }
|
||||
expect{ subject }
|
||||
.to enqueue_job(WebhookDeliveryJob).with("http://coordinator_owner_url", any_args)
|
||||
end
|
||||
|
||||
@@ -43,20 +46,21 @@ RSpec.describe OrderCycles::WebhookService do
|
||||
id: order_cycle.id,
|
||||
name: "Order cycle 1",
|
||||
orders_open_at: Time.zone.parse("2022-09-19 09:00:00"),
|
||||
opened_at: Time.zone.parse("2022-09-19 09:00:01"),
|
||||
orders_close_at: Time.zone.parse("2022-09-19 17:00:00"),
|
||||
coordinator_id: coordinator.id,
|
||||
coordinator_name: "Starship Enterprise",
|
||||
}
|
||||
|
||||
expect{ OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened") }
|
||||
expect{ subject }
|
||||
.to enqueue_job(WebhookDeliveryJob).exactly(1).times
|
||||
.with("http://coordinator_owner_url", "order_cycle.opened", hash_including(data))
|
||||
.with("http://coordinator_owner_url", "order_cycle.opened", hash_including(data), at:)
|
||||
end
|
||||
end
|
||||
|
||||
context "coordinator owner doesn't have endpoint configured" do
|
||||
it "doesn't create webhook payload" do
|
||||
expect{ OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened") }
|
||||
expect{ subject }
|
||||
.not_to enqueue_job(WebhookDeliveryJob)
|
||||
end
|
||||
end
|
||||
@@ -84,13 +88,13 @@ RSpec.describe OrderCycles::WebhookService do
|
||||
coordinator_name: "Starship Enterprise",
|
||||
}
|
||||
|
||||
expect{
|
||||
OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened")
|
||||
}
|
||||
expect{ subject }
|
||||
.to enqueue_job(WebhookDeliveryJob).with("http://distributor1_owner_url",
|
||||
"order_cycle.opened", hash_including(data))
|
||||
"order_cycle.opened", hash_including(data),
|
||||
at:)
|
||||
.and enqueue_job(WebhookDeliveryJob).with("http://distributor2_owner_url",
|
||||
"order_cycle.opened", hash_including(data))
|
||||
"order_cycle.opened", hash_including(data),
|
||||
at:)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -107,9 +111,7 @@ RSpec.describe OrderCycles::WebhookService do
|
||||
it "creates only one webhook payload for the user's endpoint" do
|
||||
user.webhook_endpoints.create! url: "http://coordinator_owner_url"
|
||||
|
||||
expect{
|
||||
OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened")
|
||||
}
|
||||
expect{ subject }
|
||||
.to enqueue_job(WebhookDeliveryJob).with("http://coordinator_owner_url", any_args)
|
||||
end
|
||||
end
|
||||
@@ -131,9 +133,7 @@ RSpec.describe OrderCycles::WebhookService do
|
||||
}
|
||||
|
||||
it "doesn't create a webhook payload for supplier owner" do
|
||||
expect{
|
||||
OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened")
|
||||
}
|
||||
expect{ subject }
|
||||
.not_to enqueue_job(WebhookDeliveryJob).with("http://supplier_owner_url", any_args)
|
||||
end
|
||||
end
|
||||
@@ -142,7 +142,7 @@ RSpec.describe OrderCycles::WebhookService do
|
||||
|
||||
context "without webhook subscribed to enterprise" do
|
||||
it "doesn't create webhook payload" do
|
||||
expect{ OrderCycles::WebhookService.create_webhook_job(order_cycle, "order_cycle.opened") }
|
||||
expect{ subject }
|
||||
.not_to enqueue_job(WebhookDeliveryJob)
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user