[marketplace] handle slot filled by other host

Handle the case when in the downloading, proving, or filling states, that another host fills the slot.
This commit is contained in:
Eric Mastro 2022-11-14 21:20:38 +11:00 committed by Mark Spanbroek
parent 0957c9adfa
commit 3db7b49537
No known key found for this signature in database
GPG Key ID: FBE3E9548D427C00
7 changed files with 97 additions and 15 deletions

View File

@ -2,9 +2,7 @@ import pkg/chronos
import pkg/upraises import pkg/upraises
import pkg/stint import pkg/stint
import ./statemachine import ./statemachine
import ./states/[downloading, unknown]
import ../contracts/requests import ../contracts/requests
import ../rng
proc newSalesAgent*(sales: Sales, proc newSalesAgent*(sales: Sales,
requestId: RequestId, requestId: RequestId,
@ -21,6 +19,7 @@ proc newSalesAgent*(sales: Sales,
# fwd declarations # fwd declarations
proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.} proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.} proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeSlotFilled*(agent: SalesAgent): Future[void] {.gcsafe.}
proc retrieveRequest(agent: SalesAgent) {.async.} = proc retrieveRequest(agent: SalesAgent) {.async.} =
if agent.request.isNone: if agent.request.isNone:
@ -31,6 +30,7 @@ proc start*(agent: SalesAgent, numSlots: uint64) {.async.} =
await agent.retrieveRequest() await agent.retrieveRequest()
await agent.subscribeCancellation() await agent.subscribeCancellation()
await agent.subscribeFailure() await agent.subscribeFailure()
await agent.subscribeSlotFilled()
proc stop*(agent: SalesAgent) {.async.} = proc stop*(agent: SalesAgent) {.async.} =
try: try:
@ -41,6 +41,10 @@ proc stop*(agent: SalesAgent) {.async.} =
await agent.failed.unsubscribe() await agent.failed.unsubscribe()
except CatchableError: except CatchableError:
discard discard
try:
await agent.slotFilled.unsubscribe()
except CatchableError:
discard
if not agent.cancelled.completed: if not agent.cancelled.completed:
await agent.cancelled.cancelAndWait() await agent.cancelled.cancelAndWait()
@ -75,7 +79,25 @@ proc subscribeFailure*(agent: SalesAgent) {.async.} =
state =? (agent.state as SaleState): state =? (agent.state as SaleState):
return return
await agent.failed.unsubscribe()
await state.onFailed(request) await state.onFailed(request)
agent.failed = agent.failed =
await market.subscribeRequestFailed(agent.requestId, onFailed) await market.subscribeRequestFailed(agent.requestId, onFailed)
proc subscribeSlotFilled*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected"
proc onSlotFilled(requestId: RequestId,
slotIndex: UInt256) {.async.} =
without state =? (agent.state as SaleState):
return
await agent.slotFilled.unsubscribe()
await state.onSlotFilled(requestId, slotIndex)
agent.slotFilled =
await market.subscribeSlotFilled(agent.requestId, slotIndex, onSlotFilled)

View File

@ -35,6 +35,7 @@ type
slotIndex*: ?UInt256 slotIndex*: ?UInt256
failed*: market.Subscription failed*: market.Subscription
fulfilled*: market.Subscription fulfilled*: market.Subscription
slotFilled*: market.Subscription
cancelled*: Future[void] cancelled*: Future[void]
SaleState* = ref object of AsyncState SaleState* = ref object of AsyncState
SaleError* = ref object of CodexError SaleError* = ref object of CodexError
@ -102,3 +103,7 @@ method onCancelled*(state: SaleState, request: StorageRequest) {.base, async.} =
method onFailed*(state: SaleState, request: StorageRequest) {.base, async.} = method onFailed*(state: SaleState, request: StorageRequest) {.base, async.} =
discard discard
method onSlotFilled*(state: SaleState, requestId: RequestId,
slotIndex: UInt256) {.base, async.} =
discard

View File

@ -1,6 +1,7 @@
import std/sequtils import std/sequtils
import ./cancelled import ./cancelled
import ./failed import ./failed
import ./filled
import ./proving import ./proving
import ./errored import ./errored
import ../salesagent import ../salesagent
@ -21,6 +22,10 @@ method onCancelled*(state: SaleDownloading, request: StorageRequest) {.async.} =
method onFailed*(state: SaleDownloading, request: StorageRequest) {.async.} = method onFailed*(state: SaleDownloading, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed()) await state.switchAsync(SaleFailed())
method onSlotFilled*(state: SaleDownloading, requestId: RequestId,
slotIndex: UInt256) {.async.} =
await state.switchAsync(SaleFilled())
method enterAsync(state: SaleDownloading) {.async.} = method enterAsync(state: SaleDownloading) {.async.} =
without agent =? (state.context as SalesAgent): without agent =? (state.context as SalesAgent):
raiseAssert "invalid state" raiseAssert "invalid state"

View File

@ -32,7 +32,7 @@ method enterAsync(state: SaleFilled) {.async.} =
if host == me.some: if host == me.some:
await state.switchAsync(SaleFinished()) await state.switchAsync(SaleFinished())
else: else:
let error = newException(SaleFilledError, "Sale host mismatch") let error = newException(SaleFilledError, "Slot filled by other host")
await state.switchAsync(SaleErrored(error: error)) await state.switchAsync(SaleErrored(error: error))
except CancelledError: except CancelledError:

View File

@ -19,27 +19,20 @@ method onCancelled*(state: SaleFilling, request: StorageRequest) {.async.} =
method onFailed*(state: SaleFilling, request: StorageRequest) {.async.} = method onFailed*(state: SaleFilling, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed()) await state.switchAsync(SaleFailed())
method onSlotFilled*(state: SaleFilling, requestId: RequestId,
slotIndex: UInt256) {.async.} =
await state.switchAsync(SaleFilled())
method enterAsync(state: SaleFilling) {.async.} = method enterAsync(state: SaleFilling) {.async.} =
without agent =? (state.context as SalesAgent): without agent =? (state.context as SalesAgent):
raiseAssert "invalid state" raiseAssert "invalid state"
var subscription: market.Subscription
proc onSlotFilled(requestId: RequestId,
slotIndex: UInt256) {.async.} =
await subscription.unsubscribe()
await state.switchAsync(SaleFilled())
try: try:
let market = agent.sales.market let market = agent.sales.market
without slotIndex =? agent.slotIndex: without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected" raiseAssert "no slot selected"
subscription = await market.subscribeSlotFilled(agent.requestId,
slotIndex,
onSlotFilled)
await market.fillSlot(agent.requestId, slotIndex, state.proof) await market.fillSlot(agent.requestId, slotIndex, state.proof)
except CancelledError: except CancelledError:

View File

@ -2,6 +2,7 @@ import ../statemachine
import ./filling import ./filling
import ./cancelled import ./cancelled
import ./failed import ./failed
import ./filled
import ./errored import ./errored
type type
@ -16,6 +17,10 @@ method onCancelled*(state: SaleProving, request: StorageRequest) {.async.} =
method onFailed*(state: SaleProving, request: StorageRequest) {.async.} = method onFailed*(state: SaleProving, request: StorageRequest) {.async.} =
await state.switchAsync(SaleFailed()) await state.switchAsync(SaleFailed())
method onSlotFilled*(state: SaleProving, requestId: RequestId,
slotIndex: UInt256) {.async.} =
await state.switchAsync(SaleFilled())
method enterAsync(state: SaleProving) {.async.} = method enterAsync(state: SaleProving) {.async.} =
without agent =? (state.context as SalesAgent): without agent =? (state.context as SalesAgent):
raiseAssert "invalid state" raiseAssert "invalid state"

View File

@ -194,6 +194,7 @@ suite "Sales":
await market.requestStorage(request) await market.requestStorage(request)
for slotIndex in 0..<request.ask.slots: for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost) market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
await sleepAsync(chronos.seconds(2))
check sales.available == @[availability] check sales.available == @[availability]
test "makes storage available again when request expires": test "makes storage available again when request expires":
@ -282,7 +283,7 @@ suite "Sales state machine":
await agent.switchAsync(SaleUnknown()) await agent.switchAsync(SaleUnknown())
let state = (agent.state as SaleErrored) let state = (agent.state as SaleErrored)
check state.isSome check state.isSome
check (!state).error.msg == "Sale host mismatch" check (!state).error.msg == "Slot filled by other host"
test "moves to SaleFinished when request state is New": test "moves to SaleFinished when request state is New":
let agent = newSalesAgent() let agent = newSalesAgent()
@ -444,6 +445,57 @@ suite "Sales state machine":
check state.isSome check state.isSome
check (!state).error.msg == "Sale failed" check (!state).error.msg == "Sale failed"
test "moves to SaleErrored when Downloading and slot is filled by another host":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
await sleepAsync(chronos.minutes(1)) # "far" in the future
let agent = newSalesAgent()
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await agent.switchAsync(SaleDownloading())
market.fillSlot(request.id, !agent.slotIndex, proof, Address.example)
await sleepAsync chronos.seconds(2)
let state = (agent.state as SaleErrored)
check state.isSome
check (!state).error.msg == "Slot filled by other host"
test "moves to SaleErrored when Proving and slot is filled by another host":
sales.onProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.async.} =
await sleepAsync(chronos.minutes(1)) # "far" in the future
return @[]
let agent = newSalesAgent()
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await agent.switchAsync(SaleProving())
market.fillSlot(request.id, !agent.slotIndex, proof, Address.example)
await sleepAsync chronos.seconds(2)
let state = (agent.state as SaleErrored)
check state.isSome
check (!state).error.msg == "Slot filled by other host"
test "moves to SaleErrored when Filling and slot is filled by another host":
sales.onProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.async.} =
await sleepAsync(chronos.minutes(1)) # "far" in the future
return @[]
let agent = newSalesAgent()
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
market.fillSlot(request.id, !agent.slotIndex, proof, Address.example)
await agent.switchAsync(SaleFilling())
await sleepAsync chronos.seconds(2)
let state = (agent.state as SaleErrored)
check state.isSome
check (!state).error.msg == "Slot filled by other host"
test "moves from SaleDownloading to SaleFinished, calling necessary callbacks": test "moves from SaleDownloading to SaleFinished, calling necessary callbacks":
var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool var onProveCalled, onStoreCalled, onClearCalled, onSaleCalled: bool
sales.onProve = proc(request: StorageRequest, sales.onProve = proc(request: StorageRequest,