[statemachine] WIP switch to declarative machine

- restoring and not restoring state both use the same setup for SalesAgent, by setting the SalesAgent’s state during instantiation, allowing it to react according to its state
- remove returning of next State from run (actual types yet to be changed)
- subscribe to onchain events, and use TransitionProperties to react (TODO: do not react from AnyState, but set to only states that make sense)
- create setError method in asyncstatemachine that handles setting of error properties
This commit is contained in:
Eric Mastro 2023-02-20 21:52:48 +11:00
parent 1a89c11d2c
commit 8defc00bb0
No known key found for this signature in database
GPG Key ID: AD065ECE27A873B9
13 changed files with 230 additions and 140 deletions

View File

@ -7,6 +7,7 @@ import ./rng
import ./market
import ./clock
import ./proving
import ./errors
import ./contracts/requests
import ./sales/salesagent
import ./sales/statemachine
@ -34,6 +35,9 @@ export stint
export salesagent
export statemachine
type
SalesError = object of CodexError
func new*(_: type Sales,
market: Market,
clock: Clock,
@ -72,14 +76,22 @@ proc handleRequest(sales: Sales,
let availability = sales.findAvailability(ask)
# TODO: check if random slot is actually available (not already filled)
let slotIndex = randomSlotIndex(ask.slots)
without request =? await sales.market.getRequest(requestId):
raise newException(SalesError, "Failed to get request on chain")
let me = await sales.market.getSigner()
let agent = newSalesAgent(
sales,
requestId,
slotIndex,
availability,
none StorageRequest
request,
me,
RequestState.New,
SlotState.Free
)
agent.start(SaleStart(next: SaleDownloading()))
await agent.start(SaleUnknown.new())
sales.agents.add agent
proc load*(sales: Sales) {.async.} =
@ -88,6 +100,7 @@ proc load*(sales: Sales) {.async.} =
# TODO: restore availability from disk
let requestIds = await market.myRequests()
let slotIds = await market.mySlots()
let me = await market.getSigner()
for slotId in slotIds:
# TODO: this needs to be optimised
@ -98,13 +111,19 @@ proc load*(sales: Sales) {.async.} =
slotId):
raiseAssert "could not find slot index"
# TODO: should be optimised (maybe get everything in one call: request, request state, slot state)
let requestState = await market.requestState(request.id)
let slotState = await market.slotState(slotId)
let agent = newSalesAgent(
sales,
request.id,
slotIndex,
availability,
some request)
agent.start(SaleStart(next: SaleUnknown()))
request,
me,
requestState,
slotState)
await agent.start(SaleUnknown.new())
sales.agents.add agent
proc start*(sales: Sales) {.async.} =

View File

