fix(statemachine): do not raise from state.run (#1115)

* fix(statemachine): do not raise from state.run

* fix rebase

* fix exception handling in SaleProvingSimulated.prove

- re-raise CancelledError
- don't return State on CatchableError
- expect the Proofs_InvalidProof custom error instead of checking a string

* asyncSpawn salesagent.onCancelled

This was swallowing a KeyError in one of the tests (fixed in the previous commit)

* remove error handling states in asyncstatemachine

* revert unneeded changes

* formatting

* PR feedback, logging updates
This commit is contained in:
Eric 2025-02-19 11:18:45 +11:00 committed by GitHub
parent 1052dad30c
commit 87590f43ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 564 additions and 446 deletions

View File

@ -1,25 +1,35 @@
import pkg/metrics
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ./errorhandling
import ./error
declareCounter(codex_purchases_cancelled, "codex purchases cancelled")
logScope:
topics = "marketplace purchases cancelled"
type PurchaseCancelled* = ref object of ErrorHandlingState
type PurchaseCancelled* = ref object of PurchaseState
method `$`*(state: PurchaseCancelled): string =
"cancelled"
method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.async.} =
method run*(
state: PurchaseCancelled, machine: Machine
): Future[?State] {.async: (raises: []).} =
codex_purchases_cancelled.inc()
let purchase = Purchase(machine)
warn "Request cancelled, withdrawing remaining funds", requestId = purchase.requestId
try:
warn "Request cancelled, withdrawing remaining funds",
requestId = purchase.requestId
await purchase.market.withdrawFunds(purchase.requestId)
let error = newException(Timeout, "Purchase cancelled due to timeout")
purchase.future.fail(error)
except CancelledError as e:
trace "PurchaseCancelled.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during PurchaseCancelled.run", error = e.msgDetail
return some State(PurchaseErrored(error: e))

View File

@ -14,7 +14,9 @@ type PurchaseErrored* = ref object of PurchaseState
method `$`*(state: PurchaseErrored): string =
"errored"
method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.async.} =
method run*(
state: PurchaseErrored, machine: Machine
): Future[?State] {.async: (raises: []).} =
codex_purchases_error.inc()
let purchase = Purchase(machine)

View File

@ -1,8 +0,0 @@
import pkg/questionable
import ../statemachine
import ./error
type ErrorHandlingState* = ref object of PurchaseState
method onError*(state: ErrorHandlingState, error: ref CatchableError): ?State =
some State(PurchaseErrored(error: error))

View File

@ -1,6 +1,7 @@
import pkg/metrics
import ../statemachine
import ../../logutils
import ../../utils/exceptions
import ./error
declareCounter(codex_purchases_failed, "codex purchases failed")
@ -10,11 +11,20 @@ type PurchaseFailed* = ref object of PurchaseState
method `$`*(state: PurchaseFailed): string =
"failed"
method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.async.} =
method run*(
state: PurchaseFailed, machine: Machine
): Future[?State] {.async: (raises: []).} =
codex_purchases_failed.inc()
let purchase = Purchase(machine)
try:
warn "Request failed, withdrawing remaining funds", requestId = purchase.requestId
await purchase.market.withdrawFunds(purchase.requestId)
except CancelledError as e:
trace "PurchaseFailed.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during PurchaseFailed.run", error = e.msgDetail
return some State(PurchaseErrored(error: e))
let error = newException(PurchaseError, "Purchase failed")
return some State(PurchaseErrored(error: error))

View File

@ -1,7 +1,9 @@
import pkg/metrics
import ../statemachine
import ../../utils/exceptions
import ../../logutils
import ./error
declareCounter(codex_purchases_finished, "codex purchases finished")
@ -13,10 +15,19 @@ type PurchaseFinished* = ref object of PurchaseState
method `$`*(state: PurchaseFinished): string =
"finished"
method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.async.} =
method run*(
state: PurchaseFinished, machine: Machine
): Future[?State] {.async: (raises: []).} =
codex_purchases_finished.inc()
let purchase = Purchase(machine)
info "Purchase finished, withdrawing remaining funds", requestId = purchase.requestId
try:
info "Purchase finished, withdrawing remaining funds",
requestId = purchase.requestId
await purchase.market.withdrawFunds(purchase.requestId)
purchase.future.complete()
except CancelledError as e:
trace "PurchaseFinished.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during PurchaseFinished.run", error = e.msgDetail
return some State(PurchaseErrored(error: e))

View File

