WIP: many tests working, but some not testable

I know this is a big commit…

There are some tests that aren’t testable yet, such as testing that an active state transitions to cancelled/failed states. This is due to the inability to wait during some states like SaleFilling. The tests would likely need to be redesigned, but I don’t think it’s worth pursuing the fully declarative model further as it is too hard to debug.

The declarative state machine DOES work, however.

The main point for keeping this commit is for the learnings about the state machine to bring fwd into the branch used in 306.
This commit is contained in:
Eric Mastro 2023-02-26 23:05:39 +11:00
parent 24ad3fdea9
commit 8286fe5bee
No known key found for this signature in database
GPG Key ID: AD065ECE27A873B9
17 changed files with 716 additions and 663 deletions

View File

@ -1,4 +1,5 @@
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import pkg/stint
import pkg/nimcrypto
@ -11,7 +12,7 @@ import ./errors
import ./contracts/requests
import ./sales/salesagent
import ./sales/statemachine
import ./sales/states/[start, downloading, unknown]
import ./sales/states/unknown
## Sales holds a list of available storage that it may sell.
##
@ -72,26 +73,21 @@ proc findSlotIndex(numSlots: uint64,
proc handleRequest(sales: Sales,
requestId: RequestId,
ask: StorageAsk) =
ask: StorageAsk) {.raises:[CatchableError].} =
let availability = sales.findAvailability(ask)
# TODO: check if random slot is actually available (not already filled)
let slotIndex = randomSlotIndex(ask.slots)
without request =? await sales.market.getRequest(requestId):
raise newException(SalesError, "Failed to get request on chain")
let me = await sales.market.getSigner()
let agent = newSalesAgent(
sales,
slotIndex,
availability,
request,
me,
requestId,
none StorageRequest,
RequestState.New,
SlotState.Free
)
await agent.start(SaleUnknown.new())
waitFor agent.start()
sales.agents.add agent
proc load*(sales: Sales) {.async.} =
@ -100,7 +96,6 @@ proc load*(sales: Sales) {.async.} =
# TODO: restore availability from disk
let requestIds = await market.myRequests()
let slotIds = await market.mySlots()
let me = await market.getSigner()
for slotId in slotIds:
# TODO: this needs to be optimised
@ -109,28 +104,34 @@ proc load*(sales: Sales) {.async.} =
without slotIndex =? findSlotIndex(request.ask.slots,
request.id,
slotId):
raiseAssert "could not find slot index"
raise newException(SalesError, "could not find slot index")
# TODO: should be optimised (maybe get everything in one call: request, request state, slot state)
let requestState = await market.requestState(request.id)
without requestState =? await market.requestState(request.id):
raise newException(SalesError, "request state could not be determined")
let slotState = await market.slotState(slotId)
let agent = newSalesAgent(
sales,
slotIndex,
availability,
request,
me,
request.id,
some request,
requestState,
slotState)
await agent.start(SaleUnknown.new())
slotState,
restoredFromChain = true)
await agent.start()
sales.agents.add agent
proc start*(sales: Sales) {.async.} =
doAssert sales.subscription.isNone, "Sales already started"
proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} =
sales.handleRequest(requestId, ask)
try:
sales.handleRequest(requestId, ask)
except CatchableError as e:
error "Failed to handle storage request", error = e.msg
try:
sales.subscription = some await sales.market.subscribeRequests(onRequest)

View File