@ -2,39 +2,146 @@ import pkg/chronos
import pkg/upraises
import pkg/stint
import ./statemachine
import ./states/[cancelled, downloading, errored, failed, finished, filled,
filling, proving, unknown]
import ../contracts/requests
proc newSalesAgent*(sales: Sales,
requestId: RequestId,
slotIndex: UInt256,
availability: ?Availability,
request: ?StorageRequest): SalesAgent =
SalesAgent(
sales: sales,
requestId: requestId,
availability: availability,
slotIndex: slotIndex,
request: request)
request: StorageRequest,
me: Address,
requestState: RequestState,
slotState: SlotState,
restoredFromChain: bool): SalesAgent =
let agent = SalesAgent.new(@[
Transition.new(
SaleUnknown.new(),
SaleDownloading.new(),
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
agent.requestState.value == RequestState.New and
agent.slotState.value == SlotState.Free
),
Transition.new(
AnyState.new(),
SaleCancelled.new(),
proc(m: Machine, s: State): bool =
SalesAgent(m).requestState.value == RequestState.Cancelled
),
Transition.new(
AnyState.new(),
SaleFailed.new(),
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
agent.requestState.value == RequestState.Failed or
agent.slotState.value == SlotState.Failed
),
Transition.new(
AnyState.new(),
SaleFilled.new(),
proc(m: Machine, s: State): bool =
SalesAgent(m).slotState.value == SlotState.Filled
),
Transition.new(
AnyState.new(),
SaleFinished.new(),
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
agent.slotState.value in @[SlotState.Finished, SlotState.Paid] or
agent.requestState.value == RequestState.Finished
),
Transition.new(
AnyState.new(),
SaleErrored.new(),
proc(m: Machine, s: State): bool =
SalesAgent(m).errored.value
),
Transition.new(
SaleDownloading.new(),
SaleProving.new(),
proc(m: Machine, s: State): bool =
SalesAgent(m).downloaded.value
),
Transition.new(
SaleProving.new(),
SaleFilling.new(),
proc(m: Machine, s: State): bool =
SalesAgent(m).proof.value.len > 0 # TODO: proof validity check?
),
Transition.new(
SaleFilled.new(),
SaleFinished.new(),
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
without host =? agent.slotHost.value:
return false
host == agent.me
),
Transition.new(
SaleFilled.new(),
SaleErrored.new(),
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
without host =? agent.slotHost.value:
return false
if host != agent.me:
agent.lastError = newException(HostMismatchError,
"Slot filled by other host")
return true
else: return false
),
Transition.new(
SaleUnknown.new(),
SaleErrored.new(),
proc(m: Machine, s: State): bool =
let agent = SalesAgent(m)
if agent.restoredFromChain and agent.slotState.value == SlotState.Free:
agent.lastError = newException(SaleUnknownError,
"cannot retrieve slot state")
return true
else: return false
),
])
agent.slotState = agent.newTransitionProperty(slotState)
agent.requestState = agent.newTransitionProperty(requestState)
agent.proof = agent.newTransitionProperty(newSeq[byte]())
agent.slotHost = agent.newTransitionProperty(none Address)
agent.downloaded = agent.newTransitionProperty(false)
agent.sales = sales
agent.availability = availability
agent.slotIndex = slotIndex
agent.request = request
agent.me = me
return agent
# proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.}
# proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.}
# proc subscribeSlotFilled*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeSlotFill*(agent: SalesAgent): Future[void] {.gcsafe.}
proc start*(agent: SalesAgent, initialState: State) {.async.} =
await agent.subscribeCancellation()
await agent.subscribeFailure()
await agent.subscribeSlotFill()
procCall Machine(agent).start(initialState)
proc stop*(agent: SalesAgent) {.async.} =
try:
await agent.fulfilled.unsubscribe()
await agent.subscribeFulfilled.unsubscribe()
except CatchableError:
discard
try:
await agent.failed.unsubscribe()
await agent.subscribeFailed.unsubscribe()
except CatchableError:
discard
try:
await agent.slotFilled.unsubscribe()
await agent.subscribeSlotFilled.unsubscribe()
except CatchableError:
discard
if not agent.cancelled.completed:
await agent.cancelled.cancelAndWait()
if not agent.waitForCancelled.completed:
await agent.waitForCancelled.cancelAndWait()
procCall Machine(agent).stop()
proc subscribeCancellation*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
@ -42,20 +149,17 @@ proc subscribeCancellation*(agent: SalesAgent) {.async.} =
proc onCancelled() {.async.} =
let clock = agent.sales.clock
without request =? agent.request:
return
await clock.waitUntil(agent.request.expiry.truncate(int64))
await agent.subscribeFulfilled.unsubscribe()
agent.requestState.setValue(RequestState.Cancelled)
await clock.waitUntil(request.expiry.truncate(int64))
await agent.fulfilled.unsubscribe()
agent.schedule(cancelledEvent(request))
agent.cancelled = onCancelled()
agent.waitForCancelled = onCancelled()
proc onFulfilled(_: RequestId) =
agent.cancelled.cancel()
agent.waitForCancelled.cancel()
agent.fulfilled =
await market.subscribeFulfillment(agent.requestId, onFulfilled)
agent.subscribeFulfilled =
await market.subscribeFulfillment(agent.request.id, onFulfilled)
# TODO: move elsewhere
proc asyncSpawn(future: Future[void], ignore: type CatchableError) =
@ -69,23 +173,33 @@ proc asyncSpawn(future: Future[void], ignore: type CatchableError) =
proc subscribeFailure*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onFailed(_: RequestId) =
without request =? agent.request:
return
asyncSpawn agent.failed.unsubscribe(), ignore = CatchableError
agent.schedule(failedEvent(request))
proc onFailed(_: RequestId) {.upraises:[], gcsafe.} =
asyncSpawn agent.subscribeFailed.unsubscribe(), ignore = CatchableError
try:
agent.requestState.setValue(RequestState.Failed)
except AsyncQueueFullError as e:
raiseAssert "State machine critical failure: " & e.msg
agent.failed =
await market.subscribeRequestFailed(agent.requestId, onFailed)
agent.subscribeFailed =
await market.subscribeRequestFailed(agent.request.id, onFailed)
proc subscribeSlotFilled*(agent: SalesAgent) {.async.} =
proc subscribeSlotFill*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
asyncSpawn agent.slotFilled.unsubscribe(), ignore = CatchableError
agent.schedule(slotFilledEvent(requestId, agent.slotIndex))
proc onSlotFilled(
requestId: RequestId,
slotIndex: UInt256) {.upraises:[], gcsafe.} =
let market = agent.sales.market
asyncSpawn agent.subscribeSlotFilled.unsubscribe(), ignore = CatchableError
try:
agent.slotState.setValue(SlotState.Filled)
except AsyncQueueFullError as e:
raiseAssert "State machine critical failure: " & e.msg
agent.subscribeSlotFilled =
await market.subscribeSlotFilled(agent.request.id,
agent.slotIndex,
onSlotFilled)
agent.slotFilled =
await market.subscribeSlotFilled(agent.requestId,
agent.slotIndex,
onSlotFilled)

