[marketplace] restart sales state machine if slot is filled by other host

When a slot is filled by other host, the state machine should not park in the `SaleErrored` state, and instead moves to the `SaleRestart` state, where it calls `onSaleRestart`. The sales module handles `onSaleRestart`, restarting the state machine, getting a new slot index while ignoring the previously attempting slot index.
This commit is contained in:
Eric Mastro 2023-05-09 14:24:31 +10:00
parent 92dc770fa2
commit 5da232f33a
No known key found for this signature in database
15 changed files with 116 additions and 32 deletions

View File

@ -3,6 +3,8 @@ import pkg/contractabi
import pkg/nimcrypto
import pkg/ethers/fields
import pkg/questionable/results
import pkg/json_serialization
import pkg/upraises
export contractabi
@ -193,3 +195,17 @@ func price*(request: StorageRequest): UInt256 =
func size*(ask: StorageAsk): UInt256 =
ask.slots.u256 * ask.slotSize
proc writeValue*(
writer: var JsonWriter,
value: SlotId | RequestId) {.upraises:[IOError].} =
mixin writeValue
writer.writeValue value.toArray
proc readValue*[T: SlotId | RequestId](
reader: var JsonReader,
value: var T) {.upraises: [SerializationError, IOError].} =
mixin readValue
value = T reader.readValue(T.distinctBase)

View File

@ -73,8 +73,6 @@ func new*(_: type Sales,
reservations: Reservations.new(repo)
))
proc handleRequest(sales: Sales,
requestId: RequestId,
ask: StorageAsk) =
@ -89,6 +87,11 @@ proc handleRequest(sales: Sales,
none UInt256,
none StorageRequest
)
agent.context.onStartOver =
proc(slotIndex: UInt256) {.gcsafe, upraises:[], async.} =
await agent.stop()
agent.start(SalePreparing(ignoreSlotIndex: some slotIndex))
agent.context.onIgnored = proc {.gcsafe, upraises:[].} =
sales.agents.keepItIf(it != agent)
agent.start(SalePreparing())

View File

