From 8286fe5beebbd01a6fedc3fdbbd53a155f3268b8 Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Sun, 26 Feb 2023 23:05:39 +1100 Subject: [PATCH] WIP: many tests working, but some not testable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I know this is a big commit… There are some tests that aren’t testable yet, such as testing that an active state transitions to cancelled/failed states. This is due to the inability to wait during some states like SaleFilling. The tests would likely need to be redesigned, but I don’t think it’s worth pursuing the fully declarative model further as it is too hard to debug. The declarative state machine DOES work, however. The main point for keeping this commit is for the learnings about the state machine to bring fwd into the branch used in 306. --- codex/sales.nim | 37 +- codex/sales/salesagent.nim | 163 +--- codex/sales/statemachine.nim | 11 +- codex/sales/states/cancelled.nim | 4 +- codex/sales/states/downloading.nim | 8 +- codex/sales/states/errored.nim | 6 +- codex/sales/states/failed.nim | 2 +- codex/sales/states/filled.nim | 24 +- codex/sales/states/filling.nim | 18 +- codex/sales/states/finished.nim | 29 +- codex/sales/states/proving.nim | 23 +- codex/sales/states/unknown.nim | 27 +- codex/sales/subscriptions.nim | 92 ++ codex/utils/asyncstatemachine.nim | 36 +- tests/codex/helpers/mockmarket.nim | 1 + tests/codex/testsales.nim | 896 ++++++++++---------- tests/codex/utils/testasyncstatemachine.nim | 2 + 17 files changed, 716 insertions(+), 663 deletions(-) create mode 100644 codex/sales/subscriptions.nim diff --git a/codex/sales.nim b/codex/sales.nim index 8a715569..1a4eb4a6 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -1,4 +1,5 @@ import pkg/questionable +import pkg/questionable/results import pkg/upraises import pkg/stint import pkg/nimcrypto @@ -11,7 +12,7 @@ import ./errors import ./contracts/requests import ./sales/salesagent import ./sales/statemachine -import ./sales/states/[start, downloading, unknown] +import ./sales/states/unknown ## Sales holds a list of available storage that it may sell. ## @@ -72,26 +73,21 @@ proc findSlotIndex(numSlots: uint64, proc handleRequest(sales: Sales, requestId: RequestId, - ask: StorageAsk) = + ask: StorageAsk) {.raises:[CatchableError].} = let availability = sales.findAvailability(ask) # TODO: check if random slot is actually available (not already filled) let slotIndex = randomSlotIndex(ask.slots) - without request =? await sales.market.getRequest(requestId): - raise newException(SalesError, "Failed to get request on chain") - - let me = await sales.market.getSigner() - let agent = newSalesAgent( sales, slotIndex, availability, - request, - me, + requestId, + none StorageRequest, RequestState.New, SlotState.Free ) - await agent.start(SaleUnknown.new()) + waitFor agent.start() sales.agents.add agent proc load*(sales: Sales) {.async.} = @@ -100,7 +96,6 @@ proc load*(sales: Sales) {.async.} = # TODO: restore availability from disk let requestIds = await market.myRequests() let slotIds = await market.mySlots() - let me = await market.getSigner() for slotId in slotIds: # TODO: this needs to be optimised @@ -109,28 +104,34 @@ proc load*(sales: Sales) {.async.} = without slotIndex =? findSlotIndex(request.ask.slots, request.id, slotId): - raiseAssert "could not find slot index" + raise newException(SalesError, "could not find slot index") # TODO: should be optimised (maybe get everything in one call: request, request state, slot state) - let requestState = await market.requestState(request.id) + without requestState =? await market.requestState(request.id): + raise newException(SalesError, "request state could not be determined") + let slotState = await market.slotState(slotId) let agent = newSalesAgent( sales, slotIndex, availability, - request, - me, + request.id, + some request, requestState, - slotState) - await agent.start(SaleUnknown.new()) + slotState, + restoredFromChain = true) + await agent.start() sales.agents.add agent proc start*(sales: Sales) {.async.} = doAssert sales.subscription.isNone, "Sales already started" proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} = - sales.handleRequest(requestId, ask) + try: + sales.handleRequest(requestId, ask) + except CatchableError as e: + error "Failed to handle storage request", error = e.msg try: sales.subscription = some await sales.market.subscribeRequests(onRequest) diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 0ec09193..4066d316 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -1,19 +1,29 @@ import pkg/chronos -import pkg/upraises import pkg/stint import ./statemachine import ./states/[cancelled, downloading, errored, failed, finished, filled, filling, proving, unknown] +import ./subscriptions import ../contracts/requests +type SaleState* {.pure.} = enum + SaleUnknown, + SaleDownloading, + SaleProving, + SaleFilling, + SaleFilled, + SaleCancelled, + SaleFailed, + SaleErrored + proc newSalesAgent*(sales: Sales, slotIndex: UInt256, availability: ?Availability, - request: StorageRequest, - me: Address, + requestId: RequestId, + request: ?StorageRequest, requestState: RequestState, slotState: SlotState, - restoredFromChain: bool): SalesAgent = + restoredFromChain: bool = false): SalesAgent = let saleUnknown = SaleUnknown.new() let saleDownloading = SaleDownloading.new() @@ -25,11 +35,18 @@ proc newSalesAgent*(sales: Sales, let saleErrored = SaleErrored.new() let agent = SalesAgent.new(@[ + Transition.new( + AnyState.new(), + saleErrored, + proc(m: Machine, s: State): bool = + SalesAgent(m).errored.value + ), Transition.new( saleUnknown, saleDownloading, proc(m: Machine, s: State): bool = let agent = SalesAgent(m) + not agent.restoredFromChain and agent.requestState.value == RequestState.New and agent.slotState.value == SlotState.Free ), @@ -84,12 +101,7 @@ proc newSalesAgent*(sales: Sales, agent.slotState.value in @[SlotState.Finished, SlotState.Paid] or agent.requestState.value == RequestState.Finished ), - Transition.new( - AnyState.new(), - saleErrored, - proc(m: Machine, s: State): bool = - SalesAgent(m).errored.value - ), + Transition.new( saleDownloading, saleProving, @@ -106,133 +118,34 @@ proc newSalesAgent*(sales: Sales, saleFilled, SaleFinished.new(), proc(m: Machine, s: State): bool = - let agent = SalesAgent(m) - without host =? agent.slotHost.value: - return false - host == agent.me - ), - Transition.new( - saleFilled, - saleErrored, - proc(m: Machine, s: State): bool = - let agent = SalesAgent(m) - without host =? agent.slotHost.value: - return false - if host != agent.me: - agent.lastError = newException(HostMismatchError, - "Slot filled by other host") - return true - else: return false - ), - Transition.new( - saleUnknown, - saleErrored, - proc(m: Machine, s: State): bool = - let agent = SalesAgent(m) - if agent.restoredFromChain and agent.slotState.value == SlotState.Free: - agent.lastError = newException(SaleUnknownError, - "cannot retrieve slot state") - return true - else: return false + SalesAgent(m).slotHostIsMe.value ), ]) + agent.addState (SaleState.SaleUnknown.int, saleUnknown), + (SaleState.SaleDownloading.int, saleDownloading), + (SaleState.SaleProving.int, saleProving), + (SaleState.SaleFilling.int, saleFilling), + (SaleState.SaleFilled.int, saleFilled), + (SaleState.SaleCancelled.int, saleCancelled), + (SaleState.SaleFailed.int, saleFailed), + (SaleState.SaleErrored.int, saleErrored) agent.slotState = agent.newTransitionProperty(slotState) agent.requestState = agent.newTransitionProperty(requestState) agent.proof = agent.newTransitionProperty(newSeq[byte]()) - agent.slotHost = agent.newTransitionProperty(none Address) + agent.slotHostIsMe = agent.newTransitionProperty(false) agent.downloaded = agent.newTransitionProperty(false) agent.sales = sales agent.availability = availability agent.slotIndex = slotIndex + agent.requestId = requestId agent.request = request - agent.me = me + agent.restoredFromChain = restoredFromChain return agent -proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.} -proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.} -proc subscribeSlotFill*(agent: SalesAgent): Future[void] {.gcsafe.} - -proc start*(agent: SalesAgent, initialState: State) {.async.} = - await agent.subscribeCancellation() - await agent.subscribeFailure() - await agent.subscribeSlotFill() +proc start*(agent: SalesAgent, + initialState: State = agent.getState(SaleState.SaleUnknown)) {.async.} = + await agent.subscribe() procCall Machine(agent).start(initialState) proc stop*(agent: SalesAgent) {.async.} = - try: - await agent.subscribeFulfilled.unsubscribe() - except CatchableError: - discard - try: - await agent.subscribeFailed.unsubscribe() - except CatchableError: - discard - try: - await agent.subscribeSlotFilled.unsubscribe() - except CatchableError: - discard - if not agent.waitForCancelled.completed: - await agent.waitForCancelled.cancelAndWait() - - procCall Machine(agent).stop() - -proc subscribeCancellation*(agent: SalesAgent) {.async.} = - let market = agent.sales.market - - proc onCancelled() {.async.} = - let clock = agent.sales.clock - - await clock.waitUntil(agent.request.expiry.truncate(int64)) - await agent.subscribeFulfilled.unsubscribe() - agent.requestState.setValue(RequestState.Cancelled) - - agent.waitForCancelled = onCancelled() - - proc onFulfilled(_: RequestId) = - agent.waitForCancelled.cancel() - - agent.subscribeFulfilled = - await market.subscribeFulfillment(agent.request.id, onFulfilled) - -# TODO: move elsewhere -proc asyncSpawn(future: Future[void], ignore: type CatchableError) = - proc ignoringError {.async.} = - try: - await future - except ignore: - discard - asyncSpawn ignoringError() - -proc subscribeFailure*(agent: SalesAgent) {.async.} = - let market = agent.sales.market - - proc onFailed(_: RequestId) {.upraises:[], gcsafe.} = - asyncSpawn agent.subscribeFailed.unsubscribe(), ignore = CatchableError - try: - agent.requestState.setValue(RequestState.Failed) - except AsyncQueueFullError as e: - raiseAssert "State machine critical failure: " & e.msg - - agent.subscribeFailed = - await market.subscribeRequestFailed(agent.request.id, onFailed) - -proc subscribeSlotFill*(agent: SalesAgent) {.async.} = - let market = agent.sales.market - - proc onSlotFilled( - requestId: RequestId, - slotIndex: UInt256) {.upraises:[], gcsafe.} = - - let market = agent.sales.market - - asyncSpawn agent.subscribeSlotFilled.unsubscribe(), ignore = CatchableError - try: - agent.slotState.setValue(SlotState.Filled) - except AsyncQueueFullError as e: - raiseAssert "State machine critical failure: " & e.msg - - agent.subscribeSlotFilled = - await market.subscribeSlotFilled(agent.request.id, - agent.slotIndex, - onSlotFilled) - + await agent.unsubscribe() diff --git a/codex/sales/statemachine.nim b/codex/sales/statemachine.nim index 64e383e2..fedc69ef 100644 --- a/codex/sales/statemachine.nim +++ b/codex/sales/statemachine.nim @@ -4,6 +4,7 @@ import pkg/questionable import pkg/upraises import ../errors import ../utils/asyncstatemachine +import ../utils/optionalcast import ../market import ../clock import ../proving @@ -13,6 +14,7 @@ export market export clock export asyncstatemachine export proving +export optionalcast type Sales* = ref object @@ -30,21 +32,20 @@ type sales*: Sales ask*: StorageAsk availability*: ?Availability # TODO: when availability persistence is added, change this to not optional - request*: StorageRequest + requestId*: RequestId + request*: ?StorageRequest slotIndex*: UInt256 subscribeFailed*: market.Subscription subscribeFulfilled*: market.Subscription subscribeSlotFilled*: market.Subscription waitForCancelled*: Future[void] - me*: Address restoredFromChain*: bool slotState*: TransitionProperty[SlotState] requestState*: TransitionProperty[RequestState] proof*: TransitionProperty[seq[byte]] - slotHost*: TransitionProperty[?Address] + slotHostIsMe*: TransitionProperty[bool] downloaded*: TransitionProperty[bool] - SaleState* = ref object of State - SaleError* = ref object of CodexError + SaleError* = object of CodexError Availability* = object id*: array[32, byte] size*: UInt256 diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 60d17de8..4fdb4224 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -1,13 +1,13 @@ import ../statemachine -import ./errored type SaleCancelled* = ref object of State - SaleCancelledError* = object of CatchableError + SaleCancelledError* = object of SaleError SaleTimeoutError* = object of SaleCancelledError method `$`*(state: SaleCancelled): string = "SaleCancelled" method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let error = newException(SaleTimeoutError, "Sale cancelled due to timeout") machine.setError(error) diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 710be34d..52442a71 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -17,16 +17,22 @@ type method `$`*(state: SaleDownloading): string = "SaleDownloading" method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) try: without onStore =? agent.sales.onStore: raiseAssert "onStore callback not set" + without request =? agent.request: + let error = newException(SaleDownloadingError, "missing request") + agent.setError error + return + if availability =? agent.availability: agent.sales.remove(availability) - await onStore(agent.request, agent.slotIndex, agent.availability) + await onStore(request, agent.slotIndex, agent.availability) agent.downloaded.setValue(true) except CancelledError: diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 739f0da8..d684d244 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -6,12 +6,12 @@ type SaleErrored* = ref object of State method `$`*(state: SaleErrored): string = "SaleErrored" method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) if onClear =? agent.sales.onClear and - request =? agent.request and - slotIndex =? agent.slotIndex: - onClear(agent.availability, request, slotIndex) + request =? agent.request: + onClear(agent.availability, request, agent.slotIndex) # TODO: when availability persistence is added, change this to not optional # NOTE: with this in place, restoring state for a restarted node will diff --git a/codex/sales/states/failed.nim b/codex/sales/states/failed.nim index 583187d6..b944d6bc 100644 --- a/codex/sales/states/failed.nim +++ b/codex/sales/states/failed.nim @@ -1,4 +1,3 @@ -import ./errored import ../statemachine type @@ -8,5 +7,6 @@ type method `$`*(state: SaleFailed): string = "SaleFailed" method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let error = newException(SaleFailedError, "Sale failed") machine.setError error diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim index 4b5bfe6d..b84a1c31 100644 --- a/codex/sales/states/filled.nim +++ b/codex/sales/states/filled.nim @@ -1,25 +1,25 @@ import pkg/questionable -import ./errored -import ./finished -import ./cancelled -import ./failed import ../statemachine type SaleFilled* = ref object of State - SaleFilledError* = object of CatchableError + SaleFilledError* = object of SaleError HostMismatchError* = object of SaleFilledError method `$`*(state: SaleFilled): string = "SaleFilled" method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) + let market = agent.sales.market - try: - let market = agent.sales.market + without slotHost =? await market.getHost(agent.requestId, agent.slotIndex): + let error = newException(SaleFilledError, "Filled slot has no host address") + agent.setError error + return - let slotHost = await market.getHost(agent.request.id, agent.slotIndex) - agent.slotHost.setValue(slotHost) - - except CancelledError: - raise + let me = await market.getSigner() + if slotHost == me: + agent.slotHostIsMe.setValue(true) + else: + agent.setError newException(HostMismatchError, "Slot filled by other host") diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index f1723861..b0c1d9e1 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -1,25 +1,15 @@ -import pkg/upraises import ../../market import ../statemachine -import ./filled -import ./errored -import ./cancelled -import ./failed type SaleFilling* = ref object of State - proof*: seq[byte] - SaleFillingError* = object of CatchableError + SaleFillingError* = object of SaleError method `$`*(state: SaleFilling): string = "SaleFilling" method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) + let market = agent.sales.market - try: - let market = agent.sales.market - - await market.fillSlot(agent.request.id, agent.slotIndex, state.proof) - - except CancelledError: - raise + await market.fillSlot(agent.requestId, agent.slotIndex, agent.proof.value) diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index 4db59655..3fbf56ad 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -6,26 +6,27 @@ import ../statemachine type SaleFinished* = ref object of State - SaleFinishedError* = object of CatchableError + SaleFinishedError* = object of SaleError method `$`*(state: SaleFinished): string = "SaleFinished" method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) + let slotIndex = agent.slotIndex - try: - if request =? agent.request and - slotIndex =? agent.slotIndex: - agent.sales.proving.add(request.slotId(slotIndex)) + without request =? agent.request: + let error = newException(SaleFinishedError, "missing request") + agent.setError error + return - if onSale =? agent.sales.onSale: - onSale(agent.availability, request, slotIndex) + agent.sales.proving.add(request.slotId(slotIndex)) - # TODO: Keep track of contract completion using local clock. When contract - # has finished, we need to add back availability to the sales module. - # This will change when the state machine is updated to include the entire - # sales process, as well as when availability is persisted, so leaving it - # as a TODO for now. + if onSale =? agent.sales.onSale: + onSale(agent.availability, request, slotIndex) - except CancelledError: - raise + # TODO: Keep track of contract completion using local clock. When contract + # has finished, we need to add back availability to the sales module. + # This will change when the state machine is updated to include the entire + # sales process, as well as when availability is persisted, so leaving it + # as a TODO for now. diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index 4fd31e9d..e5ee3265 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -1,25 +1,22 @@ import ../statemachine -import ./filling -import ./cancelled -import ./failed -import ./filled -import ./errored type SaleProving* = ref object of State - SaleProvingError* = object of CatchableError + SaleProvingError* = object of SaleError method `$`*(state: SaleProving): string = "SaleProving" method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) - try: - without onProve =? agent.sales.onProve: - raiseAssert "onProve callback not set" + without onProve =? agent.sales.onProve: + raiseAssert "onProve callback not set" - let proof = await onProve(agent.request, agent.slotIndex) - agent.proof.setValue(proof) + without request =? agent.request: + let error = newException(SaleProvingError, "missing request") + agent.setError error + return - except CancelledError: - raise + let proof = await onProve(request, agent.slotIndex) + agent.proof.setValue(proof) diff --git a/codex/sales/states/unknown.nim b/codex/sales/states/unknown.nim index 7bc15c43..aab262b6 100644 --- a/codex/sales/states/unknown.nim +++ b/codex/sales/states/unknown.nim @@ -1,14 +1,29 @@ import ../statemachine -import ./filled -import ./finished -import ./failed -import ./errored -import ./cancelled +import ../subscriptions type SaleUnknown* = ref object of State - SaleUnknownError* = object of CatchableError + SaleUnknownError* = object of SaleError UnexpectedSlotError* = object of SaleUnknownError method `$`*(state: SaleUnknown): string = "SaleUnknown" +method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} = + # echo "running ", state + let agent = SalesAgent(machine) + let market = agent.sales.market + + if agent.request.isNone: + agent.request = await market.getRequest(agent.requestId) + if agent.request.isSome: + await agent.subscribe() + + if agent.request.isNone: + agent.setError newException(SaleUnknownError, "missing request") + return + + if agent.restoredFromChain and agent.slotState.value == SlotState.Free: + agent.setError newException(UnexpectedSlotError, + "slot state on chain should not be 'free'") + return + diff --git a/codex/sales/subscriptions.nim b/codex/sales/subscriptions.nim new file mode 100644 index 00000000..823df6bd --- /dev/null +++ b/codex/sales/subscriptions.nim @@ -0,0 +1,92 @@ +import pkg/chronos +import pkg/upraises +import pkg/stint +import ./statemachine +import ../contracts/requests + +# TODO: move elsewhere +proc asyncSpawn(future: Future[void], ignore: type CatchableError) = + proc ignoringError {.async.} = + try: + await future + except ignore: + discard + asyncSpawn ignoringError() + +proc subscribeCancellation*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + proc onCancelled() {.async.} = + let clock = agent.sales.clock + without request =? agent.request: + return + + await clock.waitUntil(request.expiry.truncate(int64)) + asyncSpawn agent.subscribeFulfilled.unsubscribe(), ignore = CatchableError + agent.requestState.setValue(RequestState.Cancelled) + + proc onFulfilled(_: RequestId) = + agent.waitForCancelled.cancel() + + agent.subscribeFulfilled = + await market.subscribeFulfillment(agent.requestId, onFulfilled) + + agent.waitForCancelled = onCancelled() + +proc subscribeFailure*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + proc onFailed(_: RequestId) {.upraises:[], gcsafe.} = + asyncSpawn agent.subscribeFailed.unsubscribe(), ignore = CatchableError + try: + agent.requestState.setValue(RequestState.Failed) + except AsyncQueueFullError as e: + raiseAssert "State machine critical failure: " & e.msg + + agent.subscribeFailed = + await market.subscribeRequestFailed(agent.requestId, onFailed) + +proc subscribeSlotFill*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + proc onSlotFilled( + requestId: RequestId, + slotIndex: UInt256) {.upraises:[], gcsafe.} = + + let market = agent.sales.market + + asyncSpawn agent.subscribeSlotFilled.unsubscribe(), ignore = CatchableError + try: + agent.slotState.setValue(SlotState.Filled) + except AsyncQueueFullError as e: + raiseAssert "State machine critical failure: " & e.msg + + agent.subscribeSlotFilled = + await market.subscribeSlotFilled(agent.requestId, + agent.slotIndex, + onSlotFilled) + +proc subscribe*(agent: SalesAgent) {.async.} = + # TODO: Check that the subscription handlers aren't already assigned before + # assigning. This will allow for agent.subscribe to be called multiple times + await agent.subscribeCancellation() + await agent.subscribeFailure() + await agent.subscribeSlotFill() + +proc unsubscribe*(agent: SalesAgent) {.async.} = + try: + await agent.subscribeFulfilled.unsubscribe() + except CatchableError: + discard + try: + await agent.subscribeFailed.unsubscribe() + except CatchableError: + discard + try: + await agent.subscribeSlotFilled.unsubscribe() + except CatchableError: + discard + if not agent.waitForCancelled.completed: + await agent.waitForCancelled.cancelAndWait() + + procCall Machine(agent).stop() diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index fd1b0a4f..1b2a3a56 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -1,4 +1,6 @@ +import std/typetraits # DELETE ME import std/sequtils +import std/tables import pkg/questionable import pkg/chronos import pkg/chronicles @@ -19,6 +21,8 @@ type transitions: seq[Transition] errored*: TransitionProperty[bool] lastError*: ref CatchableError + states: Table[int, State] + started: bool State* = ref object of RootObj AnyState* = ref object of State Event* = proc(state: State): ?State {.gcsafe, upraises:[].} @@ -50,19 +54,32 @@ proc transition*(_: type Event, previous, next: State): Event = if state == previous: return some next +method `$`*(state: State): string {.base.} = "Base state" + proc state*(machine: Machine): State = machine.state +template getState*(machine: Machine, id: untyped): State = + machine.states[id.int] + +proc addState*(machine: Machine, states: varargs[(int, State)]) = + machine.states = states.toTable + proc schedule*(machine: Machine, event: Event) = machine.scheduled.putNoWait(event) proc checkTransitions(machine: Machine) = + if not machine.started: + return + for transition in machine.transitions: if transition.trigger(machine, machine.state) and machine.state != transition.nextState and # avoid transitioning to self (machine.state == nil or machine.state in transition.prevStates or # state instance, multiple transition.prevStates.any(proc (s: State): bool = s of AnyState)): + # echo "scheduling transition from ", machine.state, " to ", transition.nextState machine.schedule(Event.transition(machine.state, transition.nextState)) + return proc setValue*[T](prop: TransitionProperty[T], value: T) = prop.value = value @@ -73,12 +90,12 @@ proc setError*(machine: Machine, error: ref CatchableError) = machine.errored.value = false # clears error without triggering transitions machine.lastError = error # stores error in state -method run*(state: State): Future[?State] {.base, upraises:[].} = +method run*(state: State, machine: Machine): Future[?State] {.base, upraises:[].} = discard proc run(machine: Machine, state: State) {.async.} = try: - if next =? await state.run(): + if next =? await state.run(machine): machine.schedule(Event.transition(state, next)) except CancelledError: discard @@ -86,7 +103,6 @@ proc run(machine: Machine, state: State) {.async.} = proc scheduler(machine: Machine) {.async.} = proc onRunComplete(udata: pointer) {.gcsafe, raises: [Defect].} = var fut = cast[FutureBase](udata) - # TODO: would CancelledError be swallowed here (by not checking fut.cancelled())? if fut.failed(): try: machine.setError(fut.error) @@ -98,21 +114,29 @@ proc scheduler(machine: Machine) {.async.} = let event = await machine.scheduled.get() if next =? event(machine.state): if not machine.running.isNil: + # echo "cancelling current state: ", machine.state await machine.running.cancelAndWait() + # echo "transitioning from ", $ machine.state, " to ", $ next + # echo "moving from ", if machine.state.isNil: "nil" else: $machine.state, " to ", next machine.state = next + # trace "running state", state = machine.state machine.running = machine.run(machine.state) machine.running.addCallback(onRunComplete) - machine.checkTransitions() + machine.checkTransitions() except CancelledError: discard proc start*(machine: Machine, initialState: State) = machine.scheduling = machine.scheduler() machine.schedule(Event.transition(machine.state, initialState)) + machine.started = true proc stop*(machine: Machine) = - machine.scheduling.cancel() - machine.running.cancel() + if not machine.running.isNil and not machine.running.finished: + machine.scheduling.cancel() + if not machine.running.isNil and not machine.running.finished: + machine.running.cancel() + machine.started = false proc new*(T: type Machine, transitions: seq[Transition]): T = let m = T( diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 161efaa6..803911c0 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -169,6 +169,7 @@ proc fillSlot*(market: MockMarket, host: host ) market.filled.add(slot) + market.slotState[slotId(requestId, slotIndex)] = SlotState.Filled market.emitSlotFilled(requestId, slotIndex) method fillSlot*(market: MockMarket, diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index 79f5d427..2bb364c1 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -81,501 +81,511 @@ suite "Sales": test "makes storage unavailable when matching request comes in": sales.add(availability) await market.requestStorage(request) + await sleepAsync(5000.millis) check sales.available.len == 0 -# test "ignores request when no matching storage is available": -# sales.add(availability) -# var tooBig = request -# tooBig.ask.slotSize = request.ask.slotSize + 1 -# await market.requestStorage(tooBig) -# check sales.available == @[availability] + test "ignores request when no matching storage is available": + sales.add(availability) + var tooBig = request + tooBig.ask.slotSize = request.ask.slotSize + 1 + await market.requestStorage(tooBig) + check sales.available == @[availability] -# test "ignores request when reward is too low": -# sales.add(availability) -# var tooCheap = request -# tooCheap.ask.reward = request.ask.reward - 1 -# await market.requestStorage(tooCheap) -# check sales.available == @[availability] + test "ignores request when reward is too low": + sales.add(availability) + var tooCheap = request + tooCheap.ask.reward = request.ask.reward - 1 + await market.requestStorage(tooCheap) + check sales.available == @[availability] -# test "retrieves and stores data locally": -# var storingRequest: StorageRequest -# var storingSlot: UInt256 -# var storingAvailability: Availability -# sales.onStore = proc(request: StorageRequest, -# slot: UInt256, -# availability: ?Availability) {.async.} = -# storingRequest = request -# storingSlot = slot -# check availability.isSome -# storingAvailability = !availability -# sales.add(availability) -# await market.requestStorage(request) -# check storingRequest == request -# check storingSlot < request.ask.slots.u256 -# check storingAvailability == availability + test "retrieves and stores data locally": + var storingRequest: StorageRequest + var storingSlot: UInt256 + var storingAvailability: Availability + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability) {.async.} = + storingRequest = request + storingSlot = slot + check availability.isSome + storingAvailability = !availability + sales.add(availability) + await market.requestStorage(request) + check eventually storingRequest == request + check storingSlot < request.ask.slots.u256 + check storingAvailability == availability -# test "makes storage available again when data retrieval fails": -# let error = newException(IOError, "data retrieval failed") -# sales.onStore = proc(request: StorageRequest, -# slot: UInt256, -# availability: ?Availability) {.async.} = -# raise error -# sales.add(availability) -# await market.requestStorage(request) -# check sales.available == @[availability] + test "makes storage available again when data retrieval fails": + let error = newException(IOError, "data retrieval failed") + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability) {.async.} = + raise error + sales.add(availability) + await market.requestStorage(request) + check sales.available == @[availability] -# test "generates proof of storage": -# var provingRequest: StorageRequest -# var provingSlot: UInt256 -# sales.onProve = proc(request: StorageRequest, -# slot: UInt256): Future[seq[byte]] {.async.} = -# provingRequest = request -# provingSlot = slot -# sales.add(availability) -# await market.requestStorage(request) -# check provingRequest == request -# check provingSlot < request.ask.slots.u256 + test "generates proof of storage": + var provingRequest: StorageRequest + var provingSlot: UInt256 + sales.onProve = proc(request: StorageRequest, + slot: UInt256): Future[seq[byte]] {.async.} = + provingRequest = request + provingSlot = slot + sales.add(availability) + await market.requestStorage(request) + check eventually provingRequest == request + check provingSlot < request.ask.slots.u256 -# test "fills a slot": -# sales.add(availability) -# await market.requestStorage(request) -# check market.filled.len == 1 -# check market.filled[0].requestId == request.id -# check market.filled[0].slotIndex < request.ask.slots.u256 -# check market.filled[0].proof == proof -# check market.filled[0].host == await market.getSigner() + test "fills a slot": + sales.add(availability) + await market.requestStorage(request) + check eventually market.filled.len == 1 + check market.filled[0].requestId == request.id + check market.filled[0].slotIndex < request.ask.slots.u256 + check market.filled[0].proof == proof + check market.filled[0].host == await market.getSigner() -# test "calls onSale when slot is filled": -# var soldAvailability: Availability -# var soldRequest: StorageRequest -# var soldSlotIndex: UInt256 -# sales.onSale = proc(availability: ?Availability, -# request: StorageRequest, -# slotIndex: UInt256) = -# if a =? availability: -# soldAvailability = a -# soldRequest = request -# soldSlotIndex = slotIndex -# sales.add(availability) -# await market.requestStorage(request) -# check soldAvailability == availability -# check soldRequest == request -# check soldSlotIndex < request.ask.slots.u256 + test "calls onSale when slot is filled": + var soldAvailability: Availability + var soldRequest: StorageRequest + var soldSlotIndex: UInt256 + sales.onSale = proc(availability: ?Availability, + request: StorageRequest, + slotIndex: UInt256) = + if a =? availability: + soldAvailability = a + soldRequest = request + soldSlotIndex = slotIndex + sales.add(availability) + await market.requestStorage(request) + check eventually soldAvailability == availability + check soldRequest == request + check soldSlotIndex < request.ask.slots.u256 -# test "calls onClear when storage becomes available again": -# # fail the proof intentionally to trigger `agent.finish(success=false)`, -# # which then calls the onClear callback -# sales.onProve = proc(request: StorageRequest, -# slot: UInt256): Future[seq[byte]] {.async.} = -# raise newException(IOError, "proof failed") -# var clearedAvailability: Availability -# var clearedRequest: StorageRequest -# var clearedSlotIndex: UInt256 -# sales.onClear = proc(availability: ?Availability, -# request: StorageRequest, -# slotIndex: UInt256) = -# if a =? availability: -# clearedAvailability = a -# clearedRequest = request -# clearedSlotIndex = slotIndex -# sales.add(availability) -# await market.requestStorage(request) -# check clearedAvailability == availability -# check clearedRequest == request -# check clearedSlotIndex < request.ask.slots.u256 + test "calls onClear when storage becomes available again": + # fail the proof intentionally to trigger `agent.finish(success=false)`, + # which then calls the onClear callback + sales.onProve = proc(request: StorageRequest, + slot: UInt256): Future[seq[byte]] {.async.} = + raise newException(IOError, "proof failed") + var clearedAvailability: Availability + var clearedRequest: StorageRequest + var clearedSlotIndex: UInt256 + sales.onClear = proc(availability: ?Availability, + request: StorageRequest, + slotIndex: UInt256) = + if a =? availability: + clearedAvailability = a + clearedRequest = request + clearedSlotIndex = slotIndex + sales.add(availability) + await market.requestStorage(request) + check eventually clearedAvailability == availability + check clearedRequest == request + check clearedSlotIndex < request.ask.slots.u256 -# test "makes storage available again when other host fills the slot": -# let otherHost = Address.example -# sales.onStore = proc(request: StorageRequest, -# slot: UInt256, -# availability: ?Availability) {.async.} = -# await sleepAsync(chronos.hours(1)) -# sales.add(availability) -# await market.requestStorage(request) -# for slotIndex in 0..SaleFinished when slot state is Filled": -# # let agent = newSalesAgent() -# # await fillSlot() -# # await agent.switchAsync(SaleUnknown()) -# # check (agent.state as SaleFinished).isSome + let slotId = slotId(request.id, slotIdx) + let req = if restoredFromChain: some request else: none StorageRequest + let agent = sales.newSalesAgent(slotIdx, + some availability, + request.id, + req, + requestState, + slotState, + restoredFromChain) + return agent -# # test "moves to SaleFinished when slot state is Finished": -# # let agent = newSalesAgent() -# # await fillSlot() -# # market.slotState[slotId] = SlotState.Finished -# # agent.switch(SaleUnknown()) -# # check (agent.state as SaleFinished).isSome + proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} = + let address = await market.getSigner() + market.fillSlot(request.id, + slotIdx, + proof, + address) + # market.filled.add slot + # market.slotState[slotId(request.id, slotIdx)] = SlotState.Filled -# # test "moves to SaleFinished when slot state is Paid": -# # let agent = newSalesAgent() -# # market.slotState[slotId] = SlotState.Paid -# # agent.switch(SaleUnknown()) -# # check (agent.state as SaleFinished).isSome + test "moves to SaleErrored when request is missing": + let agent = newSalesAgent(restoredFromChain = true) + market.requested.keepItIf(it != request) + agent.request = none StorageRequest + await agent.start() + check eventually agent.state of SaleErrored + check agent.lastError of SaleUnknownError + check agent.lastError.msg == "missing request" -# # test "moves to SaleErrored when slot state is Failed": -# # let agent = newSalesAgent() -# # market.slotState[slotId] = SlotState.Failed -# # agent.switch(SaleUnknown()) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + test "moves to SaleErrored when restoring state and slot state on chain is free": + let agent = newSalesAgent(restoredFromChain = true) + await agent.start() + check eventually agent.state of SaleErrored + check agent.lastError of UnexpectedSlotError + check agent.lastError.msg == "slot state on chain should not be 'free'" -# # test "moves to SaleErrored when Downloading and request expires": -# # sales.onStore = proc(request: StorageRequest, -# # slot: UInt256, -# # availability: ?Availability) {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleDownloading()) -# # clock.set(request.expiry.truncate(int64)) -# # await sleepAsync chronos.seconds(2) + test "moves to SaleFilled>SaleErrored when slot not actually filled": + let agent = newSalesAgent(restoredFromChain = true, + slotState = SlotState.Filled) + await agent.start() + check eventually agent.state of SaleErrored + check agent.lastError of SaleFilledError + check agent.lastError.msg == "Filled slot has no host address" -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleTimeoutError -# # check state.error.msg == "Sale cancelled due to timeout" + test "moves to SaleFilled>SaleFinished when slot state is Filled": + await fillSlot() + let agent = newSalesAgent(restoredFromChain = true, + slotState = SlotState.Filled) + await agent.start() + check eventually agent.state of SaleFinished -# # test "moves to SaleErrored when Downloading and request fails": -# # sales.onStore = proc(request: StorageRequest, -# # slot: UInt256, -# # availability: ?Availability) {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleDownloading()) -# # market.emitRequestFailed(request.id) -# # await sleepAsync chronos.seconds(2) + test "moves to SaleFinished when slot state is Paid": + let agent = newSalesAgent(restoredFromChain = true, slotState = SlotState.Paid) + await agent.start() + check eventually agent.state of SaleFinished -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + test "moves to SaleErrored when slot state is Failed": + let agent = newSalesAgent(restoredFromChain = true, slotState = SlotState.Failed) + await agent.start() + check eventually agent.state of SaleErrored + check agent.lastError of SaleFailedError + check agent.lastError.msg == "Sale failed" -# # test "moves to SaleErrored when Filling and request expires": -# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleFilling()) -# # clock.set(request.expiry.truncate(int64)) -# # await sleepAsync chronos.seconds(2) + test "moves to SaleErrored when Downloading and request expires": + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability) {.async.} = + await sleepAsync(chronos.minutes(1)) # "far" in the future + request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 + let agent = newSalesAgent(requestState = RequestState.New) + await agent.start() + await sleepAsync 1.millis + clock.set(request.expiry.truncate(int64)) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleTimeoutError -# # check state.error.msg == "Sale cancelled due to timeout" + check eventually agent.state of SaleErrored + check agent.lastError of SaleTimeoutError + check agent.lastError.msg == "Sale cancelled due to timeout" -# # test "moves to SaleErrored when Filling and request fails": -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleFilling()) -# # market.emitRequestFailed(request.id) -# # await sleepAsync chronos.seconds(2) + test "moves to SaleErrored when Downloading and request fails": + let agent = newSalesAgent() + await agent.start() + # await sleepAsync 1.millis + market.emitRequestFailed(request.id) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + check eventually agent.state of SaleErrored + check agent.lastError of SaleFailedError + check agent.lastError.msg == "Sale failed" -# # test "moves to SaleErrored when Finished and request expires": -# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.FinishedswitchAsync -# # await agent.switchAsync(SaleFinished()) -# # clock.set(request.expiry.truncate(int64)) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Filling and request expires": + # request.expiry = (getTime() + 2.seconds).toUnix.u256 + # let agent = newSalesAgent() + # # agent.request.setValue(some request) # allows skip of SaleUnknown state + # await agent.start(agent.getState(SaleState.SaleFilling)) + # # check eventually agent.state of SaleFilling + # # echo ">>> we're in SaleFilling!" + # # echo "setting clock expired" + # clock.set(request.expiry.truncate(int64)) + # # else: echo ">>> we never got to SaleFilling" + # # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleTimeoutError -# # check state.error.msg == "Sale cancelled due to timeout" + # check eventually agent.state of SaleErrored + # check agent.lastError of SaleTimeoutError + # check agent.lastError.msg == "Sale cancelled due to timeout" -# # test "moves to SaleErrored when Finished and request fails": -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.Finished -# # await agent.switchAsync(SaleFinished()) -# # market.emitRequestFailed(request.id) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Filling and request fails": + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleFilling()) + # market.emitRequestFailed(request.id) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleFailedError + # check state.error.msg == "Sale failed" -# # test "moves to SaleErrored when Proving and request expires": -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # return @[] -# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleProving()) -# # clock.set(request.expiry.truncate(int64)) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Finished and request expires": + # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.FinishedswitchAsync + # await agent.switchAsync(SaleFinished()) + # clock.set(request.expiry.truncate(int64)) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleTimeoutError -# # check state.error.msg == "Sale cancelled due to timeout" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleTimeoutError + # check state.error.msg == "Sale cancelled due to timeout" -# # test "moves to SaleErrored when Proving and request fails": -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # return @[] -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleProving()) -# # market.emitRequestFailed(request.id) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Finished and request fails": + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.Finished + # await agent.switchAsync(SaleFinished()) + # market.emitRequestFailed(request.id) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleFailedError + # check state.error.msg == "Sale failed" -# # test "moves to SaleErrored when Downloading and slot is filled by another host": -# # sales.onStore = proc(request: StorageRequest, -# # slot: UInt256, -# # availability: ?Availability) {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleDownloading()) -# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Proving and request expires": + # sales.onProve = proc(request: StorageRequest, + # slot: UInt256): Future[seq[byte]] {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # return @[] + # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleProving()) + # clock.set(request.expiry.truncate(int64)) + # await sleepAsync chronos.seconds(2) -# # let state = (agent.state as SaleErrored) -# # check state.isSome -# # check (!state).error.msg == "Slot filled by other host" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleTimeoutError + # check state.error.msg == "Sale cancelled due to timeout" -# # test "moves to SaleErrored when Proving and slot is filled by another host": -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # return @[] -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleProving()) -# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Proving and request fails": + # sales.onProve = proc(request: StorageRequest, + # slot: UInt256): Future[seq[byte]] {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # return @[] + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleProving()) + # market.emitRequestFailed(request.id) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of HostMismatchError -# # check state.error.msg == "Slot filled by other host" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleFailedError + # check state.error.msg == "Sale failed" -# # test "moves to SaleErrored when Filling and slot is filled by another host": -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # return @[] -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) -# # await agent.switchAsync(SaleFilling()) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Downloading and slot is filled by another host": + # sales.onStore = proc(request: StorageRequest, + # slot: UInt256, + # availability: ?Availability) {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleDownloading()) + # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of HostMismatchError -# # check state.error.msg == "Slot filled by other host" + # let state = (agent.state as SaleErrored) + # check state.isSome + # check (!state).error.msg == "Slot filled by other host" -# # test "moves from SaleDownloading to SaleFinished, calling necessary callbacks": -# # var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # onProveCalled = true -# # return @[] -# # sales.onStore = proc(request: StorageRequest, -# # slot: UInt256, -# # availability: ?Availability) {.async.} = -# # onStoreCalled = true -# # sales.onClear = proc(availability: ?Availability, -# # request: StorageRequest, -# # slotIndex: UInt256) = -# # onClearCalled = true -# # sales.onSale = proc(availability: ?Availability, -# # request: StorageRequest, -# # slotIndex: UInt256) = -# # onSaleCalled = true + # test "moves to SaleErrored when Proving and slot is filled by another host": + # sales.onProve = proc(request: StorageRequest, + # slot: UInt256): Future[seq[byte]] {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # return @[] + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleProving()) + # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) + # await sleepAsync chronos.seconds(2) -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await fillSlot(agent.slotIndex) -# # await agent.switchAsync(SaleDownloading()) -# # market.emitRequestFulfilled(request.id) -# # await sleepAsync chronos.seconds(2) + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of HostMismatchError + # check state.error.msg == "Slot filled by other host" -# # without state =? (agent.state as SaleFinished): -# # fail() -# # check onProveCalled -# # check onStoreCalled -# # check not onClearCalled -# # check onSaleCalled + # test "moves to SaleErrored when Filling and slot is filled by another host": + # sales.onProve = proc(request: StorageRequest, + # slot: UInt256): Future[seq[byte]] {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # return @[] + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) + # await agent.switchAsync(SaleFilling()) + # await sleepAsync chronos.seconds(2) -# test "loads active slots from market": -# let me = await market.getSigner() + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of HostMismatchError + # check state.error.msg == "Slot filled by other host" -# request.ask.slots = 2 -# market.requested = @[request] -# market.requestState[request.id] = RequestState.New + test "moves from SaleDownloading to SaleFinished, calling necessary callbacks": + var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool + sales.onProve = proc(request: StorageRequest, + slot: UInt256): Future[seq[byte]] {.async.} = + onProveCalled = true + return array[32, byte].example.toSeq + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability) {.async.} = + onStoreCalled = true + sales.onClear = proc(availability: ?Availability, + request: StorageRequest, + slotIndex: UInt256) = + onClearCalled = true + sales.onSale = proc(availability: ?Availability, + request: StorageRequest, + slotIndex: UInt256) = + onSaleCalled = true -# let slot0 = MockSlot(requestId: request.id, -# slotIndex: 0.u256, -# proof: proof, -# host: me) -# await fillSlot(slot0.slotIndex) + let agent = newSalesAgent() + # market.requested.add request + # market.requestState[request.id] = RequestState.New + await fillSlot(agent.slotIndex) + await agent.start() + # await agent.switchAsync(SaleDownloading()) + market.emitRequestFulfilled(request.id) + await sleepAsync chronos.seconds(2) -# let slot1 = MockSlot(requestId: request.id, -# slotIndex: 1.u256, -# proof: proof, -# host: me) -# await fillSlot(slot1.slotIndex) -# market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)] -# market.requested = @[request] -# market.activeRequests[me] = @[request.id] + check eventually agent.state of SaleFinished + check onProveCalled + check onStoreCalled + check not onClearCalled + check onSaleCalled -# await sales.load() -# let expected = SalesAgent(sales: sales, -# requestId: request.id, -# availability: none Availability, -# request: some request) -# # because sales.load() calls agent.start, we won't know the slotIndex -# # randomly selected for the agent, and we also won't know the value of -# # `failed`/`fulfilled`/`cancelled` futures, so we need to compare -# # the properties we know -# # TODO: when calling sales.load(), slot index should be restored and not -# # randomly re-assigned, so this may no longer be needed -# proc `==` (agent0, agent1: SalesAgent): bool = -# return agent0.sales == agent1.sales and -# agent0.requestId == agent1.requestId and -# agent0.availability == agent1.availability and -# agent0.request == agent1.request + test "loads active slots from market": + let me = await market.getSigner() -# check sales.agents.all(agent => agent == expected) + request.ask.slots = 2 + market.requested = @[request] + market.requestState[request.id] = RequestState.New + + let slot0 = MockSlot(requestId: request.id, + slotIndex: 0.u256, + proof: proof, + host: me) + await fillSlot(slot0.slotIndex) + + let slot1 = MockSlot(requestId: request.id, + slotIndex: 1.u256, + proof: proof, + host: me) + await fillSlot(slot1.slotIndex) + market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)] + market.requested = @[request] + market.activeRequests[me] = @[request.id] + + await sales.load() + let expected = SalesAgent(sales: sales, + availability: none Availability, + request: some request) + # because sales.load() calls agent.start, we won't know the slotIndex + # randomly selected for the agent, and we also won't know the value of + # `failed`/`fulfilled`/`cancelled` futures, so we need to compare + # the properties we know + # TODO: when calling sales.load(), slot index should be restored and not + # randomly re-assigned, so this may no longer be needed + proc `==` (agent0, agent1: SalesAgent): bool = + return agent0.sales == agent1.sales and + agent0.availability == agent1.availability and + agent0.request == agent1.request + + check sales.agents.all(agent => agent == expected) diff --git a/tests/codex/utils/testasyncstatemachine.nim b/tests/codex/utils/testasyncstatemachine.nim index caaba7f8..97003eb5 100644 --- a/tests/codex/utils/testasyncstatemachine.nim +++ b/tests/codex/utils/testasyncstatemachine.nim @@ -1,3 +1,4 @@ +import std/typetraits import pkg/asynctest import pkg/questionable import pkg/chronos @@ -75,6 +76,7 @@ suite "async state machines": MyMachine(m).errored.value ) ]) + machine.addState state1, state2, state3, state4, state5, state6 machine.slotsFilled = machine.newTransitionProperty(0) machine.requestFinished = machine.newTransitionProperty(false)