From 27d807a8412a3fa9c8af37582e4d18f5de0b6bd2 Mon Sep 17 00:00:00 2001 From: markspanbroek Date: Wed, 11 Jun 2025 13:27:09 +0200 Subject: [PATCH] fix(sales): fix marketplace block expiry (#1258) --- codex/node.nim | 8 +-- codex/sales.nim | 31 ++++-------- codex/sales/salesagent.nim | 9 +++- codex/sales/salescontext.nim | 6 ++- codex/sales/salesdata.nim | 2 + codex/sales/slotqueue.nim | 36 ++++++++++--- codex/sales/states/downloading.nim | 17 +++++-- tests/codex/helpers/mockmarket.nim | 33 ++++++++---- tests/codex/node/testcontracts.nim | 7 ++- tests/codex/sales/testsales.nim | 78 ++++++++++++++++++++--------- tests/codex/sales/testslotqueue.nim | 63 ++++++----------------- tests/codex/testpurchasing.nim | 6 +-- tests/codex/testvalidation.nim | 2 +- 13 files changed, 168 insertions(+), 130 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index b742df2c..34d71774 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -623,6 +623,7 @@ proc requestStorage*( proc onStore( self: CodexNodeRef, request: StorageRequest, + expiry: SecondsSince1970, slotIdx: uint64, blocksCb: BlocksCb, isRepairing: bool = false, @@ -651,8 +652,6 @@ proc onStore( trace "Unable to create slots builder", err = err.msg return failure(err) - let expiry = request.expiry - if slotIdx > manifest.slotRoots.high.uint64: trace "Slot index not in manifest", slotIdx return failure(newException(CodexError, "Slot index not in manifest")) @@ -663,7 +662,7 @@ proc onStore( trace "Updating expiry for blocks", blocks = blocks.len let ensureExpiryFutures = - blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970)) + blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry)) let res = await allFinishedFailed[?!void](ensureExpiryFutures) if res.failure.len > 0: @@ -789,11 +788,12 @@ proc start*(self: CodexNodeRef) {.async.} = if hostContracts =? self.contracts.host: hostContracts.sales.onStore = proc( request: StorageRequest, + expiry: SecondsSince1970, slot: uint64, onBatch: BatchProc, isRepairing: bool = false, ): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = - self.onStore(request, slot, onBatch, isRepairing) + self.onStore(request, expiry, slot, onBatch, isRepairing) hostContracts.sales.onExpiryUpdate = proc( rootCid: Cid, expiry: SecondsSince1970 diff --git a/codex/sales.nim b/codex/sales.nim index 6a00e53b..353b70cc 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -148,26 +148,12 @@ proc cleanUp( # Re-add items back into the queue to prevent small availabilities from # draining the queue. Seen items will be ordered last. - if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request: - let res = - await noCancel sales.context.market.slotCollateral(data.requestId, data.slotIndex) - if res.isErr: - error "Failed to re-add item back to the slot queue: unable to calculate collateral", - error = res.error.msg - else: - let collateral = res.get() - let queue = sales.context.slotQueue - var seenItem = SlotQueueItem.init( - data.requestId, - data.slotIndex.uint16, - data.ask, - request.expiry, - seen = true, - collateral = collateral, - ) - trace "pushing ignored item to queue, marked as seen" - if err =? queue.push(seenItem).errorOption: - error "failed to readd slot to queue", errorType = $(type err), error = err.msg + if reprocessSlot and request =? data.request and var item =? agent.data.slotQueueItem: + let queue = sales.context.slotQueue + item.seen = true + trace "pushing ignored item to queue, marked as seen" + if err =? queue.push(item).errorOption: + error "failed to readd slot to queue", errorType = $(type err), error = err.msg let fut = sales.remove(agent) sales.trackedFutures.track(fut) @@ -181,8 +167,9 @@ proc processSlot( ) {.async: (raises: [CancelledError]).} = debug "Processing slot from queue", requestId = item.requestId, slot = item.slotIndex - let agent = - newSalesAgent(sales.context, item.requestId, item.slotIndex, none StorageRequest) + let agent = newSalesAgent( + sales.context, item.requestId, item.slotIndex, none StorageRequest, some item + ) let completed = newAsyncEvent() diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 6b62d5e4..96137fe0 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -11,6 +11,7 @@ import ./statemachine import ./salescontext import ./salesdata import ./reservations +import ./slotqueue export reservations @@ -42,10 +43,16 @@ proc newSalesAgent*( requestId: RequestId, slotIndex: uint64, request: ?StorageRequest, + slotQueueItem = SlotQueueItem.none, ): SalesAgent = var agent = SalesAgent.new() agent.context = context - agent.data = SalesData(requestId: requestId, slotIndex: slotIndex, request: request) + agent.data = SalesData( + requestId: requestId, + slotIndex: slotIndex, + request: request, + slotQueueItem: slotQueueItem, + ) return agent proc retrieveRequest*(agent: SalesAgent) {.async.} = diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index 40eded7d..ac0908df 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -28,7 +28,11 @@ type gcsafe, async: (raises: [CancelledError]) .} OnStore* = proc( - request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + blocksCb: BlocksCb, + isRepairing: bool, ): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {. gcsafe, async: (raises: [CancelledError]) diff --git a/codex/sales/salesdata.nim b/codex/sales/salesdata.nim index de8eccb5..ec16fef1 100644 --- a/codex/sales/salesdata.nim +++ b/codex/sales/salesdata.nim @@ -2,6 +2,7 @@ import pkg/chronos import ../contracts/requests import ../market import ./reservations +import ./slotqueue type SalesData* = ref object requestId*: RequestId @@ -10,3 +11,4 @@ type SalesData* = ref object slotIndex*: uint64 cancelled*: Future[void] reservation*: ?Reservation + slotQueueItem*: ?SlotQueueItem diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 890d912f..b6e77395 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -30,7 +30,7 @@ type duration: uint64 pricePerBytePerSecond: UInt256 collateral: UInt256 # Collateral computed - expiry: uint64 + expiry: ?uint64 seen: bool # don't need to -1 to prevent overflow when adding 1 (to always allow push) @@ -89,8 +89,9 @@ proc `<`*(a, b: SlotQueueItem): bool = scoreA.addIf(a.collateral < b.collateral, 2) scoreB.addIf(a.collateral > b.collateral, 2) - scoreA.addIf(a.expiry > b.expiry, 1) - scoreB.addIf(a.expiry < b.expiry, 1) + if expiryA =? a.expiry and expiryB =? b.expiry: + scoreA.addIf(expiryA > expiryB, 1) + scoreB.addIf(expiryA < expiryB, 1) return scoreA > scoreB @@ -124,7 +125,7 @@ proc init*( requestId: RequestId, slotIndex: uint16, ask: StorageAsk, - expiry: uint64, + expiry: ?uint64, collateral: UInt256, seen = false, ): SlotQueueItem = @@ -139,6 +140,17 @@ proc init*( seen: seen, ) +proc init*( + _: type SlotQueueItem, + requestId: RequestId, + slotIndex: uint16, + ask: StorageAsk, + expiry: uint64, + collateral: UInt256, + seen = false, +): SlotQueueItem = + SlotQueueItem.init(requestId, slotIndex, ask, some expiry, collateral, seen) + proc init*( _: type SlotQueueItem, request: StorageRequest, @@ -151,7 +163,7 @@ proc init*( _: type SlotQueueItem, requestId: RequestId, ask: StorageAsk, - expiry: uint64, + expiry: ?uint64, collateral: UInt256, ): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} = if not ask.slots.inRange: @@ -167,10 +179,19 @@ proc init*( Rng.instance.shuffle(items) return items +proc init*( + _: type SlotQueueItem, + requestId: RequestId, + ask: StorageAsk, + expiry: uint64, + collateral: UInt256, +): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} = + SlotQueueItem.init(requestId, ask, some expiry, collateral) + proc init*( _: type SlotQueueItem, request: StorageRequest, collateral: UInt256 ): seq[SlotQueueItem] = - return SlotQueueItem.init(request.id, request.ask, request.expiry, collateral) + return SlotQueueItem.init(request.id, request.ask, uint64.none, collateral) proc inRange*(val: SomeUnsignedInt): bool = val.uint16 in SlotQueueSize.low .. SlotQueueSize.high @@ -196,6 +217,9 @@ proc collateralPerByte*(self: SlotQueueItem): UInt256 = proc seen*(self: SlotQueueItem): bool = self.seen +proc `seen=`*(self: var SlotQueueItem, seen: bool) = + self.seen = seen + proc running*(self: SlotQueue): bool = self.running diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 0c39b0a5..0d628962 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -38,6 +38,7 @@ method run*( let agent = SalesAgent(machine) let data = agent.data let context = agent.context + let market = context.market let reservations = context.reservations without onStore =? context.onStore: @@ -69,11 +70,21 @@ method run*( return await reservations.release(reservation.id, reservation.availabilityId, bytes) try: - let slotId = slotId(request.id, data.slotIndex) - let isRepairing = (await context.market.slotState(slotId)) == SlotState.Repair + let requestId = request.id + let slotId = slotId(requestId, data.slotIndex) + let requestState = await market.requestState(requestId) + let isRepairing = (await market.slotState(slotId)) == SlotState.Repair + + trace "Retrieving expiry" + var expiry: SecondsSince1970 + if state =? requestState and state == RequestState.Started: + expiry = await market.getRequestEnd(requestId) + else: + expiry = await market.requestExpiresAt(requestId) trace "Starting download" - if err =? (await onStore(request, data.slotIndex, onBlocks, isRepairing)).errorOption: + if err =? + (await onStore(request, expiry, data.slotIndex, onBlocks, isRepairing)).errorOption: return some State(SaleErrored(error: err, reprocessSlot: false)) trace "Download complete" diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 9d2fbea3..4483a0d6 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -14,6 +14,7 @@ from pkg/ethers import BlockTag import codex/clock import ../examples +import ./mockclock export market export tables @@ -51,7 +52,7 @@ type errorOnFillSlot*: ?(ref MarketError) errorOnFreeSlot*: ?(ref MarketError) errorOnGetHost*: ?(ref MarketError) - clock: ?Clock + clock: Clock Fulfillment* = object requestId*: RequestId @@ -63,7 +64,7 @@ type host*: Address slotIndex*: uint64 proof*: Groth16Proof - timestamp: ?SecondsSince1970 + timestamp: SecondsSince1970 collateral*: UInt256 Subscriptions = object @@ -119,7 +120,7 @@ proc hash*(address: Address): Hash = proc hash*(requestId: RequestId): Hash = hash(requestId.toArray) -proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket = +proc new*(_: type MockMarket, clock: Clock = MockClock.new()): MockMarket = ## Create a new mocked Market instance ## let config = MarketplaceConfig( @@ -181,10 +182,15 @@ method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.} method requestStorage*( market: MockMarket, request: StorageRequest ) {.async: (raises: [CancelledError, MarketError]).} = + let now = market.clock.now() + let requestExpiresAt = now + request.expiry.toSecondsSince1970 + let requestEndsAt = now + request.ask.duration.toSecondsSince1970 market.requested.add(request) + market.requestExpiry[request.id] = requestExpiresAt + market.requestEnds[request.id] = requestEndsAt var subscriptions = market.subscriptions.onRequest for subscription in subscriptions: - subscription.callback(request.id, request.ask, request.expiry) + subscription.callback(request.id, request.ask, requestExpiresAt.uint64) method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} = return market.activeRequests[market.signer] @@ -308,7 +314,7 @@ proc fillSlot*( slotIndex: slotIndex, proof: proof, host: host, - timestamp: market.clock .? now, + timestamp: market.clock.now, collateral: collateral, ) market.filled.add(slot) @@ -541,7 +547,11 @@ method queryPastStorageRequestedEvents*( ): Future[seq[StorageRequested]] {.async.} = return market.requested.map( request => - StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry) + StorageRequested( + requestId: request.id, + ask: request.ask, + expiry: market.requestExpiry[request.id].uint64, + ) ) method queryPastStorageRequestedEvents*( @@ -549,7 +559,11 @@ method queryPastStorageRequestedEvents*( ): Future[seq[StorageRequested]] {.async.} = return market.requested.map( request => - StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry) + StorageRequested( + requestId: request.id, + ask: request.ask, + expiry: market.requestExpiry[request.id].uint64, + ) ) method queryPastSlotFilledEvents*( @@ -571,10 +585,7 @@ method queryPastSlotFilledEvents*( ): Future[seq[SlotFilled]] {.async.} = let filtered = market.filled.filter( proc(slot: MockSlot): bool = - if timestamp =? slot.timestamp: - return timestamp >= fromTime - else: - true + return slot.timestamp >= fromTime ) return filtered.map( slot => SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index 4fe4b94f..8469d3e7 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -116,8 +116,7 @@ asyncchecksuite "Test Node - Host contracts": let onStore = !sales.onStore var request = StorageRequest.example request.content.cid = verifiableBlock.cid - request.expiry = - (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.uint64 + let expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix var fetchedBytes: uint = 0 let onBlocks = proc( @@ -127,7 +126,7 @@ asyncchecksuite "Test Node - Host contracts": fetchedBytes += blk.data.len.uint return success() - (await onStore(request, 1.uint64, onBlocks, isRepairing = false)).tryGet() + (await onStore(request, expiry, 1.uint64, onBlocks, isRepairing = false)).tryGet() check fetchedBytes == 12 * DefaultBlockSize.uint let indexer = verifiable.protectedStrategy.init( @@ -141,4 +140,4 @@ asyncchecksuite "Test Node - Host contracts": bytes = (await localStoreMetaDs.get(key)).tryGet blkMd = BlockMetadata.decode(bytes).tryGet - check blkMd.expiry == request.expiry.toSecondsSince1970 + check blkMd.expiry == expiry diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index f7c687f4..3b0eec28 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -37,7 +37,6 @@ asyncchecksuite "Sales - start": var repo: RepoStore var queue: SlotQueue var itemsProcessed: seq[SlotQueueItem] - var expiry: SecondsSince1970 setup: request = StorageRequest( @@ -51,7 +50,7 @@ asyncchecksuite "Sales - start": content: StorageContent( cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet ), - expiry: (getTime() + initDuration(hours = 1)).toUnix.uint64, + expiry: 60, ) market = MockMarket.new() @@ -63,7 +62,11 @@ asyncchecksuite "Sales - start": sales = Sales.new(market, clock, repo) reservations = sales.context.reservations sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = return success() @@ -78,8 +81,6 @@ asyncchecksuite "Sales - start": ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = return success(proof) itemsProcessed = @[] - expiry = (clock.now() + 42) - request.expiry = expiry.uint64 teardown: await sales.stop() @@ -100,7 +101,6 @@ asyncchecksuite "Sales - start": request.ask.slots = 2 market.requested = @[request] market.requestState[request.id] = RequestState.New - market.requestExpiry[request.id] = expiry let slot0 = MockSlot(requestId: request.id, slotIndex: 0, proof: proof, host: me) await fillSlot(slot0.slotIndex) @@ -167,14 +167,13 @@ asyncchecksuite "Sales": content: StorageContent( cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet ), - expiry: (getTime() + initDuration(hours = 1)).toUnix.uint64, + expiry: 60, ) market = MockMarket.new() let me = await market.getSigner() market.activeSlots[me] = @[] - market.requestEnds[request.id] = request.expiry.toSecondsSince1970 clock = MockClock.new() let repoDs = repoTmp.newDb() @@ -184,7 +183,11 @@ asyncchecksuite "Sales": sales = Sales.new(market, clock, repo) reservations = sales.context.reservations sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = return success() @@ -361,7 +364,11 @@ asyncchecksuite "Sales": test "availability size is reduced by request slot size when fully downloaded": sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = let blk = bt.Block.new(@[1.byte]).get await onBatch(blk.repeat(request.ask.slotSize.int)) @@ -374,7 +381,11 @@ asyncchecksuite "Sales": test "bytes are returned to availability once finished": var slotIndex = 0.uint64 sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = slotIndex = slot let blk = bt.Block.new(@[1.byte]).get @@ -456,7 +467,11 @@ asyncchecksuite "Sales": var storingRequest: StorageRequest sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = storingRequest = request return success() @@ -469,7 +484,11 @@ asyncchecksuite "Sales": var storingRequest: StorageRequest var storingSlot: uint64 sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = storingRequest = request storingSlot = slot @@ -482,7 +501,11 @@ asyncchecksuite "Sales": test "makes storage available again when data retrieval fails": let error = newException(IOError, "data retrieval failed") sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = return failure(error) createAvailability() @@ -551,7 +574,11 @@ asyncchecksuite "Sales": test "makes storage available again when other host fills the slot": let otherHost = Address.example sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = await sleepAsync(chronos.hours(1)) return success() @@ -562,12 +589,13 @@ asyncchecksuite "Sales": check eventually (await reservations.all(Availability)).get == @[availability] test "makes storage available again when request expires": - let expiry = getTime().toUnix() + 10 - market.requestExpiry[request.id] = expiry - let origSize = availability.freeSize sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = await sleepAsync(chronos.hours(1)) return success() @@ -578,23 +606,25 @@ asyncchecksuite "Sales": # would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation. await sleepAsync(chronos.milliseconds(100)) market.requestState[request.id] = RequestState.Cancelled - clock.set(expiry + 1) + clock.set(market.requestExpiry[request.id] + 1) check eventually (await reservations.all(Availability)).get == @[availability] check getAvailability().freeSize == origSize test "verifies that request is indeed expired from onchain before firing onCancelled": - let expiry = getTime().toUnix() + 10 # ensure only one slot, otherwise once bytes are returned to the # availability, the queue will be unpaused and availability will be consumed # by other slots request.ask.slots = 1 - market.requestExpiry[request.id] = expiry market.requestEnds[request.id] = getTime().toUnix() + cast[int64](request.ask.duration) let origSize = availability.freeSize sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + request: StorageRequest, + expiry: SecondsSince1970, + slot: uint64, + onBatch: BatchProc, + isRepairing = false, ): Future[?!void] {.async: (raises: [CancelledError]).} = await sleepAsync(chronos.hours(1)) return success() @@ -606,7 +636,7 @@ asyncchecksuite "Sales": # If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()` # would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation. await sleepAsync(chronos.milliseconds(100)) - clock.set(expiry + 1) + clock.set(market.requestExpiry[request.id] + 1) check getAvailability().freeSize == 0 market.requestState[request.id] = RequestState.Cancelled diff --git a/tests/codex/sales/testslotqueue.nim b/tests/codex/sales/testslotqueue.nim index 7dd46916..37435ba5 100644 --- a/tests/codex/sales/testslotqueue.nim +++ b/tests/codex/sales/testslotqueue.nim @@ -300,10 +300,7 @@ suite "Slot queue": let uint64Slots = uint64(maxUInt16) request.ask.slots = uint64Slots let items = SlotQueueItem.init( - request.id, - request.ask, - request.expiry, - collateral = request.ask.collateralPerSlot, + request.id, request.ask, 0, collateral = request.ask.collateralPerSlot ) check items.len.uint16 == maxUInt16 @@ -314,10 +311,7 @@ suite "Slot queue": request.ask.slots = uint64Slots expect SlotsOutOfRangeError: discard SlotQueueItem.init( - request.id, - request.ask, - request.expiry, - collateral = request.ask.collateralPerSlot, + request.id, request.ask, 0, collateral = request.ask.collateralPerSlot ) test "cannot push duplicate items": @@ -433,11 +427,12 @@ suite "Slot queue": test "sorts items by expiry descending (longer expiry = higher priority)": var request = StorageRequest.example - let item0 = - SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot) - request.expiry += 1 - let item1 = - SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot) + let item0 = SlotQueueItem.init( + request.id, 0, request.ask, expiry = 3, collateral = request.ask.collateralPerSlot + ) + let item1 = SlotQueueItem.init( + request.id, 1, request.ask, expiry = 7, collateral = request.ask.collateralPerSlot + ) check item1 < item0 test "sorts items by slot size descending (bigger dataset = higher profitability = higher priority)": @@ -545,12 +540,7 @@ suite "Slot queue": newSlotQueue(maxSize = 4, maxWorkers = 4) let request = StorageRequest.example let item0 = SlotQueueItem.init( - request.id, - 0'u16, - request.ask, - request.expiry, - request.ask.collateralPerSlot, - seen = true, + request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true ) check queue.paused check queue.push(item0).isOk @@ -560,12 +550,7 @@ suite "Slot queue": newSlotQueue(maxSize = 4, maxWorkers = 4) let request = StorageRequest.example let item = SlotQueueItem.init( - request.id, - 1'u16, - request.ask, - request.expiry, - request.ask.collateralPerSlot, - seen = false, + request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = false ) check queue.paused # push causes unpause @@ -578,20 +563,10 @@ suite "Slot queue": newSlotQueue(maxSize = 4, maxWorkers = 4) let request = StorageRequest.example let unseen = SlotQueueItem.init( - request.id, - 0'u16, - request.ask, - request.expiry, - request.ask.collateralPerSlot, - seen = false, + request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = false ) let seen = SlotQueueItem.init( - request.id, - 1'u16, - request.ask, - request.expiry, - request.ask.collateralPerSlot, - seen = true, + request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true ) # push causes unpause check queue.push(unseen).isSuccess @@ -606,20 +581,10 @@ suite "Slot queue": newSlotQueue(maxSize = 4, maxWorkers = 1) let request = StorageRequest.example let item0 = SlotQueueItem.init( - request.id, - 0'u16, - request.ask, - request.expiry, - request.ask.collateralPerSlot, - seen = true, + request.id, 0'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true ) let item1 = SlotQueueItem.init( - request.id, - 1'u16, - request.ask, - request.expiry, - request.ask.collateralPerSlot, - seen = true, + request.id, 1'u16, request.ask, 0, request.ask.collateralPerSlot, seen = true ) check queue.push(item0).isOk check queue.push(item1).isOk diff --git a/tests/codex/testpurchasing.nim b/tests/codex/testpurchasing.nim index 1834ee03..339daa18 100644 --- a/tests/codex/testpurchasing.nim +++ b/tests/codex/testpurchasing.nim @@ -96,21 +96,19 @@ asyncchecksuite "Purchasing": check purchase.error.isNone test "fails when request times out": - let expiry = getTime().toUnix() + 10 - market.requestExpiry[populatedRequest.id] = expiry let purchase = await purchasing.purchase(populatedRequest) check eventually market.requested.len > 0 + let expiry = market.requestExpiry[populatedRequest.id] clock.set(expiry + 1) expect PurchaseTimeout: await purchase.wait() test "checks that funds were withdrawn when purchase times out": - let expiry = getTime().toUnix() + 10 - market.requestExpiry[populatedRequest.id] = expiry let purchase = await purchasing.purchase(populatedRequest) check eventually market.requested.len > 0 let request = market.requested[0] + let expiry = market.requestExpiry[populatedRequest.id] clock.set(expiry + 1) expect PurchaseTimeout: await purchase.wait() diff --git a/tests/codex/testvalidation.nim b/tests/codex/testvalidation.nim index 5c95cd76..edb3d0f2 100644 --- a/tests/codex/testvalidation.nim +++ b/tests/codex/testvalidation.nim @@ -50,7 +50,7 @@ asyncchecksuite "validation": setup: groupIndex = groupIndexForSlotId(slot.id, !validationGroups) clock = MockClock.new() - market = MockMarket.new(clock = Clock(clock).some) + market = MockMarket.new(clock) market.config.proofs.period = period market.config.proofs.timeout = timeout validation = newValidation(clock, market, maxSlots, validationGroups, groupIndex)