diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dbe18e64..0d27077a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,6 @@ env: cache_nonce: 0 # Allows for easily busting actions/cache caches nim_version: pinned - concurrency: group: ${{ github.workflow }}-${{ github.ref || github.run_id }} cancel-in-progress: true @@ -23,23 +22,23 @@ jobs: matrix: ${{ steps.matrix.outputs.matrix }} cache_nonce: ${{ env.cache_nonce }} steps: - - name: Compute matrix - id: matrix - uses: fabiocaccamo/create-matrix-action@v4 - with: - matrix: | - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2} - os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2} - os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2} - os {windows}, cpu {amd64}, builder {windows-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {msys2} + - name: Compute matrix + id: matrix + uses: fabiocaccamo/create-matrix-action@v4 + with: + matrix: | + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {amd64}, builder {macos-13}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {amd64}, builder {macos-13}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {amd64}, builder {macos-13}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {amd64}, builder {macos-13}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2} + os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2} + os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2} + os {windows}, cpu {amd64}, builder {windows-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {msys2} build: needs: matrix diff --git a/.github/workflows/nim-matrix.yml b/.github/workflows/nim-matrix.yml index 2d822641..4cddc971 100644 --- a/.github/workflows/nim-matrix.yml +++ b/.github/workflows/nim-matrix.yml @@ -20,7 +20,10 @@ jobs: uses: fabiocaccamo/create-matrix-action@v4 with: matrix: | - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {all}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} build: needs: matrix diff --git a/build.nims b/build.nims index 3d1a3cac..a9a0e553 100644 --- a/build.nims +++ b/build.nims @@ -41,6 +41,9 @@ task testContracts, "Build & run Codex Contract tests": task testIntegration, "Run integration tests": buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true" test "testIntegration" + # use params to enable logging from the integration test executable + # test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " & + # "-d:chronicles_enabled_topics:integration:TRACE" task build, "build codex binary": codexTask() diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index e4a97db1..5ff82e48 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -18,6 +18,8 @@ import ../protobuf/presence import ../peers import ../../utils +import ../../utils/exceptions +import ../../utils/trackedfutures import ../../discovery import ../../stores/blockstore import ../../logutils @@ -42,7 +44,7 @@ type advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle advertiseQueue*: AsyncQueue[Cid] # Advertise queue - advertiseTasks*: seq[Future[void]] # Advertise tasks + trackedFutures*: TrackedFutures # Advertise tasks futures advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests @@ -70,20 +72,26 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} = await b.addCidToQueue(cid) await b.addCidToQueue(manifest.treeCid) -proc advertiseLocalStoreLoop(b: Advertiser) {.async.} = +proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} = while b.advertiserRunning: - if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): - trace "Advertiser begins iterating blocks..." - for c in cids: - if cid =? await c: - await b.advertiseBlock(cid) - trace "Advertiser iterating blocks finished." + try: + if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): + trace "Advertiser begins iterating blocks..." + for c in cids: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating blocks finished." - await sleepAsync(b.advertiseLocalStoreLoopSleep) + await sleepAsync(b.advertiseLocalStoreLoopSleep) + + except CancelledError: + break # do not propagate as advertiseLocalStoreLoop was asyncSpawned + except CatchableError as e: + error "failed to advertise blocks in local store", error = e.msgDetail info "Exiting advertise task loop" -proc processQueueLoop(b: Advertiser) {.async.} = +proc processQueueLoop(b: Advertiser) {.async: (raises: []).} = while b.advertiserRunning: try: let @@ -129,9 +137,11 @@ proc start*(b: Advertiser) {.async.} = b.advertiserRunning = true for i in 0.. epochTime: + high = mid - 1 + else: + return midBlockNumber + # NOTICE that by how the binary search is implemented, when it finishes + # low is always greater than high - this is why we use high, where + # intuitively we would use low: + await provider.binarySearchFindClosestBlock( + epochTime.truncate(int), low=high, high=low) + +proc blockNumberForEpoch*( + provider: Provider, + epochTime: SecondsSince1970): Future[UInt256] + {.async: (raises: [ProviderError]).} = + let epochTimeUInt256 = epochTime.u256 + let (latestBlockNumber, latestBlockTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.latest) + let (earliestBlockNumber, earliestBlockTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.earliest) + + # Initially we used the average block time to predict + # the number of blocks we need to look back in order to find + # the block number corresponding to the given epoch time. + # This estimation can be highly inaccurate if block time + # was changing in the past or is fluctuating and therefore + # we used that information initially only to find out + # if the available history is long enough to perform effective search. + # It turns out we do not have to do that. There is an easier way. + # + # First we check if the given epoch time equals the timestamp of either + # the earliest or the latest block. If it does, we just return the + # block number of that block. + # + # Otherwise, if the earliest available block is not the genesis block, + # we should check the timestamp of that earliest block and if it is greater + # than the epoch time, we should issue a warning and return + # that earliest block number. + # In all other cases, thus when the earliest block is not the genesis + # block but its timestamp is not greater than the requested epoch time, or + # if the earliest available block is the genesis block, + # (which means we have the whole history available), we should proceed with + # the binary search. + # + # Additional benefit of this method is that we do not have to rely + # on the average block time, which not only makes the whole thing + # more reliable, but also easier to test. + + # Are lucky today? + if earliestBlockTimestamp == epochTimeUInt256: + return earliestBlockNumber + if latestBlockTimestamp == epochTimeUInt256: + return latestBlockNumber + + if earliestBlockNumber > 0 and earliestBlockTimestamp > epochTimeUInt256: + let availableHistoryInDays = + (latestBlockTimestamp - earliestBlockTimestamp) div + 1.days.secs.u256 + warn "Short block history detected.", earliestBlockTimestamp = + earliestBlockTimestamp, days = availableHistoryInDays + return earliestBlockNumber + + return await provider.binarySearchBlockNumberForEpoch( + epochTimeUInt256, latestBlockNumber, earliestBlockNumber) + +proc pastBlockTag*(provider: Provider, + blocksAgo: int): + Future[BlockTag] {.async: (raises: [ProviderError]).} = + let head = await provider.getBlockNumber() + return BlockTag.init(head - blocksAgo.abs.u256) diff --git a/codex/market.nim b/codex/market.nim index cb86e0d7..38df9669 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -246,8 +246,27 @@ method subscribeProofSubmission*(market: Market, method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = raiseAssert("not implemented") -method queryPastEvents*[T: MarketplaceEvent]( - market: Market, - _: type T, - blocksAgo: int): Future[seq[T]] {.base, async.} = +method queryPastSlotFilledEvents*( + market: Market, + fromBlock: BlockTag): Future[seq[SlotFilled]] {.base, async.} = + raiseAssert("not implemented") + +method queryPastSlotFilledEvents*( + market: Market, + blocksAgo: int): Future[seq[SlotFilled]] {.base, async.} = + raiseAssert("not implemented") + +method queryPastSlotFilledEvents*( + market: Market, + fromTime: SecondsSince1970): Future[seq[SlotFilled]] {.base, async.} = + raiseAssert("not implemented") + +method queryPastStorageRequestedEvents*( + market: Market, + fromBlock: BlockTag): Future[seq[StorageRequested]] {.base, async.} = + raiseAssert("not implemented") + +method queryPastStorageRequestedEvents*( + market: Market, + blocksAgo: int): Future[seq[StorageRequested]] {.base, async.} = raiseAssert("not implemented") diff --git a/codex/node.nim b/codex/node.nim index f180fd62..a43c9270 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -763,12 +763,12 @@ proc stop*(self: CodexNodeRef) {.async.} = if hostContracts =? self.contracts.host: await hostContracts.stop() - if not self.clock.isNil: - await self.clock.stop() - if validatorContracts =? self.contracts.validator: await validatorContracts.stop() + if not self.clock.isNil: + await self.clock.stop() + if not self.networkStore.isNil: await self.networkStore.close diff --git a/codex/sales.nim b/codex/sales.nim index 5882ec1f..6cfef4cf 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -16,8 +16,8 @@ import ./sales/statemachine import ./sales/slotqueue import ./sales/states/preparing import ./sales/states/unknown -import ./utils/then import ./utils/trackedfutures +import ./utils/exceptions ## Sales holds a list of available storage that it may sell. ## @@ -325,7 +325,7 @@ proc onSlotFreed(sales: Sales, trace "slot freed, adding to queue" - proc addSlotToQueue() {.async.} = + proc addSlotToQueue() {.async: (raises: []).} = let context = sales.context let market = context.market let queue = context.slotQueue @@ -336,25 +336,22 @@ proc onSlotFreed(sales: Sales, trace "no existing request metadata, getting request info from contract" # if there's no existing slot for that request, retrieve the request # from the contract. - without request =? await market.getRequest(requestId): - error "unknown request in contract" - return + try: + without request =? await market.getRequest(requestId): + error "unknown request in contract" + return - found = SlotQueueItem.init(request, slotIndex.truncate(uint16)) + found = SlotQueueItem.init(request, slotIndex.truncate(uint16)) + except CancelledError: + discard # do not propagate as addSlotToQueue was asyncSpawned + except CatchableError as e: + error "failed to get request from contract and add slots to queue", + error = e.msgDetail if err =? queue.push(found).errorOption: - raise err + error "failed to push slot items to queue", error = err.msgDetail - addSlotToQueue() - .track(sales) - .catch(proc(err: ref CatchableError) = - if err of SlotQueueItemExistsError: - error "Failed to push item to queue becaue it already exists" - elif err of QueueNotRunningError: - warn "Failed to push item to queue becaue queue is not running" - else: - warn "Error adding request to SlotQueue", error = err.msg - ) + asyncSpawn addSlotToQueue().track(sales) proc subscribeRequested(sales: Sales) {.async.} = let context = sales.context @@ -482,7 +479,7 @@ proc subscribeSlotReservationsFull(sales: Sales) {.async.} = except CatchableError as e: error "Unable to subscribe to slot filled events", msg = e.msg -proc startSlotQueue(sales: Sales) {.async.} = +proc startSlotQueue(sales: Sales) = let slotQueue = sales.context.slotQueue let reservations = sales.context.reservations @@ -491,7 +488,7 @@ proc startSlotQueue(sales: Sales) {.async.} = trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) - asyncSpawn slotQueue.start() + slotQueue.start() proc onAvailabilityAdded(availability: Availability) {.async.} = await sales.onAvailabilityAdded(availability) @@ -518,7 +515,7 @@ proc unsubscribe(sales: Sales) {.async.} = proc start*(sales: Sales) {.async.} = await sales.load() - await sales.startSlotQueue() + sales.startSlotQueue() await sales.subscribe() sales.running = true diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 198ef80f..f565d276 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -10,7 +10,6 @@ import ../rng import ../utils import ../contracts/requests import ../utils/asyncheapqueue -import ../utils/then import ../utils/trackedfutures logScope: @@ -333,7 +332,7 @@ proc addWorker(self: SlotQueue): ?!void = proc dispatch(self: SlotQueue, worker: SlotQueueWorker, - item: SlotQueueItem) {.async.} = + item: SlotQueueItem) {.async: (raises: []).} = logScope: requestId = item.requestId slotIndex = item.slotIndex @@ -380,22 +379,7 @@ proc clearSeenFlags*(self: SlotQueue) = trace "all 'seen' flags cleared" -proc start*(self: SlotQueue) {.async.} = - if self.running: - return - - trace "starting slot queue" - - self.running = true - - # must be called in `start` to avoid sideeffects in `new` - self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers) - - # Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its - # task, a new worker will be pushed to the queue - for i in 0.. 0: let subscription = validation.subscriptions.pop() await subscription.unsubscribe() diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 07eeb856..358a5206 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -8,11 +8,18 @@ import pkg/codex/market import pkg/codex/contracts/requests import pkg/codex/contracts/proofs import pkg/codex/contracts/config + +from pkg/ethers import BlockTag +import codex/clock + import ../examples export market export tables +logScope: + topics = "mockMarket" + type MockMarket* = ref object of Market periodicity: Periodicity @@ -40,6 +47,7 @@ type config*: MarketplaceConfig canReserveSlot*: bool reserveSlotThrowError*: ?(ref MarketError) + clock: ?Clock Fulfillment* = object requestId*: RequestId proof*: Groth16Proof @@ -49,6 +57,7 @@ type host*: Address slotIndex*: UInt256 proof*: Groth16Proof + timestamp: ?SecondsSince1970 Subscriptions = object onRequest: seq[RequestSubscription] onFulfillment: seq[FulfillmentSubscription] @@ -94,7 +103,7 @@ proc hash*(address: Address): Hash = proc hash*(requestId: RequestId): Hash = hash(requestId.toArray) -proc new*(_: type MockMarket): MockMarket = +proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket = ## Create a new mocked Market instance ## let config = MarketplaceConfig( @@ -111,7 +120,8 @@ proc new*(_: type MockMarket): MockMarket = downtimeProduct: 67.uint8 ) ) - MockMarket(signer: Address.example, config: config, canReserveSlot: true) + MockMarket(signer: Address.example, config: config, + canReserveSlot: true, clock: clock) method getSigner*(market: MockMarket): Future[Address] {.async.} = return market.signer @@ -145,7 +155,7 @@ method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} = method mySlots*(market: MockMarket): Future[seq[SlotId]] {.async.} = return market.activeSlots[market.signer] -method getRequest(market: MockMarket, +method getRequest*(market: MockMarket, id: RequestId): Future[?StorageRequest] {.async.} = for request in market.requested: if request.id == id: @@ -248,7 +258,8 @@ proc fillSlot*(market: MockMarket, requestId: requestId, slotIndex: slotIndex, proof: proof, - host: host + host: host, + timestamp: market.clock.?now ) market.filled.add(slot) market.slotState[slotId(slot.requestId, slot.slotIndex)] = SlotState.Filled @@ -472,21 +483,51 @@ method subscribeProofSubmission*(mock: MockMarket, mock.subscriptions.onProofSubmitted.add(subscription) return subscription -method queryPastEvents*[T: MarketplaceEvent]( - market: MockMarket, - _: type T, - blocksAgo: int): Future[seq[T]] {.async.} = +method queryPastStorageRequestedEvents*( + market: MockMarket, + fromBlock: BlockTag): Future[seq[StorageRequested]] {.async.} = + return market.requested.map(request => + StorageRequested(requestId: request.id, + ask: request.ask, + expiry: request.expiry) + ) - if T of StorageRequested: - return market.requested.map(request => - StorageRequested(requestId: request.id, - ask: request.ask, - expiry: request.expiry) - ) - elif T of SlotFilled: - return market.filled.map(slot => - SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) - ) +method queryPastStorageRequestedEvents*( + market: MockMarket, + blocksAgo: int): Future[seq[StorageRequested]] {.async.} = + return market.requested.map(request => + StorageRequested(requestId: request.id, + ask: request.ask, + expiry: request.expiry) + ) + +method queryPastSlotFilledEvents*( + market: MockMarket, + fromBlock: BlockTag): Future[seq[SlotFilled]] {.async.} = + return market.filled.map(slot => + SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) + ) + +method queryPastSlotFilledEvents*( + market: MockMarket, + blocksAgo: int): Future[seq[SlotFilled]] {.async.} = + return market.filled.map(slot => + SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) + ) + +method queryPastSlotFilledEvents*( + market: MockMarket, + fromTime: SecondsSince1970): Future[seq[SlotFilled]] {.async.} = + let filtered = market.filled.filter( + proc (slot: MockSlot): bool = + if timestamp =? slot.timestamp: + return timestamp >= fromTime + else: + true + ) + return filtered.map(slot => + SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) + ) method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 518853ae..bf303744 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -566,6 +566,7 @@ asyncchecksuite "Sales": request.ask.slots = 2 market.requested = @[request] market.requestState[request.id] = RequestState.New + market.requestEnds[request.id] = request.expiry.toSecondsSince1970 proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} = let address = await market.getSigner() diff --git a/tests/codex/sales/testslotqueue.nim b/tests/codex/sales/testslotqueue.nim index 193751c8..e66a37d2 100644 --- a/tests/codex/sales/testslotqueue.nim +++ b/tests/codex/sales/testslotqueue.nim @@ -27,8 +27,8 @@ suite "Slot queue start/stop": check not queue.running test "can call start multiple times, and when already running": - asyncSpawn queue.start() - asyncSpawn queue.start() + queue.start() + queue.start() check queue.running test "can call stop when alrady stopped": @@ -36,12 +36,12 @@ suite "Slot queue start/stop": check not queue.running test "can call stop when running": - asyncSpawn queue.start() + queue.start() await queue.stop() check not queue.running test "can call stop multiple times": - asyncSpawn queue.start() + queue.start() await queue.stop() await queue.stop() check not queue.running @@ -62,8 +62,6 @@ suite "Slot queue workers": queue = SlotQueue.new(maxSize = 5, maxWorkers = 3) queue.onProcessSlot = onProcessSlot - proc startQueue = asyncSpawn queue.start() - teardown: await queue.stop() @@ -79,7 +77,7 @@ suite "Slot queue workers": discard SlotQueue.new(maxSize = 1, maxWorkers = 2) test "does not surpass max workers": - startQueue() + queue.start() let item1 = SlotQueueItem.example let item2 = SlotQueueItem.example let item3 = SlotQueueItem.example @@ -97,7 +95,7 @@ suite "Slot queue workers": queue.onProcessSlot = processSlot - startQueue() + queue.start() let item1 = SlotQueueItem.example let item2 = SlotQueueItem.example let item3 = SlotQueueItem.example @@ -122,7 +120,7 @@ suite "Slot queue": onProcessSlotCalled = true onProcessSlotCalledWith.add (item.requestId, item.slotIndex) done.complete() - asyncSpawn queue.start() + queue.start() setup: onProcessSlotCalled = false diff --git a/tests/codex/testvalidation.nim b/tests/codex/testvalidation.nim index 7988269b..2cfe2f06 100644 --- a/tests/codex/testvalidation.nim +++ b/tests/codex/testvalidation.nim @@ -1,9 +1,10 @@ import pkg/chronos import std/strformat -import std/random +import std/times import codex/validation import codex/periods +import codex/clock import ../asynctest import ./helpers/mockmarket @@ -11,6 +12,9 @@ import ./helpers/mockclock import ./examples import ./helpers +logScope: + topics = "testValidation" + asyncchecksuite "validation": let period = 10 let timeout = 5 @@ -20,10 +24,10 @@ asyncchecksuite "validation": let proof = Groth16Proof.example let collateral = slot.request.ask.collateral - var validation: Validation var market: MockMarket var clock: MockClock var groupIndex: uint16 + var validation: Validation proc initValidationConfig(maxSlots: MaxSlots, validationGroups: ?ValidationGroups, @@ -32,19 +36,27 @@ asyncchecksuite "validation": maxSlots, groups=validationGroups, groupIndex), error: raiseAssert fmt"Creating ValidationConfig failed! Error msg: {error.msg}" validationConfig + + proc newValidation(clock: Clock, + market: Market, + maxSlots: MaxSlots, + validationGroups: ?ValidationGroups, + groupIndex: uint16 = 0): Validation = + let validationConfig = initValidationConfig( + maxSlots, validationGroups, groupIndex) + Validation.new(clock, market, validationConfig) setup: groupIndex = groupIndexForSlotId(slot.id, !validationGroups) - market = MockMarket.new() clock = MockClock.new() - let validationConfig = initValidationConfig( - maxSlots, validationGroups, groupIndex) - validation = Validation.new(clock, market, validationConfig) + market = MockMarket.new(clock = Clock(clock).some) market.config.proofs.period = period.u256 market.config.proofs.timeout = timeout.u256 - await validation.start() + validation = newValidation( + clock, market, maxSlots, validationGroups, groupIndex) teardown: + # calling stop on validation that did not start is harmless await validation.stop() proc advanceToNextPeriod = @@ -79,6 +91,7 @@ asyncchecksuite "validation": test "initializing ValidationConfig fails when maxSlots is negative " & "(validationGroups set)": let maxSlots = -1 + let groupIndex = 0'u16 let validationConfig = ValidationConfig.init( maxSlots = maxSlots, groups = validationGroups, groupIndex) check validationConfig.isFailure == true @@ -86,45 +99,41 @@ asyncchecksuite "validation": fmt"be greater than or equal to 0! (got: {maxSlots})" test "slot is not observed if it is not in the validation group": - let validationConfig = initValidationConfig(maxSlots, validationGroups, - (groupIndex + 1) mod uint16(!validationGroups)) - let validation = Validation.new(clock, market, validationConfig) + validation = newValidation(clock, market, maxSlots, validationGroups, + (groupIndex + 1) mod uint16(!validationGroups)) await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) - await validation.stop() check validation.slots.len == 0 test "when a slot is filled on chain, it is added to the list": + await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) check validation.slots == @[slot.id] test "slot should be observed if maxSlots is set to 0": - let validationConfig = initValidationConfig( - maxSlots = 0, ValidationGroups.none) - let validation = Validation.new(clock, market, validationConfig) + validation = newValidation(clock, market, maxSlots = 0, ValidationGroups.none) await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) - await validation.stop() check validation.slots == @[slot.id] test "slot should be observed if validation group is not set (and " & "maxSlots is not 0)": - let validationConfig = initValidationConfig( - maxSlots, ValidationGroups.none) - let validation = Validation.new(clock, market, validationConfig) + validation = newValidation(clock, market, maxSlots, ValidationGroups.none) await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) - await validation.stop() check validation.slots == @[slot.id] for state in [SlotState.Finished, SlotState.Failed]: test fmt"when slot state changes to {state}, it is removed from the list": + validation = newValidation(clock, market, maxSlots, validationGroups) + await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) market.slotState[slot.id] = state advanceToNextPeriod() check eventually validation.slots.len == 0 test "when a proof is missed, it is marked as missing": + await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) market.setCanProofBeMarkedAsMissing(slot.id, true) advanceToNextPeriod() @@ -132,6 +141,7 @@ asyncchecksuite "validation": check market.markedAsMissingProofs.contains(slot.id) test "when a proof can not be marked as missing, it will not be marked": + await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) market.setCanProofBeMarkedAsMissing(slot.id, false) advanceToNextPeriod() @@ -139,13 +149,73 @@ asyncchecksuite "validation": check market.markedAsMissingProofs.len == 0 test "it does not monitor more than the maximum number of slots": - let validationGroups = ValidationGroups.none - let validationConfig = initValidationConfig( - maxSlots, validationGroups) - let validation = Validation.new(clock, market, validationConfig) + validation = newValidation(clock, market, maxSlots, ValidationGroups.none) await validation.start() for _ in 0.. 291 + # 1728436104 => 291 + # 1728436105 => 292 + # 1728436106 => 292 + # 1728436110 => 292 + proc generateExpectations( + blocks: seq[(UInt256, UInt256)]): seq[Expectations] = + var expectations: seq[Expectations] = @[] + for i in 0.. //_.log + .withLogTopics("purchases", "onchain") + .some, + + providers: + CodexConfigs.init(nodes=1) + .withSimulateProofFailures(idx=0, failEveryNProofs=1) + # .debug() # uncomment to enable console log output + # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("sales", "onchain") + .some, + + validators: + CodexConfigs.init(nodes=2) + .withValidationGroups(groups = 2) + .withValidationGroupIndex(idx = 0, groupIndex = 0) + .withValidationGroupIndex(idx = 1, groupIndex = 1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("validator") # each topic as a separate string argument + .some + ): + let client0 = clients()[0].client + let expiry = 5.periods + let duration = expiry + 10.periods + + # let mine a block to sync the blocktime with the current clock + discard await ethProvider.send("evm_mine") + + var currentTime = await ethProvider.currentTime() + let requestEndTime = currentTime.truncate(uint64) + duration + + let data = await RandomChunker.example(blocks=8) + + # TODO: better value for data.len below. This TODO is also present in + # testproofs.nim - we may want to address it or remove the comment. + createAvailabilities(data.len * 2, duration) + + let cid = client0.upload(data).get + let purchaseId = await client0.requestStorage( + cid, + expiry=expiry, + duration=duration, + nodes=nodes, + tolerance=tolerance, + proofProbability=proofProbability + ) + let requestId = client0.requestId(purchaseId).get + + debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId + + if not eventuallyS(client0.purchaseStateIs(purchaseId, "started"), + timeout = (expiry + 60).int, step = 5): + debug "validation suite: timed out waiting for the purchase to start" + fail() + return + + discard await ethProvider.send("evm_mine") + currentTime = await ethProvider.currentTime() + let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int + + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds + + check await marketplace.waitForRequestToFail( + requestId, + timeout = secondsTillRequestEnd + 60, + step = 5 + ) + + test "validator uses historical state to mark missing proofs", NodeConfigs( + # Uncomment to start Hardhat automatically, typically so logs can be inspected locally + hardhat: + HardhatConfig.none, + + clients: + CodexConfigs.init(nodes=1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("purchases", "onchain") + .some, + + providers: + CodexConfigs.init(nodes=1) + .withSimulateProofFailures(idx=0, failEveryNProofs=1) + # .debug() # uncomment to enable console log output + # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("sales", "onchain") + .some + ): + let client0 = clients()[0].client + let expiry = 5.periods + let duration = expiry + 10.periods + + # let mine a block to sync the blocktime with the current clock + discard await ethProvider.send("evm_mine") + + var currentTime = await ethProvider.currentTime() + let requestEndTime = currentTime.truncate(uint64) + duration + + let data = await RandomChunker.example(blocks=8) + + # TODO: better value for data.len below. This TODO is also present in + # testproofs.nim - we may want to address it or remove the comment. + createAvailabilities(data.len * 2, duration) + + let cid = client0.upload(data).get + let purchaseId = await client0.requestStorage( + cid, + expiry=expiry, + duration=duration, + nodes=nodes, + tolerance=tolerance, + proofProbability=proofProbability + ) + let requestId = client0.requestId(purchaseId).get + + debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId + + if not eventuallyS(client0.purchaseStateIs(purchaseId, "started"), + timeout = (expiry + 60).int, step = 5): + debug "validation suite: timed out waiting for the purchase to start" + fail() + return + + # extra block just to make sure we have one that separates us + # from the block containing the last (past) SlotFilled event + discard await ethProvider.send("evm_mine") + + var validators = CodexConfigs.init(nodes=2) + .withValidationGroups(groups = 2) + .withValidationGroupIndex(idx = 0, groupIndex = 0) + .withValidationGroupIndex(idx = 1, groupIndex = 1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to: # tests/integration/logs/ //_.log + .withLogTopics("validator") # each topic as a separate string argument + + failAndTeardownOnError "failed to start validator nodes": + for config in validators.configs.mitems: + let node = await startValidatorNode(config) + running.add RunningNode( + role: Role.Validator, + node: node + ) + + discard await ethProvider.send("evm_mine") + currentTime = await ethProvider.currentTime() + let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int + + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds + + check await marketplace.waitForRequestToFail( + requestId, + timeout = secondsTillRequestEnd + 60, + step = 5 + ) diff --git a/tests/testContracts.nim b/tests/testContracts.nim index 4283c10a..aff2c1d7 100644 --- a/tests/testContracts.nim +++ b/tests/testContracts.nim @@ -2,5 +2,6 @@ import ./contracts/testContracts import ./contracts/testMarket import ./contracts/testDeployment import ./contracts/testClock +import ./contracts/testProvider {.warning[UnusedImport]:off.} diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index b1f81ef4..f0a59ee4 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -6,6 +6,7 @@ import ./integration/testpurchasing import ./integration/testblockexpiration import ./integration/testmarketplace import ./integration/testproofs +import ./integration/testvalidator import ./integration/testecbug {.warning[UnusedImport]:off.}