[marketplace] check randomly assigned slot index is not already filled
When a storage request is handled by the sales module, a slot index was randomly assigned and then the slot was filled. Now, the random slot index is checked that its state is `SlotState.Free` before continuing with the download process. An additional sales agent state was added, preparing, that handles this step of assigning a random slot index. All state machine setup, such as retrieving the request and subscribing to events that were previously in the `SaleDownloading` state have been moved to `SalePreparing`. If all indices of the request have been filled, the state is changed to `SaleIgnored`. # Conflicts: # codex/sales.nim # codex/sales/states/downloading.nim # codex/sales/states/filling.nim # tests/contracts/testInteractions.nim
This commit is contained in:
parent
4d028c6cb3
commit
92dc770fa2
|
@ -15,7 +15,7 @@ import pkg/libp2p/crypto/crypto
|
|||
import pkg/bearssl/rand
|
||||
|
||||
type
|
||||
RngSampleError = object of CatchableError
|
||||
RngSampleError* = object of CatchableError
|
||||
Rng* = ref HmacDrbgContext
|
||||
|
||||
var rng {.threadvar.}: Rng
|
||||
|
|
|
@ -4,7 +4,6 @@ import pkg/upraises
|
|||
import pkg/stint
|
||||
import pkg/chronicles
|
||||
import pkg/datastore
|
||||
import ./rng
|
||||
import ./market
|
||||
import ./clock
|
||||
import ./proving
|
||||
|
@ -13,7 +12,7 @@ import ./contracts/requests
|
|||
import ./sales/salescontext
|
||||
import ./sales/salesagent
|
||||
import ./sales/statemachine
|
||||
import ./sales/states/downloading
|
||||
import ./sales/states/preparing
|
||||
import ./sales/states/unknown
|
||||
|
||||
## Sales holds a list of available storage that it may sell.
|
||||
|
@ -74,10 +73,7 @@ func new*(_: type Sales,
|
|||
reservations: Reservations.new(repo)
|
||||
))
|
||||
|
||||
proc randomSlotIndex(numSlots: uint64): UInt256 =
|
||||
let rng = Rng.instance
|
||||
let slotIndex = rng.rand(numSlots - 1)
|
||||
return slotIndex.u256
|
||||
|
||||
|
||||
proc handleRequest(sales: Sales,
|
||||
requestId: RequestId,
|
||||
|
@ -87,17 +83,15 @@ proc handleRequest(sales: Sales,
|
|||
slots = ask.slots, slotSize = ask.slotSize, duration = ask.duration,
|
||||
reward = ask.reward, maxSlotLoss = ask.maxSlotLoss
|
||||
|
||||
# TODO: check if random slot is actually available (not already filled)
|
||||
let slotIndex = randomSlotIndex(ask.slots)
|
||||
let agent = newSalesAgent(
|
||||
sales.context,
|
||||
requestId,
|
||||
slotIndex,
|
||||
none UInt256,
|
||||
none StorageRequest
|
||||
)
|
||||
agent.context.onIgnored = proc {.gcsafe, upraises:[].} =
|
||||
sales.agents.keepItIf(it != agent)
|
||||
agent.start(SaleDownloading())
|
||||
agent.start(SalePreparing())
|
||||
sales.agents.add agent
|
||||
|
||||
proc load*(sales: Sales) {.async.} =
|
||||
|
@ -110,7 +104,7 @@ proc load*(sales: Sales) {.async.} =
|
|||
let agent = newSalesAgent(
|
||||
sales.context,
|
||||
slot.request.id,
|
||||
slot.slotIndex,
|
||||
some slot.slotIndex,
|
||||
some slot.request)
|
||||
agent.start(SaleUnknown())
|
||||
sales.agents.add agent
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
import std/sequtils
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/stint
|
||||
import ../contracts/requests
|
||||
import ../utils/asyncspawn
|
||||
import ../rng
|
||||
import ../errors
|
||||
import ./statemachine
|
||||
import ./salescontext
|
||||
import ./salesdata
|
||||
|
@ -13,10 +18,13 @@ export reservations
|
|||
logScope:
|
||||
topics = "sales statemachine"
|
||||
|
||||
type SalesAgent* = ref object of Machine
|
||||
context*: SalesContext
|
||||
data*: SalesData
|
||||
subscribed: bool
|
||||
type
|
||||
SalesAgent* = ref object of Machine
|
||||
context*: SalesContext
|
||||
data*: SalesData
|
||||
subscribed: bool
|
||||
SalesAgentError = object of CodexError
|
||||
AllSlotsFilledError* = object of SalesAgentError
|
||||
|
||||
func `==`*(a, b: SalesAgent): bool =
|
||||
a.data.requestId == b.data.requestId and
|
||||
|
@ -24,7 +32,7 @@ func `==`*(a, b: SalesAgent): bool =
|
|||
|
||||
proc newSalesAgent*(context: SalesContext,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256,
|
||||
slotIndex: ?UInt256,
|
||||
request: ?StorageRequest): SalesAgent =
|
||||
SalesAgent(
|
||||
context: context,
|
||||
|
@ -39,6 +47,43 @@ proc retrieveRequest*(agent: SalesAgent) {.async.} =
|
|||
if data.request.isNone:
|
||||
data.request = await market.getRequest(data.requestId)
|
||||
|
||||
proc nextRandom(sample: openArray[uint64]): uint64 =
|
||||
let rng = Rng.instance
|
||||
return rng.sample(sample)
|
||||
|
||||
proc assignRandomSlotIndex*(agent: SalesAgent,
|
||||
numSlots: uint64): Future[?!void] {.async.} =
|
||||
let market = agent.context.market
|
||||
let data = agent.data
|
||||
|
||||
if numSlots == 0:
|
||||
agent.data.slotIndex = none UInt256
|
||||
let error = newException(ValueError, "numSlots must be greater than zero")
|
||||
return failure(error)
|
||||
|
||||
var idx: UInt256
|
||||
var sample = toSeq(0'u64..<numSlots)
|
||||
|
||||
while true:
|
||||
if sample.len == 0:
|
||||
agent.data.slotIndex = none UInt256
|
||||
let error = newException(AllSlotsFilledError, "all slots have been filled")
|
||||
return failure(error)
|
||||
|
||||
without rndIdx =? nextRandom(sample).catch, err:
|
||||
agent.data.slotIndex = none UInt256
|
||||
return failure(err)
|
||||
sample.keepItIf(it != rndIdx)
|
||||
|
||||
idx = rndIdx.u256
|
||||
let slotId = slotId(data.requestId, idx)
|
||||
let state = await market.slotState(slotId)
|
||||
if state == SlotState.Free:
|
||||
break
|
||||
|
||||
agent.data.slotIndex = some idx
|
||||
return success()
|
||||
|
||||
proc subscribeCancellation(agent: SalesAgent) {.async.} =
|
||||
let data = agent.data
|
||||
let market = agent.context.market
|
||||
|
@ -78,13 +123,16 @@ proc subscribeSlotFilled(agent: SalesAgent) {.async.} =
|
|||
let data = agent.data
|
||||
let market = agent.context.market
|
||||
|
||||
without slotIndex =? data.slotIndex:
|
||||
raiseAssert("no slot index assigned")
|
||||
|
||||
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
|
||||
asyncSpawn data.slotFilled.unsubscribe(), ignore = CatchableError
|
||||
agent.schedule(slotFilledEvent(requestId, data.slotIndex))
|
||||
agent.schedule(slotFilledEvent(requestId, slotIndex))
|
||||
|
||||
data.slotFilled =
|
||||
await market.subscribeSlotFilled(data.requestId,
|
||||
data.slotIndex,
|
||||
slotIndex,
|
||||
onSlotFilled)
|
||||
|
||||
proc subscribe*(agent: SalesAgent) {.async.} =
|
||||
|
|
|
@ -8,7 +8,7 @@ type
|
|||
requestId*: RequestId
|
||||
ask*: StorageAsk
|
||||
request*: ?StorageRequest
|
||||
slotIndex*: UInt256
|
||||
slotIndex*: ?UInt256
|
||||
failed*: market.Subscription
|
||||
fulfilled*: market.Subscription
|
||||
slotFilled*: market.Subscription
|
||||
|
|
|
@ -9,12 +9,12 @@ import ./errorhandling
|
|||
import ./cancelled
|
||||
import ./failed
|
||||
import ./filled
|
||||
import ./ignored
|
||||
import ./proving
|
||||
import ./errored
|
||||
|
||||
type
|
||||
SaleDownloading* = ref object of ErrorHandlingState
|
||||
availability*: Availability
|
||||
|
||||
logScope:
|
||||
topics = "sales downloading"
|
||||
|
@ -36,9 +36,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
|
|||
let data = agent.data
|
||||
let context = agent.context
|
||||
let reservations = context.reservations
|
||||
|
||||
await agent.retrieveRequest()
|
||||
await agent.subscribe()
|
||||
let availability = state.availability
|
||||
|
||||
without onStore =? context.onStore:
|
||||
raiseAssert "onStore callback not set"
|
||||
|
@ -46,18 +44,8 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
|
|||
without request =? data.request:
|
||||
raiseAssert "no sale request"
|
||||
|
||||
without availability =? await reservations.find(
|
||||
request.ask.slotSize,
|
||||
request.ask.duration,
|
||||
request.ask.pricePerSlot,
|
||||
request.ask.collateral,
|
||||
used = false):
|
||||
info "no availability found for request, ignoring",
|
||||
slotSize = request.ask.slotSize,
|
||||
duration = request.ask.duration,
|
||||
pricePerSlot = request.ask.pricePerSlot,
|
||||
used = false
|
||||
return some State(SaleIgnored())
|
||||
without slotIndex =? data.slotIndex:
|
||||
raiseAssert("no slot index assigned")
|
||||
|
||||
# mark availability as used so that it is not matched to other requests
|
||||
if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption:
|
||||
|
@ -83,7 +71,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
|
|||
|
||||
trace "Starting download"
|
||||
if err =? (await onStore(request,
|
||||
data.slotIndex,
|
||||
slotIndex,
|
||||
onBatch)).errorOption:
|
||||
|
||||
markUnused(availability.id)
|
||||
|
|
|
@ -23,7 +23,10 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
|
|||
let data = SalesAgent(machine).data
|
||||
let market = SalesAgent(machine).context.market
|
||||
|
||||
let host = await market.getHost(data.requestId, data.slotIndex)
|
||||
without slotIndex =? data.slotIndex:
|
||||
raiseAssert("no slot index assigned")
|
||||
|
||||
let host = await market.getHost(data.requestId, slotIndex)
|
||||
let me = await market.getSigner()
|
||||
if host == me.some:
|
||||
return some State(SaleFinished())
|
||||
|
|
|
@ -28,4 +28,7 @@ method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
|
|||
without (collateral =? data.request.?ask.?collateral):
|
||||
raiseAssert "Request not set"
|
||||
|
||||
await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral)
|
||||
without slotIndex =? data.slotIndex:
|
||||
raiseAssert("no slot index assigned")
|
||||
|
||||
await market.fillSlot(data.requestId, slotIndex, state.proof, collateral)
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
import pkg/chronicles
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import ../../market
|
||||
import ../salesagent
|
||||
import ../statemachine
|
||||
import ./errorhandling
|
||||
import ./cancelled
|
||||
import ./failed
|
||||
import ./filled
|
||||
import ./ignored
|
||||
import ./downloading
|
||||
import ./errored
|
||||
|
||||
type
|
||||
SalePreparing* = ref object of ErrorHandlingState
|
||||
|
||||
logScope:
|
||||
topics = "sales preparing"
|
||||
|
||||
method `$`*(state: SalePreparing): string = "SaleDownloading"
|
||||
|
||||
method onCancelled*(state: SalePreparing, request: StorageRequest): ?State =
|
||||
return some State(SaleCancelled())
|
||||
|
||||
method onFailed*(state: SalePreparing, request: StorageRequest): ?State =
|
||||
return some State(SaleFailed())
|
||||
|
||||
method onSlotFilled*(state: SalePreparing, requestId: RequestId,
|
||||
slotIndex: UInt256): ?State =
|
||||
return some State(SaleFilled())
|
||||
|
||||
method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
|
||||
let agent = SalesAgent(machine)
|
||||
let data = agent.data
|
||||
let context = agent.context
|
||||
let reservations = context.reservations
|
||||
|
||||
await agent.retrieveRequest()
|
||||
|
||||
without slots =? agent.data.request.?ask.?slots:
|
||||
raiseAssert "missing request slots"
|
||||
|
||||
if err =? (await agent.assignRandomSlotIndex(slots)).errorOption:
|
||||
if err of AllSlotsFilledError:
|
||||
return some State(SaleIgnored())
|
||||
return some State(SaleErrored(error: err))
|
||||
|
||||
await agent.subscribe()
|
||||
|
||||
without request =? data.request:
|
||||
raiseAssert "no sale request"
|
||||
|
||||
without availability =? await reservations.find(
|
||||
request.ask.slotSize,
|
||||
request.ask.duration,
|
||||
request.ask.pricePerSlot,
|
||||
request.ask.collateral,
|
||||
used = false):
|
||||
info "no availability found for request, ignoring",
|
||||
slotSize = request.ask.slotSize,
|
||||
duration = request.ask.duration,
|
||||
pricePerSlot = request.ask.pricePerSlot,
|
||||
used = false
|
||||
return some State(SaleIgnored())
|
||||
|
||||
return some State(SaleDownloading(availability: availability))
|
|
@ -31,5 +31,8 @@ method run*(state: SaleProving, machine: Machine): Future[?State] {.async.} =
|
|||
without onProve =? context.proving.onProve:
|
||||
raiseAssert "onProve callback not set"
|
||||
|
||||
let proof = await onProve(Slot(request: request, slotIndex: data.slotIndex))
|
||||
without slotIndex =? data.slotIndex:
|
||||
raiseAssert("no slot index assigned")
|
||||
|
||||
let proof = await onProve(Slot(request: request, slotIndex: slotIndex))
|
||||
return some State(SaleFilling(proof: proof))
|
||||
|
|
|
@ -27,7 +27,10 @@ method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
|
|||
await agent.retrieveRequest()
|
||||
await agent.subscribe()
|
||||
|
||||
let slotId = slotId(data.requestId, data.slotIndex)
|
||||
without slotIndex =? data.slotIndex:
|
||||
raiseAssert("no slot index assigned")
|
||||
|
||||
let slotId = slotId(data.requestId, slotIndex)
|
||||
|
||||
without slotState =? await market.slotState(slotId):
|
||||
let error = newException(SaleUnknownError, "cannot retrieve slot state")
|
||||
|
|
|
@ -28,7 +28,7 @@ suite "sales state 'filled'":
|
|||
let context = SalesContext(market: market)
|
||||
agent = newSalesAgent(context,
|
||||
request.id,
|
||||
slotIndex,
|
||||
some slotIndex,
|
||||
StorageRequest.none)
|
||||
state = SaleFilled.new()
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
import std/unittest
|
||||
import pkg/questionable
|
||||
import pkg/codex/contracts/requests
|
||||
import pkg/codex/sales/states/downloading
|
||||
import pkg/codex/sales/states/cancelled
|
||||
import pkg/codex/sales/states/failed
|
||||
import pkg/codex/sales/states/filled
|
||||
import ../../examples
|
||||
|
||||
suite "sales state 'preparing'":
|
||||
|
||||
let request = StorageRequest.example
|
||||
let slotIndex = (request.ask.slots div 2).u256
|
||||
var state: SalePreparing
|
||||
|
||||
setup:
|
||||
state = SalePreparing.new()
|
||||
|
||||
test "switches to cancelled state when request expires":
|
||||
let next = state.onCancelled(request)
|
||||
check !next of SaleCancelled
|
||||
|
||||
test "switches to failed state when request fails":
|
||||
let next = state.onFailed(request)
|
||||
check !next of SaleFailed
|
||||
|
||||
test "switches to filled state when slot is filled":
|
||||
let next = state.onSlotFilled(request.id, slotIndex)
|
||||
check !next of SaleFilled
|
|
@ -26,7 +26,7 @@ suite "sales state 'unknown'":
|
|||
let context = SalesContext(market: market)
|
||||
agent = newSalesAgent(context,
|
||||
request.id,
|
||||
slotIndex,
|
||||
some slotIndex,
|
||||
StorageRequest.none)
|
||||
state = SaleUnknown.new()
|
||||
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
import std/sets
|
||||
import std/sequtils
|
||||
import std/sugar
|
||||
import std/times
|
||||
import pkg/asynctest
|
||||
import pkg/chronos
|
||||
|
@ -25,6 +22,7 @@ type
|
|||
MockErrorState = ref object of ErrorHandlingState
|
||||
|
||||
method `$`*(state: MockState): string = "MockState"
|
||||
method `$`*(state: MockErrorState): string = "MockErrorState"
|
||||
|
||||
method onCancelled*(state: MockState, request: StorageRequest): ?State =
|
||||
onCancelCalled = true
|
||||
|
@ -73,7 +71,7 @@ suite "Sales agent":
|
|||
onSlotFilledCalled = false
|
||||
agent = newSalesAgent(context,
|
||||
request.id,
|
||||
slotIndex,
|
||||
some slotIndex,
|
||||
some request)
|
||||
request.expiry = (getTime() + initDuration(hours=1)).toUnix.u256
|
||||
|
||||
|
@ -83,7 +81,7 @@ suite "Sales agent":
|
|||
test "can retrieve request":
|
||||
agent = newSalesAgent(context,
|
||||
request.id,
|
||||
slotIndex,
|
||||
some slotIndex,
|
||||
none StorageRequest)
|
||||
market.requested = @[request]
|
||||
await agent.retrieveRequest()
|
||||
|
@ -156,3 +154,25 @@ suite "Sales agent":
|
|||
test "ErrorHandlingState.onError can be overridden at the state level":
|
||||
agent.start(MockErrorState.new())
|
||||
check eventually onErrorCalled
|
||||
|
||||
test "assigns random slot index for slot that is free":
|
||||
let slotId0 = slotId(request.id, 0.u256)
|
||||
market.slotState[slotId0] = SlotState.Filled
|
||||
check isOk await agent.assignRandomSlotIndex(2)
|
||||
check agent.data.slotIndex == some 1.u256
|
||||
|
||||
test "fails to assign random slot index when all slots filled":
|
||||
let slotId0 = slotId(request.id, 0.u256)
|
||||
let slotId1 = slotId(request.id, 1.u256)
|
||||
market.slotState[slotId0] = SlotState.Filled
|
||||
market.slotState[slotId1] = SlotState.Filled
|
||||
let r = await agent.assignRandomSlotIndex(2)
|
||||
check r.isErr
|
||||
check r.error of AllSlotsFilledError
|
||||
check agent.data.slotIndex == none UInt256
|
||||
|
||||
test "fails to assign random slot index when invalid number of slots provided":
|
||||
let r = await agent.assignRandomSlotIndex(0)
|
||||
check r.isErr
|
||||
check r.error of ValueError
|
||||
check agent.data.slotIndex == none UInt256
|
||||
|
|
Loading…
Reference in New Issue