Move to new job

And re-organise specs.

TOFIX: concurrency test now fails. why?

Use correct testing methods
This commit is contained in:
David Cook
2025-02-21 13:15:03 +11:00
committed by Maikel Linke
parent a8490a9b11
commit eff8fdb28b
5 changed files with 148 additions and 106 deletions

View File

@@ -0,0 +1,63 @@
# frozen_string_literal: true
# Run any pre-conditions and mark order cycle as open.
#
# Currently, an order cycle is considered open in the shopfront when orders_open_at >= now.
# But now there are some pre-conditions for opening an order cycle, so we would like to change that.
# Instead, the presence of opened_at (and absence of closed_at) should indicate it is open.
class OpenOrderCycleJob < ApplicationJob
def perform(order_cycle_id)
order_cycle = OrderCycle.find(order_cycle_id)
sync_remote_variants(order_cycle)
# Mark as opened
opened_at = Time.zone.now
order_cycle.update_columns(opened_at:, updated_at: opened_at)
# And notify any subscribers
OrderCycles::WebhookService.create_webhook_job(order_cycle, 'order_cycle.opened', opened_at)
end
private
def sync_remote_variants(order_cycle)
# Sync any remote variants for each supplier
order_cycle.suppliers.each do |supplier|
links = variant_links_for(order_cycle, supplier)
next if links.empty?
# Find authorised user to access remote products
dfc_user = supplier.owner # we assume the owner's account is the one used to import from dfc.
import_variants(links, dfc_user)
end
end
# Fetch all remote variants for this supplier in the order cycle
def variant_links_for(order_cycle, supplier)
variants = order_cycle.exchanges.incoming.from_enterprise(supplier)
.joins(:exchange_variants).select('exchange_variants.variant_id')
SemanticLink.where(subject_id: variants)
end
def import_variants(links, dfc_user)
# Find any catalogues associated with the variants
catalogs = links.group_by do |link|
FdcUrlBuilder.new(link.semantic_id).catalog_url
end
# Import selected variants from each catalog
catalogs.each do |catalog_url, catalog_links|
catalog_json = DfcRequest.new(dfc_user).call(catalog_url)
graph = DfcIo.import(catalog_json)
catalog = DfcCatalog.new(graph)
catalog.apply_wholesale_values!
catalog_links.each do |link|
catalog_item = catalog.item(link.semantic_id)
SuppliedProductImporter.update_product(catalog_item, link.subject) if catalog_item
end
end
end
end

View File

@@ -1,49 +1,11 @@
# frozen_string_literal: true
# Trigger jobs for any order cycles that recently opened
#
# Currently, an order cycle is considered open in the shopfront when orders_open_at >= now.
# But now there are some pre-conditions for opening an order cycle, so we would like to change that.
# When it changes, this would become OrderCycleOpeningJob, with the responsibility of opening each
# order cycle by setting opened_at = now.
class OrderCycleOpenedJob < ApplicationJob
def perform
ActiveRecord::Base.transaction do
recently_opened_order_cycles.find_each do |order_cycle|
# Process the order cycle. this might take a while, so this should be shifted to a separate job to allow concurrent processing.
order_cycle.suppliers.each do |supplier|
# Find authorised user to access remote products
dfc_user = supplier.owner # we assume the owner's account is the one used to import from dfc.
# Fetch all remote variants for this supplier in the order cycle
variants = order_cycle.exchanges.incoming.from_enterprise(supplier).joins(:exchange_variants).select('exchange_variants.variant_id')
links = SemanticLink.where(subject_id: variants)
# Find any catalogues associated with the variants
catalogs = links.group_by do |link|
FdcUrlBuilder.new(link.semantic_id).catalog_url
end
# Import selected variants from each catalog
catalogs.each do |catalog_url, catalog_links|
catalog_json = DfcRequest.new(dfc_user).call(catalog_url)
graph = DfcIo.import(catalog_json)
catalog = DfcCatalog.new(graph)
catalog.apply_wholesale_values!
catalog_links.each do |link|
catalog_item = catalog.item(link.semantic_id)
SuppliedProductImporter.update_product(catalog_item, link.subject) if catalog_item
end
end
end
opened_at = Time.zone.now
order_cycle.update_columns(opened_at:, updated_at: opened_at)
# And notify any subscribers
OrderCycles::WebhookService.create_webhook_job(order_cycle, 'order_cycle.opened', opened_at)
OpenOrderCycleJob.perform_later(order_cycle.id)
end
end
end
@@ -51,7 +13,7 @@ class OrderCycleOpenedJob < ApplicationJob
private
def recently_opened_order_cycles
@recently_opened_order_cycles ||= OrderCycle
OrderCycle
.where(opened_at: nil)
.where(orders_open_at: 1.hour.ago..Time.zone.now)
.lock.order(:id)

View File