@ -1,18 +1,28 @@
import pkg/metrics
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ./errorhandling
import ./submitted
import ./error
declareCounter(codex_purchases_pending, "codex purchases pending")
type PurchasePending* = ref object of ErrorHandlingState
type PurchasePending* = ref object of PurchaseState
method `$`*(state: PurchasePending): string =
"pending"
method run*(state: PurchasePending, machine: Machine): Future[?State] {.async.} =
method run*(
state: PurchasePending, machine: Machine
): Future[?State] {.async: (raises: []).} =
codex_purchases_pending.inc()
let purchase = Purchase(machine)
try:
let request = !purchase.request
await purchase.market.requestStorage(request)
return some State(PurchaseSubmitted())
except CancelledError as e:
trace "PurchasePending.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during PurchasePending.run", error = e.msgDetail
return some State(PurchaseErrored(error: e))

View File

@ -1,22 +1,25 @@
import pkg/metrics
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ./errorhandling
import ./finished
import ./failed
import ./error
declareCounter(codex_purchases_started, "codex purchases started")
logScope:
topics = "marketplace purchases started"
type PurchaseStarted* = ref object of ErrorHandlingState
type PurchaseStarted* = ref object of PurchaseState
method `$`*(state: PurchaseStarted): string =
"started"
method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} =
method run*(
state: PurchaseStarted, machine: Machine
): Future[?State] {.async: (raises: []).} =
codex_purchases_started.inc()
let purchase = Purchase(machine)
@ -28,10 +31,12 @@ method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.}
proc callback(_: RequestId) =
failed.complete()
var ended: Future[void]
try:
let subscription = await market.subscribeRequestFailed(purchase.requestId, callback)
# Ensure that we're past the request end by waiting an additional second
let ended = clock.waitUntil((await market.getRequestEnd(purchase.requestId)) + 1)
ended = clock.waitUntil((await market.getRequestEnd(purchase.requestId)) + 1)
let fut = await one(ended, failed)
await subscription.unsubscribe()
if fut.id == failed.id:
@ -40,3 +45,10 @@ method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.}
else:
failed.cancelSoon()
return some State(PurchaseFinished())
except CancelledError as e:
ended.cancelSoon()
failed.cancelSoon()
trace "PurchaseStarted.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during PurchaseStarted.run", error = e.msgDetail
return some State(PurchaseErrored(error: e))

View File

@ -1,22 +1,25 @@
import pkg/metrics
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ./errorhandling
import ./started
import ./cancelled
import ./error
logScope:
topics = "marketplace purchases submitted"
declareCounter(codex_purchases_submitted, "codex purchases submitted")
type PurchaseSubmitted* = ref object of ErrorHandlingState
type PurchaseSubmitted* = ref object of PurchaseState
method `$`*(state: PurchaseSubmitted): string =
"submitted"
method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.} =
method run*(
state: PurchaseSubmitted, machine: Machine
): Future[?State] {.async: (raises: []).} =
codex_purchases_submitted.inc()
let purchase = Purchase(machine)
let request = !purchase.request
@ -44,5 +47,10 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.
await wait().withTimeout()
except Timeout:
return some State(PurchaseCancelled())
except CancelledError as e:
trace "PurchaseSubmitted.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during PurchaseSubmitted.run", error = e.msgDetail
return some State(PurchaseErrored(error: e))
return some State(PurchaseStarted())

View File

@ -1,20 +1,25 @@
import pkg/metrics
import ../../utils/exceptions
import ../../logutils
import ../statemachine
import ./errorhandling
import ./submitted
import ./started
import ./cancelled
import ./finished
import ./failed
import ./error
declareCounter(codex_purchases_unknown, "codex purchases unknown")
type PurchaseUnknown* = ref object of ErrorHandlingState
type PurchaseUnknown* = ref object of PurchaseState
method `$`*(state: PurchaseUnknown): string =
"unknown"
method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.} =
method run*(
state: PurchaseUnknown, machine: Machine
): Future[?State] {.async: (raises: []).} =
try:
codex_purchases_unknown.inc()
let purchase = Purchase(machine)
if (request =? await purchase.market.getRequest(purchase.requestId)) and
@ -32,3 +37,8 @@ method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.}
return some State(PurchaseFinished())
of RequestState.Failed:
return some State(PurchaseFailed())
except CancelledError as e:
trace "PurchaseUnknown.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during PurchaseUnknown.run", error = e.msgDetail
return some State(PurchaseErrored(error: e))

View File