View File

@ -28,15 +28,21 @@ type
agents*: seq[SalesAgent]
SalesAgent* = ref object of Machine
sales*: Sales
requestId*: RequestId
ask*: StorageAsk
availability*: ?Availability # TODO: when availability persistence is added, change this to not optional
request*: ?StorageRequest
request*: StorageRequest
slotIndex*: UInt256
failed*: market.Subscription
fulfilled*: market.Subscription
slotFilled*: market.Subscription
cancelled*: Future[void]
subscribeFailed*: market.Subscription
subscribeFulfilled*: market.Subscription
subscribeSlotFilled*: market.Subscription
waitForCancelled*: Future[void]
me*: Address
restoredFromChain*: bool
slotState*: TransitionProperty[SlotState]
requestState*: TransitionProperty[RequestState]
proof*: TransitionProperty[seq[byte]]
slotHost*: TransitionProperty[?Address]
downloaded*: TransitionProperty[bool]
SaleState* = ref object of State
SaleError* = ref object of CodexError
Availability* = object

View File

@ -10,4 +10,4 @@ method `$`*(state: SaleCancelled): string = "SaleCancelled"
method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
let error = newException(SaleTimeoutError, "Sale cancelled due to timeout")
return some State(SaleErrored(error: error))
machine.setError(error)

View File

@ -33,14 +33,11 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
without onStore =? agent.sales.onStore:
raiseAssert "onStore callback not set"
without request =? agent.request:
raiseAssert "no sale request"
if availability =? agent.availability:
agent.sales.remove(availability)
await onStore(request, agent.slotIndex, agent.availability)
return some State(SaleProving())
await onStore(agent.request, agent.slotIndex, agent.availability)
agent.downloaded.setValue(true)
except CancelledError:
raise
@ -49,4 +46,4 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
let error = newException(SaleDownloadingError,
"unknown sale downloading error",
e)
return some State(SaleErrored(error: error))
machine.setError(error)

View File

@ -2,7 +2,6 @@ import chronicles
import ../statemachine
type SaleErrored* = ref object of SaleState
error*: ref CatchableError
method `$`*(state: SaleErrored): string = "SaleErrored"
@ -21,4 +20,4 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
if availability =? agent.availability:
agent.sales.add(availability)
error "Sale error", error=state.error.msg
error "Sale error", error=agent.lastError.msg

View File

@ -9,4 +9,4 @@ method `$`*(state: SaleFailed): string = "SaleFailed"
method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} =
let error = newException(SaleFailedError, "Sale failed")
return some State(SaleErrored(error: error))
machine.setError error

View File

@ -24,17 +24,12 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
try:
let market = agent.sales.market
let host = await market.getHost(agent.requestId, agent.slotIndex)
let me = await market.getSigner()
if host == me.some:
return some State(SaleFinished())
else:
let error = newException(HostMismatchError, "Slot filled by other host")
return some State(SaleErrored(error: error))
let slotHost = await market.getHost(agent.request.id, agent.slotIndex)
agent.slotHost.setValue(slotHost)
except CancelledError:
raise
except CatchableError as e:
let error = newException(SaleFilledError, "sale filled error", e)
return some State(SaleErrored(error: error))
agent.setError error