@@ -0,0 +1,60 @@
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe OpenOrderCycleJob do
let(:order_cycle) { create(:simple_order_cycle, orders_open_at: Time.zone.now) }
subject { OpenOrderCycleJob.perform_now(order_cycle.id) }
it "marks as open" do
Timecop.freeze do
expect {
subject
order_cycle.reload
}
.to change { order_cycle.opened_at }.to(Time.zone.now)
end
end
it "enqueues webhook job" do
Timecop.freeze do
expect(OrderCycles::WebhookService)
.to receive(:create_webhook_job).with(order_cycle, 'order_cycle.opened', Time.zone.now).once
subject
end
end
describe "syncing remote products" do
let!(:user) { create(:testdfc_user, owned_enterprises: [enterprise]) }
let(:enterprise) { create(:supplier_enterprise) }
let!(:variant) { create(:variant, name: "Sauce", supplier_id: enterprise.id) }
let!(:order_cycle) {
create(:simple_order_cycle, orders_open_at: Time.zone.now,
suppliers: [enterprise], variants: [variant])
}
it "synchronises products from a FDC catalog", vcr: true do
user.update!(oidc_account: build(:testdfc_account))
# One product is existing in OFN
product_id =
"https://env-0105831.jcloud-ver-jpe.ik-server.com/api/dfc/Enterprises/test-hodmedod/SuppliedProducts/44519466467635"
variant.semantic_links << SemanticLink.new(semantic_id: product_id)
expect {
subject
variant.reload
order_cycle.reload
}.to change { order_cycle.opened_at }
.and change { enterprise.supplied_products.count }.by(0) # It shouldn't add, only update
.and change { variant.display_name }
.and change { variant.unit_value }
# 18.85 wholesale variant price divided by 12 cans in the slab.
.and change { variant.price }.to(1.57)
.and change { variant.on_demand }.to(true)
.and change { variant.on_hand }.by(0)
.and query_database 50
end
end
end

View File

@@ -6,35 +6,28 @@ require_relative '../../engines/dfc_provider/spec/support/authorization_helper'
RSpec.describe OrderCycleOpenedJob do
include AuthorizationHelper
#todo: I don't think we need order cycles with exchanges. we don't need the factory at all here.
# also, maybe rearrange the spec. test selection with opened_at. then webhooks can have it's own define block.
let(:oc_opened_before) {
create(:order_cycle, orders_open_at: 1.hour.ago)
create(:simple_order_cycle, orders_open_at: 1.hour.ago)
}
let(:oc_opened_now) {
create(:order_cycle, orders_open_at: Time.zone.now)
create(:simple_order_cycle, orders_open_at: Time.zone.now)
}
let(:oc_opening_soon) {
create(:order_cycle, orders_open_at: 1.minute.from_now)
create(:simple_order_cycle, orders_open_at: 1.minute.from_now)
}
it "enqueues jobs for recently opened order cycles only" do
expect(OrderCycles::WebhookService)
.to receive(:create_webhook_job).with(oc_opened_now, 'order_cycle.opened')
expect(OrderCycles::WebhookService)
.not_to receive(:create_webhook_job).with(oc_opened_before, 'order_cycle.opened')
expect(OrderCycles::WebhookService)
.not_to receive(:create_webhook_job).with(oc_opening_soon, 'order_cycle.opened')
OrderCycleOpenedJob.perform_now
expect{ OrderCycleOpenedJob.perform_now }
.to enqueue_job(OpenOrderCycleJob).with(oc_opened_now.id)
.and enqueue_job(OpenOrderCycleJob).with(oc_opened_before.id).exactly(0).times
.and enqueue_job(OpenOrderCycleJob).with(oc_opening_soon.id).exactly(0).times
end
describe "concurrency", concurrency: true do
let(:breakpoint) { Mutex.new }
it "doesn't place duplicate job when run concurrently" do
pending "dunno why this doesn't work" # but then maybe this can be better handled in the sub-job.
oc_opened_now
# Pause jobs when placing new job:
@@ -46,58 +39,22 @@ RSpec.describe OrderCycleOpenedJob do
end
)
expect(OrderCycles::WebhookService)
.to receive(:create_webhook_job).with(oc_opened_now, 'order_cycle.opened').once
# Start two jobs in parallel:
threads = [
Thread.new { OrderCycleOpenedJob.perform_now },
Thread.new { OrderCycleOpenedJob.perform_now },
]
# Wait for both to jobs to pause.
# This can reveal a race condition.
sleep 0.1
# Resume and complete both jobs:
breakpoint.unlock
threads.each(&:join)
end
end
describe "syncing remote products" do
let!(:user) { create(:oidc_user, owned_enterprises: [enterprise]) }
let(:enterprise) { create(:supplier_enterprise) }
let!(:variant) { create(:variant, name: "Sauce", supplier_id: enterprise.id) }
let!(:order_cycle) { create(:simple_order_cycle, orders_open_at: Time.zone.now,
suppliers: [enterprise], variants: [variant]) }
before do
user.oidc_account.update!(token: allow_token_for(email: user.email))
end
# should we move any parts of importing to a separate class, and test it separately?
it "synchronises products from a FDC catalog", vcr: true do
user.update!(oidc_account: build(:testdfc_account))
# One product is existing in OFN
product_id =
"https://env-0105831.jcloud-ver-jpe.ik-server.com/api/dfc/Enterprises/test-hodmedod/SuppliedProducts/44519466467635"
variant.semantic_links << SemanticLink.new(semantic_id: product_id)
expect {
OrderCycleOpenedJob.perform_now
variant.reload
order_cycle.reload
}.to change { order_cycle.opened_at }
.and change { enterprise.supplied_products.count }.by(0) # It should not add products, only update existing
.and change { variant.display_name }
.and change { variant.unit_value }
# 18.85 wholesale variant price divided by 12 cans in the slab.
.and change { variant.price }.to(1.57)
.and change { variant.on_demand }.to(true)
.and change { variant.on_hand }.by(0)
.and query_database 45
# Start two jobs in parallel:
threads = [
Thread.new { OrderCycleOpenedJob.perform_now },
Thread.new { OrderCycleOpenedJob.perform_now },
]
# Wait for both to jobs to pause.
# This can reveal a race condition.
sleep 0.1
# Resume and complete both jobs:
breakpoint.unlock
threads.each(&:join)
}
.to enqueue_job(OpenOrderCycleJob).with(oc_opened_now.id).once
end
end
end