diff --git a/codex/sales.nim b/codex/sales.nim index 4758e1a1..3e399357 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -1,4 +1,3 @@ -import std/sequtils import pkg/questionable import pkg/upraises import pkg/stint @@ -9,6 +8,9 @@ import ./market import ./clock import ./proving import ./contracts/requests +import ./sales/salesagent +import ./sales/statemachine +import ./sales/states/[downloading, unknown] ## Sales holds a list of available storage that it may sell. ## @@ -29,45 +31,8 @@ import ./contracts/requests ## | | ---- storage proof ---> | export stint - -type - Sales* = ref object - market: Market - clock: Clock - subscription: ?market.Subscription - available*: seq[Availability] - onStore: ?OnStore - onProve: ?OnProve - onClear: ?OnClear - onSale: ?OnSale - proving: Proving - Availability* = object - id*: array[32, byte] - size*: UInt256 - duration*: UInt256 - minPrice*: UInt256 - SalesAgent = ref object - sales: Sales - requestId: RequestId - ask: StorageAsk - availability: Availability - request: ?StorageRequest - slotIndex: ?UInt256 - subscription: ?market.Subscription - running: ?Future[void] - waiting: ?Future[void] - finished: bool - OnStore = proc(request: StorageRequest, - slot: UInt256, - availability: Availability): Future[void] {.gcsafe, upraises: [].} - OnProve = proc(request: StorageRequest, - slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].} - OnClear = proc(availability: Availability, - request: StorageRequest, - slotIndex: UInt256) {.gcsafe, upraises: [].} - OnSale = proc(availability: Availability, - request: StorageRequest, - slotIndex: UInt256) {.gcsafe, upraises: [].} +export salesagent +export statemachine func new*(_: type Sales, market: Market, @@ -87,146 +52,47 @@ proc init*(_: type Availability, doAssert randomBytes(id) == 32 Availability(id: id, size: size, duration: duration, minPrice: minPrice) -proc `onStore=`*(sales: Sales, onStore: OnStore) = - sales.onStore = some onStore - -proc `onProve=`*(sales: Sales, onProve: OnProve) = - sales.onProve = some onProve - -proc `onClear=`*(sales: Sales, onClear: OnClear) = - sales.onClear = some onClear - -proc `onSale=`*(sales: Sales, callback: OnSale) = - sales.onSale = some callback - -func add*(sales: Sales, availability: Availability) = - sales.available.add(availability) - -func remove*(sales: Sales, availability: Availability) = - sales.available.keepItIf(it != availability) - -func findAvailability(sales: Sales, ask: StorageAsk): ?Availability = - for availability in sales.available: - if ask.slotSize <= availability.size and - ask.duration <= availability.duration and - ask.pricePerSlot >= availability.minPrice: - return some availability - -proc finish(agent: SalesAgent, success: bool) = - if agent.finished: - return - - agent.finished = true - - if subscription =? agent.subscription: - asyncSpawn subscription.unsubscribe() - - if running =? agent.running: - running.cancel() - - if waiting =? agent.waiting: - waiting.cancel() - - if success: - if request =? agent.request and - slotIndex =? agent.slotIndex: - agent.sales.proving.add(request.slotId(slotIndex)) - - if onSale =? agent.sales.onSale: - onSale(agent.availability, request, slotIndex) - else: - if onClear =? agent.sales.onClear and - request =? agent.request and - slotIndex =? agent.slotIndex: - onClear(agent.availability, request, slotIndex) - agent.sales.add(agent.availability) - -proc selectSlot(agent: SalesAgent) = - let rng = Rng.instance - let slotIndex = rng.rand(agent.ask.slots - 1) - agent.slotIndex = some slotIndex.u256 - -proc onSlotFilled(agent: SalesAgent, - requestId: RequestId, - slotIndex: UInt256) {.async.} = - try: - let market = agent.sales.market - let host = await market.getHost(requestId, slotIndex) - let me = await market.getSigner() - agent.finish(success = (host == me.some)) - except CatchableError: - agent.finish(success = false) - -proc subscribeSlotFilled(agent: SalesAgent, slotIndex: UInt256) {.async.} = - proc onSlotFilled(requestId: RequestId, - slotIndex: UInt256) {.gcsafe, upraises:[].} = - asyncSpawn agent.onSlotFilled(requestId, slotIndex) - let market = agent.sales.market - let subscription = await market.subscribeSlotFilled(agent.requestId, - slotIndex, - onSlotFilled) - agent.subscription = some subscription - -proc waitForExpiry(agent: SalesAgent) {.async.} = - without request =? agent.request: - return - await agent.sales.clock.waitUntil(request.expiry.truncate(int64)) - agent.finish(success = false) - -proc start(agent: SalesAgent) {.async.} = - try: - let sales = agent.sales - let market = sales.market - let availability = agent.availability - - without onStore =? sales.onStore: - raiseAssert "onStore callback not set" - - without onProve =? sales.onProve: - raiseAssert "onProve callback not set" - - sales.remove(availability) - - agent.selectSlot() - without slotIndex =? agent.slotIndex: - raiseAssert "no slot selected" - - await agent.subscribeSlotFilled(slotIndex) - - agent.request = await market.getRequest(agent.requestId) - without request =? agent.request: - agent.finish(success = false) - return - - agent.waiting = some agent.waitForExpiry() - - await onStore(request, slotIndex, availability) - let proof = await onProve(request, slotIndex) - await market.fillSlot(request.id, slotIndex, proof) - except CancelledError: - raise - except CatchableError as e: - error "SalesAgent failed", msg = e.msg - agent.finish(success = false) - -proc handleRequest(sales: Sales, requestId: RequestId, ask: StorageAsk) = - without availability =? sales.findAvailability(ask): - return - - let agent = SalesAgent( - sales: sales, - requestId: requestId, - ask: ask, - availability: availability +proc handleRequest(sales: Sales, + requestId: RequestId, + ask: StorageAsk) {.async.} = + let availability = sales.findAvailability(ask) + let agent = newSalesAgent( + sales, + requestId, + availability, + none StorageRequest ) - agent.running = some agent.start() + await agent.init(ask.slots) + await agent.switchAsync(SaleDownloading()) + sales.agents.add agent + +proc load*(sales: Sales) {.async.} = + let market = sales.market + + # TODO: restore availability from disk + + let slotIds = await market.mySlots() + for slotId in slotIds: + # TODO: this needs to be optimised + if slot =? await market.getSlot(slotId): + if request =? await market.getRequest(slot.requestId): + let availability = sales.findAvailability(request.ask) + let agent = newSalesAgent( + sales, + slot.requestId, + availability, + some request) + + await agent.init(request.ask.slots) + await agent.switchAsync(SaleUnknown()) + sales.agents.add agent proc start*(sales: Sales) {.async.} = doAssert sales.subscription.isNone, "Sales already started" - proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} = - sales.handleRequest(requestId, ask) + proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[], async.} = + await sales.handleRequest(requestId, ask) try: sales.subscription = some await sales.market.subscribeRequests(onRequest) @@ -240,3 +106,7 @@ proc stop*(sales: Sales) {.async.} = await subscription.unsubscribe() except CatchableError as e: warn "Unsubscribe failed", msg = e.msg + + for agent in sales.agents: + await agent.deinit() + diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim new file mode 100644 index 00000000..face5493 --- /dev/null +++ b/codex/sales/salesagent.nim @@ -0,0 +1,85 @@ +import pkg/chronos +import pkg/upraises +import pkg/stint +import ./statemachine +import ./states/[downloading, unknown] +import ../contracts/requests +import ../rng + +proc newSalesAgent*(sales: Sales, + requestId: RequestId, + availability: ?Availability, + request: ?StorageRequest): SalesAgent = + SalesAgent( + sales: sales, + requestId: requestId, + availability: availability, + request: request) + +# fwd declarations +proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.} +proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.} + +proc populateRequest(agent: SalesAgent) {.async.} = + if agent.request.isNone: + agent.request = await agent.sales.market.getRequest(agent.requestId) + +proc init*(agent: SalesAgent, numSlots: uint64) {.async.} = + let rng = Rng.instance + let slotIndex = rng.rand(numSlots - 1) + agent.slotIndex = some slotIndex.u256 + + # TODO: try not to block the thread waiting for the network + await agent.populateRequest() + await agent.subscribeCancellation() + await agent.subscribeFailure() + +proc deinit*(agent: SalesAgent) {.async.} = + try: + await agent.fulfilled.unsubscribe() + except CatchableError: + discard + try: + await agent.failed.unsubscribe() + except CatchableError: + discard + if not agent.cancelled.completed: + await agent.cancelled.cancelAndWait() + +proc subscribeCancellation*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + proc onCancelled() {.async.} = + let clock = agent.sales.clock + + without request =? agent.request: + return + + await clock.waitUntil(request.expiry.truncate(int64)) + await agent.fulfilled.unsubscribe() + without state =? (agent.state as SaleState): + return + await state.onCancelled(request) + + agent.cancelled = onCancelled() + + proc onFulfilled(_: RequestId) {.async.} = + agent.cancelled.cancel() + + agent.fulfilled = + await market.subscribeFulfillment(agent.requestId, onFulfilled) + +proc subscribeFailure*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + proc onFailed(_: RequestId) {.async.} = + without request =? agent.request: + return + + without state =? (agent.state as SaleState): + return + + await state.onFailed(request) + + agent.failed = + await market.subscribeRequestFailed(agent.requestId, onFailed) diff --git a/codex/sales/statemachine.nim b/codex/sales/statemachine.nim new file mode 100644 index 00000000..af180e51 --- /dev/null +++ b/codex/sales/statemachine.nim @@ -0,0 +1,104 @@ +import std/sequtils +import pkg/chronos +import pkg/questionable +import pkg/upraises +import ../errors +import ../utils/statemachine +import ../market +import ../clock +import ../proving +import ../contracts/requests + +export market +export clock +export statemachine +export proving + +type + Sales* = ref object + market*: Market + clock*: Clock + subscription*: ?market.Subscription + available: seq[Availability] + onStore: ?OnStore + onProve: ?OnProve + onClear: ?OnClear + onSale: ?OnSale + proving*: Proving + agents*: seq[SalesAgent] + SalesAgent* = ref object of StateMachineAsync + sales*: Sales + requestId*: RequestId + ask*: StorageAsk + availability*: ?Availability # TODO: when availability persistence is added, change this to not optional + request*: ?StorageRequest + slotIndex*: ?UInt256 + failed*: market.Subscription + fulfilled*: market.Subscription + cancelled*: Future[void] + SaleState* = ref object of AsyncState + SaleError* = ref object of CodexError + Availability* = object + id*: array[32, byte] + size*: UInt256 + duration*: UInt256 + minPrice*: UInt256 + AvailabilityChange* = proc(availability: Availability) {.gcsafe, upraises: [].} + # TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all) + RequestEvent* = proc(state: SaleState, request: StorageRequest): Future[void] {.gcsafe, upraises: [].} + OnStore* = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability): Future[void] {.gcsafe, upraises: [].} + OnProve* = proc(request: StorageRequest, + slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].} + OnClear* = proc(availability: ?Availability,# TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all) + request: StorageRequest, + slotIndex: UInt256) {.gcsafe, upraises: [].} + OnSale* = proc(availability: ?Availability, # TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all) + request: StorageRequest, + slotIndex: UInt256) {.gcsafe, upraises: [].} + +proc `onStore=`*(sales: Sales, onStore: OnStore) = + sales.onStore = some onStore + +proc `onProve=`*(sales: Sales, onProve: OnProve) = + sales.onProve = some onProve + +proc `onClear=`*(sales: Sales, onClear: OnClear) = + sales.onClear = some onClear + +proc `onSale=`*(sales: Sales, callback: OnSale) = + sales.onSale = some callback + +proc onStore*(sales: Sales): ?OnStore = sales.onStore + +proc onProve*(sales: Sales): ?OnProve = sales.onProve + +proc onClear*(sales: Sales): ?OnClear = sales.onClear + +proc onSale*(sales: Sales): ?OnSale = sales.onSale + +proc available*(sales: Sales): seq[Availability] = sales.available + +func add*(sales: Sales, availability: Availability) = + if not sales.available.contains(availability): + sales.available.add(availability) + # TODO: add to disk (persist), serialise to json. + +func remove*(sales: Sales, availability: Availability) = + sales.available.keepItIf(it != availability) + # TODO: remove from disk availability, mark as in use by assigning + # a slotId, so that it can be used for restoration (node restart) + +func findAvailability*(sales: Sales, ask: StorageAsk): ?Availability = + for availability in sales.available: + if ask.slotSize <= availability.size and + ask.duration <= availability.duration and + ask.pricePerSlot >= availability.minPrice: + return some availability + +method onCancelled*(state: SaleState, request: StorageRequest) {.base, async.} = + discard + +method onFailed*(state: SaleState, request: StorageRequest) {.base, async.} = + discard diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim new file mode 100644 index 00000000..06e69652 --- /dev/null +++ b/codex/sales/states/cancelled.nim @@ -0,0 +1,13 @@ +import ../statemachine +import ./errored + +type SaleCancelled* = ref object of SaleState + +method `$`*(state: SaleCancelled): string = "SaleCancelled" + +method enterAsync*(state: SaleCancelled) {.async.} = + without agent =? (state.context as SalesAgent): + raiseAssert "invalid state" + + let error = newException(Timeout, "Sale cancelled due to timeout") + await state.switchAsync(SaleErrored(error: error)) diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim new file mode 100644 index 00000000..e1da6be7 --- /dev/null +++ b/codex/sales/states/downloading.nim @@ -0,0 +1,48 @@ +import std/sequtils +import ./cancelled +import ./failed +import ./proving +import ./errored +import ../salesagent +import ../statemachine +import ../../market + +type + SaleDownloading* = ref object of SaleState + failedSubscription: ?market.Subscription + hasCancelled: ?Future[void] + SaleDownloadingError* = object of SaleError + +method `$`*(state: SaleDownloading): string = "SaleDownloading" + +method onCancelled*(state: SaleDownloading, request: StorageRequest) {.async.} = + await state.switchAsync(SaleCancelled()) + +method onFailed*(state: SaleDownloading, request: StorageRequest) {.async.} = + await state.switchAsync(SaleFailed()) + +method enterAsync(state: SaleDownloading) {.async.} = + without agent =? (state.context as SalesAgent): + raiseAssert "invalid state" + + try: + without onStore =? agent.sales.onStore: + raiseAssert "onStore callback not set" + + without slotIndex =? agent.slotIndex: + raiseAssert "no slot selected" + + without request =? agent.request: + raiseAssert "no sale request" + + await onStore(request, slotIndex, agent.availability) + await state.switchAsync(SaleProving()) + + except CancelledError: + discard + + except CatchableError as e: + let error = newException(SaleDownloadingError, + "unknown sale downloading error", + e) + await state.switchAsync(SaleErrored(error: error)) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim new file mode 100644 index 00000000..a9da7af0 --- /dev/null +++ b/codex/sales/states/errored.nim @@ -0,0 +1,25 @@ +import chronicles +import ../statemachine + +type SaleErrored* = ref object of SaleState + error*: ref CatchableError + +method `$`*(state: SaleErrored): string = "SaleErrored" + +method enterAsync*(state: SaleErrored) {.async.} = + without agent =? (state.context as SalesAgent): + raiseAssert "invalid state" + + if onClear =? agent.sales.onClear and + request =? agent.request and + slotIndex =? agent.slotIndex: + onClear(agent.availability, request, slotIndex) + + # TODO: when availability persistence is added, change this to not optional + # NOTE: with this in place, restoring state for a restarted node will + # never free up availability once finished. Persisting availability + # on disk is required for this. + if availability =? agent.availability: + agent.sales.add(availability) + + error "Sale error", error=state.error.msg diff --git a/codex/sales/states/failed.nim b/codex/sales/states/failed.nim new file mode 100644 index 00000000..497fb528 --- /dev/null +++ b/codex/sales/states/failed.nim @@ -0,0 +1,12 @@ +import ./errored +import ../statemachine + +type + SaleFailed* = ref object of SaleState + SaleFailedError* = object of SaleError + +method `$`*(state: SaleFailed): string = "SaleFailed" + +method enterAsync*(state: SaleFailed) {.async.} = + let error = newException(SaleFailedError, "Sale failed") + await state.switchAsync(SaleErrored(error: error)) diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim new file mode 100644 index 00000000..f1e95339 --- /dev/null +++ b/codex/sales/states/filled.nim @@ -0,0 +1,46 @@ +import pkg/questionable +import ./errored +import ./finished +import ./cancelled +import ./failed +import ../statemachine + +type + SaleFilled* = ref object of SaleState + SaleFilledError* = object of CatchableError + +method onCancelled*(state: SaleFilled, request: StorageRequest) {.async.} = + await state.switchAsync(SaleCancelled()) + +method onFailed*(state: SaleFilled, request: StorageRequest) {.async.} = + await state.switchAsync(SaleFailed()) + +method `$`*(state: SaleFilled): string = "SaleFilled" + +method enterAsync(state: SaleFilled) {.async.} = + without agent =? (state.context as SalesAgent): + raiseAssert "invalid state" + + try: + let market = agent.sales.market + + without slotIndex =? agent.slotIndex: + raiseAssert "no slot selected" + + if availability =? agent.availability: + agent.sales.remove(availability) + + let host = await market.getHost(agent.requestId, slotIndex) + let me = await market.getSigner() + if host == me.some: + await state.switchAsync(SaleFinished()) + else: + let error = newException(SaleFilledError, "Sale host mismatch") + await state.switchAsync(SaleErrored(error: error)) + + except CancelledError: + discard + + except CatchableError as e: + let error = newException(SaleFilledError, "sale filled error", e) + await state.switchAsync(SaleErrored(error: error)) diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim new file mode 100644 index 00000000..688bdbd3 --- /dev/null +++ b/codex/sales/states/filling.nim @@ -0,0 +1,50 @@ +import pkg/upraises +import ../../market +import ../statemachine +import ./filled +import ./errored +import ./cancelled +import ./failed + +type + SaleFilling* = ref object of SaleState + proof*: seq[byte] + SaleFillingError* = object of CatchableError + +method `$`*(state: SaleFilling): string = "SaleFilling" + +method onCancelled*(state: SaleFilling, request: StorageRequest) {.async.} = + await state.switchAsync(SaleCancelled()) + +method onFailed*(state: SaleFilling, request: StorageRequest) {.async.} = + await state.switchAsync(SaleFailed()) + +method enterAsync(state: SaleFilling) {.async.} = + without agent =? (state.context as SalesAgent): + raiseAssert "invalid state" + + var subscription: market.Subscription + + proc onSlotFilled(requestId: RequestId, + slotIndex: UInt256) {.async.} = + await subscription.unsubscribe() + await state.switchAsync(SaleFilled()) + + try: + let market = agent.sales.market + + without slotIndex =? agent.slotIndex: + raiseAssert "no slot selected" + + subscription = await market.subscribeSlotFilled(agent.requestId, + slotIndex, + onSlotFilled) + + await market.fillSlot(agent.requestId, slotIndex, state.proof) + + except CancelledError: + discard + + except CatchableError as e: + let error = newException(SaleFillingError, "unknown sale filling error", e) + await state.switchAsync(SaleErrored(error: error)) diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim new file mode 100644 index 00000000..1199d7dc --- /dev/null +++ b/codex/sales/states/finished.nim @@ -0,0 +1,42 @@ +import pkg/chronos +import ./cancelled +import ./errored +import ./failed +import ../statemachine + +type + SaleFinished* = ref object of SaleState + SaleFinishedError* = object of CatchableError + +method `$`*(state: SaleFinished): string = "SaleFinished" + +method onCancelled*(state: SaleFinished, request: StorageRequest) {.async.} = + await state.switchAsync(SaleCancelled()) + +method onFailed*(state: SaleFinished, request: StorageRequest) {.async.} = + await state.switchAsync(SaleFailed()) + +method enterAsync*(state: SaleFinished) {.async.} = + without agent =? (state.context as SalesAgent): + raiseAssert "invalid state" + + try: + if request =? agent.request and + slotIndex =? agent.slotIndex: + agent.sales.proving.add(request.slotId(slotIndex)) + + if onSale =? agent.sales.onSale: + onSale(agent.availability, request, slotIndex) + + # TODO: Keep track of contract completion using local clock. When contract + # has finished, we need to add back availability to the sales module. + # This will change when the state machine is updated to include the entire + # sales process, as well as when availability is persisted, so leaving it + # as a TODO for now. + + except CancelledError: + discard + + except CatchableError as e: + let error = newException(SaleFinishedError, "sale finished error", e) + await state.switchAsync(SaleErrored(error: error)) diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim new file mode 100644 index 00000000..27bb14ab --- /dev/null +++ b/codex/sales/states/proving.nim @@ -0,0 +1,41 @@ +import ../statemachine +import ./filling +import ./cancelled +import ./failed +import ./errored + +type + SaleProving* = ref object of SaleState + SaleProvingError* = object of CatchableError + +method `$`*(state: SaleProving): string = "SaleProving" + +method onCancelled*(state: SaleProving, request: StorageRequest) {.async.} = + await state.switchAsync(SaleCancelled()) + +method onFailed*(state: SaleProving, request: StorageRequest) {.async.} = + await state.switchAsync(SaleFailed()) + +method enterAsync(state: SaleProving) {.async.} = + without agent =? (state.context as SalesAgent): + raiseAssert "invalid state" + + try: + without request =? agent.request: + raiseAssert "no sale request" + + without slotIndex =? agent.slotIndex: + raiseAssert "no slot selected" + + without onProve =? agent.sales.onProve: + raiseAssert "onProve callback not set" + + let proof = await onProve(request, slotIndex) + await state.switchAsync(SaleFilling(proof: proof)) + + except CancelledError: + discard + + except CatchableError as e: + let error = newException(SaleProvingError, "unknown sale proving error", e) + await state.switchAsync(SaleErrored(error: error)) diff --git a/codex/sales/states/unknown.nim b/codex/sales/states/unknown.nim new file mode 100644 index 00000000..89c71529 --- /dev/null +++ b/codex/sales/states/unknown.nim @@ -0,0 +1,47 @@ +import ../statemachine +import ./filled +import ./finished +import ./failed +import ./errored +import ./cancelled + +type + SaleUnknown* = ref object of SaleState + SaleUnknownError* = object of CatchableError + +method `$`*(state: SaleUnknown): string = "SaleUnknown" + +method onCancelled*(state: SaleUnknown, request: StorageRequest) {.async.} = + await state.switchAsync(SaleCancelled()) + +method onFailed*(state: SaleUnknown, request: StorageRequest) {.async.} = + await state.switchAsync(SaleFailed()) + +method enterAsync(state: SaleUnknown) {.async.} = + without agent =? (state.context as SalesAgent): + raiseAssert "invalid state" + + let market = agent.sales.market + + try: + without requestState =? await market.getState(agent.requestId): + raiseAssert "state unknown" + + case requestState + of RequestState.New, RequestState.Started: + await state.switchAsync(SaleFilled()) + of RequestState.Finished: + await state.switchAsync(SaleFinished()) + of RequestState.Cancelled: + await state.switchAsync(SaleCancelled()) + of RequestState.Failed: + await state.switchAsync(SaleFailed()) + + except CancelledError: + discard + + except CatchableError as e: + let error = newException(SaleUnknownError, + "error in unknown state", + e) + await state.switchAsync(SaleErrored(error: error))