@ -97,12 +97,12 @@ proc toErr[E1: ref CatchableError, E2: AvailabilityError](
proc writeValue*(
writer: var JsonWriter,
value: SlotId | AvailabilityId) {.upraises:[IOError].} =
value: AvailabilityId) {.upraises:[IOError].} =
mixin writeValue
writer.writeValue value.toArray
proc readValue*[T: SlotId | AvailabilityId](
proc readValue*[T: AvailabilityId](
reader: var JsonReader,
value: var T) {.upraises: [SerializationError, IOError].} =

View File

@ -51,8 +51,11 @@ proc nextRandom(sample: openArray[uint64]): uint64 =
let rng = Rng.instance
return rng.sample(sample)
proc assignRandomSlotIndex*(agent: SalesAgent,
numSlots: uint64): Future[?!void] {.async.} =
proc assignRandomSlotIndex*(
agent: SalesAgent,
numSlots: uint64,
ignoreSlotIndex: ?UInt256 = none UInt256): Future[?!void] {.async.} =
let market = agent.context.market
let data = agent.data
@ -63,6 +66,8 @@ proc assignRandomSlotIndex*(agent: SalesAgent,
var idx: UInt256
var sample = toSeq(0'u64..<numSlots)
if ignored =? ignoreSlotIndex:
sample.keepItIf(it != ignored.truncate(uint64))
while true:
if sample.len == 0:
@ -121,7 +126,8 @@ proc subscribeFailure(agent: SalesAgent) {.async.} =
proc subscribeSlotFilled(agent: SalesAgent) {.async.} =
let data = agent.data
let market = agent.context.market
let context = agent.context
let market = context.market
without slotIndex =? data.slotIndex:
raiseAssert("no slot index assigned")

View File

@ -15,6 +15,7 @@ type
onClear*: ?OnClear
onSale*: ?OnSale
onIgnored*: OnIgnored
onStartOver*: OnStartOver
proving*: Proving
reservations*: Reservations
@ -28,3 +29,4 @@ type
OnSale* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnIgnored* = proc() {.gcsafe, upraises: [].}
OnStartOver* = proc(slotIndex: UInt256): Future[void] {.gcsafe, upraises: [].}

View File

@ -8,9 +8,9 @@ import ../statemachine
import ./errorhandling
import ./cancelled
import ./failed
import ./filled
import ./proving
import ./errored
import ./restart
type
SaleDownloading* = ref object of ErrorHandlingState
@ -29,7 +29,8 @@ method onFailed*(state: SaleDownloading, request: StorageRequest): ?State =
method onSlotFilled*(state: SaleDownloading, requestId: RequestId,
slotIndex: UInt256): ?State =
return some State(SaleFilled())
notice "Slot filled by other host, starting over", requestId, slotIndex
return some State(SaleRestart())
method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)

View File

@ -1,16 +1,20 @@
import pkg/chronicles
import pkg/questionable
import ../statemachine
import ../salesagent
import ./errorhandling
import ./errored
import ./finished
import ./cancelled
import ./failed
import ./restart
type
SaleFilled* = ref object of ErrorHandlingState
HostMismatchError* = object of CatchableError
logScope:
topics = "sales filled"
method onCancelled*(state: SaleFilled, request: StorageRequest): ?State =
return some State(SaleCancelled())
@ -20,8 +24,10 @@ method onFailed*(state: SaleFilled, request: StorageRequest): ?State =
method `$`*(state: SaleFilled): string = "SaleFilled"
method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
let data = SalesAgent(machine).data
let market = SalesAgent(machine).context.market
let agent = SalesAgent(machine)
let context = agent.context
let data = agent.data
let market = context.market
without slotIndex =? data.slotIndex:
raiseAssert("no slot index assigned")
@ -31,5 +37,4 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
if host == me.some:
return some State(SaleFinished())
else:
let error = newException(HostMismatchError, "Slot filled by other host")
return some State(SaleErrored(error: error))
return some State(SaleRestart())

View File

@ -7,16 +7,17 @@ import ../statemachine
import ./errorhandling
import ./cancelled
import ./failed
import ./filled
import ./ignored
import ./downloading
import ./errored
import ./restart
type
SalePreparing* = ref object of ErrorHandlingState
ignoreSlotIndex*: ?UInt256
logScope:
topics = "sales preparing"
topics = "sales preparing"
method `$`*(state: SalePreparing): string = "SaleDownloading"
@ -28,7 +29,8 @@ method onFailed*(state: SalePreparing, request: StorageRequest): ?State =
method onSlotFilled*(state: SalePreparing, requestId: RequestId,
slotIndex: UInt256): ?State =
return some State(SaleFilled())
notice "Slot filled by other host, starting over", requestId, slotIndex
return some State(SaleRestart())
method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
@ -41,7 +43,8 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
without slots =? agent.data.request.?ask.?slots:
raiseAssert "missing request slots"
if err =? (await agent.assignRandomSlotIndex(slots)).errorOption:
let fut = agent.assignRandomSlotIndex(slots, state.ignoreSlotIndex)
if err =? (await fut).errorOption:
if err of AllSlotsFilledError:
return some State(SaleIgnored())
return some State(SaleErrored(error: err))

View File

@ -1,14 +1,18 @@
import pkg/chronicles
import ../statemachine
import ../salesagent
import ./errorhandling
import ./filling
import ./cancelled
import ./failed
import ./filled
import ./restart
type
SaleProving* = ref object of ErrorHandlingState
logScope:
topics = "sales proving"
method `$`*(state: SaleProving): string = "SaleProving"
method onCancelled*(state: SaleProving, request: StorageRequest): ?State =
@ -19,7 +23,8 @@ method onFailed*(state: SaleProving, request: StorageRequest): ?State =
method onSlotFilled*(state: SaleProving, requestId: RequestId,
slotIndex: UInt256): ?State =
return some State(SaleFilled())
notice "Slot filled by other host, starting over", requestId, slotIndex
return some State(SaleRestart())
method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
let data = SalesAgent(machine).data

View File

@ -0,0 +1,25 @@
import pkg/chronicles
import ../statemachine
import ../salesagent
import ./errorhandling
type
SaleRestart* = ref object of ErrorHandlingState
logScope:
topics = "sales restart"
method `$`*(state: SaleRestart): string = "SaleRestart"
method run*(state: SaleRestart, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
without slotIndex =? data.slotIndex:
raiseAssert("no slot index assigned")
if onStartOver =? context.onStartOver:
notice "Slot filled by other host, starting over",
requestId = data.requestId, slotIndex
await onStartOver(slotIndex)

View File

@ -4,7 +4,7 @@ import pkg/codex/contracts/requests
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled
import pkg/codex/sales/states/restart
import ../../examples
suite "sales state 'downloading'":
@ -24,6 +24,6 @@ suite "sales state 'downloading'":
let next = state.onFailed(request)
check !next of SaleFailed
test "switches to filled state when slot is filled":
test "switches to restart state when slot is filled":
let next = state.onSlotFilled(request.id, slotIndex)
check !next of SaleFilled
check !next of SaleRestart

View File

@ -4,7 +4,7 @@ import pkg/codex/sales
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/sales/states/filled
import pkg/codex/sales/states/errored
import pkg/codex/sales/states/restart
import pkg/codex/sales/states/finished
import ../../helpers/mockmarket
import ../../examples
@ -38,8 +38,8 @@ suite "sales state 'filled'":
let next = await state.run(agent)
check !next of SaleFinished
test "switches to error state when slot is filled by another host":
test "switches to restart state when slot is filled by another host":
slot.host = Address.example
market.filled = @[slot]
let next = await state.run(agent)
check !next of SaleErrored
check !next of SaleRestart

View File

@ -4,7 +4,7 @@ import pkg/codex/contracts/requests
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled
import pkg/codex/sales/states/restart
import ../../examples
suite "sales state 'preparing'":
@ -24,6 +24,6 @@ suite "sales state 'preparing'":
let next = state.onFailed(request)
check !next of SaleFailed
test "switches to filled state when slot is filled":
test "switches to restart state when slot is filled":
let next = state.onSlotFilled(request.id, slotIndex)
check !next of SaleFilled
check !next of SaleRestart

View File

@ -4,7 +4,7 @@ import pkg/codex/contracts/requests
import pkg/codex/sales/states/proving
import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled
import pkg/codex/sales/states/restart
import ../../examples
suite "sales state 'proving'":
@ -24,7 +24,7 @@ suite "sales state 'proving'":
let next = state.onFailed(request)
check !next of SaleFailed
test "switches to filled state when slot is filled":
test "switches to restart state when slot is filled":
let next = state.onSlotFilled(request.id, slotIndex)
check !next of SaleFilled
check !next of SaleRestart

View File

@ -176,3 +176,21 @@ suite "Sales agent":
check r.isErr
check r.error of ValueError
check agent.data.slotIndex == none UInt256
test "assigns non-ignored random slot index":
let slotId0 = slotId(request.id, 0.u256)
let slotId1 = slotId(request.id, 1.u256)
market.slotState[slotId0] = SlotState.Free
market.slotState[slotId1] = SlotState.Free
check isOk await agent.assignRandomSlotIndex(2, ignoreSlotIndex = some 0.u256)
check agent.data.slotIndex == some 1.u256
test "fails to assign random slot index when all non-ignored slots are filled":
let slotId0 = slotId(request.id, 0.u256)
let slotId1 = slotId(request.id, 1.u256)
market.slotState[slotId0] = SlotState.Free
market.slotState[slotId1] = SlotState.Filled
let r = await agent.assignRandomSlotIndex(2, ignoreSlotIndex = some 0.u256)
check r.isErr
check r.error of AllSlotsFilledError
check agent.data.slotIndex == none UInt256