From 3db7b49537cfbf168154bba2a4a68f2b0539c2f2 Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Mon, 14 Nov 2022 21:20:38 +1100 Subject: [PATCH] [marketplace] handle slot filled by other host Handle the case when in the downloading, proving, or filling states, that another host fills the slot. --- codex/sales/salesagent.nim | 26 ++++++++++++-- codex/sales/statemachine.nim | 5 +++ codex/sales/states/downloading.nim | 5 +++ codex/sales/states/filled.nim | 2 +- codex/sales/states/filling.nim | 15 +++------ codex/sales/states/proving.nim | 5 +++ tests/codex/testsales.nim | 54 +++++++++++++++++++++++++++++- 7 files changed, 97 insertions(+), 15 deletions(-) diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 058e8259..3cf1631e 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -2,9 +2,7 @@ import pkg/chronos import pkg/upraises import pkg/stint import ./statemachine -import ./states/[downloading, unknown] import ../contracts/requests -import ../rng proc newSalesAgent*(sales: Sales, requestId: RequestId, @@ -21,6 +19,7 @@ proc newSalesAgent*(sales: Sales, # fwd declarations proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.} proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.} +proc subscribeSlotFilled*(agent: SalesAgent): Future[void] {.gcsafe.} proc retrieveRequest(agent: SalesAgent) {.async.} = if agent.request.isNone: @@ -31,6 +30,7 @@ proc start*(agent: SalesAgent, numSlots: uint64) {.async.} = await agent.retrieveRequest() await agent.subscribeCancellation() await agent.subscribeFailure() + await agent.subscribeSlotFilled() proc stop*(agent: SalesAgent) {.async.} = try: @@ -41,6 +41,10 @@ proc stop*(agent: SalesAgent) {.async.} = await agent.failed.unsubscribe() except CatchableError: discard + try: + await agent.slotFilled.unsubscribe() + except CatchableError: + discard if not agent.cancelled.completed: await agent.cancelled.cancelAndWait() @@ -75,7 +79,25 @@ proc subscribeFailure*(agent: SalesAgent) {.async.} = state =? (agent.state as SaleState): return + await agent.failed.unsubscribe() await state.onFailed(request) agent.failed = await market.subscribeRequestFailed(agent.requestId, onFailed) + +proc subscribeSlotFilled*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + without slotIndex =? agent.slotIndex: + raiseAssert "no slot selected" + + proc onSlotFilled(requestId: RequestId, + slotIndex: UInt256) {.async.} = + without state =? (agent.state as SaleState): + return + + await agent.slotFilled.unsubscribe() + await state.onSlotFilled(requestId, slotIndex) + + agent.slotFilled = + await market.subscribeSlotFilled(agent.requestId, slotIndex, onSlotFilled) diff --git a/codex/sales/statemachine.nim b/codex/sales/statemachine.nim index af180e51..610208e3 100644 --- a/codex/sales/statemachine.nim +++ b/codex/sales/statemachine.nim @@ -35,6 +35,7 @@ type slotIndex*: ?UInt256 failed*: market.Subscription fulfilled*: market.Subscription + slotFilled*: market.Subscription cancelled*: Future[void] SaleState* = ref object of AsyncState SaleError* = ref object of CodexError @@ -102,3 +103,7 @@ method onCancelled*(state: SaleState, request: StorageRequest) {.base, async.} = method onFailed*(state: SaleState, request: StorageRequest) {.base, async.} = discard + +method onSlotFilled*(state: SaleState, requestId: RequestId, + slotIndex: UInt256) {.base, async.} = + discard diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index b0278e76..4d2a9a5f 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -1,6 +1,7 @@ import std/sequtils import ./cancelled import ./failed +import ./filled import ./proving import ./errored import ../salesagent @@ -21,6 +22,10 @@ method onCancelled*(state: SaleDownloading, request: StorageRequest) {.async.} = method onFailed*(state: SaleDownloading, request: StorageRequest) {.async.} = await state.switchAsync(SaleFailed()) +method onSlotFilled*(state: SaleDownloading, requestId: RequestId, + slotIndex: UInt256) {.async.} = + await state.switchAsync(SaleFilled()) + method enterAsync(state: SaleDownloading) {.async.} = without agent =? (state.context as SalesAgent): raiseAssert "invalid state" diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim index 8e2eb07c..19a17342 100644 --- a/codex/sales/states/filled.nim +++ b/codex/sales/states/filled.nim @@ -32,7 +32,7 @@ method enterAsync(state: SaleFilled) {.async.} = if host == me.some: await state.switchAsync(SaleFinished()) else: - let error = newException(SaleFilledError, "Sale host mismatch") + let error = newException(SaleFilledError, "Slot filled by other host") await state.switchAsync(SaleErrored(error: error)) except CancelledError: diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 688bdbd3..ed5ca209 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -19,27 +19,20 @@ method onCancelled*(state: SaleFilling, request: StorageRequest) {.async.} = method onFailed*(state: SaleFilling, request: StorageRequest) {.async.} = await state.switchAsync(SaleFailed()) +method onSlotFilled*(state: SaleFilling, requestId: RequestId, + slotIndex: UInt256) {.async.} = + await state.switchAsync(SaleFilled()) + method enterAsync(state: SaleFilling) {.async.} = without agent =? (state.context as SalesAgent): raiseAssert "invalid state" - var subscription: market.Subscription - - proc onSlotFilled(requestId: RequestId, - slotIndex: UInt256) {.async.} = - await subscription.unsubscribe() - await state.switchAsync(SaleFilled()) - try: let market = agent.sales.market without slotIndex =? agent.slotIndex: raiseAssert "no slot selected" - subscription = await market.subscribeSlotFilled(agent.requestId, - slotIndex, - onSlotFilled) - await market.fillSlot(agent.requestId, slotIndex, state.proof) except CancelledError: diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index 27bb14ab..647a2adb 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -2,6 +2,7 @@ import ../statemachine import ./filling import ./cancelled import ./failed +import ./filled import ./errored type @@ -16,6 +17,10 @@ method onCancelled*(state: SaleProving, request: StorageRequest) {.async.} = method onFailed*(state: SaleProving, request: StorageRequest) {.async.} = await state.switchAsync(SaleFailed()) +method onSlotFilled*(state: SaleProving, requestId: RequestId, + slotIndex: UInt256) {.async.} = + await state.switchAsync(SaleFilled()) + method enterAsync(state: SaleProving) {.async.} = without agent =? (state.context as SalesAgent): raiseAssert "invalid state" diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index 98a5f768..1a5e0b2b 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -194,6 +194,7 @@ suite "Sales": await market.requestStorage(request) for slotIndex in 0..