diff --git a/codex/purchasing/states/cancelled.nim b/codex/purchasing/states/cancelled.nim index 760dc81a..5aeeceac 100644 --- a/codex/purchasing/states/cancelled.nim +++ b/codex/purchasing/states/cancelled.nim @@ -1,25 +1,35 @@ import pkg/metrics import ../../logutils +import ../../utils/exceptions import ../statemachine -import ./errorhandling +import ./error declareCounter(codex_purchases_cancelled, "codex purchases cancelled") logScope: topics = "marketplace purchases cancelled" -type PurchaseCancelled* = ref object of ErrorHandlingState +type PurchaseCancelled* = ref object of PurchaseState method `$`*(state: PurchaseCancelled): string = "cancelled" -method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.async.} = +method run*( + state: PurchaseCancelled, machine: Machine +): Future[?State] {.async: (raises: []).} = codex_purchases_cancelled.inc() let purchase = Purchase(machine) - warn "Request cancelled, withdrawing remaining funds", requestId = purchase.requestId - await purchase.market.withdrawFunds(purchase.requestId) + try: + warn "Request cancelled, withdrawing remaining funds", + requestId = purchase.requestId + await purchase.market.withdrawFunds(purchase.requestId) - let error = newException(Timeout, "Purchase cancelled due to timeout") - purchase.future.fail(error) + let error = newException(Timeout, "Purchase cancelled due to timeout") + purchase.future.fail(error) + except CancelledError as e: + trace "PurchaseCancelled.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during PurchaseCancelled.run", error = e.msgDetail + return some State(PurchaseErrored(error: e)) diff --git a/codex/purchasing/states/error.nim b/codex/purchasing/states/error.nim index d7017b38..afa9f54f 100644 --- a/codex/purchasing/states/error.nim +++ b/codex/purchasing/states/error.nim @@ -14,7 +14,9 @@ type PurchaseErrored* = ref object of PurchaseState method `$`*(state: PurchaseErrored): string = "errored" -method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.async.} = +method run*( + state: PurchaseErrored, machine: Machine +): Future[?State] {.async: (raises: []).} = codex_purchases_error.inc() let purchase = Purchase(machine) diff --git a/codex/purchasing/states/errorhandling.nim b/codex/purchasing/states/errorhandling.nim deleted file mode 100644 index 8ef91ba6..00000000 --- a/codex/purchasing/states/errorhandling.nim +++ /dev/null @@ -1,8 +0,0 @@ -import pkg/questionable -import ../statemachine -import ./error - -type ErrorHandlingState* = ref object of PurchaseState - -method onError*(state: ErrorHandlingState, error: ref CatchableError): ?State = - some State(PurchaseErrored(error: error)) diff --git a/codex/purchasing/states/failed.nim b/codex/purchasing/states/failed.nim index 5a126a73..1f6be74f 100644 --- a/codex/purchasing/states/failed.nim +++ b/codex/purchasing/states/failed.nim @@ -1,6 +1,7 @@ import pkg/metrics import ../statemachine import ../../logutils +import ../../utils/exceptions import ./error declareCounter(codex_purchases_failed, "codex purchases failed") @@ -10,11 +11,20 @@ type PurchaseFailed* = ref object of PurchaseState method `$`*(state: PurchaseFailed): string = "failed" -method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.async.} = +method run*( + state: PurchaseFailed, machine: Machine +): Future[?State] {.async: (raises: []).} = codex_purchases_failed.inc() let purchase = Purchase(machine) - warn "Request failed, withdrawing remaining funds", requestId = purchase.requestId - await purchase.market.withdrawFunds(purchase.requestId) + + try: + warn "Request failed, withdrawing remaining funds", requestId = purchase.requestId + await purchase.market.withdrawFunds(purchase.requestId) + except CancelledError as e: + trace "PurchaseFailed.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during PurchaseFailed.run", error = e.msgDetail + return some State(PurchaseErrored(error: e)) let error = newException(PurchaseError, "Purchase failed") return some State(PurchaseErrored(error: error)) diff --git a/codex/purchasing/states/finished.nim b/codex/purchasing/states/finished.nim index 6cf5ffcc..bb7a726d 100644 --- a/codex/purchasing/states/finished.nim +++ b/codex/purchasing/states/finished.nim @@ -1,7 +1,9 @@ import pkg/metrics import ../statemachine +import ../../utils/exceptions import ../../logutils +import ./error declareCounter(codex_purchases_finished, "codex purchases finished") @@ -13,10 +15,19 @@ type PurchaseFinished* = ref object of PurchaseState method `$`*(state: PurchaseFinished): string = "finished" -method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.async.} = +method run*( + state: PurchaseFinished, machine: Machine +): Future[?State] {.async: (raises: []).} = codex_purchases_finished.inc() let purchase = Purchase(machine) - info "Purchase finished, withdrawing remaining funds", requestId = purchase.requestId - await purchase.market.withdrawFunds(purchase.requestId) + try: + info "Purchase finished, withdrawing remaining funds", + requestId = purchase.requestId + await purchase.market.withdrawFunds(purchase.requestId) - purchase.future.complete() + purchase.future.complete() + except CancelledError as e: + trace "PurchaseFinished.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during PurchaseFinished.run", error = e.msgDetail + return some State(PurchaseErrored(error: e)) diff --git a/codex/purchasing/states/pending.nim b/codex/purchasing/states/pending.nim index 4852f266..1472a63e 100644 --- a/codex/purchasing/states/pending.nim +++ b/codex/purchasing/states/pending.nim @@ -1,18 +1,28 @@ import pkg/metrics +import ../../logutils +import ../../utils/exceptions import ../statemachine -import ./errorhandling import ./submitted +import ./error declareCounter(codex_purchases_pending, "codex purchases pending") -type PurchasePending* = ref object of ErrorHandlingState +type PurchasePending* = ref object of PurchaseState method `$`*(state: PurchasePending): string = "pending" -method run*(state: PurchasePending, machine: Machine): Future[?State] {.async.} = +method run*( + state: PurchasePending, machine: Machine +): Future[?State] {.async: (raises: []).} = codex_purchases_pending.inc() let purchase = Purchase(machine) - let request = !purchase.request - await purchase.market.requestStorage(request) - return some State(PurchaseSubmitted()) + try: + let request = !purchase.request + await purchase.market.requestStorage(request) + return some State(PurchaseSubmitted()) + except CancelledError as e: + trace "PurchasePending.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during PurchasePending.run", error = e.msgDetail + return some State(PurchaseErrored(error: e)) diff --git a/codex/purchasing/states/started.nim b/codex/purchasing/states/started.nim index 083e64c8..e93d7013 100644 --- a/codex/purchasing/states/started.nim +++ b/codex/purchasing/states/started.nim @@ -1,22 +1,25 @@ import pkg/metrics import ../../logutils +import ../../utils/exceptions import ../statemachine -import ./errorhandling import ./finished import ./failed +import ./error declareCounter(codex_purchases_started, "codex purchases started") logScope: topics = "marketplace purchases started" -type PurchaseStarted* = ref object of ErrorHandlingState +type PurchaseStarted* = ref object of PurchaseState method `$`*(state: PurchaseStarted): string = "started" -method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} = +method run*( + state: PurchaseStarted, machine: Machine +): Future[?State] {.async: (raises: []).} = codex_purchases_started.inc() let purchase = Purchase(machine) @@ -28,15 +31,24 @@ method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} proc callback(_: RequestId) = failed.complete() - let subscription = await market.subscribeRequestFailed(purchase.requestId, callback) + var ended: Future[void] + try: + let subscription = await market.subscribeRequestFailed(purchase.requestId, callback) - # Ensure that we're past the request end by waiting an additional second - let ended = clock.waitUntil((await market.getRequestEnd(purchase.requestId)) + 1) - let fut = await one(ended, failed) - await subscription.unsubscribe() - if fut.id == failed.id: + # Ensure that we're past the request end by waiting an additional second + ended = clock.waitUntil((await market.getRequestEnd(purchase.requestId)) + 1) + let fut = await one(ended, failed) + await subscription.unsubscribe() + if fut.id == failed.id: + ended.cancelSoon() + return some State(PurchaseFailed()) + else: + failed.cancelSoon() + return some State(PurchaseFinished()) + except CancelledError as e: ended.cancelSoon() - return some State(PurchaseFailed()) - else: failed.cancelSoon() - return some State(PurchaseFinished()) + trace "PurchaseStarted.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during PurchaseStarted.run", error = e.msgDetail + return some State(PurchaseErrored(error: e)) diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index 1cf65b1f..dd3669e4 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -1,22 +1,25 @@ import pkg/metrics import ../../logutils +import ../../utils/exceptions import ../statemachine -import ./errorhandling import ./started import ./cancelled +import ./error logScope: topics = "marketplace purchases submitted" declareCounter(codex_purchases_submitted, "codex purchases submitted") -type PurchaseSubmitted* = ref object of ErrorHandlingState +type PurchaseSubmitted* = ref object of PurchaseState method `$`*(state: PurchaseSubmitted): string = "submitted" -method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.} = +method run*( + state: PurchaseSubmitted, machine: Machine +): Future[?State] {.async: (raises: []).} = codex_purchases_submitted.inc() let purchase = Purchase(machine) let request = !purchase.request @@ -44,5 +47,10 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async. await wait().withTimeout() except Timeout: return some State(PurchaseCancelled()) + except CancelledError as e: + trace "PurchaseSubmitted.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during PurchaseSubmitted.run", error = e.msgDetail + return some State(PurchaseErrored(error: e)) return some State(PurchaseStarted()) diff --git a/codex/purchasing/states/unknown.nim b/codex/purchasing/states/unknown.nim index 54e09942..8c2bff48 100644 --- a/codex/purchasing/states/unknown.nim +++ b/codex/purchasing/states/unknown.nim @@ -1,34 +1,44 @@ import pkg/metrics +import ../../utils/exceptions +import ../../logutils import ../statemachine -import ./errorhandling import ./submitted import ./started import ./cancelled import ./finished import ./failed +import ./error declareCounter(codex_purchases_unknown, "codex purchases unknown") -type PurchaseUnknown* = ref object of ErrorHandlingState +type PurchaseUnknown* = ref object of PurchaseState method `$`*(state: PurchaseUnknown): string = "unknown" -method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.} = - codex_purchases_unknown.inc() - let purchase = Purchase(machine) - if (request =? await purchase.market.getRequest(purchase.requestId)) and - (requestState =? await purchase.market.requestState(purchase.requestId)): - purchase.request = some request +method run*( + state: PurchaseUnknown, machine: Machine +): Future[?State] {.async: (raises: []).} = + try: + codex_purchases_unknown.inc() + let purchase = Purchase(machine) + if (request =? await purchase.market.getRequest(purchase.requestId)) and + (requestState =? await purchase.market.requestState(purchase.requestId)): + purchase.request = some request - case requestState - of RequestState.New: - return some State(PurchaseSubmitted()) - of RequestState.Started: - return some State(PurchaseStarted()) - of RequestState.Cancelled: - return some State(PurchaseCancelled()) - of RequestState.Finished: - return some State(PurchaseFinished()) - of RequestState.Failed: - return some State(PurchaseFailed()) + case requestState + of RequestState.New: + return some State(PurchaseSubmitted()) + of RequestState.Started: + return some State(PurchaseStarted()) + of RequestState.Cancelled: + return some State(PurchaseCancelled()) + of RequestState.Finished: + return some State(PurchaseFinished()) + of RequestState.Failed: + return some State(PurchaseFailed()) + except CancelledError as e: + trace "PurchaseUnknown.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during PurchaseUnknown.run", error = e.msgDetail + return some State(PurchaseErrored(error: e)) diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 8a8e5dc0..f04182aa 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -6,6 +6,7 @@ import pkg/upraises import ../contracts/requests import ../errors import ../logutils +import ../utils/exceptions import ./statemachine import ./salescontext import ./salesdata @@ -68,41 +69,48 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} = let data = agent.data let clock = agent.context.clock - proc onCancelled() {.async.} = + proc onCancelled() {.async: (raises: []).} = without request =? data.request: return - let market = agent.context.market - let expiry = await market.requestExpiresAt(data.requestId) + try: + let market = agent.context.market + let expiry = await market.requestExpiresAt(data.requestId) - while true: - let deadline = max(clock.now, expiry) + 1 - trace "Waiting for request to be cancelled", now = clock.now, expiry = deadline - await clock.waitUntil(deadline) + while true: + let deadline = max(clock.now, expiry) + 1 + trace "Waiting for request to be cancelled", now = clock.now, expiry = deadline + await clock.waitUntil(deadline) - without state =? await agent.retrieveRequestState(): - error "Uknown request", requestId = data.requestId - return + without state =? await agent.retrieveRequestState(): + error "Unknown request", requestId = data.requestId + return - case state - of New: - discard - of RequestState.Cancelled: - agent.schedule(cancelledEvent(request)) - break - of RequestState.Started, RequestState.Finished, RequestState.Failed: - break + case state + of New: + discard + of RequestState.Cancelled: + agent.schedule(cancelledEvent(request)) + break + of RequestState.Started, RequestState.Finished, RequestState.Failed: + break - debug "The request is not yet canceled, even though it should be. Waiting for some more time.", - currentState = state, now = clock.now + debug "The request is not yet canceled, even though it should be. Waiting for some more time.", + currentState = state, now = clock.now + except CancelledError: + trace "Waiting for expiry to lapse was cancelled", requestId = data.requestId + except CatchableError as e: + error "Error while waiting for expiry to lapse", error = e.msgDetail data.cancelled = onCancelled() + asyncSpawn data.cancelled method onFulfilled*( agent: SalesAgent, requestId: RequestId ) {.base, gcsafe, upraises: [].} = - if agent.data.requestId == requestId and not agent.data.cancelled.isNil: - agent.data.cancelled.cancelSoon() + let cancelled = agent.data.cancelled + if agent.data.requestId == requestId and not cancelled.isNil and not cancelled.finished: + cancelled.cancelSoon() method onFailed*( agent: SalesAgent, requestId: RequestId diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 3bb92a2c..3bdf8c2f 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -1,17 +1,20 @@ import ../../logutils +import ../../utils/exceptions import ../salesagent import ../statemachine -import ./errorhandling +import ./errored logScope: topics = "marketplace sales cancelled" -type SaleCancelled* = ref object of ErrorHandlingState +type SaleCancelled* = ref object of SaleState method `$`*(state: SaleCancelled): string = "SaleCancelled" -method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleCancelled, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) let data = agent.data let market = agent.context.market @@ -19,21 +22,27 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = without request =? data.request: raiseAssert "no sale request" - let slot = Slot(request: request, slotIndex: data.slotIndex) - debug "Collecting collateral and partial payout", - requestId = data.requestId, slotIndex = data.slotIndex - let currentCollateral = await market.currentCollateral(slot.id) - await market.freeSlot(slot.id) + try: + let slot = Slot(request: request, slotIndex: data.slotIndex) + debug "Collecting collateral and partial payout", + requestId = data.requestId, slotIndex = data.slotIndex + let currentCollateral = await market.currentCollateral(slot.id) + await market.freeSlot(slot.id) - if onClear =? agent.context.onClear and request =? data.request: - onClear(request, data.slotIndex) + if onClear =? agent.context.onClear and request =? data.request: + onClear(request, data.slotIndex) - if onCleanUp =? agent.onCleanUp: - await onCleanUp( - returnBytes = true, - reprocessSlot = false, - returnedCollateral = some currentCollateral, - ) + if onCleanUp =? agent.onCleanUp: + await onCleanUp( + returnBytes = true, + reprocessSlot = false, + returnedCollateral = some currentCollateral, + ) - warn "Sale cancelled due to timeout", - requestId = data.requestId, slotIndex = data.slotIndex + warn "Sale cancelled due to timeout", + requestId = data.requestId, slotIndex = data.slotIndex + except CancelledError as e: + trace "SaleCancelled.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleCancelled.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index f6ced6be..cb991dc8 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -4,16 +4,16 @@ import pkg/questionable/results import ../../blocktype as bt import ../../logutils import ../../market +import ../../utils/exceptions import ../salesagent import ../statemachine -import ./errorhandling import ./cancelled import ./failed import ./filled import ./initialproving import ./errored -type SaleDownloading* = ref object of ErrorHandlingState +type SaleDownloading* = ref object of SaleState logScope: topics = "marketplace sales downloading" @@ -32,7 +32,9 @@ method onSlotFilled*( ): ?State = return some State(SaleFilled()) -method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleDownloading, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) let data = agent.data let context = agent.context @@ -64,9 +66,15 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} trace "Releasing batch of bytes written to disk", bytes return await reservations.release(reservation.id, reservation.availabilityId, bytes) - trace "Starting download" - if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption: - return some State(SaleErrored(error: err, reprocessSlot: false)) + try: + trace "Starting download" + if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption: + return some State(SaleErrored(error: err, reprocessSlot: false)) - trace "Download complete" - return some State(SaleInitialProving()) + trace "Download complete" + return some State(SaleInitialProving()) + except CancelledError as e: + trace "SaleDownloading.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleDownloading.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index b85b7930..77bf08d3 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -17,10 +17,9 @@ type SaleErrored* = ref object of SaleState method `$`*(state: SaleErrored): string = "SaleErrored" -method onError*(state: SaleState, err: ref CatchableError): ?State {.upraises: [].} = - error "error during SaleErrored run", error = err.msg - -method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleErrored, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) let data = agent.data let context = agent.context @@ -30,8 +29,13 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = requestId = data.requestId, slotIndex = data.slotIndex - if onClear =? context.onClear and request =? data.request: - onClear(request, data.slotIndex) + try: + if onClear =? context.onClear and request =? data.request: + onClear(request, data.slotIndex) - if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot) + if onCleanUp =? agent.onCleanUp: + await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot) + except CancelledError as e: + trace "SaleErrored.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleErrored.run", error = e.msgDetail diff --git a/codex/sales/states/errorhandling.nim b/codex/sales/states/errorhandling.nim deleted file mode 100644 index 2ee399ef..00000000 --- a/codex/sales/states/errorhandling.nim +++ /dev/null @@ -1,8 +0,0 @@ -import pkg/questionable -import ../statemachine -import ./errored - -type ErrorHandlingState* = ref object of SaleState - -method onError*(state: ErrorHandlingState, error: ref CatchableError): ?State = - some State(SaleErrored(error: error)) diff --git a/codex/sales/states/failed.nim b/codex/sales/states/failed.nim index 6103765c..b0d6a7cd 100644 --- a/codex/sales/states/failed.nim +++ b/codex/sales/states/failed.nim @@ -1,30 +1,39 @@ import ../../logutils +import ../../utils/exceptions +import ../../utils/exceptions import ../salesagent import ../statemachine -import ./errorhandling import ./errored logScope: topics = "marketplace sales failed" type - SaleFailed* = ref object of ErrorHandlingState + SaleFailed* = ref object of SaleState SaleFailedError* = object of SaleError method `$`*(state: SaleFailed): string = "SaleFailed" -method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleFailed, machine: Machine +): Future[?State] {.async: (raises: []).} = let data = SalesAgent(machine).data let market = SalesAgent(machine).context.market without request =? data.request: raiseAssert "no sale request" - let slot = Slot(request: request, slotIndex: data.slotIndex) - debug "Removing slot from mySlots", - requestId = data.requestId, slotIndex = data.slotIndex - await market.freeSlot(slot.id) + try: + let slot = Slot(request: request, slotIndex: data.slotIndex) + debug "Removing slot from mySlots", + requestId = data.requestId, slotIndex = data.slotIndex + await market.freeSlot(slot.id) - let error = newException(SaleFailedError, "Sale failed") - return some State(SaleErrored(error: error)) + let error = newException(SaleFailedError, "Sale failed") + return some State(SaleErrored(error: error)) + except CancelledError as e: + trace "SaleFailed.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleFailed.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim index 9e7d9906..b0fc65c9 100644 --- a/codex/sales/states/filled.nim +++ b/codex/sales/states/filled.nim @@ -3,9 +3,9 @@ import pkg/questionable/results import ../../conf import ../../logutils +import ../../utils/exceptions import ../statemachine import ../salesagent -import ./errorhandling import ./errored import ./cancelled import ./failed @@ -18,7 +18,7 @@ logScope: topics = "marketplace sales filled" type - SaleFilled* = ref object of ErrorHandlingState + SaleFilled* = ref object of SaleState HostMismatchError* = object of CatchableError method onCancelled*(state: SaleFilled, request: StorageRequest): ?State = @@ -30,40 +30,48 @@ method onFailed*(state: SaleFilled, request: StorageRequest): ?State = method `$`*(state: SaleFilled): string = "SaleFilled" -method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleFilled, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) let data = agent.data let context = agent.context - let market = context.market - let host = await market.getHost(data.requestId, data.slotIndex) - let me = await market.getSigner() - if host == me.some: - info "Slot succesfully filled", - requestId = data.requestId, slotIndex = data.slotIndex + try: + let host = await market.getHost(data.requestId, data.slotIndex) + let me = await market.getSigner() - without request =? data.request: - raiseAssert "no sale request" + if host == me.some: + info "Slot succesfully filled", + requestId = data.requestId, slotIndex = data.slotIndex - if onFilled =? agent.onFilled: - onFilled(request, data.slotIndex) + without request =? data.request: + raiseAssert "no sale request" - without onExpiryUpdate =? context.onExpiryUpdate: - raiseAssert "onExpiryUpdate callback not set" + if onFilled =? agent.onFilled: + onFilled(request, data.slotIndex) - let requestEnd = await market.getRequestEnd(data.requestId) - if err =? (await onExpiryUpdate(request.content.cid, requestEnd)).errorOption: - return some State(SaleErrored(error: err)) + without onExpiryUpdate =? context.onExpiryUpdate: + raiseAssert "onExpiryUpdate callback not set" - when codex_enable_proof_failures: - if context.simulateProofFailures > 0: - info "Proving with failure rate", rate = context.simulateProofFailures - return some State( - SaleProvingSimulated(failEveryNProofs: context.simulateProofFailures) - ) + let requestEnd = await market.getRequestEnd(data.requestId) + if err =? (await onExpiryUpdate(request.content.cid, requestEnd)).errorOption: + return some State(SaleErrored(error: err)) - return some State(SaleProving()) - else: - let error = newException(HostMismatchError, "Slot filled by other host") - return some State(SaleErrored(error: error)) + when codex_enable_proof_failures: + if context.simulateProofFailures > 0: + info "Proving with failure rate", rate = context.simulateProofFailures + return some State( + SaleProvingSimulated(failEveryNProofs: context.simulateProofFailures) + ) + + return some State(SaleProving()) + else: + let error = newException(HostMismatchError, "Slot filled by other host") + return some State(SaleErrored(error: error)) + except CancelledError as e: + trace "SaleFilled.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleFilled.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 1934fc12..0c20a64e 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -1,9 +1,9 @@ import pkg/stint import ../../logutils import ../../market +import ../../utils/exceptions import ../statemachine import ../salesagent -import ./errorhandling import ./filled import ./cancelled import ./failed @@ -13,7 +13,7 @@ import ./errored logScope: topics = "marketplace sales filling" -type SaleFilling* = ref object of ErrorHandlingState +type SaleFilling* = ref object of SaleState proof*: Groth16Proof method `$`*(state: SaleFilling): string = @@ -25,7 +25,9 @@ method onCancelled*(state: SaleFilling, request: StorageRequest): ?State = method onFailed*(state: SaleFilling, request: StorageRequest): ?State = return some State(SaleFailed()) -method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleFilling, machine: Machine +): Future[?State] {.async: (raises: []).} = let data = SalesAgent(machine).data let market = SalesAgent(machine).context.market without (request =? data.request): @@ -35,28 +37,34 @@ method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = requestId = data.requestId slotIndex = data.slotIndex - let slotState = await market.slotState(slotId(data.requestId, data.slotIndex)) - let requestedCollateral = request.ask.collateralPerSlot - var collateral: UInt256 - - if slotState == SlotState.Repair: - # When repairing the node gets "discount" on the collateral that it needs to - let repairRewardPercentage = (await market.repairRewardPercentage).u256 - collateral = - requestedCollateral - - ((requestedCollateral * repairRewardPercentage)).div(100.u256) - else: - collateral = requestedCollateral - - debug "Filling slot" try: - await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) - except MarketError as e: - if e.msg.contains "Slot is not free": - debug "Slot is already filled, ignoring slot" - return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) - else: - return some State(SaleErrored(error: e)) - # other CatchableErrors are handled "automatically" by the ErrorHandlingState + let slotState = await market.slotState(slotId(data.requestId, data.slotIndex)) + let requestedCollateral = request.ask.collateralPerSlot + var collateral: UInt256 - return some State(SaleFilled()) + if slotState == SlotState.Repair: + # When repairing the node gets "discount" on the collateral that it needs to + let repairRewardPercentage = (await market.repairRewardPercentage).u256 + collateral = + requestedCollateral - + ((requestedCollateral * repairRewardPercentage)).div(100.u256) + else: + collateral = requestedCollateral + + debug "Filling slot" + try: + await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) + except MarketError as e: + if e.msg.contains "Slot is not free": + debug "Slot is already filled, ignoring slot" + return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) + else: + return some State(SaleErrored(error: e)) + # other CatchableErrors are handled "automatically" by the SaleState + + return some State(SaleFilled()) + except CancelledError as e: + trace "SaleFilling.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleFilling.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index 151300d0..2aba69eb 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -1,16 +1,17 @@ import pkg/chronos import ../../logutils +import ../../utils/exceptions import ../statemachine import ../salesagent -import ./errorhandling import ./cancelled import ./failed +import ./errored logScope: topics = "marketplace sales finished" -type SaleFinished* = ref object of ErrorHandlingState +type SaleFinished* = ref object of SaleState returnedCollateral*: ?UInt256 method `$`*(state: SaleFinished): string = @@ -22,7 +23,9 @@ method onCancelled*(state: SaleFinished, request: StorageRequest): ?State = method onFailed*(state: SaleFinished, request: StorageRequest): ?State = return some State(SaleFailed()) -method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleFinished, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) let data = agent.data @@ -32,5 +35,11 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = info "Slot finished and paid out", requestId = data.requestId, slotIndex = data.slotIndex - if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnedCollateral = state.returnedCollateral) + try: + if onCleanUp =? agent.onCleanUp: + await onCleanUp(returnedCollateral = state.returnedCollateral) + except CancelledError as e: + trace "SaleFilled.run onCleanUp was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleFilled.run in onCleanUp callback", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index b915bff5..b07a201c 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -1,9 +1,10 @@ import pkg/chronos import ../../logutils +import ../../utils/exceptions import ../statemachine import ../salesagent -import ./errorhandling +import ./errored logScope: topics = "marketplace sales ignored" @@ -11,17 +12,25 @@ logScope: # Ignored slots could mean there was no availability or that the slot could # not be reserved. -type SaleIgnored* = ref object of ErrorHandlingState +type SaleIgnored* = ref object of SaleState reprocessSlot*: bool # readd slot to queue with `seen` flag returnBytes*: bool # return unreleased bytes from Reservation to Availability method `$`*(state: SaleIgnored): string = "SaleIgnored" -method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleIgnored, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) - if onCleanUp =? agent.onCleanUp: - await onCleanUp( - reprocessSlot = state.reprocessSlot, returnBytes = state.returnBytes - ) + try: + if onCleanUp =? agent.onCleanUp: + await onCleanUp( + reprocessSlot = state.reprocessSlot, returnBytes = state.returnBytes + ) + except CancelledError as e: + trace "SaleIgnored.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleIgnored.run in onCleanUp", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/initialproving.nim b/codex/sales/states/initialproving.nim index bc9ce6b6..3b35ba90 100644 --- a/codex/sales/states/initialproving.nim +++ b/codex/sales/states/initialproving.nim @@ -1,9 +1,9 @@ import pkg/questionable/results import ../../clock import ../../logutils +import ../../utils/exceptions import ../statemachine import ../salesagent -import ./errorhandling import ./filling import ./cancelled import ./errored @@ -12,7 +12,7 @@ import ./failed logScope: topics = "marketplace sales initial-proving" -type SaleInitialProving* = ref object of ErrorHandlingState +type SaleInitialProving* = ref object of SaleState method `$`*(state: SaleInitialProving): string = "SaleInitialProving" @@ -36,7 +36,9 @@ proc waitForStableChallenge(market: Market, clock: Clock, slotId: SlotId) {.asyn while (await market.getPointer(slotId)) > (256 - downtime): await clock.waitUntilNextPeriod(periodicity) -method run*(state: SaleInitialProving, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleInitialProving, machine: Machine +): Future[?State] {.async: (raises: []).} = let data = SalesAgent(machine).data let context = SalesAgent(machine).context let market = context.market @@ -48,16 +50,22 @@ method run*(state: SaleInitialProving, machine: Machine): Future[?State] {.async without onProve =? context.onProve: raiseAssert "onProve callback not set" - debug "Waiting for a proof challenge that is valid for the entire period" - let slot = Slot(request: request, slotIndex: data.slotIndex) - await waitForStableChallenge(market, clock, slot.id) + try: + debug "Waiting for a proof challenge that is valid for the entire period" + let slot = Slot(request: request, slotIndex: data.slotIndex) + await waitForStableChallenge(market, clock, slot.id) - debug "Generating initial proof", requestId = data.requestId - let challenge = await context.market.getChallenge(slot.id) - without proof =? (await onProve(slot, challenge)), err: - error "Failed to generate initial proof", error = err.msg - return some State(SaleErrored(error: err)) + debug "Generating initial proof", requestId = data.requestId + let challenge = await context.market.getChallenge(slot.id) + without proof =? (await onProve(slot, challenge)), err: + error "Failed to generate initial proof", error = err.msg + return some State(SaleErrored(error: err)) - debug "Finished proof calculation", requestId = data.requestId + debug "Finished proof calculation", requestId = data.requestId - return some State(SaleFilling(proof: proof)) + return some State(SaleFilling(proof: proof)) + except CancelledError as e: + trace "SaleInitialProving.run onCleanUp was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleInitialProving.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/payout.nim b/codex/sales/states/payout.nim index 9ce36613..e808307d 100644 --- a/codex/sales/states/payout.nim +++ b/codex/sales/states/payout.nim @@ -1,16 +1,17 @@ import ../../logutils import ../../market +import ../../utils/exceptions import ../statemachine import ../salesagent -import ./errorhandling import ./cancelled import ./failed import ./finished +import ./errored logScope: topics = "marketplace sales payout" -type SalePayout* = ref object of ErrorHandlingState +type SalePayout* = ref object of SaleState method `$`*(state: SalePayout): string = "SalePayout" @@ -21,17 +22,25 @@ method onCancelled*(state: SalePayout, request: StorageRequest): ?State = method onFailed*(state: SalePayout, request: StorageRequest): ?State = return some State(SaleFailed()) -method run*(state: SalePayout, machine: Machine): Future[?State] {.async.} = +method run*( + state: SalePayout, machine: Machine +): Future[?State] {.async: (raises: []).} = let data = SalesAgent(machine).data let market = SalesAgent(machine).context.market without request =? data.request: raiseAssert "no sale request" - let slot = Slot(request: request, slotIndex: data.slotIndex) - debug "Collecting finished slot's reward", - requestId = data.requestId, slotIndex = data.slotIndex - let currentCollateral = await market.currentCollateral(slot.id) - await market.freeSlot(slot.id) + try: + let slot = Slot(request: request, slotIndex: data.slotIndex) + debug "Collecting finished slot's reward", + requestId = data.requestId, slotIndex = data.slotIndex + let currentCollateral = await market.currentCollateral(slot.id) + await market.freeSlot(slot.id) - return some State(SaleFinished(returnedCollateral: some currentCollateral)) + return some State(SaleFinished(returnedCollateral: some currentCollateral)) + except CancelledError as e: + trace "SalePayout.run onCleanUp was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SalePayout.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index bdde1249..7509558c 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -4,9 +4,9 @@ import pkg/metrics import ../../logutils import ../../market +import ../../utils/exceptions import ../salesagent import ../statemachine -import ./errorhandling import ./cancelled import ./failed import ./filled @@ -18,7 +18,7 @@ declareCounter( codex_reservations_availability_mismatch, "codex reservations availability_mismatch" ) -type SalePreparing* = ref object of ErrorHandlingState +type SalePreparing* = ref object of SaleState logScope: topics = "marketplace sales preparing" @@ -37,62 +37,70 @@ method onSlotFilled*( ): ?State = return some State(SaleFilled()) -method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} = +method run*( + state: SalePreparing, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) let data = agent.data let context = agent.context let market = context.market let reservations = context.reservations - await agent.retrieveRequest() - await agent.subscribe() + try: + await agent.retrieveRequest() + await agent.subscribe() - without request =? data.request: - raiseAssert "no sale request" + without request =? data.request: + raiseAssert "no sale request" - let slotId = slotId(data.requestId, data.slotIndex) - let state = await market.slotState(slotId) - if state != SlotState.Free and state != SlotState.Repair: - return some State(SaleIgnored(reprocessSlot: false, returnBytes: false)) + let slotId = slotId(data.requestId, data.slotIndex) + let state = await market.slotState(slotId) + if state != SlotState.Free and state != SlotState.Repair: + return some State(SaleIgnored(reprocessSlot: false, returnBytes: false)) - # TODO: Once implemented, check to ensure the host is allowed to fill the slot, - # due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal) + # TODO: Once implemented, check to ensure the host is allowed to fill the slot, + # due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal) - logScope: - slotIndex = data.slotIndex - slotSize = request.ask.slotSize - duration = request.ask.duration - pricePerBytePerSecond = request.ask.pricePerBytePerSecond - collateralPerByte = request.ask.collateralPerByte + logScope: + slotIndex = data.slotIndex + slotSize = request.ask.slotSize + duration = request.ask.duration + pricePerBytePerSecond = request.ask.pricePerBytePerSecond + collateralPerByte = request.ask.collateralPerByte - without availability =? - await reservations.findAvailability( - request.ask.slotSize, request.ask.duration, request.ask.pricePerBytePerSecond, - request.ask.collateralPerByte, - ): - debug "No availability found for request, ignoring" + without availability =? + await reservations.findAvailability( + request.ask.slotSize, request.ask.duration, request.ask.pricePerBytePerSecond, + request.ask.collateralPerByte, + ): + debug "No availability found for request, ignoring" - return some State(SaleIgnored(reprocessSlot: true)) - - info "Availability found for request, creating reservation" - - without reservation =? - await reservations.createReservation( - availability.id, request.ask.slotSize, request.id, data.slotIndex, - request.ask.collateralPerByte, - ), error: - trace "Creation of reservation failed" - # Race condition: - # reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it. - # Should createReservation fail because there's no space, we proceed to SaleIgnored. - if error of BytesOutOfBoundsError: - # Lets monitor how often this happen and if it is often we can make it more inteligent to handle it - codex_reservations_availability_mismatch.inc() return some State(SaleIgnored(reprocessSlot: true)) - return some State(SaleErrored(error: error)) + info "Availability found for request, creating reservation" - trace "Reservation created succesfully" + without reservation =? + await reservations.createReservation( + availability.id, request.ask.slotSize, request.id, data.slotIndex, + request.ask.collateralPerByte, + ), error: + trace "Creation of reservation failed" + # Race condition: + # reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it. + # Should createReservation fail because there's no space, we proceed to SaleIgnored. + if error of BytesOutOfBoundsError: + # Lets monitor how often this happen and if it is often we can make it more inteligent to handle it + codex_reservations_availability_mismatch.inc() + return some State(SaleIgnored(reprocessSlot: true)) - data.reservation = some reservation - return some State(SaleSlotReserving()) + return some State(SaleErrored(error: error)) + + trace "Reservation created successfully" + + data.reservation = some reservation + return some State(SaleSlotReserving()) + except CancelledError as e: + trace "SalePreparing.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SalePreparing.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index 0ee2ed60..759cad0c 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -6,7 +6,6 @@ import ../../utils/exceptions import ../statemachine import ../salesagent import ../salescontext -import ./errorhandling import ./cancelled import ./failed import ./errored @@ -18,7 +17,7 @@ logScope: type SlotFreedError* = object of CatchableError SlotNotFilledError* = object of CatchableError - SaleProving* = ref object of ErrorHandlingState + SaleProving* = ref object of SaleState loop: Future[void] method prove*( @@ -113,7 +112,9 @@ method onFailed*(state: SaleProving, request: StorageRequest): ?State = # state change return some State(SaleFailed()) -method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleProving, machine: Machine +): Future[?State] {.async: (raises: []).} = let data = SalesAgent(machine).data let context = SalesAgent(machine).context @@ -129,27 +130,37 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} = without clock =? context.clock: raiseAssert("clock not set") - debug "Start proving", requestId = data.requestId, slotIndex = data.slotIndex try: - let loop = state.proveLoop(market, clock, request, data.slotIndex, onProve) - state.loop = loop - await loop - except CancelledError: - discard + debug "Start proving", requestId = data.requestId, slotIndex = data.slotIndex + try: + let loop = state.proveLoop(market, clock, request, data.slotIndex, onProve) + state.loop = loop + await loop + except CancelledError as e: + trace "proving loop cancelled" + discard + except CatchableError as e: + error "Proving failed", + msg = e.msg, typ = $(type e), stack = e.getStackTrace(), error = e.msgDetail + return some State(SaleErrored(error: e)) + finally: + # Cleanup of the proving loop + debug "Stopping proving.", requestId = data.requestId, slotIndex = data.slotIndex + + if not state.loop.isNil: + if not state.loop.finished: + try: + await state.loop.cancelAndWait() + except CancelledError: + discard + except CatchableError as e: + error "Error during cancellation of proving loop", msg = e.msg + + state.loop = nil + + return some State(SalePayout()) + except CancelledError as e: + trace "SaleProving.run onCleanUp was cancelled", error = e.msgDetail except CatchableError as e: - error "Proving failed", msg = e.msg + error "Error during SaleProving.run", error = e.msgDetail return some State(SaleErrored(error: e)) - finally: - # Cleanup of the proving loop - debug "Stopping proving.", requestId = data.requestId, slotIndex = data.slotIndex - - if not state.loop.isNil: - if not state.loop.finished: - try: - await state.loop.cancelAndWait() - except CatchableError as e: - error "Error during cancellation of proving loop", msg = e.msg - - state.loop = nil - - return some State(SalePayout()) diff --git a/codex/sales/states/provingsimulated.nim b/codex/sales/states/provingsimulated.nim index e60169bc..a797e113 100644 --- a/codex/sales/states/provingsimulated.nim +++ b/codex/sales/states/provingsimulated.nim @@ -4,12 +4,14 @@ when codex_enable_proof_failures: import pkg/stint import pkg/ethers + import ../../contracts/marketplace import ../../contracts/requests import ../../logutils import ../../market import ../../utils/exceptions import ../salescontext import ./proving + import ./errored logScope: topics = "marketplace sales simulated-proving" @@ -29,22 +31,27 @@ when codex_enable_proof_failures: market: Market, currentPeriod: Period, ) {.async.} = - trace "Processing proving in simulated mode" - state.proofCount += 1 - if state.failEveryNProofs > 0 and state.proofCount mod state.failEveryNProofs == 0: - state.proofCount = 0 + try: + trace "Processing proving in simulated mode" + state.proofCount += 1 + if state.failEveryNProofs > 0 and state.proofCount mod state.failEveryNProofs == 0: + state.proofCount = 0 - try: - warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id - await market.submitProof(slot.id, Groth16Proof.default) - except MarketError as e: - if not e.msg.contains("Invalid proof"): + try: + warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id + await market.submitProof(slot.id, Groth16Proof.default) + except Proofs_InvalidProof as e: + discard # expected + except CancelledError as error: + raise error + except CatchableError as e: onSubmitProofError(e, currentPeriod, slot.id) - except CancelledError as error: - raise error - except CatchableError as e: - onSubmitProofError(e, currentPeriod, slot.id) - else: - await procCall SaleProving(state).prove( - slot, challenge, onProve, market, currentPeriod - ) + else: + await procCall SaleProving(state).prove( + slot, challenge, onProve, market, currentPeriod + ) + except CancelledError as e: + trace "Submitting INVALID proof cancelled", error = e.msgDetail + raise e + except CatchableError as e: + error "Submitting INVALID proof failed", error = e.msgDetail diff --git a/codex/sales/states/slotreserving.nim b/codex/sales/states/slotreserving.nim index 38b7fa76..a67c51a0 100644 --- a/codex/sales/states/slotreserving.nim +++ b/codex/sales/states/slotreserving.nim @@ -3,16 +3,16 @@ import pkg/metrics import ../../logutils import ../../market +import ../../utils/exceptions import ../salesagent import ../statemachine -import ./errorhandling import ./cancelled import ./failed import ./ignored import ./downloading import ./errored -type SaleSlotReserving* = ref object of ErrorHandlingState +type SaleSlotReserving* = ref object of SaleState logScope: topics = "marketplace sales reserving" @@ -26,7 +26,9 @@ method onCancelled*(state: SaleSlotReserving, request: StorageRequest): ?State = method onFailed*(state: SaleSlotReserving, request: StorageRequest): ?State = return some State(SaleFailed()) -method run*(state: SaleSlotReserving, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleSlotReserving, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) let data = agent.data let context = agent.context @@ -36,23 +38,29 @@ method run*(state: SaleSlotReserving, machine: Machine): Future[?State] {.async. requestId = data.requestId slotIndex = data.slotIndex - let canReserve = await market.canReserveSlot(data.requestId, data.slotIndex) - if canReserve: - try: - trace "Reserving slot" - await market.reserveSlot(data.requestId, data.slotIndex) - except MarketError as e: - if e.msg.contains "SlotReservations_ReservationNotAllowed": - debug "Slot cannot be reserved, ignoring", error = e.msg - return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) - else: - return some State(SaleErrored(error: e)) - # other CatchableErrors are handled "automatically" by the ErrorHandlingState + try: + let canReserve = await market.canReserveSlot(data.requestId, data.slotIndex) + if canReserve: + try: + trace "Reserving slot" + await market.reserveSlot(data.requestId, data.slotIndex) + except MarketError as e: + if e.msg.contains "SlotReservations_ReservationNotAllowed": + debug "Slot cannot be reserved, ignoring", error = e.msg + return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) + else: + return some State(SaleErrored(error: e)) + # other CatchableErrors are handled "automatically" by the SaleState - trace "Slot successfully reserved" - return some State(SaleDownloading()) - else: - # do not re-add this slot to the queue, and return bytes from Reservation to - # the Availability - debug "Slot cannot be reserved, ignoring" - return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) + trace "Slot successfully reserved" + return some State(SaleDownloading()) + else: + # do not re-add this slot to the queue, and return bytes from Reservation to + # the Availability + debug "Slot cannot be reserved, ignoring" + return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) + except CancelledError as e: + trace "SaleSlotReserving.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleSlotReserving.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/sales/states/unknown.nim b/codex/sales/states/unknown.nim index 3034129a..d182d744 100644 --- a/codex/sales/states/unknown.nim +++ b/codex/sales/states/unknown.nim @@ -1,4 +1,5 @@ import ../../logutils +import ../../utils/exceptions import ../statemachine import ../salesagent import ./filled @@ -26,34 +27,42 @@ method onCancelled*(state: SaleUnknown, request: StorageRequest): ?State = method onFailed*(state: SaleUnknown, request: StorageRequest): ?State = return some State(SaleFailed()) -method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} = +method run*( + state: SaleUnknown, machine: Machine +): Future[?State] {.async: (raises: []).} = let agent = SalesAgent(machine) let data = agent.data let market = agent.context.market - await agent.retrieveRequest() - await agent.subscribe() + try: + await agent.retrieveRequest() + await agent.subscribe() - let slotId = slotId(data.requestId, data.slotIndex) - let slotState = await market.slotState(slotId) + let slotId = slotId(data.requestId, data.slotIndex) + let slotState = await market.slotState(slotId) - case slotState - of SlotState.Free: - let error = - newException(UnexpectedSlotError, "Slot state on chain should not be 'free'") - return some State(SaleErrored(error: error)) - of SlotState.Filled: - return some State(SaleFilled()) - of SlotState.Finished: - return some State(SalePayout()) - of SlotState.Paid: - return some State(SaleFinished()) - of SlotState.Failed: - return some State(SaleFailed()) - of SlotState.Cancelled: - return some State(SaleCancelled()) - of SlotState.Repair: - let error = newException( - SlotFreedError, "Slot was forcible freed and host was removed from its hosting" - ) - return some State(SaleErrored(error: error)) + case slotState + of SlotState.Free: + let error = + newException(UnexpectedSlotError, "Slot state on chain should not be 'free'") + return some State(SaleErrored(error: error)) + of SlotState.Filled: + return some State(SaleFilled()) + of SlotState.Finished: + return some State(SalePayout()) + of SlotState.Paid: + return some State(SaleFinished()) + of SlotState.Failed: + return some State(SaleFailed()) + of SlotState.Cancelled: + return some State(SaleCancelled()) + of SlotState.Repair: + let error = newException( + SlotFreedError, "Slot was forcible freed and host was removed from its hosting" + ) + return some State(SaleErrored(error: error)) + except CancelledError as e: + trace "SaleUnknown.run was cancelled", error = e.msgDetail + except CatchableError as e: + error "Error during SaleUnknown.run", error = e.msgDetail + return some State(SaleErrored(error: e)) diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index 572ae246..2d87ebc1 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -2,6 +2,7 @@ import pkg/questionable import pkg/chronos import ../logutils import ./trackedfutures +import ./exceptions {.push raises: [].} @@ -46,24 +47,14 @@ proc schedule*(machine: Machine, event: Event) = except AsyncQueueFullError: raiseAssert "unlimited queue is full?!" -method run*(state: State, machine: Machine): Future[?State] {.base, async.} = +method run*( + state: State, machine: Machine +): Future[?State] {.base, async: (raises: []).} = discard -method onError*(state: State, error: ref CatchableError): ?State {.base.} = - raise (ref Defect)(msg: "error in state machine: " & error.msg, parent: error) - -proc onError(machine: Machine, error: ref CatchableError): Event = - return proc(state: State): ?State = - state.onError(error) - proc run(machine: Machine, state: State) {.async: (raises: []).} = - try: - if next =? await state.run(machine): - machine.schedule(Event.transition(state, next)) - except CancelledError: - discard # do not propagate - except CatchableError as e: - machine.schedule(machine.onError(e)) + if next =? await state.run(machine): + machine.schedule(Event.transition(state, next)) proc scheduler(machine: Machine) {.async: (raises: []).} = var running: Future[void].Raising([]) diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 05f31057..09a2ce49 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -36,6 +36,7 @@ asyncchecksuite "Sales - start": var repo: RepoStore var queue: SlotQueue var itemsProcessed: seq[SlotQueueItem] + var expiry: SecondsSince1970 setup: request = StorageRequest( @@ -76,7 +77,8 @@ asyncchecksuite "Sales - start": ): Future[?!Groth16Proof] {.async.} = return success(proof) itemsProcessed = @[] - request.expiry = (clock.now() + 42).u256 + expiry = (clock.now() + 42) + request.expiry = expiry.u256 teardown: await sales.stop() @@ -97,6 +99,7 @@ asyncchecksuite "Sales - start": request.ask.slots = 2 market.requested = @[request] market.requestState[request.id] = RequestState.New + market.requestExpiry[request.id] = expiry let slot0 = MockSlot(requestId: request.id, slotIndex: 0.u256, proof: proof, host: me) @@ -430,23 +433,6 @@ asyncchecksuite "Sales": check eventually storingRequest == request check storingSlot < request.ask.slots.u256 - test "handles errors during state run": - var saleFailed = false - sales.onProve = proc( - slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = - # raise exception so machine.onError is called - raise newException(ValueError, "some error") - - # onClear is called in SaleErrored.run - sales.onClear = proc(request: StorageRequest, idx: UInt256) = - saleFailed = true - createAvailability() - await market.requestStorage(request) - await allowRequestToStart() - - check eventually saleFailed - test "makes storage available again when data retrieval fails": let error = newException(IOError, "data retrieval failed") sales.onStore = proc( diff --git a/tests/codex/sales/testsalesagent.nim b/tests/codex/sales/testsalesagent.nim index f17711d3..f1cb53a6 100644 --- a/tests/codex/sales/testsalesagent.nim +++ b/tests/codex/sales/testsalesagent.nim @@ -4,7 +4,6 @@ import pkg/codex/sales import pkg/codex/sales/salesagent import pkg/codex/sales/salescontext import pkg/codex/sales/statemachine -import pkg/codex/sales/states/errorhandling import ../../asynctest import ../helpers/mockmarket @@ -15,18 +14,12 @@ import ../examples var onCancelCalled = false var onFailedCalled = false var onSlotFilledCalled = false -var onErrorCalled = false -type - MockState = ref object of SaleState - MockErrorState = ref object of ErrorHandlingState +type MockState = ref object of SaleState method `$`*(state: MockState): string = "MockState" -method `$`*(state: MockErrorState): string = - "MockErrorState" - method onCancelled*(state: MockState, request: StorageRequest): ?State = onCancelCalled = true @@ -38,12 +31,6 @@ method onSlotFilled*( ): ?State = onSlotFilledCalled = true -method onError*(state: MockErrorState, err: ref CatchableError): ?State = - onErrorCalled = true - -method run*(state: MockErrorState, machine: Machine): Future[?State] {.async.} = - raise newException(ValueError, "failure") - asyncchecksuite "Sales agent": let request = StorageRequest.example var agent: SalesAgent @@ -123,7 +110,9 @@ asyncchecksuite "Sales agent": agent.start(MockState.new()) await agent.subscribe() agent.onFulfilled(request.id) - check eventually agent.data.cancelled.cancelled() + # Note: futures that are cancelled, and do not re-raise the CancelledError + # will have a state of completed, not cancelled. + check eventually agent.data.cancelled.completed() test "current state onFailed called when onFailed called": agent.start(MockState.new()) @@ -134,7 +123,3 @@ asyncchecksuite "Sales agent": agent.start(MockState.new()) agent.onSlotFilled(request.id, slotIndex) check eventually onSlotFilledCalled - - test "ErrorHandlingState.onError can be overridden at the state level": - agent.start(MockErrorState.new()) - check eventually onErrorCalled diff --git a/tests/codex/utils/testasyncstatemachine.nim b/tests/codex/utils/testasyncstatemachine.nim index 40a040c4..ed3ea747 100644 --- a/tests/codex/utils/testasyncstatemachine.nim +++ b/tests/codex/utils/testasyncstatemachine.nim @@ -10,9 +10,8 @@ type State1 = ref object of State State2 = ref object of State State3 = ref object of State - State4 = ref object of State -var runs, cancellations, errors = [0, 0, 0, 0] +var runs, cancellations = [0, 0, 0, 0] method `$`(state: State1): string = "State1" @@ -23,28 +22,20 @@ method `$`(state: State2): string = method `$`(state: State3): string = "State3" -method `$`(state: State4): string = - "State4" - -method run(state: State1, machine: Machine): Future[?State] {.async.} = +method run(state: State1, machine: Machine): Future[?State] {.async: (raises: []).} = inc runs[0] return some State(State2.new()) -method run(state: State2, machine: Machine): Future[?State] {.async.} = +method run(state: State2, machine: Machine): Future[?State] {.async: (raises: []).} = inc runs[1] try: await sleepAsync(1.hours) except CancelledError: inc cancellations[1] - raise -method run(state: State3, machine: Machine): Future[?State] {.async.} = +method run(state: State3, machine: Machine): Future[?State] {.async: (raises: []).} = inc runs[2] -method run(state: State4, machine: Machine): Future[?State] {.async.} = - inc runs[3] - raise newException(ValueError, "failed") - method onMoveToNextStateEvent*(state: State): ?State {.base, upraises: [].} = discard @@ -54,19 +45,6 @@ method onMoveToNextStateEvent(state: State2): ?State = method onMoveToNextStateEvent(state: State3): ?State = some State(State1.new()) -method onError(state: State1, error: ref CatchableError): ?State = - inc errors[0] - -method onError(state: State2, error: ref CatchableError): ?State = - inc errors[1] - -method onError(state: State3, error: ref CatchableError): ?State = - inc errors[2] - -method onError(state: State4, error: ref CatchableError): ?State = - inc errors[3] - some State(State2.new()) - asyncchecksuite "async state machines": var machine: Machine @@ -76,7 +54,6 @@ asyncchecksuite "async state machines": setup: runs = [0, 0, 0, 0] cancellations = [0, 0, 0, 0] - errors = [0, 0, 0, 0] machine = Machine.new() test "should call run on start state": @@ -112,16 +89,6 @@ asyncchecksuite "async state machines": check runs == [0, 1, 0, 0] check cancellations == [0, 1, 0, 0] - test "forwards errors to error handler": - machine.start(State4.new()) - check eventually errors == [0, 0, 0, 1] and runs == [0, 1, 0, 1] - - test "error handler ignores CancelledError": - machine.start(State2.new()) - machine.schedule(moveToNextStateEvent) - check eventually cancellations == [0, 1, 0, 0] - check errors == [0, 0, 0, 0] - test "queries properties of the current state": proc description(state: State): string = $state