@ -1,19 +1,29 @@
import pkg/chronos
import pkg/upraises
import pkg/stint
import ./statemachine
import ./states/[cancelled, downloading, errored, failed, finished, filled,
filling, proving, unknown]
import ./subscriptions
import ../contracts/requests
type SaleState* {.pure.} = enum
SaleUnknown,
SaleDownloading,
SaleProving,
SaleFilling,
SaleFilled,
SaleCancelled,
SaleFailed,
SaleErrored
proc newSalesAgent*(sales: Sales,
slotIndex: UInt256,
availability: ?Availability,
request: StorageRequest,
me: Address,
requestId: RequestId,
request: ?StorageRequest,
requestState: RequestState,
slotState: SlotState,
restoredFromChain: bool): SalesAgent =
restoredFromChain: bool = false): SalesAgent =
let saleUnknown = SaleUnknown.new()
let saleDownloading = SaleDownloading.new()
@ -25,11 +35,18 @@ proc newSalesAgent*(sales: Sales,
let saleErrored = SaleErrored.new()
let agent = SalesAgent.new(@[
Transition.new(
AnyState.new(),
saleErrored,
proc(m: Machine, s: State): bool =
SalesAgent(m).errored.value
),
Transition.new(
saleUnknown,
saleDownloading,
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
not agent.restoredFromChain and
agent.requestState.value == RequestState.New and
agent.slotState.value == SlotState.Free
),
@ -84,12 +101,7 @@ proc newSalesAgent*(sales: Sales,
agent.slotState.value in @[SlotState.Finished, SlotState.Paid] or
agent.requestState.value == RequestState.Finished
),
Transition.new(
AnyState.new(),
saleErrored,
proc(m: Machine, s: State): bool =
SalesAgent(m).errored.value
),
Transition.new(
saleDownloading,
saleProving,
@ -106,133 +118,34 @@ proc newSalesAgent*(sales: Sales,
saleFilled,
SaleFinished.new(),
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
without host =? agent.slotHost.value:
return false
host == agent.me
),
Transition.new(
saleFilled,
saleErrored,
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
without host =? agent.slotHost.value:
return false
if host != agent.me:
agent.lastError = newException(HostMismatchError,
"Slot filled by other host")
return true
else: return false
),
Transition.new(
saleUnknown,
saleErrored,
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
if agent.restoredFromChain and agent.slotState.value == SlotState.Free:
agent.lastError = newException(SaleUnknownError,
"cannot retrieve slot state")
return true
else: return false
SalesAgent(m).slotHostIsMe.value
),
])
agent.addState (SaleState.SaleUnknown.int, saleUnknown),
(SaleState.SaleDownloading.int, saleDownloading),
(SaleState.SaleProving.int, saleProving),
(SaleState.SaleFilling.int, saleFilling),
(SaleState.SaleFilled.int, saleFilled),
(SaleState.SaleCancelled.int, saleCancelled),
(SaleState.SaleFailed.int, saleFailed),
(SaleState.SaleErrored.int, saleErrored)
agent.slotState = agent.newTransitionProperty(slotState)
agent.requestState = agent.newTransitionProperty(requestState)
agent.proof = agent.newTransitionProperty(newSeq[byte]())
agent.slotHost = agent.newTransitionProperty(none Address)
agent.slotHostIsMe = agent.newTransitionProperty(false)
agent.downloaded = agent.newTransitionProperty(false)
agent.sales = sales
agent.availability = availability
agent.slotIndex = slotIndex
agent.requestId = requestId
agent.request = request
agent.me = me
agent.restoredFromChain = restoredFromChain
return agent
proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeSlotFill*(agent: SalesAgent): Future[void] {.gcsafe.}
proc start*(agent: SalesAgent, initialState: State) {.async.} =
await agent.subscribeCancellation()
await agent.subscribeFailure()
await agent.subscribeSlotFill()
proc start*(agent: SalesAgent,
initialState: State = agent.getState(SaleState.SaleUnknown)) {.async.} =
await agent.subscribe()
procCall Machine(agent).start(initialState)
proc stop*(agent: SalesAgent) {.async.} =
try:
await agent.subscribeFulfilled.unsubscribe()
except CatchableError:
discard
try:
await agent.subscribeFailed.unsubscribe()
except CatchableError:
discard
try:
await agent.subscribeSlotFilled.unsubscribe()
except CatchableError:
discard
if not agent.waitForCancelled.completed:
await agent.waitForCancelled.cancelAndWait()
procCall Machine(agent).stop()
proc subscribeCancellation*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onCancelled() {.async.} =
let clock = agent.sales.clock
await clock.waitUntil(agent.request.expiry.truncate(int64))
await agent.subscribeFulfilled.unsubscribe()
agent.requestState.setValue(RequestState.Cancelled)
agent.waitForCancelled = onCancelled()
proc onFulfilled(_: RequestId) =
agent.waitForCancelled.cancel()
agent.subscribeFulfilled =
await market.subscribeFulfillment(agent.request.id, onFulfilled)
# TODO: move elsewhere
proc asyncSpawn(future: Future[void], ignore: type CatchableError) =
proc ignoringError {.async.} =
try:
await future
except ignore:
discard
asyncSpawn ignoringError()
proc subscribeFailure*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onFailed(_: RequestId) {.upraises:[], gcsafe.} =
asyncSpawn agent.subscribeFailed.unsubscribe(), ignore = CatchableError
try:
agent.requestState.setValue(RequestState.Failed)
except AsyncQueueFullError as e:
raiseAssert "State machine critical failure: " & e.msg
agent.subscribeFailed =
await market.subscribeRequestFailed(agent.request.id, onFailed)
proc subscribeSlotFill*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onSlotFilled(
requestId: RequestId,
slotIndex: UInt256) {.upraises:[], gcsafe.} =
let market = agent.sales.market
asyncSpawn agent.subscribeSlotFilled.unsubscribe(), ignore = CatchableError
try:
agent.slotState.setValue(SlotState.Filled)
except AsyncQueueFullError as e:
raiseAssert "State machine critical failure: " & e.msg
agent.subscribeSlotFilled =
await market.subscribeSlotFilled(agent.request.id,
agent.slotIndex,
onSlotFilled)
await agent.unsubscribe()

View File

@ -4,6 +4,7 @@ import pkg/questionable
import pkg/upraises
import ../errors
import ../utils/asyncstatemachine
import ../utils/optionalcast
import ../market
import ../clock
import ../proving
@ -13,6 +14,7 @@ export market
export clock
export asyncstatemachine
export proving
export optionalcast
type
Sales* = ref object
@ -30,21 +32,20 @@ type
sales*: Sales
ask*: StorageAsk
availability*: ?Availability # TODO: when availability persistence is added, change this to not optional
request*: StorageRequest
requestId*: RequestId
request*: ?StorageRequest
slotIndex*: UInt256
subscribeFailed*: market.Subscription
subscribeFulfilled*: market.Subscription
subscribeSlotFilled*: market.Subscription
waitForCancelled*: Future[void]
me*: Address
restoredFromChain*: bool
slotState*: TransitionProperty[SlotState]
requestState*: TransitionProperty[RequestState]
proof*: TransitionProperty[seq[byte]]
slotHost*: TransitionProperty[?Address]
slotHostIsMe*: TransitionProperty[bool]
downloaded*: TransitionProperty[bool]
SaleState* = ref object of State
SaleError* = ref object of CodexError
SaleError* = object of CodexError
Availability* = object
id*: array[32, byte]
size*: UInt256

View File

@ -1,13 +1,13 @@
import ../statemachine
import ./errored
type
SaleCancelled* = ref object of State
SaleCancelledError* = object of CatchableError
SaleCancelledError* = object of SaleError
SaleTimeoutError* = object of SaleCancelledError
method `$`*(state: SaleCancelled): string = "SaleCancelled"
method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let error = newException(SaleTimeoutError, "Sale cancelled due to timeout")
machine.setError(error)

View File

@ -17,16 +17,22 @@ type
method `$`*(state: SaleDownloading): string = "SaleDownloading"
method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let agent = SalesAgent(machine)
try:
without onStore =? agent.sales.onStore:
raiseAssert "onStore callback not set"
without request =? agent.request:
let error = newException(SaleDownloadingError, "missing request")
agent.setError error
return
if availability =? agent.availability:
agent.sales.remove(availability)
await onStore(agent.request, agent.slotIndex, agent.availability)
await onStore(request, agent.slotIndex, agent.availability)
agent.downloaded.setValue(true)
except CancelledError:

View File

@ -6,12 +6,12 @@ type SaleErrored* = ref object of State
method `$`*(state: SaleErrored): string = "SaleErrored"
method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let agent = SalesAgent(machine)
if onClear =? agent.sales.onClear and
request =? agent.request and
slotIndex =? agent.slotIndex:
onClear(agent.availability, request, slotIndex)
request =? agent.request:
onClear(agent.availability, request, agent.slotIndex)
# TODO: when availability persistence is added, change this to not optional
# NOTE: with this in place, restoring state for a restarted node will

View File

@ -1,4 +1,3 @@
import ./errored
import ../statemachine
type
@ -8,5 +7,6 @@ type
method `$`*(state: SaleFailed): string = "SaleFailed"
method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let error = newException(SaleFailedError, "Sale failed")
machine.setError error

View File

@ -1,25 +1,25 @@
import pkg/questionable
import ./errored
import ./finished
import ./cancelled
import ./failed
import ../statemachine
type
SaleFilled* = ref object of State
SaleFilledError* = object of CatchableError
SaleFilledError* = object of SaleError
HostMismatchError* = object of SaleFilledError
method `$`*(state: SaleFilled): string = "SaleFilled"
method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let agent = SalesAgent(machine)
let market = agent.sales.market
try:
let market = agent.sales.market
without slotHost =? await market.getHost(agent.requestId, agent.slotIndex):
let error = newException(SaleFilledError, "Filled slot has no host address")
agent.setError error
return
let slotHost = await market.getHost(agent.request.id, agent.slotIndex)
agent.slotHost.setValue(slotHost)
except CancelledError:
raise
let me = await market.getSigner()
if slotHost == me:
agent.slotHostIsMe.setValue(true)
else:
agent.setError newException(HostMismatchError, "Slot filled by other host")

View File

@ -1,25 +1,15 @@
import pkg/upraises
import ../../market
import ../statemachine
import ./filled
import ./errored
import ./cancelled
import ./failed
type
SaleFilling* = ref object of State
proof*: seq[byte]
SaleFillingError* = object of CatchableError
SaleFillingError* = object of SaleError
method `$`*(state: SaleFilling): string = "SaleFilling"
method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let agent = SalesAgent(machine)
let market = agent.sales.market
try:
let market = agent.sales.market
await market.fillSlot(agent.request.id, agent.slotIndex, state.proof)
except CancelledError:
raise
await market.fillSlot(agent.requestId, agent.slotIndex, agent.proof.value)

View File

@ -6,26 +6,27 @@ import ../statemachine
type
SaleFinished* = ref object of State
SaleFinishedError* = object of CatchableError
SaleFinishedError* = object of SaleError
method `$`*(state: SaleFinished): string = "SaleFinished"
method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let agent = SalesAgent(machine)
let slotIndex = agent.slotIndex
try:
if request =? agent.request and
slotIndex =? agent.slotIndex:
agent.sales.proving.add(request.slotId(slotIndex))
without request =? agent.request:
let error = newException(SaleFinishedError, "missing request")
agent.setError error
return
if onSale =? agent.sales.onSale:
onSale(agent.availability, request, slotIndex)
agent.sales.proving.add(request.slotId(slotIndex))
# TODO: Keep track of contract completion using local clock. When contract
# has finished, we need to add back availability to the sales module.
# This will change when the state machine is updated to include the entire
# sales process, as well as when availability is persisted, so leaving it
# as a TODO for now.
if onSale =? agent.sales.onSale:
onSale(agent.availability, request, slotIndex)
except CancelledError:
raise
# TODO: Keep track of contract completion using local clock. When contract
# has finished, we need to add back availability to the sales module.
# This will change when the state machine is updated to include the entire
# sales process, as well as when availability is persisted, so leaving it
# as a TODO for now.

View File

@ -1,25 +1,22 @@
import ../statemachine
import ./filling
import ./cancelled
import ./failed
import ./filled
import ./errored
type
SaleProving* = ref object of State
SaleProvingError* = object of CatchableError
SaleProvingError* = object of SaleError
method `$`*(state: SaleProving): string = "SaleProving"
method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let agent = SalesAgent(machine)
try:
without onProve =? agent.sales.onProve:
raiseAssert "onProve callback not set"
without onProve =? agent.sales.onProve:
raiseAssert "onProve callback not set"
let proof = await onProve(agent.request, agent.slotIndex)
agent.proof.setValue(proof)
without request =? agent.request:
let error = newException(SaleProvingError, "missing request")
agent.setError error
return
except CancelledError:
raise
let proof = await onProve(request, agent.slotIndex)
agent.proof.setValue(proof)

View File

@ -1,14 +1,29 @@
import ../statemachine
import ./filled
import ./finished
import ./failed
import ./errored
import ./cancelled
import ../subscriptions
type
SaleUnknown* = ref object of State
SaleUnknownError* = object of CatchableError
SaleUnknownError* = object of SaleError
UnexpectedSlotError* = object of SaleUnknownError
method `$`*(state: SaleUnknown): string = "SaleUnknown"
method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
# echo "running ", state
let agent = SalesAgent(machine)
let market = agent.sales.market
if agent.request.isNone:
agent.request = await market.getRequest(agent.requestId)
if agent.request.isSome:
await agent.subscribe()
if agent.request.isNone:
agent.setError newException(SaleUnknownError, "missing request")
return
if agent.restoredFromChain and agent.slotState.value == SlotState.Free:
agent.setError newException(UnexpectedSlotError,
"slot state on chain should not be 'free'")
return

View File

@ -0,0 +1,92 @@
import pkg/chronos
import pkg/upraises
import pkg/stint
import ./statemachine
import ../contracts/requests
# TODO: move elsewhere
proc asyncSpawn(future: Future[void], ignore: type CatchableError) =
proc ignoringError {.async.} =
try:
await future
except ignore:
discard
asyncSpawn ignoringError()
proc subscribeCancellation*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onCancelled() {.async.} =
let clock = agent.sales.clock
without request =? agent.request:
return
await clock.waitUntil(request.expiry.truncate(int64))
asyncSpawn agent.subscribeFulfilled.unsubscribe(), ignore = CatchableError
agent.requestState.setValue(RequestState.Cancelled)
proc onFulfilled(_: RequestId) =
agent.waitForCancelled.cancel()
agent.subscribeFulfilled =
await market.subscribeFulfillment(agent.requestId, onFulfilled)
agent.waitForCancelled = onCancelled()
proc subscribeFailure*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onFailed(_: RequestId) {.upraises:[], gcsafe.} =
asyncSpawn agent.subscribeFailed.unsubscribe(), ignore = CatchableError
try:
agent.requestState.setValue(RequestState.Failed)
except AsyncQueueFullError as e:
raiseAssert "State machine critical failure: " & e.msg
agent.subscribeFailed =
await market.subscribeRequestFailed(agent.requestId, onFailed)
proc subscribeSlotFill*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onSlotFilled(
requestId: RequestId,
slotIndex: UInt256) {.upraises:[], gcsafe.} =
let market = agent.sales.market
asyncSpawn agent.subscribeSlotFilled.unsubscribe(), ignore = CatchableError
try:
agent.slotState.setValue(SlotState.Filled)
except AsyncQueueFullError as e:
raiseAssert "State machine critical failure: " & e.msg
agent.subscribeSlotFilled =
await market.subscribeSlotFilled(agent.requestId,
agent.slotIndex,
onSlotFilled)
proc subscribe*(agent: SalesAgent) {.async.} =
# TODO: Check that the subscription handlers aren't already assigned before
# assigning. This will allow for agent.subscribe to be called multiple times
await agent.subscribeCancellation()
await agent.subscribeFailure()
await agent.subscribeSlotFill()
proc unsubscribe*(agent: SalesAgent) {.async.} =
try:
await agent.subscribeFulfilled.unsubscribe()
except CatchableError:
discard
try:
await agent.subscribeFailed.unsubscribe()
except CatchableError:
discard
try:
await agent.subscribeSlotFilled.unsubscribe()
except CatchableError:
discard
if not agent.waitForCancelled.completed:
await agent.waitForCancelled.cancelAndWait()
procCall Machine(agent).stop()

View File

@ -1,4 +1,6 @@
import std/typetraits # DELETE ME
import std/sequtils
import std/tables
import pkg/questionable
import pkg/chronos
import pkg/chronicles
@ -19,6 +21,8 @@ type
transitions: seq[Transition]
errored*: TransitionProperty[bool]
lastError*: ref CatchableError
states: Table[int, State]
started: bool
State* = ref object of RootObj
AnyState* = ref object of State
Event* = proc(state: State): ?State {.gcsafe, upraises:[].}
@ -50,19 +54,32 @@ proc transition*(_: type Event, previous, next: State): Event =
if state == previous:
return some next
method `$`*(state: State): string {.base.} = "Base state"
proc state*(machine: Machine): State = machine.state
template getState*(machine: Machine, id: untyped): State =
machine.states[id.int]
proc addState*(machine: Machine, states: varargs[(int, State)]) =
machine.states = states.toTable
proc schedule*(machine: Machine, event: Event) =
machine.scheduled.putNoWait(event)
proc checkTransitions(machine: Machine) =
if not machine.started:
return
for transition in machine.transitions:
if transition.trigger(machine, machine.state) and
machine.state != transition.nextState and # avoid transitioning to self
(machine.state == nil or
machine.state in transition.prevStates or # state instance, multiple
transition.prevStates.any(proc (s: State): bool = s of AnyState)):
# echo "scheduling transition from ", machine.state, " to ", transition.nextState
machine.schedule(Event.transition(machine.state, transition.nextState))
return
proc setValue*[T](prop: TransitionProperty[T], value: T) =
prop.value = value
@ -73,12 +90,12 @@ proc setError*(machine: Machine, error: ref CatchableError) =
machine.errored.value = false # clears error without triggering transitions
machine.lastError = error # stores error in state
method run*(state: State): Future[?State] {.base, upraises:[].} =
method run*(state: State, machine: Machine): Future[?State] {.base, upraises:[].} =
discard
proc run(machine: Machine, state: State) {.async.} =
try:
if next =? await state.run():
if next =? await state.run(machine):
machine.schedule(Event.transition(state, next))
except CancelledError:
discard
@ -86,7 +103,6 @@ proc run(machine: Machine, state: State) {.async.} =
proc scheduler(machine: Machine) {.async.} =
proc onRunComplete(udata: pointer) {.gcsafe, raises: [Defect].} =
var fut = cast[FutureBase](udata)
# TODO: would CancelledError be swallowed here (by not checking fut.cancelled())?
if fut.failed():
try:
machine.setError(fut.error)
@ -98,21 +114,29 @@ proc scheduler(machine: Machine) {.async.} =
let event = await machine.scheduled.get()
if next =? event(machine.state):
if not machine.running.isNil:
# echo "cancelling current state: ", machine.state
await machine.running.cancelAndWait()
# echo "transitioning from ", $ machine.state, " to ", $ next
# echo "moving from ", if machine.state.isNil: "nil" else: $machine.state, " to ", next
machine.state = next
# trace "running state", state = machine.state
machine.running = machine.run(machine.state)
machine.running.addCallback(onRunComplete)
machine.checkTransitions()
machine.checkTransitions()
except CancelledError:
discard
proc start*(machine: Machine, initialState: State) =
machine.scheduling = machine.scheduler()
machine.schedule(Event.transition(machine.state, initialState))
machine.started = true
proc stop*(machine: Machine) =
machine.scheduling.cancel()
machine.running.cancel()
if not machine.running.isNil and not machine.running.finished:
machine.scheduling.cancel()
if not machine.running.isNil and not machine.running.finished:
machine.running.cancel()
machine.started = false
proc new*(T: type Machine, transitions: seq[Transition]): T =
let m = T(

View File

@ -169,6 +169,7 @@ proc fillSlot*(market: MockMarket,
host: host
)
market.filled.add(slot)
market.slotState[slotId(requestId, slotIndex)] = SlotState.Filled
market.emitSlotFilled(requestId, slotIndex)
method fillSlot*(market: MockMarket,

View File

@ -81,501 +81,511 @@ suite "Sales":
test "makes storage unavailable when matching request comes in":
sales.add(availability)
await market.requestStorage(request)
await sleepAsync(5000.millis)
check sales.available.len == 0
# test "ignores request when no matching storage is available":
# sales.add(availability)
# var tooBig = request
# tooBig.ask.slotSize = request.ask.slotSize + 1
# await market.requestStorage(tooBig)
# check sales.available == @[availability]
test "ignores request when no matching storage is available":
sales.add(availability)
var tooBig = request
tooBig.ask.slotSize = request.ask.slotSize + 1
await market.requestStorage(tooBig)
check sales.available == @[availability]
# test "ignores request when reward is too low":
# sales.add(availability)
# var tooCheap = request
# tooCheap.ask.reward = request.ask.reward - 1
# await market.requestStorage(tooCheap)
# check sales.available == @[availability]
test "ignores request when reward is too low":
sales.add(availability)
var tooCheap = request
tooCheap.ask.reward = request.ask.reward - 1
await market.requestStorage(tooCheap)
check sales.available == @[availability]
# test "retrieves and stores data locally":
# var storingRequest: StorageRequest
# var storingSlot: UInt256
# var storingAvailability: Availability
# sales.onStore = proc(request: StorageRequest,
# slot: UInt256,
# availability: ?Availability) {.async.} =
# storingRequest = request
# storingSlot = slot
# check availability.isSome
# storingAvailability = !availability
# sales.add(availability)
# await market.requestStorage(request)
# check storingRequest == request
# check storingSlot < request.ask.slots.u256
# check storingAvailability == availability
test "retrieves and stores data locally":
var storingRequest: StorageRequest
var storingSlot: UInt256
var storingAvailability: Availability
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
storingRequest = request
storingSlot = slot
check availability.isSome
storingAvailability = !availability
sales.add(availability)
await market.requestStorage(request)
check eventually storingRequest == request
check storingSlot < request.ask.slots.u256
check storingAvailability == availability
# test "makes storage available again when data retrieval fails":
# let error = newException(IOError, "data retrieval failed")
# sales.onStore = proc(request: StorageRequest,
# slot: UInt256,
# availability: ?Availability) {.async.} =
# raise error
# sales.add(availability)
# await market.requestStorage(request)
# check sales.available == @[availability]
test "makes storage available again when data retrieval fails":
let error = newException(IOError, "data retrieval failed")
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
raise error
sales.add(availability)
await market.requestStorage(request)
check sales.available == @[availability]
# test "generates proof of storage":
# var provingRequest: StorageRequest
# var provingSlot: UInt256
# sales.onProve = proc(request: StorageRequest,
# slot: UInt256): Future[seq[byte]] {.async.} =
# provingRequest = request
# provingSlot = slot
# sales.add(availability)
# await market.requestStorage(request)
# check provingRequest == request
# check provingSlot < request.ask.slots.u256
test "generates proof of storage":
var provingRequest: StorageRequest
var provingSlot: UInt256
sales.onProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.async.} =
provingRequest = request
provingSlot = slot
sales.add(availability)
await market.requestStorage(request)
check eventually provingRequest == request
check provingSlot < request.ask.slots.u256
# test "fills a slot":
# sales.add(availability)
# await market.requestStorage(request)
# check market.filled.len == 1
# check market.filled[0].requestId == request.id
# check market.filled[0].slotIndex < request.ask.slots.u256
# check market.filled[0].proof == proof
# check market.filled[0].host == await market.getSigner()
test "fills a slot":
sales.add(availability)
await market.requestStorage(request)
check eventually market.filled.len == 1
check market.filled[0].requestId == request.id
check market.filled[0].slotIndex < request.ask.slots.u256
check market.filled[0].proof == proof
check market.filled[0].host == await market.getSigner()
# test "calls onSale when slot is filled":
# var soldAvailability: Availability
# var soldRequest: StorageRequest
# var soldSlotIndex: UInt256
# sales.onSale = proc(availability: ?Availability,
# request: StorageRequest,
# slotIndex: UInt256) =
# if a =? availability:
# soldAvailability = a
# soldRequest = request
# soldSlotIndex = slotIndex
# sales.add(availability)
# await market.requestStorage(request)
# check soldAvailability == availability
# check soldRequest == request
# check soldSlotIndex < request.ask.slots.u256
test "calls onSale when slot is filled":
var soldAvailability: Availability
var soldRequest: StorageRequest
var soldSlotIndex: UInt256
sales.onSale = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) =
if a =? availability:
soldAvailability = a
soldRequest = request
soldSlotIndex = slotIndex
sales.add(availability)
await market.requestStorage(request)
check eventually soldAvailability == availability
check soldRequest == request
check soldSlotIndex < request.ask.slots.u256
# test "calls onClear when storage becomes available again":
# # fail the proof intentionally to trigger `agent.finish(success=false)`,
# # which then calls the onClear callback
# sales.onProve = proc(request: StorageRequest,
# slot: UInt256): Future[seq[byte]] {.async.} =
# raise newException(IOError, "proof failed")
# var clearedAvailability: Availability
# var clearedRequest: StorageRequest
# var clearedSlotIndex: UInt256
# sales.onClear = proc(availability: ?Availability,
# request: StorageRequest,
# slotIndex: UInt256) =
# if a =? availability:
# clearedAvailability = a
# clearedRequest = request
# clearedSlotIndex = slotIndex
# sales.add(availability)
# await market.requestStorage(request)
# check clearedAvailability == availability
# check clearedRequest == request
# check clearedSlotIndex < request.ask.slots.u256
test "calls onClear when storage becomes available again":
# fail the proof intentionally to trigger `agent.finish(success=false)`,
# which then calls the onClear callback
sales.onProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.async.} =
raise newException(IOError, "proof failed")
var clearedAvailability: Availability
var clearedRequest: StorageRequest
var clearedSlotIndex: UInt256
sales.onClear = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) =
if a =? availability:
clearedAvailability = a
clearedRequest = request
clearedSlotIndex = slotIndex
sales.add(availability)
await market.requestStorage(request)
check eventually clearedAvailability == availability
check clearedRequest == request
check clearedSlotIndex < request.ask.slots.u256
# test "makes storage available again when other host fills the slot":
# let otherHost = Address.example
# sales.onStore = proc(request: StorageRequest,
# slot: UInt256,
# availability: ?Availability) {.async.} =
# await sleepAsync(chronos.hours(1))
# sales.add(availability)
# await market.requestStorage(request)
# for slotIndex in 0..<request.ask.slots:
# market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
# await sleepAsync(chronos.seconds(2))
# check sales.available == @[availability]
test "makes storage available again when other host fills the slot":
let otherHost = Address.example
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
await sleepAsync(chronos.hours(1))
sales.add(availability)
await market.requestStorage(request)
for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
await sleepAsync(chronos.seconds(2))
check sales.available == @[availability]
# test "makes storage available again when request expires":
# sales.onStore = proc(request: StorageRequest,
# slot: UInt256,
# availability: ?Availability) {.async.} =
# await sleepAsync(chronos.hours(1))
# sales.add(availability)
# await market.requestStorage(request)
# clock.set(request.expiry.truncate(int64))
# check eventually (sales.available == @[availability])
test "makes storage available again when request expires":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
await sleepAsync(chronos.hours(1))
sales.add(availability)
await market.requestStorage(request)
clock.set(request.expiry.truncate(int64))
check eventually (sales.available == @[availability])
# test "adds proving for slot when slot is filled":
# var soldSlotIndex: UInt256
# sales.onSale = proc(availability: ?Availability,
# request: StorageRequest,
# slotIndex: UInt256) =
# soldSlotIndex = slotIndex
# check proving.slots.len == 0
# sales.add(availability)
# await market.requestStorage(request)
# check proving.slots.len == 1
# check proving.slots.contains(request.slotId(soldSlotIndex))
test "adds proving for slot when slot is filled":
var soldSlotIndex: UInt256
sales.onSale = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) =
soldSlotIndex = slotIndex
check proving.slots.len == 0
sales.add(availability)
await market.requestStorage(request)
check eventually proving.slots.len == 1
check proving.slots.contains(request.slotId(soldSlotIndex))
# suite "Sales state machine":
suite "Sales state machine":
# let availability = Availability.init(
# size=100.u256,
# duration=60.u256,
# minPrice=600.u256
# )
# var request = StorageRequest(
# ask: StorageAsk(
# slots: 4,
# slotSize: 100.u256,
# duration: 60.u256,
# reward: 10.u256,
# ),
# content: StorageContent(
# cid: "some cid"
# )
# )
# let proof = exampleProof()
let availability = Availability.init(
size=100.u256,
duration=60.u256,
minPrice=600.u256
)
var request = StorageRequest(
ask: StorageAsk(
slots: 4,
slotSize: 100.u256,
duration: 60.u256,
reward: 10.u256,
),
content: StorageContent(
cid: "some cid"
)
)
let proof = exampleProof()
# var sales: Sales
# var market: MockMarket
# var clock: MockClock
# var proving: Proving
# var slotIdx: UInt256
# var slotId: SlotId
var sales: Sales
var market: MockMarket
var clock: MockClock
var proving: Proving
var slotIdx: UInt256
var slotId: SlotId
# setup:
# market = MockMarket.new()
# clock = MockClock.new()
# proving = Proving.new()
# sales = Sales.new(market, clock, proving)
# sales.onStore = proc(request: StorageRequest,
# slot: UInt256,
# availability: ?Availability) {.async.} =
# discard
# sales.onProve = proc(request: StorageRequest,
# slot: UInt256): Future[seq[byte]] {.async.} =
# return proof
# await sales.start()
# request.expiry = (clock.now() + 42).u256
# slotIdx = 0.u256
# slotId = slotId(request.id, slotIdx)
setup:
market = MockMarket.new()
clock = MockClock.new()
proving = Proving.new()
sales = Sales.new(market, clock, proving)
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
discard
sales.onProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.async.} =
return proof
await sales.start()
request.expiry = (clock.now() + 42).u256
slotIdx = 0.u256
slotId = slotId(request.id, slotIdx)
# teardown:
# await sales.stop()
teardown:
await sales.stop()
# proc newSalesAgent(slotIdx: UInt256 = 0.u256): SalesAgent =
# let agent = sales.newSalesAgent(request.id,
# slotIdx,
# some availability,
# some request)
# return agent
proc newSalesAgent(
slotIdx: UInt256 = 0.u256,
slotState = SlotState.Free,
requestState = RequestState.New,
restoredFromChain = false): SalesAgent =
# proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
# let address = await market.getSigner()
# let slot = MockSlot(requestId: request.id,
# slotIndex: slotIdx,
# proof: proof,
# host: address)
# market.filled.add slot
# market.slotState[slotId(request.id, slotIdx)] = SlotState.Filled
market.requested.add(request)
# # test "moves to SaleErrored when SaleFilled errors":
# # let agent = newSalesAgent()
# # market.slotState[slotId] = SlotState.Free
# # await agent.switchAsync(SaleUnknown())
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of UnexpectedSlotError
# # check state.error.msg == "slot state on chain should not be 'free'"
market.requestState[request.id] = requestState
market.slotState[slotId] = slotState
# # test "moves to SaleFilled>SaleFinished when slot state is Filled":
# # let agent = newSalesAgent()
# # await fillSlot()
# # await agent.switchAsync(SaleUnknown())
# # check (agent.state as SaleFinished).isSome
let slotId = slotId(request.id, slotIdx)
let req = if restoredFromChain: some request else: none StorageRequest
let agent = sales.newSalesAgent(slotIdx,
some availability,
request.id,
req,
requestState,
slotState,
restoredFromChain)
return agent
# # test "moves to SaleFinished when slot state is Finished":
# # let agent = newSalesAgent()
# # await fillSlot()
# # market.slotState[slotId] = SlotState.Finished
# # agent.switch(SaleUnknown())
# # check (agent.state as SaleFinished).isSome
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
let address = await market.getSigner()
market.fillSlot(request.id,
slotIdx,
proof,
address)
# market.filled.add slot
# market.slotState[slotId(request.id, slotIdx)] = SlotState.Filled
# # test "moves to SaleFinished when slot state is Paid":
# # let agent = newSalesAgent()
# # market.slotState[slotId] = SlotState.Paid
# # agent.switch(SaleUnknown())
# # check (agent.state as SaleFinished).isSome
test "moves to SaleErrored when request is missing":
let agent = newSalesAgent(restoredFromChain = true)
market.requested.keepItIf(it != request)
agent.request = none StorageRequest
await agent.start()
check eventually agent.state of SaleErrored
check agent.lastError of SaleUnknownError
check agent.lastError.msg == "missing request"
# # test "moves to SaleErrored when slot state is Failed":
# # let agent = newSalesAgent()
# # market.slotState[slotId] = SlotState.Failed
# # agent.switch(SaleUnknown())
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleFailedError
# # check state.error.msg == "Sale failed"
test "moves to SaleErrored when restoring state and slot state on chain is free":
let agent = newSalesAgent(restoredFromChain = true)
await agent.start()
check eventually agent.state of SaleErrored
check agent.lastError of UnexpectedSlotError
check agent.lastError.msg == "slot state on chain should not be 'free'"
# # test "moves to SaleErrored when Downloading and request expires":
# # sales.onStore = proc(request: StorageRequest,
# # slot: UInt256,
# # availability: ?Availability) {.async.} =
# # await sleepAsync(chronos.minutes(1)) # "far" in the future
# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await agent.switchAsync(SaleDownloading())
# # clock.set(request.expiry.truncate(int64))
# # await sleepAsync chronos.seconds(2)
test "moves to SaleFilled>SaleErrored when slot not actually filled":
let agent = newSalesAgent(restoredFromChain = true,
slotState = SlotState.Filled)
await agent.start()
check eventually agent.state of SaleErrored
check agent.lastError of SaleFilledError
check agent.lastError.msg == "Filled slot has no host address"
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleTimeoutError
# # check state.error.msg == "Sale cancelled due to timeout"
test "moves to SaleFilled>SaleFinished when slot state is Filled":
await fillSlot()
let agent = newSalesAgent(restoredFromChain = true,
slotState = SlotState.Filled)
await agent.start()
check eventually agent.state of SaleFinished
# # test "moves to SaleErrored when Downloading and request fails":
# # sales.onStore = proc(request: StorageRequest,
# # slot: UInt256,
# # availability: ?Availability) {.async.} =
# # await sleepAsync(chronos.minutes(1)) # "far" in the future
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await agent.switchAsync(SaleDownloading())
# # market.emitRequestFailed(request.id)
# # await sleepAsync chronos.seconds(2)
test "moves to SaleFinished when slot state is Paid":
let agent = newSalesAgent(restoredFromChain = true, slotState = SlotState.Paid)
await agent.start()
check eventually agent.state of SaleFinished
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleFailedError
# # check state.error.msg == "Sale failed"
test "moves to SaleErrored when slot state is Failed":
let agent = newSalesAgent(restoredFromChain = true, slotState = SlotState.Failed)
await agent.start()
check eventually agent.state of SaleErrored
check agent.lastError of SaleFailedError
check agent.lastError.msg == "Sale failed"
# # test "moves to SaleErrored when Filling and request expires":
# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await agent.switchAsync(SaleFilling())
# # clock.set(request.expiry.truncate(int64))
# # await sleepAsync chronos.seconds(2)
test "moves to SaleErrored when Downloading and request expires":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
await sleepAsync(chronos.minutes(1)) # "far" in the future
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
let agent = newSalesAgent(requestState = RequestState.New)
await agent.start()
await sleepAsync 1.millis
clock.set(request.expiry.truncate(int64))
# await sleepAsync chronos.seconds(2)
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleTimeoutError
# # check state.error.msg == "Sale cancelled due to timeout"
check eventually agent.state of SaleErrored
check agent.lastError of SaleTimeoutError
check agent.lastError.msg == "Sale cancelled due to timeout"
# # test "moves to SaleErrored when Filling and request fails":
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await agent.switchAsync(SaleFilling())
# # market.emitRequestFailed(request.id)
# # await sleepAsync chronos.seconds(2)
test "moves to SaleErrored when Downloading and request fails":
let agent = newSalesAgent()
await agent.start()
# await sleepAsync 1.millis
market.emitRequestFailed(request.id)
# await sleepAsync chronos.seconds(2)
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleFailedError
# # check state.error.msg == "Sale failed"
check eventually agent.state of SaleErrored
check agent.lastError of SaleFailedError
check agent.lastError.msg == "Sale failed"
# # test "moves to SaleErrored when Finished and request expires":
# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.FinishedswitchAsync
# # await agent.switchAsync(SaleFinished())
# # clock.set(request.expiry.truncate(int64))
# # await sleepAsync chronos.seconds(2)
# test "moves to SaleErrored when Filling and request expires":
# request.expiry = (getTime() + 2.seconds).toUnix.u256
# let agent = newSalesAgent()
# # agent.request.setValue(some request) # allows skip of SaleUnknown state
# await agent.start(agent.getState(SaleState.SaleFilling))
# # check eventually agent.state of SaleFilling
# # echo ">>> we're in SaleFilling!"
# # echo "setting clock expired"
# clock.set(request.expiry.truncate(int64))
# # else: echo ">>> we never got to SaleFilling"
# # await sleepAsync chronos.seconds(2)
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleTimeoutError
# # check state.error.msg == "Sale cancelled due to timeout"
# check eventually agent.state of SaleErrored
# check agent.lastError of SaleTimeoutError
# check agent.lastError.msg == "Sale cancelled due to timeout"
# # test "moves to SaleErrored when Finished and request fails":
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.Finished
# # await agent.switchAsync(SaleFinished())
# # market.emitRequestFailed(request.id)
# # await sleepAsync chronos.seconds(2)
# test "moves to SaleErrored when Filling and request fails":
# let agent = newSalesAgent()
# await agent.start(request.ask.slots)
# market.requested.add request
# market.requestState[request.id] = RequestState.New
# await agent.switchAsync(SaleFilling())
# market.emitRequestFailed(request.id)
# await sleepAsync chronos.seconds(2)
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleFailedError
# # check state.error.msg == "Sale failed"
# without state =? (agent.state as SaleErrored):
# fail()
# check state.error of SaleFailedError
# check state.error.msg == "Sale failed"
# # test "moves to SaleErrored when Proving and request expires":
# # sales.onProve = proc(request: StorageRequest,
# # slot: UInt256): Future[seq[byte]] {.async.} =
# # await sleepAsync(chronos.minutes(1)) # "far" in the future
# # return @[]
# # request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await agent.switchAsync(SaleProving())
# # clock.set(request.expiry.truncate(int64))
# # await sleepAsync chronos.seconds(2)
# test "moves to SaleErrored when Finished and request expires":
# request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
# let agent = newSalesAgent()
# await agent.start(request.ask.slots)
# market.requested.add request
# market.requestState[request.id] = RequestState.FinishedswitchAsync
# await agent.switchAsync(SaleFinished())
# clock.set(request.expiry.truncate(int64))
# await sleepAsync chronos.seconds(2)
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleTimeoutError
# # check state.error.msg == "Sale cancelled due to timeout"
# without state =? (agent.state as SaleErrored):
# fail()
# check state.error of SaleTimeoutError
# check state.error.msg == "Sale cancelled due to timeout"
# # test "moves to SaleErrored when Proving and request fails":
# # sales.onProve = proc(request: StorageRequest,
# # slot: UInt256): Future[seq[byte]] {.async.} =
# # await sleepAsync(chronos.minutes(1)) # "far" in the future
# # return @[]
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await agent.switchAsync(SaleProving())
# # market.emitRequestFailed(request.id)
# # await sleepAsync chronos.seconds(2)
# test "moves to SaleErrored when Finished and request fails":
# let agent = newSalesAgent()
# await agent.start(request.ask.slots)
# market.requested.add request
# market.requestState[request.id] = RequestState.Finished
# await agent.switchAsync(SaleFinished())
# market.emitRequestFailed(request.id)
# await sleepAsync chronos.seconds(2)
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of SaleFailedError
# # check state.error.msg == "Sale failed"
# without state =? (agent.state as SaleErrored):
# fail()
# check state.error of SaleFailedError
# check state.error.msg == "Sale failed"
# # test "moves to SaleErrored when Downloading and slot is filled by another host":
# # sales.onStore = proc(request: StorageRequest,
# # slot: UInt256,
# # availability: ?Availability) {.async.} =
# # await sleepAsync(chronos.minutes(1)) # "far" in the future
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await agent.switchAsync(SaleDownloading())
# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
# # await sleepAsync chronos.seconds(2)
# test "moves to SaleErrored when Proving and request expires":
# sales.onProve = proc(request: StorageRequest,
# slot: UInt256): Future[seq[byte]] {.async.} =
# await sleepAsync(chronos.minutes(1)) # "far" in the future
# return @[]
# request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
# let agent = newSalesAgent()
# await agent.start(request.ask.slots)
# market.requested.add request
# market.requestState[request.id] = RequestState.New
# await agent.switchAsync(SaleProving())
# clock.set(request.expiry.truncate(int64))
# await sleepAsync chronos.seconds(2)
# # let state = (agent.state as SaleErrored)
# # check state.isSome
# # check (!state).error.msg == "Slot filled by other host"
# without state =? (agent.state as SaleErrored):
# fail()
# check state.error of SaleTimeoutError
# check state.error.msg == "Sale cancelled due to timeout"
# # test "moves to SaleErrored when Proving and slot is filled by another host":
# # sales.onProve = proc(request: StorageRequest,
# # slot: UInt256): Future[seq[byte]] {.async.} =
# # await sleepAsync(chronos.minutes(1)) # "far" in the future
# # return @[]
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await agent.switchAsync(SaleProving())
# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
# # await sleepAsync chronos.seconds(2)
# test "moves to SaleErrored when Proving and request fails":
# sales.onProve = proc(request: StorageRequest,
# slot: UInt256): Future[seq[byte]] {.async.} =
# await sleepAsync(chronos.minutes(1)) # "far" in the future
# return @[]
# let agent = newSalesAgent()
# await agent.start(request.ask.slots)
# market.requested.add request
# market.requestState[request.id] = RequestState.New
# await agent.switchAsync(SaleProving())
# market.emitRequestFailed(request.id)
# await sleepAsync chronos.seconds(2)
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of HostMismatchError
# # check state.error.msg == "Slot filled by other host"
# without state =? (agent.state as SaleErrored):
# fail()
# check state.error of SaleFailedError
# check state.error.msg == "Sale failed"
# # test "moves to SaleErrored when Filling and slot is filled by another host":
# # sales.onProve = proc(request: StorageRequest,
# # slot: UInt256): Future[seq[byte]] {.async.} =
# # await sleepAsync(chronos.minutes(1)) # "far" in the future
# # return @[]
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
# # await agent.switchAsync(SaleFilling())
# # await sleepAsync chronos.seconds(2)
# test "moves to SaleErrored when Downloading and slot is filled by another host":
# sales.onStore = proc(request: StorageRequest,
# slot: UInt256,
# availability: ?Availability) {.async.} =
# await sleepAsync(chronos.minutes(1)) # "far" in the future
# let agent = newSalesAgent()
# await agent.start(request.ask.slots)
# market.requested.add request
# market.requestState[request.id] = RequestState.New
# await agent.switchAsync(SaleDownloading())
# market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
# await sleepAsync chronos.seconds(2)
# # without state =? (agent.state as SaleErrored):
# # fail()
# # check state.error of HostMismatchError
# # check state.error.msg == "Slot filled by other host"
# let state = (agent.state as SaleErrored)
# check state.isSome
# check (!state).error.msg == "Slot filled by other host"
# # test "moves from SaleDownloading to SaleFinished, calling necessary callbacks":
# # var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool
# # sales.onProve = proc(request: StorageRequest,
# # slot: UInt256): Future[seq[byte]] {.async.} =
# # onProveCalled = true
# # return @[]
# # sales.onStore = proc(request: StorageRequest,
# # slot: UInt256,
# # availability: ?Availability) {.async.} =
# # onStoreCalled = true
# # sales.onClear = proc(availability: ?Availability,
# # request: StorageRequest,
# # slotIndex: UInt256) =
# # onClearCalled = true
# # sales.onSale = proc(availability: ?Availability,
# # request: StorageRequest,
# # slotIndex: UInt256) =
# # onSaleCalled = true
# test "moves to SaleErrored when Proving and slot is filled by another host":
# sales.onProve = proc(request: StorageRequest,
# slot: UInt256): Future[seq[byte]] {.async.} =
# await sleepAsync(chronos.minutes(1)) # "far" in the future
# return @[]
# let agent = newSalesAgent()
# await agent.start(request.ask.slots)
# market.requested.add request
# market.requestState[request.id] = RequestState.New
# await agent.switchAsync(SaleProving())
# market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
# await sleepAsync chronos.seconds(2)
# # let agent = newSalesAgent()
# # await agent.start(request.ask.slots)
# # market.requested.add request
# # market.requestState[request.id] = RequestState.New
# # await fillSlot(agent.slotIndex)
# # await agent.switchAsync(SaleDownloading())
# # market.emitRequestFulfilled(request.id)
# # await sleepAsync chronos.seconds(2)
# without state =? (agent.state as SaleErrored):
# fail()
# check state.error of HostMismatchError
# check state.error.msg == "Slot filled by other host"
# # without state =? (agent.state as SaleFinished):
# # fail()
# # check onProveCalled
# # check onStoreCalled
# # check not onClearCalled
# # check onSaleCalled
# test "moves to SaleErrored when Filling and slot is filled by another host":
# sales.onProve = proc(request: StorageRequest,
# slot: UInt256): Future[seq[byte]] {.async.} =
# await sleepAsync(chronos.minutes(1)) # "far" in the future
# return @[]
# let agent = newSalesAgent()
# await agent.start(request.ask.slots)
# market.requested.add request
# market.requestState[request.id] = RequestState.New
# market.fillSlot(request.id, agent.slotIndex, proof, Address.example)
# await agent.switchAsync(SaleFilling())
# await sleepAsync chronos.seconds(2)
# test "loads active slots from market":
# let me = await market.getSigner()
# without state =? (agent.state as SaleErrored):
# fail()
# check state.error of HostMismatchError
# check state.error.msg == "Slot filled by other host"
# request.ask.slots = 2
# market.requested = @[request]
# market.requestState[request.id] = RequestState.New
test "moves from SaleDownloading to SaleFinished, calling necessary callbacks":
var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool
sales.onProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.async.} =
onProveCalled = true
return array[32, byte].example.toSeq
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
onStoreCalled = true
sales.onClear = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) =
onClearCalled = true
sales.onSale = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) =
onSaleCalled = true
# let slot0 = MockSlot(requestId: request.id,
# slotIndex: 0.u256,
# proof: proof,
# host: me)
# await fillSlot(slot0.slotIndex)
let agent = newSalesAgent()
# market.requested.add request
# market.requestState[request.id] = RequestState.New
await fillSlot(agent.slotIndex)
await agent.start()
# await agent.switchAsync(SaleDownloading())
market.emitRequestFulfilled(request.id)
await sleepAsync chronos.seconds(2)
# let slot1 = MockSlot(requestId: request.id,
# slotIndex: 1.u256,
# proof: proof,
# host: me)
# await fillSlot(slot1.slotIndex)
# market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)]
# market.requested = @[request]
# market.activeRequests[me] = @[request.id]
check eventually agent.state of SaleFinished
check onProveCalled
check onStoreCalled
check not onClearCalled
check onSaleCalled
# await sales.load()
# let expected = SalesAgent(sales: sales,
# requestId: request.id,
# availability: none Availability,
# request: some request)
# # because sales.load() calls agent.start, we won't know the slotIndex
# # randomly selected for the agent, and we also won't know the value of
# # `failed`/`fulfilled`/`cancelled` futures, so we need to compare
# # the properties we know
# # TODO: when calling sales.load(), slot index should be restored and not
# # randomly re-assigned, so this may no longer be needed
# proc `==` (agent0, agent1: SalesAgent): bool =
# return agent0.sales == agent1.sales and
# agent0.requestId == agent1.requestId and
# agent0.availability == agent1.availability and
# agent0.request == agent1.request
test "loads active slots from market":
let me = await market.getSigner()
# check sales.agents.all(agent => agent == expected)
request.ask.slots = 2
market.requested = @[request]
market.requestState[request.id] = RequestState.New
let slot0 = MockSlot(requestId: request.id,
slotIndex: 0.u256,
proof: proof,
host: me)
await fillSlot(slot0.slotIndex)
let slot1 = MockSlot(requestId: request.id,
slotIndex: 1.u256,
proof: proof,
host: me)
await fillSlot(slot1.slotIndex)
market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)]
market.requested = @[request]
market.activeRequests[me] = @[request.id]
await sales.load()
let expected = SalesAgent(sales: sales,
availability: none Availability,
request: some request)
# because sales.load() calls agent.start, we won't know the slotIndex
# randomly selected for the agent, and we also won't know the value of
# `failed`/`fulfilled`/`cancelled` futures, so we need to compare
# the properties we know
# TODO: when calling sales.load(), slot index should be restored and not
# randomly re-assigned, so this may no longer be needed
proc `==` (agent0, agent1: SalesAgent): bool =
return agent0.sales == agent1.sales and
agent0.availability == agent1.availability and
agent0.request == agent1.request
check sales.agents.all(agent => agent == expected)

View File

@ -1,3 +1,4 @@
import std/typetraits
import pkg/asynctest
import pkg/questionable
import pkg/chronos
@ -75,6 +76,7 @@ suite "async state machines":
MyMachine(m).errored.value
)
])
machine.addState state1, state2, state3, state4, state5, state6
machine.slotsFilled = machine.newTransitionProperty(0)
machine.requestFinished = machine.newTransitionProperty(false)