import std/sequtils import std/sugar import std/times import pkg/chronos import pkg/datastore/typedds import pkg/questionable import pkg/questionable/results import pkg/codex/sales import pkg/codex/sales/salesdata import pkg/codex/sales/salescontext import pkg/codex/sales/reservations import pkg/codex/sales/slotqueue import pkg/codex/stores/repostore import pkg/codex/blocktype as bt import pkg/codex/node import pkg/codex/utils/asyncstatemachine import ../../asynctest import ../helpers import ../helpers/mockmarket import ../helpers/mockclock import ../helpers/always import ../examples import ./helpers/periods asyncchecksuite "Sales - start": let proof = Groth16Proof.example repoTmp = TempLevelDb.new() metaTmp = TempLevelDb.new() var request: StorageRequest var sales: Sales var market: MockMarket var clock: MockClock var reservations: Reservations var repo: RepoStore var queue: SlotQueue var itemsProcessed: seq[SlotQueueItem] setup: request = StorageRequest( ask: StorageAsk( slots: 4, slotSize: 100.u256, duration: 60.u256, reward: 10.u256, collateral: 200.u256, ), content: StorageContent( cid: "some cid" ), expiry: (getTime() + initDuration(hours=1)).toUnix.u256 ) market = MockMarket.new() clock = MockClock.new() let repoDs = repoTmp.newDb() let metaDs = metaTmp.newDb() repo = RepoStore.new(repoDs, metaDs) await repo.start() sales = Sales.new(market, clock, repo) reservations = sales.context.reservations sales.onStore = proc(request: StorageRequest, slot: UInt256, onBatch: BatchProc): Future[?!void] {.async.} = return success() sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} = return success() queue = sales.context.slotQueue sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} = return success(proof) itemsProcessed = @[] request.expiry = (clock.now() + 42).u256 teardown: await sales.stop() await repo.stop() await repoTmp.destroyDb() await metaTmp.destroyDb() proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} = let address = await market.getSigner() let slot = MockSlot(requestId: request.id, slotIndex: slotIdx, proof: proof, host: address) market.filled.add slot market.slotState[slotId(request.id, slotIdx)] = SlotState.Filled test "load slots when Sales module starts": let me = await market.getSigner() request.ask.slots = 2 market.requested = @[request] market.requestState[request.id] = RequestState.New let slot0 = MockSlot(requestId: request.id, slotIndex: 0.u256, proof: proof, host: me) await fillSlot(slot0.slotIndex) let slot1 = MockSlot(requestId: request.id, slotIndex: 1.u256, proof: proof, host: me) await fillSlot(slot1.slotIndex) market.activeSlots[me] = @[request.slotId(0.u256), request.slotId(1.u256)] market.requested = @[request] market.activeRequests[me] = @[request.id] await sales.start() check eventually sales.agents.len == 2 check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 0.u256) check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256) asyncchecksuite "Sales": let proof = Groth16Proof.example repoTmp = TempLevelDb.new() metaTmp = TempLevelDb.new() var availability: Availability var request: StorageRequest var sales: Sales var market: MockMarket var clock: MockClock var reservations: Reservations var repo: RepoStore var queue: SlotQueue var itemsProcessed: seq[SlotQueueItem] setup: availability = Availability( totalSize: 100.u256, freeSize: 100.u256, duration: 60.u256, minPrice: 600.u256, maxCollateral: 400.u256 ) request = StorageRequest( ask: StorageAsk( slots: 4, slotSize: 100.u256, duration: 60.u256, reward: 10.u256, collateral: 200.u256, ), content: StorageContent( cid: "some cid" ), expiry: (getTime() + initDuration(hours=1)).toUnix.u256 ) market = MockMarket.new() let me = await market.getSigner() market.activeSlots[me] = @[] market.requestEnds[request.id] = request.expiry.toSecondsSince1970 clock = MockClock.new() let repoDs = repoTmp.newDb() let metaDs = metaTmp.newDb() repo = RepoStore.new(repoDs, metaDs) await repo.start() sales = Sales.new(market, clock, repo) reservations = sales.context.reservations sales.onStore = proc(request: StorageRequest, slot: UInt256, onBatch: BatchProc): Future[?!void] {.async.} = return success() sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} = return success() queue = sales.context.slotQueue sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} = return success(proof) await sales.start() itemsProcessed = @[] teardown: await sales.stop() await repo.stop() await repoTmp.destroyDb() await metaTmp.destroyDb() proc isInState(idx: int, state: string): bool = proc description(state: State): string = $state if idx >= sales.agents.len: return false sales.agents[idx].query(description) == state.some proc allowRequestToStart {.async.} = check eventually isInState(0, "SaleInitialProving") # it won't start proving until the next period await clock.advanceToNextPeriod(market) proc getAvailability: Availability = let key = availability.id.key.get (waitFor reservations.get(key, Availability)).get proc createAvailability() = let a = waitFor reservations.createAvailability( availability.totalSize, availability.duration, availability.minPrice, availability.maxCollateral ) availability = a.get # update id proc notProcessed(itemsProcessed: seq[SlotQueueItem], request: StorageRequest): bool = let items = SlotQueueItem.init(request) for i in 0.. 0 # queue starts paused, allow items to be added to the queue check eventually queue.paused # The first processed item will be will have been re-pushed with `seen = # true`. Then, once this item is processed by the queue, its 'seen' flag # will be checked, at which point the queue will be paused. This test could # check item existence in the queue, but that would require inspecting # onProcessSlot to see which item was first, and overridding onProcessSlot # will prevent the queue working as expected in the Sales module. check eventually queue.len == 4 for item in items: check queue.contains(item) for i in 0.. 0 # queue starts paused, allow items to be added to the queue check eventually queue.paused # The first processed item/slot will be filled (eventually). Subsequent # items will be processed and eventually re-pushed with `seen = true`. Once # a "seen" item is processed by the queue, the queue is paused. In the # meantime, the other items that are process, marked as seen, and re-added # to the queue may be processed simultaneously as the queue pausing. # Therefore, there should eventually be 3 items remaining in the queue, all # seen. check eventually queue.len == 3 for i in 0.. 0 check market.filled[0].requestId == request.id check market.filled[0].slotIndex < request.ask.slots.u256 check market.filled[0].proof == proof check market.filled[0].host == await market.getSigner() test "calls onFilled when slot is filled": var soldRequest = StorageRequest.default var soldSlotIndex = UInt256.high sales.onSale = proc(request: StorageRequest, slotIndex: UInt256) = soldRequest = request soldSlotIndex = slotIndex createAvailability() await market.requestStorage(request) await allowRequestToStart() check eventually soldRequest == request check soldSlotIndex < request.ask.slots.u256 test "calls onClear when storage becomes available again": # fail the proof intentionally to trigger `agent.finish(success=false)`, # which then calls the onClear callback sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.async.} = raise newException(IOError, "proof failed") var clearedRequest: StorageRequest var clearedSlotIndex: UInt256 sales.onClear = proc(request: StorageRequest, slotIndex: UInt256) = clearedRequest = request clearedSlotIndex = slotIndex createAvailability() await market.requestStorage(request) await allowRequestToStart() check eventually clearedRequest == request check clearedSlotIndex < request.ask.slots.u256 test "makes storage available again when other host fills the slot": let otherHost = Address.example sales.onStore = proc(request: StorageRequest, slot: UInt256, onBatch: BatchProc): Future[?!void] {.async.} = await sleepAsync(chronos.hours(1)) return success() createAvailability() await market.requestStorage(request) for slotIndex in 0.. agent.data.requestId == request.id and agent.data.slotIndex == 0.u256) check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256) test "deletes inactive reservations on load": createAvailability() discard await reservations.createReservation( availability.id, 100.u256, RequestId.example, UInt256.example) check (await reservations.all(Reservation)).get.len == 1 await sales.load() check (await reservations.all(Reservation)).get.len == 0 check getAvailability().freeSize == availability.freeSize # was restored