View File

@ -29,7 +29,7 @@ method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
try:
let market = agent.sales.market
await market.fillSlot(agent.requestId, agent.slotIndex, state.proof)
await market.fillSlot(agent.request.id, agent.slotIndex, state.proof)
except CancelledError:
raise

View File

@ -25,18 +25,15 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
try:
without request =? agent.request:
raiseAssert "no sale request"
without onProve =? agent.sales.onProve:
raiseAssert "onProve callback not set"
let proof = await onProve(request, agent.slotIndex)
return some State(SaleFilling(proof: proof))
let proof = await onProve(agent.request, agent.slotIndex)
agent.proof.setValue(proof)
except CancelledError:
raise
except CatchableError as e:
let error = newException(SaleProvingError, "unknown sale proving error", e)
return some State(SaleErrored(error: error))
machine.setError error

View File

@ -1,35 +0,0 @@
import ./downloading
import ./cancelled
import ./failed
import ./filled
import ../statemachine
type
SaleStart* = ref object of SaleState
next*: SaleState
method `$`*(state: SaleStart): string = "SaleStart"
method onCancelled*(state: SaleStart, request: StorageRequest): ?State =
return some State(SaleCancelled())
method onFailed*(state: SaleStart, request: StorageRequest): ?State =
return some State(SaleFailed())
method onSlotFilled*(state: SaleStart, requestId: RequestId,
slotIndex: UInt256): ?State =
return some State(SaleFilled())
proc retrieveRequest(agent: SalesAgent) {.async.} =
if agent.request.isNone:
agent.request = await agent.sales.market.getRequest(agent.requestId)
# TODO: remove machine from this method, pass salesagent via constructor instead
method run*(state: SaleStart, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
await agent.retrieveRequest()
# TODO: re-enable and fix this:
# await agent.subscribeCancellation()
# await agent.subscribeFailure()
# await agent.subscribeSlotFilled()
return some State(state.next)

View File

@ -22,30 +22,25 @@ method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
let market = agent.sales.market
try:
let slotId = slotId(agent.requestId, agent.slotIndex)
# try:
# let slotId = slotId(agent.request.id, agent.slotIndex)
without slotState =? await market.slotState(slotId):
let error = newException(SaleUnknownError, "cannot retrieve slot state")
return some State(SaleErrored(error: error))
# without slotState =? await market.slotState(slotId):
# let error = newException(SaleUnknownError, "cannot retrieve slot state")
# agent.setError error
case slotState
of SlotState.Free:
let error = newException(UnexpectedSlotError,
"slot state on chain should not be 'free'")
return some State(SaleErrored(error: error))
of SlotState.Filled:
return some State(SaleFilled())
of SlotState.Finished, SlotState.Paid:
return some State(SaleFinished())
of SlotState.Failed:
return some State(SaleFailed())
# if slotState == SlotState.Free:
# let error = newException(UnexpectedSlotError,
# "slot state on chain should not be 'free'")
# agent.setError error
except CancelledError:
raise
# agent.slotState.setValue(slotState)
except CatchableError as e:
let error = newException(SaleUnknownError,
"error in unknown state",
e)
return some State(SaleErrored(error: error))
# except CancelledError:
# raise
# except CatchableError as e:
# let error = newException(SaleUnknownError,
# "error in unknown state",
# e)
# agent.setError error

View File

@ -60,6 +60,11 @@ proc setValue*[T](prop: TransitionProperty[T], value: T) =
prop.value = value
prop.machine.checkTransitions()
proc setError*(machine: Machine, error: ref CatchableError) =
machine.errored.setValue(true) # triggers transitions
machine.errored.value = false # clears error without triggering transitions
machine.lastError = error # stores error in state
method run*(state: State): Future[?State] {.base, upraises:[].} =
discard
@ -75,11 +80,9 @@ proc scheduler(machine: Machine) {.async.} =
var fut = cast[FutureBase](udata)
if fut.failed():
try:
machine.errored.setValue(true) # triggers transitions
machine.errored.value = false # clears error without triggering transitions
machine.lastError = fut.error # stores error in state
machine.setError(fut.error)
except AsyncQueueFullError as e:
error "Cannot set transition value because queue is full", error = e
error "Cannot set transition value because queue is full", error = e.msg
try:
while true: