diff --git a/codex/sales.nim b/codex/sales.nim index 8a715569..1a4eb4a6 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -1,4 +1,5 @@ import pkg/questionable +import pkg/questionable/results import pkg/upraises import pkg/stint import pkg/nimcrypto @@ -11,7 +12,7 @@ import ./errors import ./contracts/requests import ./sales/salesagent import ./sales/statemachine -import ./sales/states/[start, downloading, unknown] +import ./sales/states/unknown ## Sales holds a list of available storage that it may sell. ## @@ -72,26 +73,21 @@ proc findSlotIndex(numSlots: uint64, proc handleRequest(sales: Sales, requestId: RequestId, - ask: StorageAsk) = + ask: StorageAsk) {.raises:[CatchableError].} = let availability = sales.findAvailability(ask) # TODO: check if random slot is actually available (not already filled) let slotIndex = randomSlotIndex(ask.slots) - without request =? await sales.market.getRequest(requestId): - raise newException(SalesError, "Failed to get request on chain") - - let me = await sales.market.getSigner() - let agent = newSalesAgent( sales, slotIndex, availability, - request, - me, + requestId, + none StorageRequest, RequestState.New, SlotState.Free ) - await agent.start(SaleUnknown.new()) + waitFor agent.start() sales.agents.add agent proc load*(sales: Sales) {.async.} = @@ -100,7 +96,6 @@ proc load*(sales: Sales) {.async.} = # TODO: restore availability from disk let requestIds = await market.myRequests() let slotIds = await market.mySlots() - let me = await market.getSigner() for slotId in slotIds: # TODO: this needs to be optimised @@ -109,28 +104,34 @@ proc load*(sales: Sales) {.async.} = without slotIndex =? findSlotIndex(request.ask.slots, request.id, slotId): - raiseAssert "could not find slot index" + raise newException(SalesError, "could not find slot index") # TODO: should be optimised (maybe get everything in one call: request, request state, slot state) - let requestState = await market.requestState(request.id) + without requestState =? await market.requestState(request.id): + raise newException(SalesError, "request state could not be determined") + let slotState = await market.slotState(slotId) let agent = newSalesAgent( sales, slotIndex, availability, - request, - me, + request.id, + some request, requestState, - slotState) - await agent.start(SaleUnknown.new()) + slotState, + restoredFromChain = true) + await agent.start() sales.agents.add agent proc start*(sales: Sales) {.async.} = doAssert sales.subscription.isNone, "Sales already started" proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} = - sales.handleRequest(requestId, ask) + try: + sales.handleRequest(requestId, ask) + except CatchableError as e: + error "Failed to handle storage request", error = e.msg try: sales.subscription = some await sales.market.subscribeRequests(onRequest) diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 0ec09193..4066d316 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -1,19 +1,29 @@ import pkg/chronos -import pkg/upraises import pkg/stint import ./statemachine import ./states/[cancelled, downloading, errored, failed, finished, filled, filling, proving, unknown] +import ./subscriptions import ../contracts/requests +type SaleState* {.pure.} = enum + SaleUnknown, + SaleDownloading, + SaleProving, + SaleFilling, + SaleFilled, + SaleCancelled, + SaleFailed, + SaleErrored + proc newSalesAgent*(sales: Sales, slotIndex: UInt256, availability: ?Availability, - request: StorageRequest, - me: Address, + requestId: RequestId, + request: ?StorageRequest, requestState: RequestState, slotState: SlotState, - restoredFromChain: bool): SalesAgent = + restoredFromChain: bool = false): SalesAgent = let saleUnknown = SaleUnknown.new() let saleDownloading = SaleDownloading.new() @@ -25,11 +35,18 @@ proc newSalesAgent*(sales: Sales, let saleErrored = SaleErrored.new() let agent = SalesAgent.new(@[ + Transition.new( + AnyState.new(), + saleErrored, + proc(m: Machine, s: State): bool = + SalesAgent(m).errored.value + ), Transition.new( saleUnknown, saleDownloading, proc(m: Machine, s: State): bool = let agent = SalesAgent(m) + not agent.restoredFromChain and agent.requestState.value == RequestState.New and agent.slotState.value == SlotState.Free ), @@ -84,12 +101,7 @@ proc newSalesAgent*(sales: Sales, agent.slotState.value in @[SlotState.Finished, SlotState.Paid] or agent.requestState.value == RequestState.Finished ), - Transition.new( - AnyState.new(), - saleErrored, - proc(m: Machine, s: State): bool = - SalesAgent(m).errored.value - ), + Transition.new( saleDownloading, saleProving, @@ -106,133 +118,34 @@ proc newSalesAgent*(sales: Sales, saleFilled, SaleFinished.new(), proc(m: Machine, s: State): bool = - let agent = SalesAgent(m) - without host =? agent.slotHost.value: - return false - host == agent.me - ), - Transition.new( - saleFilled, - saleErrored, - proc(m: Machine, s: State): bool = - let agent = SalesAgent(m) - without host =? agent.slotHost.value: - return false - if host != agent.me: - agent.lastError = newException(HostMismatchError, - "Slot filled by other host") - return true - else: return false - ), - Transition.new( - saleUnknown, - saleErrored, - proc(m: Machine, s: State): bool = - let agent = SalesAgent(m) - if agent.restoredFromChain and agent.slotState.value == SlotState.Free: - agent.lastError = newException(SaleUnknownError, - "cannot retrieve slot state") - return true - else: return false + SalesAgent(m).slotHostIsMe.value ), ]) + agent.addState (SaleState.SaleUnknown.int, saleUnknown), + (SaleState.SaleDownloading.int, saleDownloading), + (SaleState.SaleProving.int, saleProving), + (SaleState.SaleFilling.int, saleFilling), + (SaleState.SaleFilled.int, saleFilled), + (SaleState.SaleCancelled.int, saleCancelled), + (SaleState.SaleFailed.int, saleFailed), + (SaleState.SaleErrored.int, saleErrored) agent.slotState = agent.newTransitionProperty(slotState) agent.requestState = agent.newTransitionProperty(requestState) agent.proof = agent.newTransitionProperty(newSeq[byte]()) - agent.slotHost = agent.newTransitionProperty(none Address) + agent.slotHostIsMe = agent.newTransitionProperty(false) agent.downloaded = agent.newTransitionProperty(false) agent.sales = sales agent.availability = availability agent.slotIndex = slotIndex + agent.requestId = requestId agent.request = request - agent.me = me + agent.restoredFromChain = restoredFromChain return agent -proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.} -proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.} -proc subscribeSlotFill*(agent: SalesAgent): Future[void] {.gcsafe.} - -proc start*(agent: SalesAgent, initialState: State) {.async.} = - await agent.subscribeCancellation() - await agent.subscribeFailure() - await agent.subscribeSlotFill() +proc start*(agent: SalesAgent, + initialState: State = agent.getState(SaleState.SaleUnknown)) {.async.} = + await agent.subscribe() procCall Machine(agent).start(initialState) proc stop*(agent: SalesAgent) {.async.} = - try: - await agent.subscribeFulfilled.unsubscribe() - except CatchableError: - discard - try: - await agent.subscribeFailed.unsubscribe() - except CatchableError: - discard - try: - await agent.subscribeSlotFilled.unsubscribe() - except CatchableError: - discard - if not agent.waitForCancelled.completed: - await agent.waitForCancelled.cancelAndWait() - - procCall Machine(agent).stop() - -proc subscribeCancellation*(agent: SalesAgent) {.async.} = - let market = agent.sales.market - - proc onCancelled() {.async.} = - let clock = agent.sales.clock - - await clock.waitUntil(agent.request.expiry.truncate(int64)) - await agent.subscribeFulfilled.unsubscribe() - agent.requestState.setValue(RequestState.Cancelled) - - agent.waitForCancelled = onCancelled() - - proc onFulfilled(_: RequestId) = - agent.waitForCancelled.cancel() - - agent.subscribeFulfilled = - await market.subscribeFulfillment(agent.request.id, onFulfilled) - -# TODO: move elsewhere -proc asyncSpawn(future: Future[void], ignore: type CatchableError) = - proc ignoringError {.async.} = - try: - await future - except ignore: - discard - asyncSpawn ignoringError() - -proc subscribeFailure*(agent: SalesAgent) {.async.} = - let market = agent.sales.market - - proc onFailed(_: RequestId) {.upraises:[], gcsafe.} = - asyncSpawn agent.subscribeFailed.unsubscribe(), ignore = CatchableError - try: - agent.requestState.setValue(RequestState.Failed) - except AsyncQueueFullError as e: - raiseAssert "State machine critical failure: " & e.msg - - agent.subscribeFailed = - await market.subscribeRequestFailed(agent.request.id, onFailed) - -proc subscribeSlotFill*(agent: SalesAgent) {.async.} = - let market = agent.sales.market - - proc onSlotFilled( - requestId: RequestId, - slotIndex: UInt256) {.upraises:[], gcsafe.} = - - let market = agent.sales.market - - asyncSpawn agent.subscribeSlotFilled.unsubscribe(), ignore = CatchableError - try: - agent.slotState.setValue(SlotState.Filled) - except AsyncQueueFullError as e: - raiseAssert "State machine critical failure: " & e.msg - - agent.subscribeSlotFilled = - await market.subscribeSlotFilled(agent.request.id, - agent.slotIndex, - onSlotFilled) - + await agent.unsubscribe() diff --git a/codex/sales/statemachine.nim b/codex/sales/statemachine.nim index 64e383e2..fedc69ef 100644 --- a/codex/sales/statemachine.nim +++ b/codex/sales/statemachine.nim @@ -4,6 +4,7 @@ import pkg/questionable import pkg/upraises import ../errors import ../utils/asyncstatemachine +import ../utils/optionalcast import ../market import ../clock import ../proving @@ -13,6 +14,7 @@ export market export clock export asyncstatemachine export proving +export optionalcast type Sales* = ref object @@ -30,21 +32,20 @@ type sales*: Sales ask*: StorageAsk availability*: ?Availability # TODO: when availability persistence is added, change this to not optional - request*: StorageRequest + requestId*: RequestId + request*: ?StorageRequest slotIndex*: UInt256 subscribeFailed*: market.Subscription subscribeFulfilled*: market.Subscription subscribeSlotFilled*: market.Subscription waitForCancelled*: Future[void] - me*: Address restoredFromChain*: bool slotState*: TransitionProperty[SlotState] requestState*: TransitionProperty[RequestState] proof*: TransitionProperty[seq[byte]] - slotHost*: TransitionProperty[?Address] + slotHostIsMe*: TransitionProperty[bool] downloaded*: TransitionProperty[bool] - SaleState* = ref object of State - SaleError* = ref object of CodexError + SaleError* = object of CodexError Availability* = object id*: array[32, byte] size*: UInt256 diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 60d17de8..4fdb4224 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -1,13 +1,13 @@ import ../statemachine -import ./errored type SaleCancelled* = ref object of State - SaleCancelledError* = object of CatchableError + SaleCancelledError* = object of SaleError SaleTimeoutError* = object of SaleCancelledError method `$`*(state: SaleCancelled): string = "SaleCancelled" method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let error = newException(SaleTimeoutError, "Sale cancelled due to timeout") machine.setError(error) diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 710be34d..52442a71 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -17,16 +17,22 @@ type method `$`*(state: SaleDownloading): string = "SaleDownloading" method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) try: without onStore =? agent.sales.onStore: raiseAssert "onStore callback not set" + without request =? agent.request: + let error = newException(SaleDownloadingError, "missing request") + agent.setError error + return + if availability =? agent.availability: agent.sales.remove(availability) - await onStore(agent.request, agent.slotIndex, agent.availability) + await onStore(request, agent.slotIndex, agent.availability) agent.downloaded.setValue(true) except CancelledError: diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 739f0da8..d684d244 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -6,12 +6,12 @@ type SaleErrored* = ref object of State method `$`*(state: SaleErrored): string = "SaleErrored" method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) if onClear =? agent.sales.onClear and - request =? agent.request and - slotIndex =? agent.slotIndex: - onClear(agent.availability, request, slotIndex) + request =? agent.request: + onClear(agent.availability, request, agent.slotIndex) # TODO: when availability persistence is added, change this to not optional # NOTE: with this in place, restoring state for a restarted node will diff --git a/codex/sales/states/failed.nim b/codex/sales/states/failed.nim index 583187d6..b944d6bc 100644 --- a/codex/sales/states/failed.nim +++ b/codex/sales/states/failed.nim @@ -1,4 +1,3 @@ -import ./errored import ../statemachine type @@ -8,5 +7,6 @@ type method `$`*(state: SaleFailed): string = "SaleFailed" method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let error = newException(SaleFailedError, "Sale failed") machine.setError error diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim index 4b5bfe6d..b84a1c31 100644 --- a/codex/sales/states/filled.nim +++ b/codex/sales/states/filled.nim @@ -1,25 +1,25 @@ import pkg/questionable -import ./errored -import ./finished -import ./cancelled -import ./failed import ../statemachine type SaleFilled* = ref object of State - SaleFilledError* = object of CatchableError + SaleFilledError* = object of SaleError HostMismatchError* = object of SaleFilledError method `$`*(state: SaleFilled): string = "SaleFilled" method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) + let market = agent.sales.market - try: - let market = agent.sales.market + without slotHost =? await market.getHost(agent.requestId, agent.slotIndex): + let error = newException(SaleFilledError, "Filled slot has no host address") + agent.setError error + return - let slotHost = await market.getHost(agent.request.id, agent.slotIndex) - agent.slotHost.setValue(slotHost) - - except CancelledError: - raise + let me = await market.getSigner() + if slotHost == me: + agent.slotHostIsMe.setValue(true) + else: + agent.setError newException(HostMismatchError, "Slot filled by other host") diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index f1723861..b0c1d9e1 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -1,25 +1,15 @@ -import pkg/upraises import ../../market import ../statemachine -import ./filled -import ./errored -import ./cancelled -import ./failed type SaleFilling* = ref object of State - proof*: seq[byte] - SaleFillingError* = object of CatchableError + SaleFillingError* = object of SaleError method `$`*(state: SaleFilling): string = "SaleFilling" method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) + let market = agent.sales.market - try: - let market = agent.sales.market - - await market.fillSlot(agent.request.id, agent.slotIndex, state.proof) - - except CancelledError: - raise + await market.fillSlot(agent.requestId, agent.slotIndex, agent.proof.value) diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index 4db59655..3fbf56ad 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -6,26 +6,27 @@ import ../statemachine type SaleFinished* = ref object of State - SaleFinishedError* = object of CatchableError + SaleFinishedError* = object of SaleError method `$`*(state: SaleFinished): string = "SaleFinished" method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) + let slotIndex = agent.slotIndex - try: - if request =? agent.request and - slotIndex =? agent.slotIndex: - agent.sales.proving.add(request.slotId(slotIndex)) + without request =? agent.request: + let error = newException(SaleFinishedError, "missing request") + agent.setError error + return - if onSale =? agent.sales.onSale: - onSale(agent.availability, request, slotIndex) + agent.sales.proving.add(request.slotId(slotIndex)) - # TODO: Keep track of contract completion using local clock. When contract - # has finished, we need to add back availability to the sales module. - # This will change when the state machine is updated to include the entire - # sales process, as well as when availability is persisted, so leaving it - # as a TODO for now. + if onSale =? agent.sales.onSale: + onSale(agent.availability, request, slotIndex) - except CancelledError: - raise + # TODO: Keep track of contract completion using local clock. When contract + # has finished, we need to add back availability to the sales module. + # This will change when the state machine is updated to include the entire + # sales process, as well as when availability is persisted, so leaving it + # as a TODO for now. diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index 4fd31e9d..e5ee3265 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -1,25 +1,22 @@ import ../statemachine -import ./filling -import ./cancelled -import ./failed -import ./filled -import ./errored type SaleProving* = ref object of State - SaleProvingError* = object of CatchableError + SaleProvingError* = object of SaleError method `$`*(state: SaleProving): string = "SaleProving" method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} = + # echo "running ", state let agent = SalesAgent(machine) - try: - without onProve =? agent.sales.onProve: - raiseAssert "onProve callback not set" + without onProve =? agent.sales.onProve: + raiseAssert "onProve callback not set" - let proof = await onProve(agent.request, agent.slotIndex) - agent.proof.setValue(proof) + without request =? agent.request: + let error = newException(SaleProvingError, "missing request") + agent.setError error + return - except CancelledError: - raise + let proof = await onProve(request, agent.slotIndex) + agent.proof.setValue(proof) diff --git a/codex/sales/states/unknown.nim b/codex/sales/states/unknown.nim index 7bc15c43..aab262b6 100644 --- a/codex/sales/states/unknown.nim +++ b/codex/sales/states/unknown.nim @@ -1,14 +1,29 @@ import ../statemachine -import ./filled -import ./finished -import ./failed -import ./errored -import ./cancelled +import ../subscriptions type SaleUnknown* = ref object of State - SaleUnknownError* = object of CatchableError + SaleUnknownError* = object of SaleError UnexpectedSlotError* = object of SaleUnknownError method `$`*(state: SaleUnknown): string = "SaleUnknown" +method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} = + # echo "running ", state + let agent = SalesAgent(machine) + let market = agent.sales.market + + if agent.request.isNone: + agent.request = await market.getRequest(agent.requestId) + if agent.request.isSome: + await agent.subscribe() + + if agent.request.isNone: + agent.setError newException(SaleUnknownError, "missing request") + return + + if agent.restoredFromChain and agent.slotState.value == SlotState.Free: + agent.setError newException(UnexpectedSlotError, + "slot state on chain should not be 'free'") + return + diff --git a/codex/sales/subscriptions.nim b/codex/sales/subscriptions.nim new file mode 100644 index 00000000..823df6bd --- /dev/null +++ b/codex/sales/subscriptions.nim @@ -0,0 +1,92 @@ +import pkg/chronos +import pkg/upraises +import pkg/stint +import ./statemachine +import ../contracts/requests + +# TODO: move elsewhere +proc asyncSpawn(future: Future[void], ignore: type CatchableError) = + proc ignoringError {.async.} = + try: + await future + except ignore: + discard + asyncSpawn ignoringError() + +proc subscribeCancellation*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + proc onCancelled() {.async.} = + let clock = agent.sales.clock + without request =? agent.request: + return + + await clock.waitUntil(request.expiry.truncate(int64)) + asyncSpawn agent.subscribeFulfilled.unsubscribe(), ignore = CatchableError + agent.requestState.setValue(RequestState.Cancelled) + + proc onFulfilled(_: RequestId) = + agent.waitForCancelled.cancel() + + agent.subscribeFulfilled = + await market.subscribeFulfillment(agent.requestId, onFulfilled) + + agent.waitForCancelled = onCancelled() + +proc subscribeFailure*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + proc onFailed(_: RequestId) {.upraises:[], gcsafe.} = + asyncSpawn agent.subscribeFailed.unsubscribe(), ignore = CatchableError + try: + agent.requestState.setValue(RequestState.Failed) + except AsyncQueueFullError as e: + raiseAssert "State machine critical failure: " & e.msg + + agent.subscribeFailed = + await market.subscribeRequestFailed(agent.requestId, onFailed) + +proc subscribeSlotFill*(agent: SalesAgent) {.async.} = + let market = agent.sales.market + + proc onSlotFilled( + requestId: RequestId, + slotIndex: UInt256) {.upraises:[], gcsafe.} = + + let market = agent.sales.market + + asyncSpawn agent.subscribeSlotFilled.unsubscribe(), ignore = CatchableError + try: + agent.slotState.setValue(SlotState.Filled) + except AsyncQueueFullError as e: + raiseAssert "State machine critical failure: " & e.msg + + agent.subscribeSlotFilled = + await market.subscribeSlotFilled(agent.requestId, + agent.slotIndex, + onSlotFilled) + +proc subscribe*(agent: SalesAgent) {.async.} = + # TODO: Check that the subscription handlers aren't already assigned before + # assigning. This will allow for agent.subscribe to be called multiple times + await agent.subscribeCancellation() + await agent.subscribeFailure() + await agent.subscribeSlotFill() + +proc unsubscribe*(agent: SalesAgent) {.async.} = + try: + await agent.subscribeFulfilled.unsubscribe() + except CatchableError: + discard + try: + await agent.subscribeFailed.unsubscribe() + except CatchableError: + discard + try: + await agent.subscribeSlotFilled.unsubscribe() + except CatchableError: + discard + if not agent.waitForCancelled.completed: + await agent.waitForCancelled.cancelAndWait() + + procCall Machine(agent).stop() diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index fd1b0a4f..1b2a3a56 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -1,4 +1,6 @@ +import std/typetraits # DELETE ME import std/sequtils +import std/tables import pkg/questionable import pkg/chronos import pkg/chronicles @@ -19,6 +21,8 @@ type transitions: seq[Transition] errored*: TransitionProperty[bool] lastError*: ref CatchableError + states: Table[int, State] + started: bool State* = ref object of RootObj AnyState* = ref object of State Event* = proc(state: State): ?State {.gcsafe, upraises:[].} @@ -50,19 +54,32 @@ proc transition*(_: type Event, previous, next: State): Event = if state == previous: return some next +method `$`*(state: State): string {.base.} = "Base state" + proc state*(machine: Machine): State = machine.state +template getState*(machine: Machine, id: untyped): State = + machine.states[id.int] + +proc addState*(machine: Machine, states: varargs[(int, State)]) = + machine.states = states.toTable + proc schedule*(machine: Machine, event: Event) = machine.scheduled.putNoWait(event) proc checkTransitions(machine: Machine) = + if not machine.started: + return + for transition in machine.transitions: if transition.trigger(machine, machine.state) and machine.state != transition.nextState and # avoid transitioning to self (machine.state == nil or machine.state in transition.prevStates or # state instance, multiple transition.prevStates.any(proc (s: State): bool = s of AnyState)): + # echo "scheduling transition from ", machine.state, " to ", transition.nextState machine.schedule(Event.transition(machine.state, transition.nextState)) + return proc setValue*[T](prop: TransitionProperty[T], value: T) = prop.value = value @@ -73,12 +90,12 @@ proc setError*(machine: Machine, error: ref CatchableError) = machine.errored.value = false # clears error without triggering transitions machine.lastError = error # stores error in state -method run*(state: State): Future[?State] {.base, upraises:[].} = +method run*(state: State, machine: Machine): Future[?State] {.base, upraises:[].} = discard proc run(machine: Machine, state: State) {.async.} = try: - if next =? await state.run(): + if next =? await state.run(machine): machine.schedule(Event.transition(state, next)) except CancelledError: discard @@ -86,7 +103,6 @@ proc run(machine: Machine, state: State) {.async.} = proc scheduler(machine: Machine) {.async.} = proc onRunComplete(udata: pointer) {.gcsafe, raises: [Defect].} = var fut = cast[FutureBase](udata) - # TODO: would CancelledError be swallowed here (by not checking fut.cancelled())? if fut.failed(): try: machine.setError(fut.error) @@ -98,21 +114,29 @@ proc scheduler(machine: Machine) {.async.} = let event = await machine.scheduled.get() if next =? event(machine.state): if not machine.running.isNil: + # echo "cancelling current state: ", machine.state await machine.running.cancelAndWait() + # echo "transitioning from ", $ machine.state, " to ", $ next + # echo "moving from ", if machine.state.isNil: "nil" else: $machine.state, " to ", next machine.state = next + # trace "running state", state = machine.state machine.running = machine.run(machine.state) machine.running.addCallback(onRunComplete) - machine.checkTransitions() + machine.checkTransitions() except CancelledError: discard proc start*(machine: Machine, initialState: State) = machine.scheduling = machine.scheduler() machine.schedule(Event.transition(machine.state, initialState)) + machine.started = true proc stop*(machine: Machine) = - machine.scheduling.cancel() - machine.running.cancel() + if not machine.running.isNil and not machine.running.finished: + machine.scheduling.cancel() + if not machine.running.isNil and not machine.running.finished: + machine.running.cancel() + machine.started = false proc new*(T: type Machine, transitions: seq[Transition]): T = let m = T( diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 161efaa6..803911c0 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -169,6 +169,7 @@ proc fillSlot*(market: MockMarket, host: host ) market.filled.add(slot) + market.slotState[slotId(requestId, slotIndex)] = SlotState.Filled market.emitSlotFilled(requestId, slotIndex) method fillSlot*(market: MockMarket, diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index 79f5d427..2bb364c1 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -81,501 +81,511 @@ suite "Sales": test "makes storage unavailable when matching request comes in": sales.add(availability) await market.requestStorage(request) + await sleepAsync(5000.millis) check sales.available.len == 0 -# test "ignores request when no matching storage is available": -# sales.add(availability) -# var tooBig = request -# tooBig.ask.slotSize = request.ask.slotSize + 1 -# await market.requestStorage(tooBig) -# check sales.available == @[availability] + test "ignores request when no matching storage is available": + sales.add(availability) + var tooBig = request + tooBig.ask.slotSize = request.ask.slotSize + 1 + await market.requestStorage(tooBig) + check sales.available == @[availability] -# test "ignores request when reward is too low": -# sales.add(availability) -# var tooCheap = request -# tooCheap.ask.reward = request.ask.reward - 1 -# await market.requestStorage(tooCheap) -# check sales.available == @[availability] + test "ignores request when reward is too low": + sales.add(availability) + var tooCheap = request + tooCheap.ask.reward = request.ask.reward - 1 + await market.requestStorage(tooCheap) + check sales.available == @[availability] -# test "retrieves and stores data locally": -# var storingRequest: StorageRequest -# var storingSlot: UInt256 -# var storingAvailability: Availability -# sales.onStore = proc(request: StorageRequest, -# slot: UInt256, -# availability: ?Availability) {.async.} = -# storingRequest = request -# storingSlot = slot -# check availability.isSome -# storingAvailability = !availability -# sales.add(availability) -# await market.requestStorage(request) -# check storingRequest == request -# check storingSlot < request.ask.slots.u256 -# check storingAvailability == availability + test "retrieves and stores data locally": + var storingRequest: StorageRequest + var storingSlot: UInt256 + var storingAvailability: Availability + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability) {.async.} = + storingRequest = request + storingSlot = slot + check availability.isSome + storingAvailability = !availability + sales.add(availability) + await market.requestStorage(request) + check eventually storingRequest == request + check storingSlot < request.ask.slots.u256 + check storingAvailability == availability -# test "makes storage available again when data retrieval fails": -# let error = newException(IOError, "data retrieval failed") -# sales.onStore = proc(request: StorageRequest, -# slot: UInt256, -# availability: ?Availability) {.async.} = -# raise error -# sales.add(availability) -# await market.requestStorage(request) -# check sales.available == @[availability] + test "makes storage available again when data retrieval fails": + let error = newException(IOError, "data retrieval failed") + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability) {.async.} = + raise error + sales.add(availability) + await market.requestStorage(request) + check sales.available == @[availability] -# test "generates proof of storage": -# var provingRequest: StorageRequest -# var provingSlot: UInt256 -# sales.onProve = proc(request: StorageRequest, -# slot: UInt256): Future[seq[byte]] {.async.} = -# provingRequest = request -# provingSlot = slot -# sales.add(availability) -# await market.requestStorage(request) -# check provingRequest == request -# check provingSlot < request.ask.slots.u256 + test "generates proof of storage": + var provingRequest: StorageRequest + var provingSlot: UInt256 + sales.onProve = proc(request: StorageRequest, + slot: UInt256): Future[seq[byte]] {.async.} = + provingRequest = request + provingSlot = slot + sales.add(availability) + await market.requestStorage(request) + check eventually provingRequest == request + check provingSlot < request.ask.slots.u256 -# test "fills a slot": -# sales.add(availability) -# await market.requestStorage(request) -# check market.filled.len == 1 -# check market.filled[0].requestId == request.id -# check market.filled[0].slotIndex < request.ask.slots.u256 -# check market.filled[0].proof == proof -# check market.filled[0].host == await market.getSigner() + test "fills a slot": + sales.add(availability) + await market.requestStorage(request) + check eventually market.filled.len == 1 + check market.filled[0].requestId == request.id + check market.filled[0].slotIndex < request.ask.slots.u256 + check market.filled[0].proof == proof + check market.filled[0].host == await market.getSigner() -# test "calls onSale when slot is filled": -# var soldAvailability: Availability -# var soldRequest: StorageRequest -# var soldSlotIndex: UInt256 -# sales.onSale = proc(availability: ?Availability, -# request: StorageRequest, -# slotIndex: UInt256) = -# if a =? availability: -# soldAvailability = a -# soldRequest = request -# soldSlotIndex = slotIndex -# sales.add(availability) -# await market.requestStorage(request) -# check soldAvailability == availability -# check soldRequest == request -# check soldSlotIndex < request.ask.slots.u256 + test "calls onSale when slot is filled": + var soldAvailability: Availability + var soldRequest: StorageRequest + var soldSlotIndex: UInt256 + sales.onSale = proc(availability: ?Availability, + request: StorageRequest, + slotIndex: UInt256) = + if a =? availability: + soldAvailability = a + soldRequest = request + soldSlotIndex = slotIndex + sales.add(availability) + await market.requestStorage(request) + check eventually soldAvailability == availability + check soldRequest == request + check soldSlotIndex < request.ask.slots.u256 -# test "calls onClear when storage becomes available again": -# # fail the proof intentionally to trigger `agent.finish(success=false)`, -# # which then calls the onClear callback -# sales.onProve = proc(request: StorageRequest, -# slot: UInt256): Future[seq[byte]] {.async.} = -# raise newException(IOError, "proof failed") -# var clearedAvailability: Availability -# var clearedRequest: StorageRequest -# var clearedSlotIndex: UInt256 -# sales.onClear = proc(availability: ?Availability, -# request: StorageRequest, -# slotIndex: UInt256) = -# if a =? availability: -# clearedAvailability = a -# clearedRequest = request -# clearedSlotIndex = slotIndex -# sales.add(availability) -# await market.requestStorage(request) -# check clearedAvailability == availability -# check clearedRequest == request -# check clearedSlotIndex < request.ask.slots.u256 + test "calls onClear when storage becomes available again": + # fail the proof intentionally to trigger `agent.finish(success=false)`, + # which then calls the onClear callback + sales.onProve = proc(request: StorageRequest, + slot: UInt256): Future[seq[byte]] {.async.} = + raise newException(IOError, "proof failed") + var clearedAvailability: Availability + var clearedRequest: StorageRequest + var clearedSlotIndex: UInt256 + sales.onClear = proc(availability: ?Availability, + request: StorageRequest, + slotIndex: UInt256) = + if a =? availability: + clearedAvailability = a + clearedRequest = request + clearedSlotIndex = slotIndex + sales.add(availability) + await market.requestStorage(request) + check eventually clearedAvailability == availability + check clearedRequest == request + check clearedSlotIndex < request.ask.slots.u256 -# test "makes storage available again when other host fills the slot": -# let otherHost = Address.example -# sales.onStore = proc(request: StorageRequest, -# slot: UInt256, -# availability: ?Availability) {.async.} = -# await sleepAsync(chronos.hours(1)) -# sales.add(availability) -# await market.requestStorage(request) -# for slotIndex in 0..SaleFinished when slot state is Filled": -# # let agent = newSalesAgent() -# # await fillSlot() -# # await agent.switchAsync(SaleUnknown()) -# # check (agent.state as SaleFinished).isSome + let slotId = slotId(request.id, slotIdx) + let req = if restoredFromChain: some request else: none StorageRequest + let agent = sales.newSalesAgent(slotIdx, + some availability, + request.id, + req, + requestState, + slotState, + restoredFromChain) + return agent -# # test "moves to SaleFinished when slot state is Finished": -# # let agent = newSalesAgent() -# # await fillSlot() -# # market.slotState[slotId] = SlotState.Finished -# # agent.switch(SaleUnknown()) -# # check (agent.state as SaleFinished).isSome + proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} = + let address = await market.getSigner() + market.fillSlot(request.id, + slotIdx, + proof, + address) + # market.filled.add slot + # market.slotState[slotId(request.id, slotIdx)] = SlotState.Filled -# # test "moves to SaleFinished when slot state is Paid": -# # let agent = newSalesAgent() -# # market.slotState[slotId] = SlotState.Paid -# # agent.switch(SaleUnknown()) -# # check (agent.state as SaleFinished).isSome + test "moves to SaleErrored when request is missing": + let agent = newSalesAgent(restoredFromChain = true) + market.requested.keepItIf(it != request) + agent.request = none StorageRequest + await agent.start() + check eventually agent.state of SaleErrored + check agent.lastError of SaleUnknownError + check agent.lastError.msg == "missing request" -# # test "moves to SaleErrored when slot state is Failed": -# # let agent = newSalesAgent() -# # market.slotState[slotId] = SlotState.Failed -# # agent.switch(SaleUnknown()) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + test "moves to SaleErrored when restoring state and slot state on chain is free": + let agent = newSalesAgent(restoredFromChain = true) + await agent.start() + check eventually agent.state of SaleErrored + check agent.lastError of UnexpectedSlotError + check agent.lastError.msg == "slot state on chain should not be 'free'" -# # test "moves to SaleErrored when Downloading and request expires": -# # sales.onStore = proc(request: StorageRequest, -# # slot: UInt256, -# # availability: ?Availability) {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleDownloading()) -# # clock.set(request.expiry.truncate(int64)) -# # await sleepAsync chronos.seconds(2) + test "moves to SaleFilled>SaleErrored when slot not actually filled": + let agent = newSalesAgent(restoredFromChain = true, + slotState = SlotState.Filled) + await agent.start() + check eventually agent.state of SaleErrored + check agent.lastError of SaleFilledError + check agent.lastError.msg == "Filled slot has no host address" -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleTimeoutError -# # check state.error.msg == "Sale cancelled due to timeout" + test "moves to SaleFilled>SaleFinished when slot state is Filled": + await fillSlot() + let agent = newSalesAgent(restoredFromChain = true, + slotState = SlotState.Filled) + await agent.start() + check eventually agent.state of SaleFinished -# # test "moves to SaleErrored when Downloading and request fails": -# # sales.onStore = proc(request: StorageRequest, -# # slot: UInt256, -# # availability: ?Availability) {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleDownloading()) -# # market.emitRequestFailed(request.id) -# # await sleepAsync chronos.seconds(2) + test "moves to SaleFinished when slot state is Paid": + let agent = newSalesAgent(restoredFromChain = true, slotState = SlotState.Paid) + await agent.start() + check eventually agent.state of SaleFinished -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + test "moves to SaleErrored when slot state is Failed": + let agent = newSalesAgent(restoredFromChain = true, slotState = SlotState.Failed) + await agent.start() + check eventually agent.state of SaleErrored + check agent.lastError of SaleFailedError + check agent.lastError.msg == "Sale failed" -# # test "moves to SaleErrored when Filling and request expires": -# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleFilling()) -# # clock.set(request.expiry.truncate(int64)) -# # await sleepAsync chronos.seconds(2) + test "moves to SaleErrored when Downloading and request expires": + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability) {.async.} = + await sleepAsync(chronos.minutes(1)) # "far" in the future + request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 + let agent = newSalesAgent(requestState = RequestState.New) + await agent.start() + await sleepAsync 1.millis + clock.set(request.expiry.truncate(int64)) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleTimeoutError -# # check state.error.msg == "Sale cancelled due to timeout" + check eventually agent.state of SaleErrored + check agent.lastError of SaleTimeoutError + check agent.lastError.msg == "Sale cancelled due to timeout" -# # test "moves to SaleErrored when Filling and request fails": -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleFilling()) -# # market.emitRequestFailed(request.id) -# # await sleepAsync chronos.seconds(2) + test "moves to SaleErrored when Downloading and request fails": + let agent = newSalesAgent() + await agent.start() + # await sleepAsync 1.millis + market.emitRequestFailed(request.id) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + check eventually agent.state of SaleErrored + check agent.lastError of SaleFailedError + check agent.lastError.msg == "Sale failed" -# # test "moves to SaleErrored when Finished and request expires": -# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.FinishedswitchAsync -# # await agent.switchAsync(SaleFinished()) -# # clock.set(request.expiry.truncate(int64)) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Filling and request expires": + # request.expiry = (getTime() + 2.seconds).toUnix.u256 + # let agent = newSalesAgent() + # # agent.request.setValue(some request) # allows skip of SaleUnknown state + # await agent.start(agent.getState(SaleState.SaleFilling)) + # # check eventually agent.state of SaleFilling + # # echo ">>> we're in SaleFilling!" + # # echo "setting clock expired" + # clock.set(request.expiry.truncate(int64)) + # # else: echo ">>> we never got to SaleFilling" + # # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleTimeoutError -# # check state.error.msg == "Sale cancelled due to timeout" + # check eventually agent.state of SaleErrored + # check agent.lastError of SaleTimeoutError + # check agent.lastError.msg == "Sale cancelled due to timeout" -# # test "moves to SaleErrored when Finished and request fails": -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.Finished -# # await agent.switchAsync(SaleFinished()) -# # market.emitRequestFailed(request.id) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Filling and request fails": + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleFilling()) + # market.emitRequestFailed(request.id) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleFailedError + # check state.error.msg == "Sale failed" -# # test "moves to SaleErrored when Proving and request expires": -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # return @[] -# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleProving()) -# # clock.set(request.expiry.truncate(int64)) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Finished and request expires": + # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.FinishedswitchAsync + # await agent.switchAsync(SaleFinished()) + # clock.set(request.expiry.truncate(int64)) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleTimeoutError -# # check state.error.msg == "Sale cancelled due to timeout" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleTimeoutError + # check state.error.msg == "Sale cancelled due to timeout" -# # test "moves to SaleErrored when Proving and request fails": -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # return @[] -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleProving()) -# # market.emitRequestFailed(request.id) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Finished and request fails": + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.Finished + # await agent.switchAsync(SaleFinished()) + # market.emitRequestFailed(request.id) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of SaleFailedError -# # check state.error.msg == "Sale failed" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleFailedError + # check state.error.msg == "Sale failed" -# # test "moves to SaleErrored when Downloading and slot is filled by another host": -# # sales.onStore = proc(request: StorageRequest, -# # slot: UInt256, -# # availability: ?Availability) {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleDownloading()) -# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Proving and request expires": + # sales.onProve = proc(request: StorageRequest, + # slot: UInt256): Future[seq[byte]] {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # return @[] + # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleProving()) + # clock.set(request.expiry.truncate(int64)) + # await sleepAsync chronos.seconds(2) -# # let state = (agent.state as SaleErrored) -# # check state.isSome -# # check (!state).error.msg == "Slot filled by other host" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleTimeoutError + # check state.error.msg == "Sale cancelled due to timeout" -# # test "moves to SaleErrored when Proving and slot is filled by another host": -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # return @[] -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await agent.switchAsync(SaleProving()) -# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Proving and request fails": + # sales.onProve = proc(request: StorageRequest, + # slot: UInt256): Future[seq[byte]] {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # return @[] + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleProving()) + # market.emitRequestFailed(request.id) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of HostMismatchError -# # check state.error.msg == "Slot filled by other host" + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of SaleFailedError + # check state.error.msg == "Sale failed" -# # test "moves to SaleErrored when Filling and slot is filled by another host": -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # await sleepAsync(chronos.minutes(1)) # "far" in the future -# # return @[] -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) -# # await agent.switchAsync(SaleFilling()) -# # await sleepAsync chronos.seconds(2) + # test "moves to SaleErrored when Downloading and slot is filled by another host": + # sales.onStore = proc(request: StorageRequest, + # slot: UInt256, + # availability: ?Availability) {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleDownloading()) + # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) + # await sleepAsync chronos.seconds(2) -# # without state =? (agent.state as SaleErrored): -# # fail() -# # check state.error of HostMismatchError -# # check state.error.msg == "Slot filled by other host" + # let state = (agent.state as SaleErrored) + # check state.isSome + # check (!state).error.msg == "Slot filled by other host" -# # test "moves from SaleDownloading to SaleFinished, calling necessary callbacks": -# # var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool -# # sales.onProve = proc(request: StorageRequest, -# # slot: UInt256): Future[seq[byte]] {.async.} = -# # onProveCalled = true -# # return @[] -# # sales.onStore = proc(request: StorageRequest, -# # slot: UInt256, -# # availability: ?Availability) {.async.} = -# # onStoreCalled = true -# # sales.onClear = proc(availability: ?Availability, -# # request: StorageRequest, -# # slotIndex: UInt256) = -# # onClearCalled = true -# # sales.onSale = proc(availability: ?Availability, -# # request: StorageRequest, -# # slotIndex: UInt256) = -# # onSaleCalled = true + # test "moves to SaleErrored when Proving and slot is filled by another host": + # sales.onProve = proc(request: StorageRequest, + # slot: UInt256): Future[seq[byte]] {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # return @[] + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # await agent.switchAsync(SaleProving()) + # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) + # await sleepAsync chronos.seconds(2) -# # let agent = newSalesAgent() -# # await agent.start(request.ask.slots) -# # market.requested.add request -# # market.requestState[request.id] = RequestState.New -# # await fillSlot(agent.slotIndex) -# # await agent.switchAsync(SaleDownloading()) -# # market.emitRequestFulfilled(request.id) -# # await sleepAsync chronos.seconds(2) + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of HostMismatchError + # check state.error.msg == "Slot filled by other host" -# # without state =? (agent.state as SaleFinished): -# # fail() -# # check onProveCalled -# # check onStoreCalled -# # check not onClearCalled -# # check onSaleCalled + # test "moves to SaleErrored when Filling and slot is filled by another host": + # sales.onProve = proc(request: StorageRequest, + # slot: UInt256): Future[seq[byte]] {.async.} = + # await sleepAsync(chronos.minutes(1)) # "far" in the future + # return @[] + # let agent = newSalesAgent() + # await agent.start(request.ask.slots) + # market.requested.add request + # market.requestState[request.id] = RequestState.New + # market.fillSlot(request.id, agent.slotIndex, proof, Address.example) + # await agent.switchAsync(SaleFilling()) + # await sleepAsync chronos.seconds(2) -# test "loads active slots from market": -# let me = await market.getSigner() + # without state =? (agent.state as SaleErrored): + # fail() + # check state.error of HostMismatchError + # check state.error.msg == "Slot filled by other host" -# request.ask.slots = 2 -# market.requested = @[request] -# market.requestState[request.id] = RequestState.New + test "moves from SaleDownloading to SaleFinished, calling necessary callbacks": + var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool + sales.onProve = proc(request: StorageRequest, + slot: UInt256): Future[seq[byte]] {.async.} = + onProveCalled = true + return array[32, byte].example.toSeq + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + availability: ?Availability) {.async.} = + onStoreCalled = true + sales.onClear = proc(availability: ?Availability, + request: StorageRequest, + slotIndex: UInt256) = + onClearCalled = true + sales.onSale = proc(availability: ?Availability, + request: StorageRequest, + slotIndex: UInt256) = + onSaleCalled = true -# let slot0 = MockSlot(requestId: request.id, -# slotIndex: 0.u256, -# proof: proof, -# host: me) -# await fillSlot(slot0.slotIndex) + let agent = newSalesAgent() + # market.requested.add request + # market.requestState[request.id] = RequestState.New + await fillSlot(agent.slotIndex) + await agent.start() + # await agent.switchAsync(SaleDownloading()) + market.emitRequestFulfilled(request.id) + await sleepAsync chronos.seconds(2) -# let slot1 = MockSlot(requestId: request.id, -# slotIndex: 1.u256, -# proof: proof, -# host: me) -# await fillSlot(slot1.slotIndex) -# market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)] -# market.requested = @[request] -# market.activeRequests[me] = @[request.id] + check eventually agent.state of SaleFinished + check onProveCalled + check onStoreCalled + check not onClearCalled + check onSaleCalled -# await sales.load() -# let expected = SalesAgent(sales: sales, -# requestId: request.id, -# availability: none Availability, -# request: some request) -# # because sales.load() calls agent.start, we won't know the slotIndex -# # randomly selected for the agent, and we also won't know the value of -# # `failed`/`fulfilled`/`cancelled` futures, so we need to compare -# # the properties we know -# # TODO: when calling sales.load(), slot index should be restored and not -# # randomly re-assigned, so this may no longer be needed -# proc `==` (agent0, agent1: SalesAgent): bool = -# return agent0.sales == agent1.sales and -# agent0.requestId == agent1.requestId and -# agent0.availability == agent1.availability and -# agent0.request == agent1.request + test "loads active slots from market": + let me = await market.getSigner() -# check sales.agents.all(agent => agent == expected) + request.ask.slots = 2 + market.requested = @[request] + market.requestState[request.id] = RequestState.New + + let slot0 = MockSlot(requestId: request.id, + slotIndex: 0.u256, + proof: proof, + host: me) + await fillSlot(slot0.slotIndex) + + let slot1 = MockSlot(requestId: request.id, + slotIndex: 1.u256, + proof: proof, + host: me) + await fillSlot(slot1.slotIndex) + market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)] + market.requested = @[request] + market.activeRequests[me] = @[request.id] + + await sales.load() + let expected = SalesAgent(sales: sales, + availability: none Availability, + request: some request) + # because sales.load() calls agent.start, we won't know the slotIndex + # randomly selected for the agent, and we also won't know the value of + # `failed`/`fulfilled`/`cancelled` futures, so we need to compare + # the properties we know + # TODO: when calling sales.load(), slot index should be restored and not + # randomly re-assigned, so this may no longer be needed + proc `==` (agent0, agent1: SalesAgent): bool = + return agent0.sales == agent1.sales and + agent0.availability == agent1.availability and + agent0.request == agent1.request + + check sales.agents.all(agent => agent == expected) diff --git a/tests/codex/utils/testasyncstatemachine.nim b/tests/codex/utils/testasyncstatemachine.nim index caaba7f8..97003eb5 100644 --- a/tests/codex/utils/testasyncstatemachine.nim +++ b/tests/codex/utils/testasyncstatemachine.nim @@ -1,3 +1,4 @@ +import std/typetraits import pkg/asynctest import pkg/questionable import pkg/chronos @@ -75,6 +76,7 @@ suite "async state machines": MyMachine(m).errored.value ) ]) + machine.addState state1, state2, state3, state4, state5, state6 machine.slotsFilled = machine.newTransitionProperty(0) machine.requestFinished = machine.newTransitionProperty(false)