[marketplace] move sale process to async state machine

This commit is contained in:
Eric Mastro 2022-11-10 16:45:33 +11:00
parent 7c87b72bce
commit fb96a9eb17
No known key found for this signature in database
GPG Key ID: 141E3048D95A4E63
12 changed files with 557 additions and 174 deletions

View File

@ -1,4 +1,3 @@
import std/sequtils
import pkg/questionable
import pkg/upraises
import pkg/stint
@ -9,6 +8,9 @@ import ./market
import ./clock
import ./proving
import ./contracts/requests
import ./sales/salesagent
import ./sales/statemachine
import ./sales/states/[downloading, unknown]
## Sales holds a list of available storage that it may sell.
##
@ -29,45 +31,8 @@ import ./contracts/requests
## | | ---- storage proof ---> |
export stint
type
Sales* = ref object
market: Market
clock: Clock
subscription: ?market.Subscription
available*: seq[Availability]
onStore: ?OnStore
onProve: ?OnProve
onClear: ?OnClear
onSale: ?OnSale
proving: Proving
Availability* = object
id*: array[32, byte]
size*: UInt256
duration*: UInt256
minPrice*: UInt256
SalesAgent = ref object
sales: Sales
requestId: RequestId
ask: StorageAsk
availability: Availability
request: ?StorageRequest
slotIndex: ?UInt256
subscription: ?market.Subscription
running: ?Future[void]
waiting: ?Future[void]
finished: bool
OnStore = proc(request: StorageRequest,
slot: UInt256,
availability: Availability): Future[void] {.gcsafe, upraises: [].}
OnProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].}
OnClear = proc(availability: Availability,
request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSale = proc(availability: Availability,
request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
export salesagent
export statemachine
func new*(_: type Sales,
market: Market,
@ -87,146 +52,47 @@ proc init*(_: type Availability,
doAssert randomBytes(id) == 32
Availability(id: id, size: size, duration: duration, minPrice: minPrice)
proc `onStore=`*(sales: Sales, onStore: OnStore) =
sales.onStore = some onStore
proc `onProve=`*(sales: Sales, onProve: OnProve) =
sales.onProve = some onProve
proc `onClear=`*(sales: Sales, onClear: OnClear) =
sales.onClear = some onClear
proc `onSale=`*(sales: Sales, callback: OnSale) =
sales.onSale = some callback
func add*(sales: Sales, availability: Availability) =
sales.available.add(availability)
func remove*(sales: Sales, availability: Availability) =
sales.available.keepItIf(it != availability)
func findAvailability(sales: Sales, ask: StorageAsk): ?Availability =
for availability in sales.available:
if ask.slotSize <= availability.size and
ask.duration <= availability.duration and
ask.pricePerSlot >= availability.minPrice:
return some availability
proc finish(agent: SalesAgent, success: bool) =
if agent.finished:
return
agent.finished = true
if subscription =? agent.subscription:
asyncSpawn subscription.unsubscribe()
if running =? agent.running:
running.cancel()
if waiting =? agent.waiting:
waiting.cancel()
if success:
if request =? agent.request and
slotIndex =? agent.slotIndex:
agent.sales.proving.add(request.slotId(slotIndex))
if onSale =? agent.sales.onSale:
onSale(agent.availability, request, slotIndex)
else:
if onClear =? agent.sales.onClear and
request =? agent.request and
slotIndex =? agent.slotIndex:
onClear(agent.availability, request, slotIndex)
agent.sales.add(agent.availability)
proc selectSlot(agent: SalesAgent) =
let rng = Rng.instance
let slotIndex = rng.rand(agent.ask.slots - 1)
agent.slotIndex = some slotIndex.u256
proc onSlotFilled(agent: SalesAgent,
requestId: RequestId,
slotIndex: UInt256) {.async.} =
try:
let market = agent.sales.market
let host = await market.getHost(requestId, slotIndex)
let me = await market.getSigner()
agent.finish(success = (host == me.some))
except CatchableError:
agent.finish(success = false)
proc subscribeSlotFilled(agent: SalesAgent, slotIndex: UInt256) {.async.} =
proc onSlotFilled(requestId: RequestId,
slotIndex: UInt256) {.gcsafe, upraises:[].} =
asyncSpawn agent.onSlotFilled(requestId, slotIndex)
let market = agent.sales.market
let subscription = await market.subscribeSlotFilled(agent.requestId,
slotIndex,
onSlotFilled)
agent.subscription = some subscription
proc waitForExpiry(agent: SalesAgent) {.async.} =
without request =? agent.request:
return
await agent.sales.clock.waitUntil(request.expiry.truncate(int64))
agent.finish(success = false)
proc start(agent: SalesAgent) {.async.} =
try:
let sales = agent.sales
let market = sales.market
let availability = agent.availability
without onStore =? sales.onStore:
raiseAssert "onStore callback not set"
without onProve =? sales.onProve:
raiseAssert "onProve callback not set"
sales.remove(availability)
agent.selectSlot()
without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected"
await agent.subscribeSlotFilled(slotIndex)
agent.request = await market.getRequest(agent.requestId)
without request =? agent.request:
agent.finish(success = false)
return
agent.waiting = some agent.waitForExpiry()
await onStore(request, slotIndex, availability)
let proof = await onProve(request, slotIndex)
await market.fillSlot(request.id, slotIndex, proof)
except CancelledError:
raise
except CatchableError as e:
error "SalesAgent failed", msg = e.msg
agent.finish(success = false)
proc handleRequest(sales: Sales, requestId: RequestId, ask: StorageAsk) =
without availability =? sales.findAvailability(ask):
return
let agent = SalesAgent(
sales: sales,
requestId: requestId,
ask: ask,
availability: availability
proc handleRequest(sales: Sales,
requestId: RequestId,
ask: StorageAsk) {.async.} =
let availability = sales.findAvailability(ask)
let agent = newSalesAgent(
sales,
requestId,
availability,
none StorageRequest
)
agent.running = some agent.start()
await agent.init(ask.slots)
await agent.switchAsync(SaleDownloading())
sales.agents.add agent
proc load*(sales: Sales) {.async.} =
let market = sales.market
# TODO: restore availability from disk
let slotIds = await market.mySlots()
for slotId in slotIds:
# TODO: this needs to be optimised
if slot =? await market.getSlot(slotId):
if request =? await market.getRequest(slot.requestId):
let availability = sales.findAvailability(request.ask)
let agent = newSalesAgent(
sales,
slot.requestId,
availability,
some request)
await agent.init(request.ask.slots)
await agent.switchAsync(SaleUnknown())
sales.agents.add agent
proc start*(sales: Sales) {.async.} =
doAssert sales.subscription.isNone, "Sales already started"
proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} =
sales.handleRequest(requestId, ask)
proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[], async.} =
await sales.handleRequest(requestId, ask)
try:
sales.subscription = some await sales.market.subscribeRequests(onRequest)
@ -240,3 +106,7 @@ proc stop*(sales: Sales) {.async.} =
await subscription.unsubscribe()
except CatchableError as e:
warn "Unsubscribe failed", msg = e.msg
for agent in sales.agents:
await agent.deinit()

View File

@ -0,0 +1,85 @@
import pkg/chronos
import pkg/upraises
import pkg/stint
import ./statemachine
import ./states/[downloading, unknown]
import ../contracts/requests
import ../rng
proc newSalesAgent*(sales: Sales,
requestId: RequestId,
availability: ?Availability,
request: ?StorageRequest): SalesAgent =
SalesAgent(
sales: sales,
requestId: requestId,
availability: availability,
request: request)
# fwd declarations
proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.}
proc populateRequest(agent: SalesAgent) {.async.} =
if agent.request.isNone:
agent.request = await agent.sales.market.getRequest(agent.requestId)
proc init*(agent: SalesAgent, numSlots: uint64) {.async.} =
let rng = Rng.instance
let slotIndex = rng.rand(numSlots - 1)
agent.slotIndex = some slotIndex.u256
# TODO: try not to block the thread waiting for the network
await agent.populateRequest()
await agent.subscribeCancellation()
await agent.subscribeFailure()
proc deinit*(agent: SalesAgent) {.async.} =
try:
await agent.fulfilled.unsubscribe()
except CatchableError:
discard
try:
await agent.failed.unsubscribe()
except CatchableError:
discard
if not agent.cancelled.completed:
await agent.cancelled.cancelAndWait()
proc subscribeCancellation*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onCancelled() {.async.} =
let clock = agent.sales.clock
without request =? agent.request:
return
await clock.waitUntil(request.expiry.truncate(int64))
await agent.fulfilled.unsubscribe()
without state =? (agent.state as SaleState):
return
await state.onCancelled(request)
agent.cancelled = onCancelled()
proc onFulfilled(_: RequestId) {.async.} =
agent.cancelled.cancel()
agent.fulfilled =
await market.subscribeFulfillment(agent.requestId, onFulfilled)
proc subscribeFailure*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onFailed(_: RequestId) {.async.} =
without request =? agent.request:
return
without state =? (agent.state as SaleState):
return
await state.onFailed(request)
agent.failed =
await market.subscribeRequestFailed(agent.requestId, onFailed)

View File

@ -0,0 +1,104 @@
import std/sequtils
import pkg/chronos
import pkg/questionable
import pkg/upraises
import ../errors
import ../utils/statemachine
import ../market
import ../clock
import ../proving
import ../contracts/requests
export market
export clock
export statemachine
export proving
type
Sales* = ref object
market*: Market
clock*: Clock
subscription*: ?market.Subscription
available: seq[Availability]
onStore: ?OnStore
onProve: ?OnProve
onClear: ?OnClear
onSale: ?OnSale
proving*: Proving
agents*: seq[SalesAgent]
SalesAgent* = ref object of StateMachineAsync
sales*: Sales
requestId*: RequestId
ask*: StorageAsk
availability*: ?Availability # TODO: when availability persistence is added, change this to not optional
request*: ?StorageRequest
slotIndex*: ?UInt256
failed*: market.Subscription
fulfilled*: market.Subscription
cancelled*: Future[void]
SaleState* = ref object of AsyncState
SaleError* = ref object of CodexError
Availability* = object
id*: array[32, byte]
size*: UInt256
duration*: UInt256
minPrice*: UInt256
AvailabilityChange* = proc(availability: Availability) {.gcsafe, upraises: [].}
# TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all)
RequestEvent* = proc(state: SaleState, request: StorageRequest): Future[void] {.gcsafe, upraises: [].}
OnStore* = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability): Future[void] {.gcsafe, upraises: [].}
OnProve* = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].}
OnClear* = proc(availability: ?Availability,# TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all)
request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSale* = proc(availability: ?Availability, # TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all)
request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
proc `onStore=`*(sales: Sales, onStore: OnStore) =
sales.onStore = some onStore
proc `onProve=`*(sales: Sales, onProve: OnProve) =
sales.onProve = some onProve
proc `onClear=`*(sales: Sales, onClear: OnClear) =
sales.onClear = some onClear
proc `onSale=`*(sales: Sales, callback: OnSale) =
sales.onSale = some callback
proc onStore*(sales: Sales): ?OnStore = sales.onStore
proc onProve*(sales: Sales): ?OnProve = sales.onProve
proc onClear*(sales: Sales): ?OnClear = sales.onClear
proc onSale*(sales: Sales): ?OnSale = sales.onSale
proc available*(sales: Sales): seq[Availability] = sales.available
func add*(sales: Sales, availability: Availability) =
if not sales.available.contains(availability):
sales.available.add(availability)
# TODO: add to disk (persist), serialise to json.
func remove*(sales: Sales, availability: Availability) =
sales.available.keepItIf(it != availability)
# TODO: remove from disk availability, mark as in use by assigning
# a slotId, so that it can be used for restoration (node restart)
func findAvailability*(sales: Sales, ask: StorageAsk): ?Availability =
for availability in sales.available:
if ask.slotSize <= availability.size and
ask.duration <= availability.duration and
ask.pricePerSlot >= availability.minPrice:
return some availability
method onCancelled*(state: SaleState, request: StorageRequest) {.base, async.} =
discard
method onFailed*(state: SaleState, request: StorageRequest) {.base, async.} =
discard

View File

@ -0,0 +1,13 @@
import ../statemachine
import ./errored
type SaleCancelled* = ref object of SaleState
method `$`*(state: SaleCancelled): string = "SaleCancelled"
method enterAsync*(state: SaleCancelled) {.async.} =
without agent =? (state.context as SalesAgent):
raiseAssert "invalid state"
let error = newException(Timeout, "Sale cancelled due to timeout")
await state.switchAsync(SaleErrored(error: error))

View File

@ -0,0 +1,48 @@
import std/sequtils
import ./cancelled
import ./failed
import ./proving
import ./errored
import ../salesagent
import ../statemachine
import ../../market
type
SaleDownloading* = ref object of SaleState
failedSubscription: ?market.Subscription
hasCancelled: ?Future[void]
SaleDownloadingError* = object of SaleError
method `$`*(state: SaleDownloading): string = "SaleDownloading"
method onCancelled*(state: SaleDownloading, request: StorageRequest) {.async.} =
await state.switchAsync(SaleCancelled())
method onFailed*(state: SaleDownloading, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed())
method enterAsync(state: SaleDownloading) {.async.} =
without agent =? (state.context as SalesAgent):
raiseAssert "invalid state"
try:
without onStore =? agent.sales.onStore:
raiseAssert "onStore callback not set"
without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected"
without request =? agent.request:
raiseAssert "no sale request"
await onStore(request, slotIndex, agent.availability)
await state.switchAsync(SaleProving())
except CancelledError:
discard
except CatchableError as e:
let error = newException(SaleDownloadingError,
"unknown sale downloading error",
e)
await state.switchAsync(SaleErrored(error: error))

View File

@ -0,0 +1,25 @@
import chronicles
import ../statemachine
type SaleErrored* = ref object of SaleState
error*: ref CatchableError
method `$`*(state: SaleErrored): string = "SaleErrored"
method enterAsync*(state: SaleErrored) {.async.} =
without agent =? (state.context as SalesAgent):
raiseAssert "invalid state"
if onClear =? agent.sales.onClear and
request =? agent.request and
slotIndex =? agent.slotIndex:
onClear(agent.availability, request, slotIndex)
# TODO: when availability persistence is added, change this to not optional
# NOTE: with this in place, restoring state for a restarted node will
# never free up availability once finished. Persisting availability
# on disk is required for this.
if availability =? agent.availability:
agent.sales.add(availability)
error "Sale error", error=state.error.msg

View File

@ -0,0 +1,12 @@
import ./errored
import ../statemachine
type
SaleFailed* = ref object of SaleState
SaleFailedError* = object of SaleError
method `$`*(state: SaleFailed): string = "SaleFailed"
method enterAsync*(state: SaleFailed) {.async.} =
let error = newException(SaleFailedError, "Sale failed")
await state.switchAsync(SaleErrored(error: error))

View File

@ -0,0 +1,46 @@
import pkg/questionable
import ./errored
import ./finished
import ./cancelled
import ./failed
import ../statemachine
type
SaleFilled* = ref object of SaleState
SaleFilledError* = object of CatchableError
method onCancelled*(state: SaleFilled, request: StorageRequest) {.async.} =
await state.switchAsync(SaleCancelled())
method onFailed*(state: SaleFilled, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed())
method `$`*(state: SaleFilled): string = "SaleFilled"
method enterAsync(state: SaleFilled) {.async.} =
without agent =? (state.context as SalesAgent):
raiseAssert "invalid state"
try:
let market = agent.sales.market
without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected"
if availability =? agent.availability:
agent.sales.remove(availability)
let host = await market.getHost(agent.requestId, slotIndex)
let me = await market.getSigner()
if host == me.some:
await state.switchAsync(SaleFinished())
else:
let error = newException(SaleFilledError, "Sale host mismatch")
await state.switchAsync(SaleErrored(error: error))
except CancelledError:
discard
except CatchableError as e:
let error = newException(SaleFilledError, "sale filled error", e)
await state.switchAsync(SaleErrored(error: error))

View File

@ -0,0 +1,50 @@
import pkg/upraises
import ../../market
import ../statemachine
import ./filled
import ./errored
import ./cancelled
import ./failed
type
SaleFilling* = ref object of SaleState
proof*: seq[byte]
SaleFillingError* = object of CatchableError
method `$`*(state: SaleFilling): string = "SaleFilling"
method onCancelled*(state: SaleFilling, request: StorageRequest) {.async.} =
await state.switchAsync(SaleCancelled())
method onFailed*(state: SaleFilling, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed())
method enterAsync(state: SaleFilling) {.async.} =
without agent =? (state.context as SalesAgent):
raiseAssert "invalid state"
var subscription: market.Subscription
proc onSlotFilled(requestId: RequestId,
slotIndex: UInt256) {.async.} =
await subscription.unsubscribe()
await state.switchAsync(SaleFilled())
try:
let market = agent.sales.market
without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected"
subscription = await market.subscribeSlotFilled(agent.requestId,
slotIndex,
onSlotFilled)
await market.fillSlot(agent.requestId, slotIndex, state.proof)
except CancelledError:
discard
except CatchableError as e:
let error = newException(SaleFillingError, "unknown sale filling error", e)
await state.switchAsync(SaleErrored(error: error))

View File

@ -0,0 +1,42 @@
import pkg/chronos
import ./cancelled
import ./errored
import ./failed
import ../statemachine
type
SaleFinished* = ref object of SaleState
SaleFinishedError* = object of CatchableError
method `$`*(state: SaleFinished): string = "SaleFinished"
method onCancelled*(state: SaleFinished, request: StorageRequest) {.async.} =
await state.switchAsync(SaleCancelled())
method onFailed*(state: SaleFinished, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed())
method enterAsync*(state: SaleFinished) {.async.} =
without agent =? (state.context as SalesAgent):
raiseAssert "invalid state"
try:
if request =? agent.request and
slotIndex =? agent.slotIndex:
agent.sales.proving.add(request.slotId(slotIndex))
if onSale =? agent.sales.onSale:
onSale(agent.availability, request, slotIndex)
# TODO: Keep track of contract completion using local clock. When contract
# has finished, we need to add back availability to the sales module.
# This will change when the state machine is updated to include the entire
# sales process, as well as when availability is persisted, so leaving it
# as a TODO for now.
except CancelledError:
discard
except CatchableError as e:
let error = newException(SaleFinishedError, "sale finished error", e)
await state.switchAsync(SaleErrored(error: error))

View File

@ -0,0 +1,41 @@
import ../statemachine
import ./filling
import ./cancelled
import ./failed
import ./errored
type
SaleProving* = ref object of SaleState
SaleProvingError* = object of CatchableError
method `$`*(state: SaleProving): string = "SaleProving"
method onCancelled*(state: SaleProving, request: StorageRequest) {.async.} =
await state.switchAsync(SaleCancelled())
method onFailed*(state: SaleProving, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed())
method enterAsync(state: SaleProving) {.async.} =
without agent =? (state.context as SalesAgent):
raiseAssert "invalid state"
try:
without request =? agent.request:
raiseAssert "no sale request"
without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected"
without onProve =? agent.sales.onProve:
raiseAssert "onProve callback not set"
let proof = await onProve(request, slotIndex)
await state.switchAsync(SaleFilling(proof: proof))
except CancelledError:
discard
except CatchableError as e:
let error = newException(SaleProvingError, "unknown sale proving error", e)
await state.switchAsync(SaleErrored(error: error))

View File

@ -0,0 +1,47 @@
import ../statemachine
import ./filled
import ./finished
import ./failed
import ./errored
import ./cancelled
type
SaleUnknown* = ref object of SaleState
SaleUnknownError* = object of CatchableError
method `$`*(state: SaleUnknown): string = "SaleUnknown"
method onCancelled*(state: SaleUnknown, request: StorageRequest) {.async.} =
await state.switchAsync(SaleCancelled())
method onFailed*(state: SaleUnknown, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed())
method enterAsync(state: SaleUnknown) {.async.} =
without agent =? (state.context as SalesAgent):
raiseAssert "invalid state"
let market = agent.sales.market
try:
without requestState =? await market.getState(agent.requestId):
raiseAssert "state unknown"
case requestState
of RequestState.New, RequestState.Started:
await state.switchAsync(SaleFilled())
of RequestState.Finished:
await state.switchAsync(SaleFinished())
of RequestState.Cancelled:
await state.switchAsync(SaleCancelled())
of RequestState.Failed:
await state.switchAsync(SaleFailed())
except CancelledError:
discard
except CatchableError as e:
let error = newException(SaleUnknownError,
"error in unknown state",
e)
await state.switchAsync(SaleErrored(error: error))