fix: sales concurrency bug (#537)

This commit is contained in:
Adam Uhlíř 2023-09-05 16:47:29 +02:00 committed by GitHub
parent d3a22a7b7b
commit ae89db1eea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 10 deletions

View File

@ -101,11 +101,8 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
if sales.running: if sales.running:
sales.agents.keepItIf(it != agent) sales.agents.keepItIf(it != agent)
proc cleanUp(sales: Sales, proc filled(sales: Sales,
agent: SalesAgent, processing: Future[void]) =
processing: Future[void]) {.async.} =
await sales.remove(agent)
# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished(): if not processing.isNil and not processing.finished():
processing.complete() processing.complete()
@ -121,7 +118,10 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
) )
agent.context.onCleanUp = proc {.async.} = agent.context.onCleanUp = proc {.async.} =
await sales.cleanUp(agent, done) await sales.remove(agent)
agent.context.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(done)
agent.start(SalePreparing()) agent.start(SalePreparing())
sales.agents.add agent sales.agents.add agent

View File

@ -14,6 +14,7 @@ type
onStore*: ?OnStore onStore*: ?OnStore
onClear*: ?OnClear onClear*: ?OnClear
onSale*: ?OnSale onSale*: ?OnSale
onFilled*: ?OnFilled
onCleanUp*: OnCleanUp onCleanUp*: OnCleanUp
onProve*: ?OnProve onProve*: ?OnProve
reservations*: Reservations reservations*: Reservations
@ -28,4 +29,11 @@ type
slotIndex: UInt256) {.gcsafe, upraises: [].} slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSale* = proc(request: StorageRequest, OnSale* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].} slotIndex: UInt256) {.gcsafe, upraises: [].}
# OnFilled has same function as OnSale, but is kept for internal purposes and should not be set by any external
# purposes as it is used for freeing Queue Workers after slot is filled. And the callbacks allows only
# one callback to be set, so if some other component would use it, it would override the Slot Queue freeing
# mechanism which would lead to blocking of the queue.
OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].} OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].}

View File

@ -38,9 +38,11 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
if host == me.some: if host == me.some:
info "Slot succesfully filled", requestId = $data.requestId, slotIndex info "Slot succesfully filled", requestId = $data.requestId, slotIndex
if request =? data.request and slotIndex =? data.slotIndex and if request =? data.request and slotIndex =? data.slotIndex:
onSale =? context.onSale: if onSale =? context.onSale:
onSale(request, slotIndex) onSale(request, slotIndex)
if onFilled =? context.onFilled:
onFilled(request, slotIndex)
when codex_enable_proof_failures: when codex_enable_proof_failures:
if context.simulateProofFailures > 0: if context.simulateProofFailures > 0:

View File

@ -377,7 +377,7 @@ asyncchecksuite "Sales":
check market.filled[0].proof == proof check market.filled[0].proof == proof
check market.filled[0].host == await market.getSigner() check market.filled[0].host == await market.getSigner()
test "calls onSale when slot is filled": test "calls onFilled when slot is filled":
var soldAvailability: Availability var soldAvailability: Availability
var soldRequest: StorageRequest var soldRequest: StorageRequest
var soldSlotIndex: UInt256 var soldSlotIndex: UInt256