[purchasing] Rework purchasing as a state machine
This commit is contained in:
parent
7841a9692c
commit
92eecb0702
|
@ -1,46 +1,31 @@
|
|||
import ../market
|
||||
import ../clock
|
||||
import ./statemachine
|
||||
import ./states/pending
|
||||
import ./purchaseid
|
||||
|
||||
export purchaseid
|
||||
# Purchase is implemented as a state machine:
|
||||
#
|
||||
# pending ----> submitted ----------> started
|
||||
# \ \ \
|
||||
# \ \ -----------> cancelled
|
||||
# \ \ \
|
||||
# --------------------------------------> error
|
||||
#
|
||||
|
||||
type
|
||||
Purchase* = ref object
|
||||
future: Future[void]
|
||||
market: Market
|
||||
clock: Clock
|
||||
request*: StorageRequest
|
||||
export Purchase
|
||||
export purchaseid
|
||||
|
||||
func newPurchase*(request: StorageRequest,
|
||||
market: Market,
|
||||
clock: Clock): Purchase =
|
||||
Purchase(request: request, market: market, clock: clock)
|
||||
|
||||
proc run(purchase: Purchase) {.async.} =
|
||||
let market = purchase.market
|
||||
let clock = purchase.clock
|
||||
|
||||
proc requestStorage {.async.} =
|
||||
purchase.request = await market.requestStorage(purchase.request)
|
||||
|
||||
proc waitUntilFulfilled {.async.} =
|
||||
let done = newFuture[void]()
|
||||
proc callback(_: RequestId) =
|
||||
done.complete()
|
||||
let request = purchase.request
|
||||
let subscription = await market.subscribeFulfillment(request.id, callback)
|
||||
await done
|
||||
await subscription.unsubscribe()
|
||||
|
||||
proc withTimeout(future: Future[void]) {.async.} =
|
||||
let expiry = purchase.request.expiry.truncate(int64)
|
||||
await future.withTimeout(clock, expiry)
|
||||
|
||||
await requestStorage()
|
||||
await waitUntilFulfilled().withTimeout()
|
||||
Purchase(
|
||||
future: Future[void].new(),
|
||||
request: request,
|
||||
market: market,
|
||||
clock: clock
|
||||
)
|
||||
|
||||
proc start*(purchase: Purchase) =
|
||||
purchase.future = purchase.run()
|
||||
purchase.switch(PurchasePending())
|
||||
|
||||
proc wait*(purchase: Purchase) {.async.} =
|
||||
await purchase.future
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
import ../utils/statemachine
|
||||
import ../market
|
||||
import ../clock
|
||||
|
||||
export market
|
||||
export clock
|
||||
export statemachine
|
||||
|
||||
type
|
||||
Purchase* = ref object of StateMachine
|
||||
future*: Future[void]
|
||||
market*: Market
|
||||
clock*: Clock
|
||||
request*: StorageRequest
|
||||
PurchaseState* = ref object of AsyncState
|
|
@ -0,0 +1,17 @@
|
|||
import ../statemachine
|
||||
import ./error
|
||||
|
||||
type PurchaseCancelled* = ref object of PurchaseState
|
||||
|
||||
method enterAsync*(state: PurchaseCancelled) {.async.} =
|
||||
without purchase =? (state.context as Purchase):
|
||||
raiseAssert "invalid state"
|
||||
|
||||
try:
|
||||
await purchase.market.withdrawFunds(purchase.request.id)
|
||||
except CatchableError as error:
|
||||
state.switch(PurchaseError(error: error))
|
||||
return
|
||||
|
||||
let error = newException(Timeout, "Purchase cancelled due to timeout")
|
||||
state.switch(PurchaseError(error: error))
|
|
@ -0,0 +1,10 @@
|
|||
import ../statemachine
|
||||
|
||||
type PurchaseError* = ref object of PurchaseState
|
||||
error*: ref CatchableError
|
||||
|
||||
method enter*(state: PurchaseError) =
|
||||
without purchase =? (state.context as Purchase):
|
||||
raiseAssert "invalid state"
|
||||
|
||||
purchase.future.fail(state.error)
|
|
@ -0,0 +1,17 @@
|
|||
import ../statemachine
|
||||
import ./submitted
|
||||
import ./error
|
||||
|
||||
type PurchasePending* = ref object of PurchaseState
|
||||
|
||||
method enterAsync(state: PurchasePending) {.async.} =
|
||||
without purchase =? (state.context as Purchase):
|
||||
raiseAssert "invalid state"
|
||||
|
||||
try:
|
||||
purchase.request = await purchase.market.requestStorage(purchase.request)
|
||||
except CatchableError as error:
|
||||
state.switch(PurchaseError(error: error))
|
||||
return
|
||||
|
||||
state.switch(PurchaseSubmitted())
|
|
@ -0,0 +1,9 @@
|
|||
import ../statemachine
|
||||
|
||||
type PurchaseStarted* = ref object of PurchaseState
|
||||
|
||||
method enter*(state: PurchaseStarted) =
|
||||
without purchase =? (state.context as Purchase):
|
||||
raiseAssert "invalid state"
|
||||
|
||||
purchase.future.complete()
|
|
@ -0,0 +1,37 @@
|
|||
import ../statemachine
|
||||
import ./error
|
||||
import ./started
|
||||
import ./cancelled
|
||||
|
||||
type PurchaseSubmitted* = ref object of PurchaseState
|
||||
|
||||
method enterAsync(state: PurchaseSubmitted) {.async.} =
|
||||
without purchase =? (state.context as Purchase):
|
||||
raiseAssert "invalid state"
|
||||
|
||||
let market = purchase.market
|
||||
let clock = purchase.clock
|
||||
|
||||
proc wait {.async.} =
|
||||
let done = newFuture[void]()
|
||||
proc callback(_: RequestId) =
|
||||
done.complete()
|
||||
let request = purchase.request
|
||||
let subscription = await market.subscribeFulfillment(request.id, callback)
|
||||
await done
|
||||
await subscription.unsubscribe()
|
||||
|
||||
proc withTimeout(future: Future[void]) {.async.} =
|
||||
let expiry = purchase.request.expiry.truncate(int64)
|
||||
await future.withTimeout(clock, expiry)
|
||||
|
||||
try:
|
||||
await wait().withTimeout()
|
||||
except Timeout:
|
||||
state.switch(PurchaseCancelled())
|
||||
return
|
||||
except CatchableError as error:
|
||||
state.switch(PurchaseError(error: error))
|
||||
return
|
||||
|
||||
state.switch(PurchaseStarted())
|
Loading…
Reference in New Issue