From 1f49f86131fbc9721fa1d136705a1aec4601572e Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:42:05 +0700 Subject: [PATCH 1/7] fix(slotqueue): asyncSpawns futures correctly (#1034) - asyncSpawns `run` and worker `dispatch` in slotqueue. - removes usage of `then` from slotqueue. --- codex/sales.nim | 2 +- codex/sales/slotqueue.nim | 51 ++++++++++++++--------------- tests/codex/sales/testslotqueue.nim | 16 ++++----- 3 files changed, 33 insertions(+), 36 deletions(-) diff --git a/codex/sales.nim b/codex/sales.nim index 5882ec1f..f2cc366c 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -491,7 +491,7 @@ proc startSlotQueue(sales: Sales) {.async.} = trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex sales.processSlot(item, done) - asyncSpawn slotQueue.start() + slotQueue.start() proc onAvailabilityAdded(availability: Availability) {.async.} = await sales.onAvailabilityAdded(availability) diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index 198ef80f..f565d276 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -10,7 +10,6 @@ import ../rng import ../utils import ../contracts/requests import ../utils/asyncheapqueue -import ../utils/then import ../utils/trackedfutures logScope: @@ -333,7 +332,7 @@ proc addWorker(self: SlotQueue): ?!void = proc dispatch(self: SlotQueue, worker: SlotQueueWorker, - item: SlotQueueItem) {.async.} = + item: SlotQueueItem) {.async: (raises: []).} = logScope: requestId = item.requestId slotIndex = item.slotIndex @@ -380,22 +379,7 @@ proc clearSeenFlags*(self: SlotQueue) = trace "all 'seen' flags cleared" -proc start*(self: SlotQueue) {.async.} = - if self.running: - return - - trace "starting slot queue" - - self.running = true - - # must be called in `start` to avoid sideeffects in `new` - self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers) - - # Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its - # task, a new worker will be pushed to the queue - for i in 0.. Date: Sat, 14 Dec 2024 06:07:55 +0100 Subject: [PATCH 2/7] Validator historical state restoration (#922) * adds a new overload of queryPastEvents allowing to query past events based on timestamp in the past * adds state restoration to validator * refactors a bit to get the tests back to work * replaces deprecated generic methods from Market with methods for specific event types * Refactors binary search * adds market tests for querying past SlotFilled events and binary search * Takes into account that <> block available is not necessarily the genesis block * Adds more logging and makes testing earliest block boundary more reliable * adds validation tests for historical state restoration * adds mockprovider to simplify and improve testing of the edge conditions * adds slot reservation to the new tests after rebasing * adds validation groups and group index in logs of validator * adds integration test with two validators * adds comment on how to enable logging in integration test executable itself * testIntegration: makes list is running nodes injected and available in the body of the test * validation: adds integration test for historical state * adds more logging to validator * integration test: validator only looks 30 days back for historical state * adds logging of the slotState when removing slots during validation * review and refactor validator integration tests * adds validation to the set of integration tests * Fixes mistyped name of the mock provider module in testMarket * Fixes a typo in the name of the validation suite in integration tests * Makes validation unit test a bit easier to follow * better use of logScopes to reduce duplication * improves timing and clarifies the test conditions * uses http as default RPC provider for nodes running in integration tests as a workaround for dropped subscriptions * simplifies the validation integration tests by waiting for failed request instead of tracking slots * adds config option allowing selectively to set different provider url * Brings back the default settings for RPC provider in integration tests * use http RPC provider for clients in validation integration tests * fine-tune the tests * Makes validator integration test more robust - adds extra tracking * brings tracking of marketplace event back to validator integration test * refactors integration tests * deletes tmp file * adds <> after forcing integration test to fail preliminarily * re-enables all integration tests and matrix * stops debug output in CI * allows to choose a different RPC provider for a given integration test suite * fixes signature of <> method in mockProvider * adds missing import which seem to be braking integration tests on windows * makes sure that clients, SPs, and validators use the same provider url * makes validator integration tests using http at 127.0.0.1:8545 * testvalidator: stop resubscribing as we are now using http polling as rpc provider * applying review comments * groups queryPastStorage overrides together (review comment) * groups the historical validation tests into a sub suite * removes the temporary extensions in marketplacesuite and multinodesuite allowing to specify provider url * simplifies validation integration tests * Removes debug logs when waiting for request to fail * Renaming waitForRequestFailed => waitForRequestToFail * renames blockNumberForBlocksAgo to pastBlockTag and makes it private * removes redundant debugging logs * refines logging in validation * removes dev logging from mockmarket * improves exception handling in provider helper procs and prepares for extraction to a separate module * Uses chronos instead of std/times for Duration * extracts provider and binary search helpers to a separate module * removes redundant log entry params from validator * unifies the notation to consistently use method call syntax * reuses ProviderError from nim-ethers in the provider extension * clarifies the comment in multinodesuite * uses == operator to check the predefined tags and raises exception when `BlockTag.pending` is requested. * when waiting for request to fail, we break on any request state that is not Started * removes tests that were moved to testProvider from testMarket * extracts tests that use MockProvider to a separate async suite * improves performance of the historical state restoration * removing redundant log messages in validator (groupIndex and groups) * adds testProvider to testContracts group * removes unused import in testMarket --- .github/workflows/ci.yml | 35 ++-- build.nims | 3 + codex/contracts.nim | 2 + codex/contracts/market.nim | 54 ++++-- codex/contracts/provider.nim | 126 +++++++++++++ codex/market.nim | 27 ++- codex/validation.nim | 42 ++++- tests/codex/helpers/mockmarket.nim | 75 ++++++-- tests/codex/testvalidation.nim | 118 +++++++++--- tests/contracts/helpers/mockprovider.nim | 85 +++++++++ tests/contracts/testMarket.nim | 64 ++++++- tests/contracts/testProvider.nim | 163 +++++++++++++++++ tests/integration/codexconfig.nim | 45 +++++ tests/integration/marketplacesuite.nim | 3 +- tests/integration/multinodes.nim | 32 +++- tests/integration/nodeprocess.nim | 3 +- tests/integration/testmarketplace.nim | 2 - tests/integration/testvalidator.nim | 220 +++++++++++++++++++++++ tests/testContracts.nim | 1 + tests/testIntegration.nim | 1 + 20 files changed, 1004 insertions(+), 97 deletions(-) create mode 100644 codex/contracts/provider.nim create mode 100644 tests/contracts/helpers/mockprovider.nim create mode 100644 tests/contracts/testProvider.nim create mode 100644 tests/integration/testvalidator.nim diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dbe18e64..0d27077a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,6 @@ env: cache_nonce: 0 # Allows for easily busting actions/cache caches nim_version: pinned - concurrency: group: ${{ github.workflow }}-${{ github.ref || github.run_id }} cancel-in-progress: true @@ -23,23 +22,23 @@ jobs: matrix: ${{ steps.matrix.outputs.matrix }} cache_nonce: ${{ env.cache_nonce }} steps: - - name: Compute matrix - id: matrix - uses: fabiocaccamo/create-matrix-action@v4 - with: - matrix: | - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {macos}, cpu {amd64}, builder {macos-13}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2} - os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2} - os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2} - os {windows}, cpu {amd64}, builder {windows-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {msys2} + - name: Compute matrix + id: matrix + uses: fabiocaccamo/create-matrix-action@v4 + with: + matrix: | + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {amd64}, builder {macos-13}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {amd64}, builder {macos-13}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {amd64}, builder {macos-13}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {macos}, cpu {amd64}, builder {macos-13}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2} + os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2} + os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2} + os {windows}, cpu {amd64}, builder {windows-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {msys2} build: needs: matrix diff --git a/build.nims b/build.nims index 3d1a3cac..a9a0e553 100644 --- a/build.nims +++ b/build.nims @@ -41,6 +41,9 @@ task testContracts, "Build & run Codex Contract tests": task testIntegration, "Run integration tests": buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true" test "testIntegration" + # use params to enable logging from the integration test executable + # test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " & + # "-d:chronicles_enabled_topics:integration:TRACE" task build, "build codex binary": codexTask() diff --git a/codex/contracts.nim b/codex/contracts.nim index ecf298f4..512571a8 100644 --- a/codex/contracts.nim +++ b/codex/contracts.nim @@ -2,8 +2,10 @@ import contracts/requests import contracts/marketplace import contracts/market import contracts/interactions +import contracts/provider export requests export marketplace export market export interactions +export provider diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 049e38bb..a06a9486 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -1,6 +1,4 @@ -import std/sequtils import std/strutils -import std/sugar import pkg/ethers import pkg/upraises import pkg/questionable @@ -9,6 +7,7 @@ import ../logutils import ../market import ./marketplace import ./proofs +import ./provider export market @@ -467,18 +466,49 @@ method subscribeProofSubmission*(market: OnChainMarket, method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = await subscription.eventSubscription.unsubscribe() -method queryPastEvents*[T: MarketplaceEvent]( +method queryPastSlotFilledEvents*( market: OnChainMarket, - _: type T, - blocksAgo: int): Future[seq[T]] {.async.} = + fromBlock: BlockTag): Future[seq[SlotFilled]] {.async.} = convertEthersError: - let contract = market.contract - let provider = contract.provider + return await market.contract.queryFilter(SlotFilled, + fromBlock, + BlockTag.latest) - let head = await provider.getBlockNumber() - let fromBlock = BlockTag.init(head - blocksAgo.abs.u256) +method queryPastSlotFilledEvents*( + market: OnChainMarket, + blocksAgo: int): Future[seq[SlotFilled]] {.async.} = - return await contract.queryFilter(T, - fromBlock, - BlockTag.latest) + convertEthersError: + let fromBlock = + await market.contract.provider.pastBlockTag(blocksAgo) + + return await market.queryPastSlotFilledEvents(fromBlock) + +method queryPastSlotFilledEvents*( + market: OnChainMarket, + fromTime: SecondsSince1970): Future[seq[SlotFilled]] {.async.} = + + convertEthersError: + let fromBlock = + await market.contract.provider.blockNumberForEpoch(fromTime) + return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock)) + +method queryPastStorageRequestedEvents*( + market: OnChainMarket, + fromBlock: BlockTag): Future[seq[StorageRequested]] {.async.} = + + convertEthersError: + return await market.contract.queryFilter(StorageRequested, + fromBlock, + BlockTag.latest) + +method queryPastStorageRequestedEvents*( + market: OnChainMarket, + blocksAgo: int): Future[seq[StorageRequested]] {.async.} = + + convertEthersError: + let fromBlock = + await market.contract.provider.pastBlockTag(blocksAgo) + + return await market.queryPastStorageRequestedEvents(fromBlock) diff --git a/codex/contracts/provider.nim b/codex/contracts/provider.nim new file mode 100644 index 00000000..62098fb5 --- /dev/null +++ b/codex/contracts/provider.nim @@ -0,0 +1,126 @@ +import pkg/ethers/provider +import pkg/chronos +import pkg/questionable + +import ../logutils + +from ../clock import SecondsSince1970 + +logScope: + topics = "marketplace onchain provider" + +proc raiseProviderError(message: string) {.raises: [ProviderError].} = + raise newException(ProviderError, message) + +proc blockNumberAndTimestamp*(provider: Provider, blockTag: BlockTag): + Future[(UInt256, UInt256)] {.async: (raises: [ProviderError]).} = + without latestBlock =? await provider.getBlock(blockTag): + raiseProviderError("Could not get latest block") + + without latestBlockNumber =? latestBlock.number: + raiseProviderError("Could not get latest block number") + + return (latestBlockNumber, latestBlock.timestamp) + +proc binarySearchFindClosestBlock( + provider: Provider, + epochTime: int, + low: UInt256, + high: UInt256): Future[UInt256] {.async: (raises: [ProviderError]).} = + let (_, lowTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.init(low)) + let (_, highTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.init(high)) + if abs(lowTimestamp.truncate(int) - epochTime) < + abs(highTimestamp.truncate(int) - epochTime): + return low + else: + return high + +proc binarySearchBlockNumberForEpoch( + provider: Provider, + epochTime: UInt256, + latestBlockNumber: UInt256, + earliestBlockNumber: UInt256): Future[UInt256] + {.async: (raises: [ProviderError]).} = + var low = earliestBlockNumber + var high = latestBlockNumber + + while low <= high: + if low == 0 and high == 0: + return low + let mid = (low + high) div 2 + let (midBlockNumber, midBlockTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.init(mid)) + + if midBlockTimestamp < epochTime: + low = mid + 1 + elif midBlockTimestamp > epochTime: + high = mid - 1 + else: + return midBlockNumber + # NOTICE that by how the binary search is implemented, when it finishes + # low is always greater than high - this is why we use high, where + # intuitively we would use low: + await provider.binarySearchFindClosestBlock( + epochTime.truncate(int), low=high, high=low) + +proc blockNumberForEpoch*( + provider: Provider, + epochTime: SecondsSince1970): Future[UInt256] + {.async: (raises: [ProviderError]).} = + let epochTimeUInt256 = epochTime.u256 + let (latestBlockNumber, latestBlockTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.latest) + let (earliestBlockNumber, earliestBlockTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.earliest) + + # Initially we used the average block time to predict + # the number of blocks we need to look back in order to find + # the block number corresponding to the given epoch time. + # This estimation can be highly inaccurate if block time + # was changing in the past or is fluctuating and therefore + # we used that information initially only to find out + # if the available history is long enough to perform effective search. + # It turns out we do not have to do that. There is an easier way. + # + # First we check if the given epoch time equals the timestamp of either + # the earliest or the latest block. If it does, we just return the + # block number of that block. + # + # Otherwise, if the earliest available block is not the genesis block, + # we should check the timestamp of that earliest block and if it is greater + # than the epoch time, we should issue a warning and return + # that earliest block number. + # In all other cases, thus when the earliest block is not the genesis + # block but its timestamp is not greater than the requested epoch time, or + # if the earliest available block is the genesis block, + # (which means we have the whole history available), we should proceed with + # the binary search. + # + # Additional benefit of this method is that we do not have to rely + # on the average block time, which not only makes the whole thing + # more reliable, but also easier to test. + + # Are lucky today? + if earliestBlockTimestamp == epochTimeUInt256: + return earliestBlockNumber + if latestBlockTimestamp == epochTimeUInt256: + return latestBlockNumber + + if earliestBlockNumber > 0 and earliestBlockTimestamp > epochTimeUInt256: + let availableHistoryInDays = + (latestBlockTimestamp - earliestBlockTimestamp) div + 1.days.secs.u256 + warn "Short block history detected.", earliestBlockTimestamp = + earliestBlockTimestamp, days = availableHistoryInDays + return earliestBlockNumber + + return await provider.binarySearchBlockNumberForEpoch( + epochTimeUInt256, latestBlockNumber, earliestBlockNumber) + +proc pastBlockTag*(provider: Provider, + blocksAgo: int): + Future[BlockTag] {.async: (raises: [ProviderError]).} = + let head = await provider.getBlockNumber() + return BlockTag.init(head - blocksAgo.abs.u256) diff --git a/codex/market.nim b/codex/market.nim index cb86e0d7..38df9669 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -246,8 +246,27 @@ method subscribeProofSubmission*(market: Market, method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = raiseAssert("not implemented") -method queryPastEvents*[T: MarketplaceEvent]( - market: Market, - _: type T, - blocksAgo: int): Future[seq[T]] {.base, async.} = +method queryPastSlotFilledEvents*( + market: Market, + fromBlock: BlockTag): Future[seq[SlotFilled]] {.base, async.} = + raiseAssert("not implemented") + +method queryPastSlotFilledEvents*( + market: Market, + blocksAgo: int): Future[seq[SlotFilled]] {.base, async.} = + raiseAssert("not implemented") + +method queryPastSlotFilledEvents*( + market: Market, + fromTime: SecondsSince1970): Future[seq[SlotFilled]] {.base, async.} = + raiseAssert("not implemented") + +method queryPastStorageRequestedEvents*( + market: Market, + fromBlock: BlockTag): Future[seq[StorageRequested]] {.base, async.} = + raiseAssert("not implemented") + +method queryPastStorageRequestedEvents*( + market: Market, + blocksAgo: int): Future[seq[StorageRequested]] {.base, async.} = raiseAssert("not implemented") diff --git a/codex/validation.nim b/codex/validation.nim index d00f5772..cc663c5f 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -23,6 +23,9 @@ type proofTimeout: UInt256 config: ValidationConfig +const + MaxStorageRequestDuration = 30.days + logScope: topics = "codex validator" @@ -56,15 +59,15 @@ func maxSlotsConstraintRespected(validation: Validation): bool = validation.slots.len < validation.config.maxSlots func shouldValidateSlot(validation: Validation, slotId: SlotId): bool = - if (validationGroups =? validation.config.groups): - (groupIndexForSlotId(slotId, validationGroups) == - validation.config.groupIndex) and - validation.maxSlotsConstraintRespected - else: - validation.maxSlotsConstraintRespected + without validationGroups =? validation.config.groups: + return true + groupIndexForSlotId(slotId, validationGroups) == + validation.config.groupIndex proc subscribeSlotFilled(validation: Validation) {.async.} = proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) = + if not validation.maxSlotsConstraintRespected: + return let slotId = slotId(requestId, slotIndex) if validation.shouldValidateSlot(slotId): trace "Adding slot", slotId @@ -78,7 +81,7 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = for slotId in slots: let state = await validation.market.slotState(slotId) if state != SlotState.Filled: - trace "Removing slot", slotId + trace "Removing slot", slotId, slotState = state ended.incl(slotId) validation.slots.excl(ended) @@ -119,14 +122,37 @@ proc run(validation: Validation) {.async.} = except CatchableError as e: error "Validation failed", msg = e.msg +proc epochForDurationBackFromNow(validation: Validation, + duration: Duration): SecondsSince1970 = + return validation.clock.now - duration.secs + +proc restoreHistoricalState(validation: Validation) {.async.} = + trace "Restoring historical state..." + let startTimeEpoch = validation.epochForDurationBackFromNow(MaxStorageRequestDuration) + let slotFilledEvents = await validation.market.queryPastSlotFilledEvents( + fromTime = startTimeEpoch) + for event in slotFilledEvents: + if not validation.maxSlotsConstraintRespected: + break + let slotId = slotId(event.requestId, event.slotIndex) + let slotState = await validation.market.slotState(slotId) + if slotState == SlotState.Filled and validation.shouldValidateSlot(slotId): + trace "Adding slot [historical]", slotId + validation.slots.incl(slotId) + trace "Historical state restored", numberOfSlots = validation.slots.len + proc start*(validation: Validation) {.async.} = + trace "Starting validator", groups = validation.config.groups, + groupIndex = validation.config.groupIndex validation.periodicity = await validation.market.periodicity() validation.proofTimeout = await validation.market.proofTimeout() await validation.subscribeSlotFilled() + await validation.restoreHistoricalState() validation.running = validation.run() proc stop*(validation: Validation) {.async.} = - await validation.running.cancelAndWait() + if not isNil(validation.running): + await validation.running.cancelAndWait() while validation.subscriptions.len > 0: let subscription = validation.subscriptions.pop() await subscription.unsubscribe() diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 07eeb856..7a5c94b8 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -8,11 +8,18 @@ import pkg/codex/market import pkg/codex/contracts/requests import pkg/codex/contracts/proofs import pkg/codex/contracts/config + +from pkg/ethers import BlockTag +import codex/clock + import ../examples export market export tables +logScope: + topics = "mockMarket" + type MockMarket* = ref object of Market periodicity: Periodicity @@ -40,6 +47,7 @@ type config*: MarketplaceConfig canReserveSlot*: bool reserveSlotThrowError*: ?(ref MarketError) + clock: ?Clock Fulfillment* = object requestId*: RequestId proof*: Groth16Proof @@ -49,6 +57,7 @@ type host*: Address slotIndex*: UInt256 proof*: Groth16Proof + timestamp: ?SecondsSince1970 Subscriptions = object onRequest: seq[RequestSubscription] onFulfillment: seq[FulfillmentSubscription] @@ -94,7 +103,7 @@ proc hash*(address: Address): Hash = proc hash*(requestId: RequestId): Hash = hash(requestId.toArray) -proc new*(_: type MockMarket): MockMarket = +proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket = ## Create a new mocked Market instance ## let config = MarketplaceConfig( @@ -111,7 +120,8 @@ proc new*(_: type MockMarket): MockMarket = downtimeProduct: 67.uint8 ) ) - MockMarket(signer: Address.example, config: config, canReserveSlot: true) + MockMarket(signer: Address.example, config: config, + canReserveSlot: true, clock: clock) method getSigner*(market: MockMarket): Future[Address] {.async.} = return market.signer @@ -248,7 +258,8 @@ proc fillSlot*(market: MockMarket, requestId: requestId, slotIndex: slotIndex, proof: proof, - host: host + host: host, + timestamp: market.clock.?now ) market.filled.add(slot) market.slotState[slotId(slot.requestId, slot.slotIndex)] = SlotState.Filled @@ -472,21 +483,51 @@ method subscribeProofSubmission*(mock: MockMarket, mock.subscriptions.onProofSubmitted.add(subscription) return subscription -method queryPastEvents*[T: MarketplaceEvent]( - market: MockMarket, - _: type T, - blocksAgo: int): Future[seq[T]] {.async.} = +method queryPastStorageRequestedEvents*( + market: MockMarket, + fromBlock: BlockTag): Future[seq[StorageRequested]] {.async.} = + return market.requested.map(request => + StorageRequested(requestId: request.id, + ask: request.ask, + expiry: request.expiry) + ) - if T of StorageRequested: - return market.requested.map(request => - StorageRequested(requestId: request.id, - ask: request.ask, - expiry: request.expiry) - ) - elif T of SlotFilled: - return market.filled.map(slot => - SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) - ) +method queryPastStorageRequestedEvents*( + market: MockMarket, + blocksAgo: int): Future[seq[StorageRequested]] {.async.} = + return market.requested.map(request => + StorageRequested(requestId: request.id, + ask: request.ask, + expiry: request.expiry) + ) + +method queryPastSlotFilledEvents*( + market: MockMarket, + fromBlock: BlockTag): Future[seq[SlotFilled]] {.async.} = + return market.filled.map(slot => + SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) + ) + +method queryPastSlotFilledEvents*( + market: MockMarket, + blocksAgo: int): Future[seq[SlotFilled]] {.async.} = + return market.filled.map(slot => + SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) + ) + +method queryPastSlotFilledEvents*( + market: MockMarket, + fromTime: SecondsSince1970): Future[seq[SlotFilled]] {.async.} = + let filtered = market.filled.filter( + proc (slot: MockSlot): bool = + if timestamp =? slot.timestamp: + return timestamp >= fromTime + else: + true + ) + return filtered.map(slot => + SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) + ) method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) diff --git a/tests/codex/testvalidation.nim b/tests/codex/testvalidation.nim index 7988269b..2cfe2f06 100644 --- a/tests/codex/testvalidation.nim +++ b/tests/codex/testvalidation.nim @@ -1,9 +1,10 @@ import pkg/chronos import std/strformat -import std/random +import std/times import codex/validation import codex/periods +import codex/clock import ../asynctest import ./helpers/mockmarket @@ -11,6 +12,9 @@ import ./helpers/mockclock import ./examples import ./helpers +logScope: + topics = "testValidation" + asyncchecksuite "validation": let period = 10 let timeout = 5 @@ -20,10 +24,10 @@ asyncchecksuite "validation": let proof = Groth16Proof.example let collateral = slot.request.ask.collateral - var validation: Validation var market: MockMarket var clock: MockClock var groupIndex: uint16 + var validation: Validation proc initValidationConfig(maxSlots: MaxSlots, validationGroups: ?ValidationGroups, @@ -32,19 +36,27 @@ asyncchecksuite "validation": maxSlots, groups=validationGroups, groupIndex), error: raiseAssert fmt"Creating ValidationConfig failed! Error msg: {error.msg}" validationConfig + + proc newValidation(clock: Clock, + market: Market, + maxSlots: MaxSlots, + validationGroups: ?ValidationGroups, + groupIndex: uint16 = 0): Validation = + let validationConfig = initValidationConfig( + maxSlots, validationGroups, groupIndex) + Validation.new(clock, market, validationConfig) setup: groupIndex = groupIndexForSlotId(slot.id, !validationGroups) - market = MockMarket.new() clock = MockClock.new() - let validationConfig = initValidationConfig( - maxSlots, validationGroups, groupIndex) - validation = Validation.new(clock, market, validationConfig) + market = MockMarket.new(clock = Clock(clock).some) market.config.proofs.period = period.u256 market.config.proofs.timeout = timeout.u256 - await validation.start() + validation = newValidation( + clock, market, maxSlots, validationGroups, groupIndex) teardown: + # calling stop on validation that did not start is harmless await validation.stop() proc advanceToNextPeriod = @@ -79,6 +91,7 @@ asyncchecksuite "validation": test "initializing ValidationConfig fails when maxSlots is negative " & "(validationGroups set)": let maxSlots = -1 + let groupIndex = 0'u16 let validationConfig = ValidationConfig.init( maxSlots = maxSlots, groups = validationGroups, groupIndex) check validationConfig.isFailure == true @@ -86,45 +99,41 @@ asyncchecksuite "validation": fmt"be greater than or equal to 0! (got: {maxSlots})" test "slot is not observed if it is not in the validation group": - let validationConfig = initValidationConfig(maxSlots, validationGroups, - (groupIndex + 1) mod uint16(!validationGroups)) - let validation = Validation.new(clock, market, validationConfig) + validation = newValidation(clock, market, maxSlots, validationGroups, + (groupIndex + 1) mod uint16(!validationGroups)) await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) - await validation.stop() check validation.slots.len == 0 test "when a slot is filled on chain, it is added to the list": + await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) check validation.slots == @[slot.id] test "slot should be observed if maxSlots is set to 0": - let validationConfig = initValidationConfig( - maxSlots = 0, ValidationGroups.none) - let validation = Validation.new(clock, market, validationConfig) + validation = newValidation(clock, market, maxSlots = 0, ValidationGroups.none) await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) - await validation.stop() check validation.slots == @[slot.id] test "slot should be observed if validation group is not set (and " & "maxSlots is not 0)": - let validationConfig = initValidationConfig( - maxSlots, ValidationGroups.none) - let validation = Validation.new(clock, market, validationConfig) + validation = newValidation(clock, market, maxSlots, ValidationGroups.none) await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) - await validation.stop() check validation.slots == @[slot.id] for state in [SlotState.Finished, SlotState.Failed]: test fmt"when slot state changes to {state}, it is removed from the list": + validation = newValidation(clock, market, maxSlots, validationGroups) + await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) market.slotState[slot.id] = state advanceToNextPeriod() check eventually validation.slots.len == 0 test "when a proof is missed, it is marked as missing": + await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) market.setCanProofBeMarkedAsMissing(slot.id, true) advanceToNextPeriod() @@ -132,6 +141,7 @@ asyncchecksuite "validation": check market.markedAsMissingProofs.contains(slot.id) test "when a proof can not be marked as missing, it will not be marked": + await validation.start() await market.fillSlot(slot.request.id, slot.slotIndex, proof, collateral) market.setCanProofBeMarkedAsMissing(slot.id, false) advanceToNextPeriod() @@ -139,13 +149,73 @@ asyncchecksuite "validation": check market.markedAsMissingProofs.len == 0 test "it does not monitor more than the maximum number of slots": - let validationGroups = ValidationGroups.none - let validationConfig = initValidationConfig( - maxSlots, validationGroups) - let validation = Validation.new(clock, market, validationConfig) + validation = newValidation(clock, market, maxSlots, ValidationGroups.none) await validation.start() for _ in 0.. 291 + # 1728436104 => 291 + # 1728436105 => 292 + # 1728436106 => 292 + # 1728436110 => 292 + proc generateExpectations( + blocks: seq[(UInt256, UInt256)]): seq[Expectations] = + var expectations: seq[Expectations] = @[] + for i in 0.. //_.log + .withLogTopics("purchases", "onchain") + .some, + + providers: + CodexConfigs.init(nodes=1) + .withSimulateProofFailures(idx=0, failEveryNProofs=1) + # .debug() # uncomment to enable console log output + # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("sales", "onchain") + .some, + + validators: + CodexConfigs.init(nodes=2) + .withValidationGroups(groups = 2) + .withValidationGroupIndex(idx = 0, groupIndex = 0) + .withValidationGroupIndex(idx = 1, groupIndex = 1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("validator") # each topic as a separate string argument + .some + ): + let client0 = clients()[0].client + let expiry = 5.periods + let duration = expiry + 10.periods + + # let mine a block to sync the blocktime with the current clock + discard await ethProvider.send("evm_mine") + + var currentTime = await ethProvider.currentTime() + let requestEndTime = currentTime.truncate(uint64) + duration + + let data = await RandomChunker.example(blocks=8) + + # TODO: better value for data.len below. This TODO is also present in + # testproofs.nim - we may want to address it or remove the comment. + createAvailabilities(data.len * 2, duration) + + let cid = client0.upload(data).get + let purchaseId = await client0.requestStorage( + cid, + expiry=expiry, + duration=duration, + nodes=nodes, + tolerance=tolerance, + proofProbability=proofProbability + ) + let requestId = client0.requestId(purchaseId).get + + debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId + + if not eventuallyS(client0.purchaseStateIs(purchaseId, "started"), + timeout = (expiry + 60).int, step = 5): + debug "validation suite: timed out waiting for the purchase to start" + fail() + return + + discard await ethProvider.send("evm_mine") + currentTime = await ethProvider.currentTime() + let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int + + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds + + check await marketplace.waitForRequestToFail( + requestId, + timeout = secondsTillRequestEnd + 60, + step = 5 + ) + + test "validator uses historical state to mark missing proofs", NodeConfigs( + # Uncomment to start Hardhat automatically, typically so logs can be inspected locally + hardhat: + HardhatConfig.none, + + clients: + CodexConfigs.init(nodes=1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + .withLogTopics("purchases", "onchain") + .some, + + providers: + CodexConfigs.init(nodes=1) + .withSimulateProofFailures(idx=0, failEveryNProofs=1) + # .debug() # uncomment to enable console log output + # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("sales", "onchain") + .some + ): + let client0 = clients()[0].client + let expiry = 5.periods + let duration = expiry + 10.periods + + # let mine a block to sync the blocktime with the current clock + discard await ethProvider.send("evm_mine") + + var currentTime = await ethProvider.currentTime() + let requestEndTime = currentTime.truncate(uint64) + duration + + let data = await RandomChunker.example(blocks=8) + + # TODO: better value for data.len below. This TODO is also present in + # testproofs.nim - we may want to address it or remove the comment. + createAvailabilities(data.len * 2, duration) + + let cid = client0.upload(data).get + let purchaseId = await client0.requestStorage( + cid, + expiry=expiry, + duration=duration, + nodes=nodes, + tolerance=tolerance, + proofProbability=proofProbability + ) + let requestId = client0.requestId(purchaseId).get + + debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId + + if not eventuallyS(client0.purchaseStateIs(purchaseId, "started"), + timeout = (expiry + 60).int, step = 5): + debug "validation suite: timed out waiting for the purchase to start" + fail() + return + + # extra block just to make sure we have one that separates us + # from the block containing the last (past) SlotFilled event + discard await ethProvider.send("evm_mine") + + var validators = CodexConfigs.init(nodes=2) + .withValidationGroups(groups = 2) + .withValidationGroupIndex(idx = 0, groupIndex = 0) + .withValidationGroupIndex(idx = 1, groupIndex = 1) + # .debug() # uncomment to enable console log output + .withLogFile() # uncomment to output log file to: # tests/integration/logs/ //_.log + .withLogTopics("validator") # each topic as a separate string argument + + failAndTeardownOnError "failed to start validator nodes": + for config in validators.configs.mitems: + let node = await startValidatorNode(config) + running.add RunningNode( + role: Role.Validator, + node: node + ) + + discard await ethProvider.send("evm_mine") + currentTime = await ethProvider.currentTime() + let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int + + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds + + check await marketplace.waitForRequestToFail( + requestId, + timeout = secondsTillRequestEnd + 60, + step = 5 + ) diff --git a/tests/testContracts.nim b/tests/testContracts.nim index 4283c10a..aff2c1d7 100644 --- a/tests/testContracts.nim +++ b/tests/testContracts.nim @@ -2,5 +2,6 @@ import ./contracts/testContracts import ./contracts/testMarket import ./contracts/testDeployment import ./contracts/testClock +import ./contracts/testProvider {.warning[UnusedImport]:off.} diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index b1f81ef4..f0a59ee4 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -6,6 +6,7 @@ import ./integration/testpurchasing import ./integration/testblockexpiration import ./integration/testmarketplace import ./integration/testproofs +import ./integration/testvalidator import ./integration/testecbug {.warning[UnusedImport]:off.} From 01fb685bf69ad738f4766b7b14a892cb51c85d81 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Mon, 16 Dec 2024 06:19:31 +0700 Subject: [PATCH 3/7] fix(sales): replaces `then` with `asyncSpawn` (#1036) - ensures `addSlotToQueue` does not raise exceptions as it is now asyncSpawned --- codex/contracts/market.nim | 2 +- codex/sales.nim | 35 ++++++++++++++---------------- tests/codex/helpers/mockmarket.nim | 2 +- tests/codex/sales/testsales.nim | 1 + 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index a06a9486..06902868 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -115,7 +115,7 @@ method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} = await market.approveFunds(request.price()) discard await market.contract.requestStorage(request).confirm(1) -method getRequest(market: OnChainMarket, +method getRequest*(market: OnChainMarket, id: RequestId): Future[?StorageRequest] {.async.} = convertEthersError: try: diff --git a/codex/sales.nim b/codex/sales.nim index f2cc366c..6cfef4cf 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -16,8 +16,8 @@ import ./sales/statemachine import ./sales/slotqueue import ./sales/states/preparing import ./sales/states/unknown -import ./utils/then import ./utils/trackedfutures +import ./utils/exceptions ## Sales holds a list of available storage that it may sell. ## @@ -325,7 +325,7 @@ proc onSlotFreed(sales: Sales, trace "slot freed, adding to queue" - proc addSlotToQueue() {.async.} = + proc addSlotToQueue() {.async: (raises: []).} = let context = sales.context let market = context.market let queue = context.slotQueue @@ -336,25 +336,22 @@ proc onSlotFreed(sales: Sales, trace "no existing request metadata, getting request info from contract" # if there's no existing slot for that request, retrieve the request # from the contract. - without request =? await market.getRequest(requestId): - error "unknown request in contract" - return + try: + without request =? await market.getRequest(requestId): + error "unknown request in contract" + return - found = SlotQueueItem.init(request, slotIndex.truncate(uint16)) + found = SlotQueueItem.init(request, slotIndex.truncate(uint16)) + except CancelledError: + discard # do not propagate as addSlotToQueue was asyncSpawned + except CatchableError as e: + error "failed to get request from contract and add slots to queue", + error = e.msgDetail if err =? queue.push(found).errorOption: - raise err + error "failed to push slot items to queue", error = err.msgDetail - addSlotToQueue() - .track(sales) - .catch(proc(err: ref CatchableError) = - if err of SlotQueueItemExistsError: - error "Failed to push item to queue becaue it already exists" - elif err of QueueNotRunningError: - warn "Failed to push item to queue becaue queue is not running" - else: - warn "Error adding request to SlotQueue", error = err.msg - ) + asyncSpawn addSlotToQueue().track(sales) proc subscribeRequested(sales: Sales) {.async.} = let context = sales.context @@ -482,7 +479,7 @@ proc subscribeSlotReservationsFull(sales: Sales) {.async.} = except CatchableError as e: error "Unable to subscribe to slot filled events", msg = e.msg -proc startSlotQueue(sales: Sales) {.async.} = +proc startSlotQueue(sales: Sales) = let slotQueue = sales.context.slotQueue let reservations = sales.context.reservations @@ -518,7 +515,7 @@ proc unsubscribe(sales: Sales) {.async.} = proc start*(sales: Sales) {.async.} = await sales.load() - await sales.startSlotQueue() + sales.startSlotQueue() await sales.subscribe() sales.running = true diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 7a5c94b8..358a5206 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -155,7 +155,7 @@ method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} = method mySlots*(market: MockMarket): Future[seq[SlotId]] {.async.} = return market.activeSlots[market.signer] -method getRequest(market: MockMarket, +method getRequest*(market: MockMarket, id: RequestId): Future[?StorageRequest] {.async.} = for request in market.requested: if request.id == id: diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 518853ae..bf303744 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -566,6 +566,7 @@ asyncchecksuite "Sales": request.ask.slots = 2 market.requested = @[request] market.requestState[request.id] = RequestState.New + market.requestEnds[request.id] = request.expiry.toSecondsSince1970 proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} = let address = await market.getSigner() From 6d415b0ace0a3cd7c3a49109845fd0fe9ad9cae2 Mon Sep 17 00:00:00 2001 From: Slava <20563034+veaceslavdoina@users.noreply.github.com> Date: Mon, 16 Dec 2024 01:31:55 +0200 Subject: [PATCH 4/7] ci: split nim-matrix workflow (#1041) --- .github/workflows/nim-matrix.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/nim-matrix.yml b/.github/workflows/nim-matrix.yml index 2d822641..4cddc971 100644 --- a/.github/workflows/nim-matrix.yml +++ b/.github/workflows/nim-matrix.yml @@ -20,7 +20,10 @@ jobs: uses: fabiocaccamo/create-matrix-action@v4 with: matrix: | - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {all}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} build: needs: matrix From 5f2ba1428195fd4bb67cfb20b7b886fec9bab758 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Mon, 16 Dec 2024 11:24:57 +0700 Subject: [PATCH 5/7] fix(codexnode): ensure timer loop is asyncSpawned (#1038) * fix(codexnode): stop clock after validator stops * fix(timer): ensure timer loop is asyncSpawned --- codex/node.nim | 6 +++--- codex/utils/timer.nim | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index f180fd62..a43c9270 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -763,12 +763,12 @@ proc stop*(self: CodexNodeRef) {.async.} = if hostContracts =? self.contracts.host: await hostContracts.stop() - if not self.clock.isNil: - await self.clock.stop() - if validatorContracts =? self.contracts.validator: await validatorContracts.stop() + if not self.clock.isNil: + await self.clock.stop() + if not self.networkStore.isNil: await self.networkStore.close diff --git a/codex/utils/timer.nim b/codex/utils/timer.nim index 9361d07b..b01d95c6 100644 --- a/codex/utils/timer.nim +++ b/codex/utils/timer.nim @@ -30,13 +30,13 @@ proc new*(T: type Timer, timerName = "Unnamed Timer"): Timer = ## Create a new Timer intance with the given name Timer(name: timerName) -proc timerLoop(timer: Timer) {.async.} = +proc timerLoop(timer: Timer) {.async: (raises: []).} = try: while true: await timer.callback() await sleepAsync(timer.interval) except CancelledError: - raise + discard # do not propagate as timerLoop is asyncSpawned except CatchableError as exc: error "Timer caught unhandled exception: ", name=timer.name, msg=exc.msg @@ -47,9 +47,10 @@ method start*(timer: Timer, callback: TimerCallback, interval: Duration) {.base. timer.callback = callback timer.interval = interval timer.loopFuture = timerLoop(timer) + asyncSpawn timer.loopFuture method stop*(timer: Timer) {.async, base.} = - if timer.loopFuture != nil: + if timer.loopFuture != nil and not timer.loopFuture.finished: trace "Timer stopping: ", name=timer.name await timer.loopFuture.cancelAndWait() timer.loopFuture = nil From b0cc27f56380ffd8fd647e80ed92bf8d7efe95d4 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Mon, 16 Dec 2024 13:01:49 +0700 Subject: [PATCH 6/7] fix(blockexchange): ensures futures are asyncSpawned (#1037) * fix(blockexchange): asyncSpawn advertising of local store blocks * fix(blockexchange): asyncSpawn discoveryQueueLoop - prevents silently swallowing async errors * fix(blockexchange): asyncSpawns block exchange tasks - prevents silently swallow future exceptions --- codex/blockexchange/engine/advertiser.nim | 51 ++++++++++++----------- codex/blockexchange/engine/discovery.nim | 44 ++++++++++--------- codex/blockexchange/engine/engine.nim | 29 +++++++------ 3 files changed, 67 insertions(+), 57 deletions(-) diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index e4a97db1..5ff82e48 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -18,6 +18,8 @@ import ../protobuf/presence import ../peers import ../../utils +import ../../utils/exceptions +import ../../utils/trackedfutures import ../../discovery import ../../stores/blockstore import ../../logutils @@ -42,7 +44,7 @@ type advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle advertiseQueue*: AsyncQueue[Cid] # Advertise queue - advertiseTasks*: seq[Future[void]] # Advertise tasks + trackedFutures*: TrackedFutures # Advertise tasks futures advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests @@ -70,20 +72,26 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} = await b.addCidToQueue(cid) await b.addCidToQueue(manifest.treeCid) -proc advertiseLocalStoreLoop(b: Advertiser) {.async.} = +proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} = while b.advertiserRunning: - if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): - trace "Advertiser begins iterating blocks..." - for c in cids: - if cid =? await c: - await b.advertiseBlock(cid) - trace "Advertiser iterating blocks finished." + try: + if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): + trace "Advertiser begins iterating blocks..." + for c in cids: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating blocks finished." - await sleepAsync(b.advertiseLocalStoreLoopSleep) + await sleepAsync(b.advertiseLocalStoreLoopSleep) + + except CancelledError: + break # do not propagate as advertiseLocalStoreLoop was asyncSpawned + except CatchableError as e: + error "failed to advertise blocks in local store", error = e.msgDetail info "Exiting advertise task loop" -proc processQueueLoop(b: Advertiser) {.async.} = +proc processQueueLoop(b: Advertiser) {.async: (raises: []).} = while b.advertiserRunning: try: let @@ -129,9 +137,11 @@ proc start*(b: Advertiser) {.async.} = b.advertiserRunning = true for i in 0.. Date: Mon, 16 Dec 2024 13:55:19 +0700 Subject: [PATCH 7/7] fix(validation): asyncSpawns run (#1039) - annotates run with raises: [] - asyncSpawns run, to ensure there are no escaping exceptions --- codex/validation.nim | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/codex/validation.nim b/codex/validation.nim index cc663c5f..3e9e63ff 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -109,7 +109,7 @@ proc markProofsAsMissing(validation: Validation) {.async.} = let previousPeriod = validation.getCurrentPeriod() - 1 await validation.markProofAsMissing(slotId, previousPeriod) -proc run(validation: Validation) {.async.} = +proc run(validation: Validation) {.async: (raises: []).} = trace "Validation started" try: while true: @@ -118,7 +118,7 @@ proc run(validation: Validation) {.async.} = await validation.markProofsAsMissing() except CancelledError: trace "Validation stopped" - discard + discard # do not propagate as run is asyncSpawned except CatchableError as e: error "Validation failed", msg = e.msg @@ -149,9 +149,10 @@ proc start*(validation: Validation) {.async.} = await validation.subscribeSlotFilled() await validation.restoreHistoricalState() validation.running = validation.run() + asyncSpawn validation.running proc stop*(validation: Validation) {.async.} = - if not isNil(validation.running): + if not validation.running.isNil and not validation.running.finished: await validation.running.cancelAndWait() while validation.subscriptions.len > 0: let subscription = validation.subscriptions.pop()