Ben Bierens d47ce38894
Fixes race in testsales (#995)
* Adds isWaiting to mockClock to remove sleep in testsales

* Review comments by Eric. Also replaced two more sleeps with check-eventually
2024-11-26 10:48:52 +00:00

609 lines
22 KiB

import std/sequtils
import std/sugar
import std/times
import pkg/chronos
import pkg/datastore/typedds
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales
import pkg/codex/sales/salesdata
import pkg/codex/sales/salescontext
import pkg/codex/sales/reservations
import pkg/codex/sales/slotqueue
import pkg/codex/stores/repostore
import pkg/codex/blocktype as bt
import pkg/codex/node
import pkg/codex/utils/asyncstatemachine
import ../../asynctest
import ../helpers
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/always
import ../examples
import ./helpers/periods
asyncchecksuite "Sales - start":
proof = Groth16Proof.example
repoTmp =
metaTmp =
var request: StorageRequest
var sales: Sales
var market: MockMarket
var clock: MockClock
var reservations: Reservations
var repo: RepoStore
var queue: SlotQueue
var itemsProcessed: seq[SlotQueueItem]
request = StorageRequest(
ask: StorageAsk(
slots: 4,
slotSize: 100.u256,
duration: 60.u256,
reward: 10.u256,
collateral: 200.u256,
content: StorageContent(
cid: "some cid"
expiry: (getTime() + initDuration(hours=1)).toUnix.u256
market =
clock =
let repoDs = repoTmp.newDb()
let metaDs = metaTmp.newDb()
repo =, metaDs)
await repo.start()
sales =, clock, repo)
reservations = sales.context.reservations
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return success()
sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} =
return success()
queue = sales.context.slotQueue
sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} =
return success(proof)
itemsProcessed = @[]
request.expiry = ( + 42).u256
await sales.stop()
await repo.stop()
await repoTmp.destroyDb()
await metaTmp.destroyDb()
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
let address = await market.getSigner()
let slot = MockSlot(requestId:,
slotIndex: slotIdx,
proof: proof,
host: address)
market.filled.add slot
market.slotState[slotId(, slotIdx)] = SlotState.Filled
test "load slots when Sales module starts":
let me = await market.getSigner()
request.ask.slots = 2
market.requested = @[request]
market.requestState[] = RequestState.New
let slot0 = MockSlot(requestId:,
slotIndex: 0.u256,
proof: proof,
host: me)
await fillSlot(slot0.slotIndex)
let slot1 = MockSlot(requestId:,
slotIndex: 1.u256,
proof: proof,
host: me)
await fillSlot(slot1.slotIndex)
market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)]
market.requested = @[request]
market.activeRequests[me] = @[]
await sales.start()
check eventually sales.agents.len == 2
check sales.agents.any(agent => == and == 0.u256)
check sales.agents.any(agent => == and == 1.u256)
asyncchecksuite "Sales":
proof = Groth16Proof.example
repoTmp =
metaTmp =
var availability: Availability
var request: StorageRequest
var sales: Sales
var market: MockMarket
var clock: MockClock
var reservations: Reservations
var repo: RepoStore
var queue: SlotQueue
var itemsProcessed: seq[SlotQueueItem]
availability = Availability(
totalSize: 100.u256,
freeSize: 100.u256,
duration: 60.u256,
minPrice: 600.u256,
maxCollateral: 400.u256
request = StorageRequest(
ask: StorageAsk(
slots: 4,
slotSize: 100.u256,
duration: 60.u256,
reward: 10.u256,
collateral: 200.u256,
content: StorageContent(
cid: "some cid"
expiry: (getTime() + initDuration(hours=1)).toUnix.u256
market =
let me = await market.getSigner()
market.activeSlots[me] = @[]
market.requestEnds[] = request.expiry.toSecondsSince1970
clock =
let repoDs = repoTmp.newDb()
let metaDs = metaTmp.newDb()
repo =, metaDs)
await repo.start()
sales =, clock, repo)
reservations = sales.context.reservations
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return success()
sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} =
return success()
queue = sales.context.slotQueue
sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} =
return success(proof)
await sales.start()
itemsProcessed = @[]
await sales.stop()
await repo.stop()
await repoTmp.destroyDb()
await metaTmp.destroyDb()
proc isInState(idx: int, state: string): Future[bool] {.async.} =
proc description(state: State): string =
check eventually sales.agents.len > idx
sales.agents[idx].query(description) == state.some
proc allowRequestToStart {.async.} =
check eventually (await isInState(0, "SaleInitialProving"))
# it won't start proving until the next period
await clock.advanceToNextPeriod(market)
proc getAvailability: Availability =
let key =
(waitFor reservations.get(key, Availability)).get
proc createAvailability() =
let a = waitFor reservations.createAvailability(
availability = a.get # update id
proc notProcessed(itemsProcessed: seq[SlotQueueItem],
request: StorageRequest): bool =
let items = SlotQueueItem.init(request)
for i in 0..<items.len:
if itemsProcessed.contains(items[i]):
return false
return true
proc addRequestToSaturatedQueue(): Future[StorageRequest] {.async.} =
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
await sleepAsync(10.millis)
itemsProcessed.add item
var request1 = StorageRequest.example
request1.ask.collateral = request.ask.collateral + 1
# saturate queue
while queue.len < queue.size - 1:
await market.requestStorage(StorageRequest.example)
# send request
await market.requestStorage(request1)
await sleepAsync(5.millis) # wait for request slots to be added to queue
return request1
proc wasIgnored(): bool =
let run = proc(): Future[bool] {.async.} =
always (
getAvailability().freeSize == availability.freeSize and
(waitFor reservations.all(Reservation)).get.len == 0
waitFor run()
test "processes all request's slots once StorageRequested emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually items.allIt(itemsProcessed.contains(it))
test "removes slots from slot queue once RequestCancelled emitted":
let request1 = await addRequestToSaturatedQueue()
check always itemsProcessed.notProcessed(request1)
test "removes request from slot queue once RequestFailed emitted":
let request1 = await addRequestToSaturatedQueue()
check always itemsProcessed.notProcessed(request1)
test "removes request from slot queue once RequestFulfilled emitted":
let request1 = await addRequestToSaturatedQueue()
check always itemsProcessed.notProcessed(request1)
test "removes slot index from slot queue once SlotFilled emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitSlotFilled(, 1.u256)
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))
test "removes slot index from slot queue once SlotReservationsFull emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitSlotReservationsFull(, 1.u256)
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))
test "adds slot index to slot queue once SlotFreed emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
market.requested.add request # "contract" must be able to return request
market.emitSlotFreed(, 2.u256)
let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected)
test "items in queue are readded (and marked seen) once ignored":
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually queue.len > 0 # queue starts paused, allow items to be added to the queue
check eventually queue.paused
# The first processed item will be will have been re-pushed with `seen =
# true`. Then, once this item is processed by the queue, its 'seen' flag
# will be checked, at which point the queue will be paused. This test could
# check item existence in the queue, but that would require inspecting
# onProcessSlot to see which item was first, and overridding onProcessSlot
# will prevent the queue working as expected in the Sales module.
check eventually queue.len == 4
for item in items:
check queue.contains(item)
for i in 0..<queue.len:
check queue[i].seen
test "queue is paused once availability is insufficient to service slots in queue":
createAvailability() # enough to fill a single slot
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually queue.len > 0 # queue starts paused, allow items to be added to the queue
check eventually queue.paused
# The first processed item/slot will be filled (eventually). Subsequent
# items will be processed and eventually re-pushed with `seen = true`. Once
# a "seen" item is processed by the queue, the queue is paused. In the
# meantime, the other items that are process, marked as seen, and re-added
# to the queue may be processed simultaneously as the queue pausing.
# Therefore, there should eventually be 3 items remaining in the queue, all
# seen.
check eventually queue.len == 3
for i in 0..<queue.len:
check queue[i].seen
test "availability size is reduced by request slot size when fully downloaded":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
let blk = @[1.byte] ).get
await onBatch( blk.repeat(request.ask.slotSize.truncate(int)) )
await market.requestStorage(request)
check eventually getAvailability().freeSize == availability.freeSize - request.ask.slotSize
test "non-downloaded bytes are returned to availability once finished":
var slotIndex = 0.u256
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
slotIndex = slot
let blk = @[1.byte] ).get
await onBatch(@[ blk ])
let sold = newFuture[void]()
sales.onSale = proc(request: StorageRequest, slotIndex: UInt256) =
let origSize = availability.freeSize
await market.requestStorage(request)
await allowRequestToStart()
await sold
# complete request
market.slotState[request.slotId(slotIndex)] = SlotState.Finished
check eventually getAvailability().freeSize == origSize - 1
test "ignores download when duration not long enough":
availability.duration = request.ask.duration - 1
await market.requestStorage(request)
check wasIgnored()
test "ignores request when slot size is too small":
availability.totalSize = request.ask.slotSize - 1
await market.requestStorage(request)
check wasIgnored()
test "ignores request when reward is too low":
availability.minPrice = request.ask.pricePerSlot + 1
await market.requestStorage(request)
check wasIgnored()
test "ignores request when asked collateral is too high":
var tooBigCollateral = request
tooBigCollateral.ask.collateral = availability.maxCollateral + 1
await market.requestStorage(tooBigCollateral)
check wasIgnored()
test "ignores request when slot state is not free":
await market.requestStorage(request)
market.slotState[request.slotId(0.u256)] = SlotState.Filled
market.slotState[request.slotId(1.u256)] = SlotState.Filled
market.slotState[request.slotId(2.u256)] = SlotState.Filled
market.slotState[request.slotId(3.u256)] = SlotState.Filled
check wasIgnored()
test "retrieves and stores data locally":
var storingRequest: StorageRequest
var storingSlot: UInt256
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
storingRequest = request
storingSlot = slot
return success()
await market.requestStorage(request)
check eventually storingRequest == request
check storingSlot < request.ask.slots.u256
test "handles errors during state run":
var saleFailed = false
sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} =
# raise exception so machine.onError is called
raise newException(ValueError, "some error")
# onClear is called in
sales.onClear = proc(request: StorageRequest,
idx: UInt256) =
saleFailed = true
await market.requestStorage(request)
await allowRequestToStart()
check eventually saleFailed
test "makes storage available again when data retrieval fails":
let error = newException(IOError, "data retrieval failed")
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return failure(error)
await market.requestStorage(request)
check getAvailability().freeSize == availability.freeSize
test "generates proof of storage":
var provingRequest: StorageRequest
var provingSlot: UInt256
sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} =
provingRequest = slot.request
provingSlot = slot.slotIndex
return success(Groth16Proof.example)
await market.requestStorage(request)
await allowRequestToStart()
check eventually provingRequest == request
check provingSlot < request.ask.slots.u256
test "fills a slot":
await market.requestStorage(request)
await allowRequestToStart()
check eventually market.filled.len > 0
check market.filled[0].requestId ==
check market.filled[0].slotIndex < request.ask.slots.u256
check market.filled[0].proof == proof
check market.filled[0].host == await market.getSigner()
test "calls onFilled when slot is filled":
var soldRequest = StorageRequest.default
var soldSlotIndex = UInt256.high
sales.onSale = proc(request: StorageRequest,
slotIndex: UInt256) =
soldRequest = request
soldSlotIndex = slotIndex
await market.requestStorage(request)
await allowRequestToStart()
check eventually soldRequest == request
check soldSlotIndex < request.ask.slots.u256
test "calls onClear when storage becomes available again":
# fail the proof intentionally to trigger `agent.finish(success=false)`,
# which then calls the onClear callback
sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} =
raise newException(IOError, "proof failed")
var clearedRequest: StorageRequest
var clearedSlotIndex: UInt256
sales.onClear = proc(request: StorageRequest,
slotIndex: UInt256) =
clearedRequest = request
clearedSlotIndex = slotIndex
await market.requestStorage(request)
await allowRequestToStart()
check eventually clearedRequest == request
check clearedSlotIndex < request.ask.slots.u256
test "makes storage available again when other host fills the slot":
let otherHost = Address.example
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
await market.requestStorage(request)
for slotIndex in 0..<request.ask.slots:
market.fillSlot(, slotIndex.u256, proof, otherHost)
check eventually (await reservations.all(Availability)).get == @[availability]
test "makes storage available again when request expires":
let expiry = getTime().toUnix() + 10
market.requestExpiry[] = expiry
let origSize = availability.freeSize
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
await market.requestStorage(request)
# If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()`
# would otherwise not set the timeout early enough as it uses `` in the deadline calculation.
await sleepAsync(chronos.milliseconds(100))
clock.set(expiry + 1)
check eventually (await reservations.all(Availability)).get == @[availability]
check getAvailability().freeSize == origSize
test "verifies that request is indeed expired from onchain before firing onCancelled":
let expiry = getTime().toUnix() + 10
# ensure only one slot, otherwise once bytes are returned to the
# availability, the queue will be unpaused and availability will be consumed
# by other slots
request.ask.slots = 1.uint64
market.requestExpiry[] = expiry
let origSize = availability.freeSize
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
await market.requestStorage(request)
market.requestState[]=RequestState.New # "On-chain" is the request still ongoing even after local expiration
# If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()`
# would otherwise not set the timeout early enough as it uses `` in the deadline calculation.
await sleepAsync(chronos.milliseconds(100))
clock.set(expiry + 1)
check getAvailability().freeSize == 0
market.requestState[]=RequestState.Cancelled # Now "on-chain" is also expired
check eventually getAvailability().freeSize == origSize
test "loads active slots from market":
let me = await market.getSigner()
request.ask.slots = 2
market.requested = @[request]
market.requestState[] = RequestState.New
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
let address = await market.getSigner()
let slot = MockSlot(requestId:,
slotIndex: slotIdx,
proof: proof,
host: address)
market.filled.add slot
market.slotState[slotId(, slotIdx)] = SlotState.Filled
let slot0 = MockSlot(requestId:,
slotIndex: 0.u256,
proof: proof,
host: me)
await fillSlot(slot0.slotIndex)
let slot1 = MockSlot(requestId:,
slotIndex: 1.u256,
proof: proof,
host: me)
await fillSlot(slot1.slotIndex)
market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)]
market.requested = @[request]
market.activeRequests[me] = @[]
await sales.load()
check eventually sales.agents.len == 2
check sales.agents.any(agent => == and == 0.u256)
check sales.agents.any(agent => == and == 1.u256)
test "deletes inactive reservations on load":
discard await reservations.createReservation(,
check (await reservations.all(Reservation)).get.len == 1
await sales.load()
check (await reservations.all(Reservation)).get.len == 0
check getAvailability().freeSize == availability.freeSize # was restored