@ -6,6 +6,7 @@ import pkg/upraises
import ../contracts/requests
import ../errors
import ../logutils
import ../utils/exceptions
import ./statemachine
import ./salescontext
import ./salesdata
@ -68,10 +69,11 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
let data = agent.data
let clock = agent.context.clock
proc onCancelled() {.async.} =
proc onCancelled() {.async: (raises: []).} =
without request =? data.request:
return
try:
let market = agent.context.market
let expiry = await market.requestExpiresAt(data.requestId)
@ -81,7 +83,7 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
await clock.waitUntil(deadline)
without state =? await agent.retrieveRequestState():
error "Uknown request", requestId = data.requestId
error "Unknown request", requestId = data.requestId
return
case state
@ -95,14 +97,20 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
debug "The request is not yet canceled, even though it should be. Waiting for some more time.",
currentState = state, now = clock.now
except CancelledError:
trace "Waiting for expiry to lapse was cancelled", requestId = data.requestId
except CatchableError as e:
error "Error while waiting for expiry to lapse", error = e.msgDetail
data.cancelled = onCancelled()
asyncSpawn data.cancelled
method onFulfilled*(
agent: SalesAgent, requestId: RequestId
) {.base, gcsafe, upraises: [].} =
if agent.data.requestId == requestId and not agent.data.cancelled.isNil:
agent.data.cancelled.cancelSoon()
let cancelled = agent.data.cancelled
if agent.data.requestId == requestId and not cancelled.isNil and not cancelled.finished:
cancelled.cancelSoon()
method onFailed*(
agent: SalesAgent, requestId: RequestId

View File

@ -1,17 +1,20 @@
import ../../logutils
import ../../utils/exceptions
import ../salesagent
import ../statemachine
import ./errorhandling
import ./errored
logScope:
topics = "marketplace sales cancelled"
type SaleCancelled* = ref object of ErrorHandlingState
type SaleCancelled* = ref object of SaleState
method `$`*(state: SaleCancelled): string =
"SaleCancelled"
method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleCancelled, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
let market = agent.context.market
@ -19,6 +22,7 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
without request =? data.request:
raiseAssert "no sale request"
try:
let slot = Slot(request: request, slotIndex: data.slotIndex)
debug "Collecting collateral and partial payout",
requestId = data.requestId, slotIndex = data.slotIndex
@ -37,3 +41,8 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
warn "Sale cancelled due to timeout",
requestId = data.requestId, slotIndex = data.slotIndex
except CancelledError as e:
trace "SaleCancelled.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleCancelled.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -4,16 +4,16 @@ import pkg/questionable/results
import ../../blocktype as bt
import ../../logutils
import ../../market
import ../../utils/exceptions
import ../salesagent
import ../statemachine
import ./errorhandling
import ./cancelled
import ./failed
import ./filled
import ./initialproving
import ./errored
type SaleDownloading* = ref object of ErrorHandlingState
type SaleDownloading* = ref object of SaleState
logScope:
topics = "marketplace sales downloading"
@ -32,7 +32,9 @@ method onSlotFilled*(
): ?State =
return some State(SaleFilled())
method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleDownloading, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
@ -64,9 +66,15 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
trace "Releasing batch of bytes written to disk", bytes
return await reservations.release(reservation.id, reservation.availabilityId, bytes)
try:
trace "Starting download"
if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption:
return some State(SaleErrored(error: err, reprocessSlot: false))
trace "Download complete"
return some State(SaleInitialProving())
except CancelledError as e:
trace "SaleDownloading.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleDownloading.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -17,10 +17,9 @@ type SaleErrored* = ref object of SaleState
method `$`*(state: SaleErrored): string =
"SaleErrored"
method onError*(state: SaleState, err: ref CatchableError): ?State {.upraises: [].} =
error "error during SaleErrored run", error = err.msg
method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleErrored, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
@ -30,8 +29,13 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
requestId = data.requestId,
slotIndex = data.slotIndex
try:
if onClear =? context.onClear and request =? data.request:
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot)
except CancelledError as e:
trace "SaleErrored.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleErrored.run", error = e.msgDetail

View File

@ -1,8 +0,0 @@
import pkg/questionable
import ../statemachine
import ./errored
type ErrorHandlingState* = ref object of SaleState
method onError*(state: ErrorHandlingState, error: ref CatchableError): ?State =
some State(SaleErrored(error: error))

View File

@ -1,26 +1,30 @@
import ../../logutils
import ../../utils/exceptions
import ../../utils/exceptions
import ../salesagent
import ../statemachine
import ./errorhandling
import ./errored
logScope:
topics = "marketplace sales failed"
type
SaleFailed* = ref object of ErrorHandlingState
SaleFailed* = ref object of SaleState
SaleFailedError* = object of SaleError
method `$`*(state: SaleFailed): string =
"SaleFailed"
method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleFailed, machine: Machine
): Future[?State] {.async: (raises: []).} =
let data = SalesAgent(machine).data
let market = SalesAgent(machine).context.market
without request =? data.request:
raiseAssert "no sale request"
try:
let slot = Slot(request: request, slotIndex: data.slotIndex)
debug "Removing slot from mySlots",
requestId = data.requestId, slotIndex = data.slotIndex
@ -28,3 +32,8 @@ method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} =
let error = newException(SaleFailedError, "Sale failed")
return some State(SaleErrored(error: error))
except CancelledError as e:
trace "SaleFailed.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleFailed.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -3,9 +3,9 @@ import pkg/questionable/results
import ../../conf
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ../salesagent
import ./errorhandling
import ./errored
import ./cancelled
import ./failed
@ -18,7 +18,7 @@ logScope:
topics = "marketplace sales filled"
type
SaleFilled* = ref object of ErrorHandlingState
SaleFilled* = ref object of SaleState
HostMismatchError* = object of CatchableError
method onCancelled*(state: SaleFilled, request: StorageRequest): ?State =
@ -30,12 +30,15 @@ method onFailed*(state: SaleFilled, request: StorageRequest): ?State =
method `$`*(state: SaleFilled): string =
"SaleFilled"
method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleFilled, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
let market = context.market
try:
let host = await market.getHost(data.requestId, data.slotIndex)
let me = await market.getSigner()
@ -67,3 +70,8 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
else:
let error = newException(HostMismatchError, "Slot filled by other host")
return some State(SaleErrored(error: error))
except CancelledError as e:
trace "SaleFilled.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleFilled.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -1,9 +1,9 @@
import pkg/stint
import ../../logutils
import ../../market
import ../../utils/exceptions
import ../statemachine
import ../salesagent
import ./errorhandling
import ./filled
import ./cancelled
import ./failed
@ -13,7 +13,7 @@ import ./errored
logScope:
topics = "marketplace sales filling"
type SaleFilling* = ref object of ErrorHandlingState
type SaleFilling* = ref object of SaleState
proof*: Groth16Proof
method `$`*(state: SaleFilling): string =
@ -25,7 +25,9 @@ method onCancelled*(state: SaleFilling, request: StorageRequest): ?State =
method onFailed*(state: SaleFilling, request: StorageRequest): ?State =
return some State(SaleFailed())
method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleFilling, machine: Machine
): Future[?State] {.async: (raises: []).} =
let data = SalesAgent(machine).data
let market = SalesAgent(machine).context.market
without (request =? data.request):
@ -35,6 +37,7 @@ method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
requestId = data.requestId
slotIndex = data.slotIndex
try:
let slotState = await market.slotState(slotId(data.requestId, data.slotIndex))
let requestedCollateral = request.ask.collateralPerSlot
var collateral: UInt256
@ -57,6 +60,11 @@ method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
return some State(SaleIgnored(reprocessSlot: false, returnBytes: true))
else:
return some State(SaleErrored(error: e))
# other CatchableErrors are handled "automatically" by the ErrorHandlingState
# other CatchableErrors are handled "automatically" by the SaleState
return some State(SaleFilled())
except CancelledError as e:
trace "SaleFilling.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleFilling.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -1,16 +1,17 @@
import pkg/chronos
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ../salesagent
import ./errorhandling
import ./cancelled
import ./failed
import ./errored
logScope:
topics = "marketplace sales finished"
type SaleFinished* = ref object of ErrorHandlingState
type SaleFinished* = ref object of SaleState
returnedCollateral*: ?UInt256
method `$`*(state: SaleFinished): string =
@ -22,7 +23,9 @@ method onCancelled*(state: SaleFinished, request: StorageRequest): ?State =
method onFailed*(state: SaleFinished, request: StorageRequest): ?State =
return some State(SaleFailed())
method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleFinished, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
@ -32,5 +35,11 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} =
info "Slot finished and paid out",
requestId = data.requestId, slotIndex = data.slotIndex
try:
if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnedCollateral = state.returnedCollateral)
except CancelledError as e:
trace "SaleFilled.run onCleanUp was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleFilled.run in onCleanUp callback", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -1,9 +1,10 @@
import pkg/chronos
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ../salesagent
import ./errorhandling
import ./errored
logScope:
topics = "marketplace sales ignored"
@ -11,17 +12,25 @@ logScope:
# Ignored slots could mean there was no availability or that the slot could
# not be reserved.
type SaleIgnored* = ref object of ErrorHandlingState
type SaleIgnored* = ref object of SaleState
reprocessSlot*: bool # readd slot to queue with `seen` flag
returnBytes*: bool # return unreleased bytes from Reservation to Availability
method `$`*(state: SaleIgnored): string =
"SaleIgnored"
method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleIgnored, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
try:
if onCleanUp =? agent.onCleanUp:
await onCleanUp(
reprocessSlot = state.reprocessSlot, returnBytes = state.returnBytes
)
except CancelledError as e:
trace "SaleIgnored.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleIgnored.run in onCleanUp", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -1,9 +1,9 @@
import pkg/questionable/results
import ../../clock
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ../salesagent
import ./errorhandling
import ./filling
import ./cancelled
import ./errored
@ -12,7 +12,7 @@ import ./failed
logScope:
topics = "marketplace sales initial-proving"
type SaleInitialProving* = ref object of ErrorHandlingState
type SaleInitialProving* = ref object of SaleState
method `$`*(state: SaleInitialProving): string =
"SaleInitialProving"
@ -36,7 +36,9 @@ proc waitForStableChallenge(market: Market, clock: Clock, slotId: SlotId) {.asyn
while (await market.getPointer(slotId)) > (256 - downtime):
await clock.waitUntilNextPeriod(periodicity)
method run*(state: SaleInitialProving, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleInitialProving, machine: Machine
): Future[?State] {.async: (raises: []).} =
let data = SalesAgent(machine).data
let context = SalesAgent(machine).context
let market = context.market
@ -48,6 +50,7 @@ method run*(state: SaleInitialProving, machine: Machine): Future[?State] {.async
without onProve =? context.onProve:
raiseAssert "onProve callback not set"
try:
debug "Waiting for a proof challenge that is valid for the entire period"
let slot = Slot(request: request, slotIndex: data.slotIndex)
await waitForStableChallenge(market, clock, slot.id)
@ -61,3 +64,8 @@ method run*(state: SaleInitialProving, machine: Machine): Future[?State] {.async
debug "Finished proof calculation", requestId = data.requestId
return some State(SaleFilling(proof: proof))
except CancelledError as e:
trace "SaleInitialProving.run onCleanUp was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleInitialProving.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -1,16 +1,17 @@
import ../../logutils
import ../../market
import ../../utils/exceptions
import ../statemachine
import ../salesagent
import ./errorhandling
import ./cancelled
import ./failed
import ./finished
import ./errored
logScope:
topics = "marketplace sales payout"
type SalePayout* = ref object of ErrorHandlingState
type SalePayout* = ref object of SaleState
method `$`*(state: SalePayout): string =
"SalePayout"
@ -21,13 +22,16 @@ method onCancelled*(state: SalePayout, request: StorageRequest): ?State =
method onFailed*(state: SalePayout, request: StorageRequest): ?State =
return some State(SaleFailed())
method run*(state: SalePayout, machine: Machine): Future[?State] {.async.} =
method run*(
state: SalePayout, machine: Machine
): Future[?State] {.async: (raises: []).} =
let data = SalesAgent(machine).data
let market = SalesAgent(machine).context.market
without request =? data.request:
raiseAssert "no sale request"
try:
let slot = Slot(request: request, slotIndex: data.slotIndex)
debug "Collecting finished slot's reward",
requestId = data.requestId, slotIndex = data.slotIndex
@ -35,3 +39,8 @@ method run*(state: SalePayout, machine: Machine): Future[?State] {.async.} =
await market.freeSlot(slot.id)
return some State(SaleFinished(returnedCollateral: some currentCollateral))
except CancelledError as e:
trace "SalePayout.run onCleanUp was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SalePayout.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -4,9 +4,9 @@ import pkg/metrics
import ../../logutils
import ../../market
import ../../utils/exceptions
import ../salesagent
import ../statemachine
import ./errorhandling
import ./cancelled
import ./failed
import ./filled
@ -18,7 +18,7 @@ declareCounter(
codex_reservations_availability_mismatch, "codex reservations availability_mismatch"
)
type SalePreparing* = ref object of ErrorHandlingState
type SalePreparing* = ref object of SaleState
logScope:
topics = "marketplace sales preparing"
@ -37,13 +37,16 @@ method onSlotFilled*(
): ?State =
return some State(SaleFilled())
method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
method run*(
state: SalePreparing, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
let market = context.market
let reservations = context.reservations
try:
await agent.retrieveRequest()
await agent.subscribe()
@ -92,7 +95,12 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
return some State(SaleErrored(error: error))
trace "Reservation created succesfully"
trace "Reservation created successfully"
data.reservation = some reservation
return some State(SaleSlotReserving())
except CancelledError as e:
trace "SalePreparing.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SalePreparing.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -6,7 +6,6 @@ import ../../utils/exceptions
import ../statemachine
import ../salesagent
import ../salescontext
import ./errorhandling
import ./cancelled
import ./failed
import ./errored
@ -18,7 +17,7 @@ logScope:
type
SlotFreedError* = object of CatchableError
SlotNotFilledError* = object of CatchableError
SaleProving* = ref object of ErrorHandlingState
SaleProving* = ref object of SaleState
loop: Future[void]
method prove*(
@ -113,7 +112,9 @@ method onFailed*(state: SaleProving, request: StorageRequest): ?State =
# state change
return some State(SaleFailed())
method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleProving, machine: Machine
): Future[?State] {.async: (raises: []).} =
let data = SalesAgent(machine).data
let context = SalesAgent(machine).context
@ -129,15 +130,18 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
without clock =? context.clock:
raiseAssert("clock not set")
try:
debug "Start proving", requestId = data.requestId, slotIndex = data.slotIndex
try:
let loop = state.proveLoop(market, clock, request, data.slotIndex, onProve)
state.loop = loop
await loop
except CancelledError:
except CancelledError as e:
trace "proving loop cancelled"
discard
except CatchableError as e:
error "Proving failed", msg = e.msg
error "Proving failed",
msg = e.msg, typ = $(type e), stack = e.getStackTrace(), error = e.msgDetail
return some State(SaleErrored(error: e))
finally:
# Cleanup of the proving loop
@ -147,9 +151,16 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
if not state.loop.finished:
try:
await state.loop.cancelAndWait()
except CancelledError:
discard
except CatchableError as e:
error "Error during cancellation of proving loop", msg = e.msg
state.loop = nil
return some State(SalePayout())
except CancelledError as e:
trace "SaleProving.run onCleanUp was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleProving.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -4,12 +4,14 @@ when codex_enable_proof_failures:
import pkg/stint
import pkg/ethers
import ../../contracts/marketplace
import ../../contracts/requests
import ../../logutils
import ../../market
import ../../utils/exceptions
import ../salescontext
import ./proving
import ./errored
logScope:
topics = "marketplace sales simulated-proving"
@ -29,6 +31,7 @@ when codex_enable_proof_failures:
market: Market,
currentPeriod: Period,
) {.async.} =
try:
trace "Processing proving in simulated mode"
state.proofCount += 1
if state.failEveryNProofs > 0 and state.proofCount mod state.failEveryNProofs == 0:
@ -37,9 +40,8 @@ when codex_enable_proof_failures:
try:
warn "Submitting INVALID proof", period = currentPeriod, slotId = slot.id
await market.submitProof(slot.id, Groth16Proof.default)
except MarketError as e:
if not e.msg.contains("Invalid proof"):
onSubmitProofError(e, currentPeriod, slot.id)
except Proofs_InvalidProof as e:
discard # expected
except CancelledError as error:
raise error
except CatchableError as e:
@ -48,3 +50,8 @@ when codex_enable_proof_failures:
await procCall SaleProving(state).prove(
slot, challenge, onProve, market, currentPeriod
)
except CancelledError as e:
trace "Submitting INVALID proof cancelled", error = e.msgDetail
raise e
except CatchableError as e:
error "Submitting INVALID proof failed", error = e.msgDetail

View File

@ -3,16 +3,16 @@ import pkg/metrics
import ../../logutils
import ../../market
import ../../utils/exceptions
import ../salesagent
import ../statemachine
import ./errorhandling
import ./cancelled
import ./failed
import ./ignored
import ./downloading
import ./errored
type SaleSlotReserving* = ref object of ErrorHandlingState
type SaleSlotReserving* = ref object of SaleState
logScope:
topics = "marketplace sales reserving"
@ -26,7 +26,9 @@ method onCancelled*(state: SaleSlotReserving, request: StorageRequest): ?State =
method onFailed*(state: SaleSlotReserving, request: StorageRequest): ?State =
return some State(SaleFailed())
method run*(state: SaleSlotReserving, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleSlotReserving, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
@ -36,6 +38,7 @@ method run*(state: SaleSlotReserving, machine: Machine): Future[?State] {.async.
requestId = data.requestId
slotIndex = data.slotIndex
try:
let canReserve = await market.canReserveSlot(data.requestId, data.slotIndex)
if canReserve:
try:
@ -47,7 +50,7 @@ method run*(state: SaleSlotReserving, machine: Machine): Future[?State] {.async.
return some State(SaleIgnored(reprocessSlot: false, returnBytes: true))
else:
return some State(SaleErrored(error: e))
# other CatchableErrors are handled "automatically" by the ErrorHandlingState
# other CatchableErrors are handled "automatically" by the SaleState
trace "Slot successfully reserved"
return some State(SaleDownloading())
@ -56,3 +59,8 @@ method run*(state: SaleSlotReserving, machine: Machine): Future[?State] {.async.
# the Availability
debug "Slot cannot be reserved, ignoring"
return some State(SaleIgnored(reprocessSlot: false, returnBytes: true))
except CancelledError as e:
trace "SaleSlotReserving.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleSlotReserving.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -1,4 +1,5 @@
import ../../logutils
import ../../utils/exceptions
import ../statemachine
import ../salesagent
import ./filled
@ -26,11 +27,14 @@ method onCancelled*(state: SaleUnknown, request: StorageRequest): ?State =
method onFailed*(state: SaleUnknown, request: StorageRequest): ?State =
return some State(SaleFailed())
method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
method run*(
state: SaleUnknown, machine: Machine
): Future[?State] {.async: (raises: []).} =
let agent = SalesAgent(machine)
let data = agent.data
let market = agent.context.market
try:
await agent.retrieveRequest()
await agent.subscribe()
@ -57,3 +61,8 @@ method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
SlotFreedError, "Slot was forcible freed and host was removed from its hosting"
)
return some State(SaleErrored(error: error))
except CancelledError as e:
trace "SaleUnknown.run was cancelled", error = e.msgDetail
except CatchableError as e:
error "Error during SaleUnknown.run", error = e.msgDetail
return some State(SaleErrored(error: e))

View File

@ -2,6 +2,7 @@ import pkg/questionable
import pkg/chronos
import ../logutils
import ./trackedfutures
import ./exceptions
{.push raises: [].}
@ -46,24 +47,14 @@ proc schedule*(machine: Machine, event: Event) =
except AsyncQueueFullError:
raiseAssert "unlimited queue is full?!"
method run*(state: State, machine: Machine): Future[?State] {.base, async.} =
method run*(
state: State, machine: Machine
): Future[?State] {.base, async: (raises: []).} =
discard
method onError*(state: State, error: ref CatchableError): ?State {.base.} =
raise (ref Defect)(msg: "error in state machine: " & error.msg, parent: error)
proc onError(machine: Machine, error: ref CatchableError): Event =
return proc(state: State): ?State =
state.onError(error)
proc run(machine: Machine, state: State) {.async: (raises: []).} =
try:
if next =? await state.run(machine):
machine.schedule(Event.transition(state, next))
except CancelledError:
discard # do not propagate
except CatchableError as e:
machine.schedule(machine.onError(e))
proc scheduler(machine: Machine) {.async: (raises: []).} =
var running: Future[void].Raising([])

View File

@ -36,6 +36,7 @@ asyncchecksuite "Sales - start":
var repo: RepoStore
var queue: SlotQueue
var itemsProcessed: seq[SlotQueueItem]
var expiry: SecondsSince1970
setup:
request = StorageRequest(
@ -76,7 +77,8 @@ asyncchecksuite "Sales - start":
): Future[?!Groth16Proof] {.async.} =
return success(proof)
itemsProcessed = @[]
request.expiry = (clock.now() + 42).u256
expiry = (clock.now() + 42)
request.expiry = expiry.u256
teardown:
await sales.stop()
@ -97,6 +99,7 @@ asyncchecksuite "Sales - start":
request.ask.slots = 2
market.requested = @[request]
market.requestState[request.id] = RequestState.New
market.requestExpiry[request.id] = expiry
let slot0 =
MockSlot(requestId: request.id, slotIndex: 0.u256, proof: proof, host: me)
@ -430,23 +433,6 @@ asyncchecksuite "Sales":
check eventually storingRequest == request
check storingSlot < request.ask.slots.u256
test "handles errors during state run":
var saleFailed = false
sales.onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
# raise exception so machine.onError is called
raise newException(ValueError, "some error")
# onClear is called in SaleErrored.run
sales.onClear = proc(request: StorageRequest, idx: UInt256) =
saleFailed = true
createAvailability()
await market.requestStorage(request)
await allowRequestToStart()
check eventually saleFailed
test "makes storage available again when data retrieval fails":
let error = newException(IOError, "data retrieval failed")
sales.onStore = proc(

View File

@ -4,7 +4,6 @@ import pkg/codex/sales
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/sales/statemachine
import pkg/codex/sales/states/errorhandling
import ../../asynctest
import ../helpers/mockmarket
@ -15,18 +14,12 @@ import ../examples
var onCancelCalled = false
var onFailedCalled = false
var onSlotFilledCalled = false
var onErrorCalled = false
type
MockState = ref object of SaleState
MockErrorState = ref object of ErrorHandlingState
type MockState = ref object of SaleState
method `$`*(state: MockState): string =
"MockState"
method `$`*(state: MockErrorState): string =
"MockErrorState"
method onCancelled*(state: MockState, request: StorageRequest): ?State =
onCancelCalled = true
@ -38,12 +31,6 @@ method onSlotFilled*(
): ?State =
onSlotFilledCalled = true
method onError*(state: MockErrorState, err: ref CatchableError): ?State =
onErrorCalled = true
method run*(state: MockErrorState, machine: Machine): Future[?State] {.async.} =
raise newException(ValueError, "failure")
asyncchecksuite "Sales agent":
let request = StorageRequest.example
var agent: SalesAgent
@ -123,7 +110,9 @@ asyncchecksuite "Sales agent":
agent.start(MockState.new())
await agent.subscribe()
agent.onFulfilled(request.id)
check eventually agent.data.cancelled.cancelled()
# Note: futures that are cancelled, and do not re-raise the CancelledError
# will have a state of completed, not cancelled.
check eventually agent.data.cancelled.completed()
test "current state onFailed called when onFailed called":
agent.start(MockState.new())
@ -134,7 +123,3 @@ asyncchecksuite "Sales agent":
agent.start(MockState.new())
agent.onSlotFilled(request.id, slotIndex)
check eventually onSlotFilledCalled
test "ErrorHandlingState.onError can be overridden at the state level":
agent.start(MockErrorState.new())
check eventually onErrorCalled

View File

@ -10,9 +10,8 @@ type
State1 = ref object of State
State2 = ref object of State
State3 = ref object of State
State4 = ref object of State
var runs, cancellations, errors = [0, 0, 0, 0]
var runs, cancellations = [0, 0, 0, 0]
method `$`(state: State1): string =
"State1"
@ -23,28 +22,20 @@ method `$`(state: State2): string =
method `$`(state: State3): string =
"State3"
method `$`(state: State4): string =
"State4"
method run(state: State1, machine: Machine): Future[?State] {.async.} =
method run(state: State1, machine: Machine): Future[?State] {.async: (raises: []).} =
inc runs[0]
return some State(State2.new())
method run(state: State2, machine: Machine): Future[?State] {.async.} =
method run(state: State2, machine: Machine): Future[?State] {.async: (raises: []).} =
inc runs[1]
try:
await sleepAsync(1.hours)
except CancelledError:
inc cancellations[1]
raise
method run(state: State3, machine: Machine): Future[?State] {.async.} =
method run(state: State3, machine: Machine): Future[?State] {.async: (raises: []).} =
inc runs[2]
method run(state: State4, machine: Machine): Future[?State] {.async.} =
inc runs[3]
raise newException(ValueError, "failed")
method onMoveToNextStateEvent*(state: State): ?State {.base, upraises: [].} =
discard
@ -54,19 +45,6 @@ method onMoveToNextStateEvent(state: State2): ?State =
method onMoveToNextStateEvent(state: State3): ?State =
some State(State1.new())
method onError(state: State1, error: ref CatchableError): ?State =
inc errors[0]
method onError(state: State2, error: ref CatchableError): ?State =
inc errors[1]
method onError(state: State3, error: ref CatchableError): ?State =
inc errors[2]
method onError(state: State4, error: ref CatchableError): ?State =
inc errors[3]
some State(State2.new())
asyncchecksuite "async state machines":
var machine: Machine
@ -76,7 +54,6 @@ asyncchecksuite "async state machines":
setup:
runs = [0, 0, 0, 0]
cancellations = [0, 0, 0, 0]
errors = [0, 0, 0, 0]
machine = Machine.new()
test "should call run on start state":
@ -112,16 +89,6 @@ asyncchecksuite "async state machines":
check runs == [0, 1, 0, 0]
check cancellations == [0, 1, 0, 0]
test "forwards errors to error handler":
machine.start(State4.new())
check eventually errors == [0, 0, 0, 1] and runs == [0, 1, 0, 1]
test "error handler ignores CancelledError":
machine.start(State2.new())
machine.schedule(moveToNextStateEvent)
check eventually cancellations == [0, 1, 0, 0]
check errors == [0, 0, 0, 0]
test "queries properties of the current state":
proc description(state: State): string =
$state