From 5b4d762600a617ec6f26289723ebccee6bfbde75 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 4 Mar 2025 16:48:42 -0600 Subject: [PATCH] wip --- Makefile | 2 +- codex.nim | 2 +- codex/blockexchange/engine/engine.nim | 283 +++++++++++------- codex/blockexchange/engine/pendingblocks.nim | 6 + codex/blockexchange/peers/peerctxstore.nim | 10 +- codex/blocktype.nim | 7 + .../blockexchange/engine/testblockexc.nim | 2 +- .../codex/blockexchange/engine/testengine.nim | 10 +- tests/codex/testasyncheapqueue.nim | 263 +++++++++------- vendor/nim-circom-compat | 2 +- 10 files changed, 371 insertions(+), 216 deletions(-) diff --git a/Makefile b/Makefile index 29d6c11d..7a92d8b4 100644 --- a/Makefile +++ b/Makefile @@ -114,7 +114,7 @@ else NIM_PARAMS := $(NIM_PARAMS) -d:release endif -deps: | deps-common nat-libs +deps: | deps-common nat-libs build-nph ifneq ($(USE_LIBBACKTRACE), 0) deps: | libbacktrace endif diff --git a/codex.nim b/codex.nim index 7749bdee..c22bd4e9 100644 --- a/codex.nim +++ b/codex.nim @@ -142,7 +142,7 @@ when isMainModule: chronos.poll() except Exception as exc: error "Unhandled exception in async proc, aborting", msg = exc.msg - quit QuitFailure + raise exc try: # signal handlers guarantee that the shutdown Future will diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 35785cfe..6ce8c193 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -67,19 +67,32 @@ declareCounter( const DefaultMaxPeersPerRequest* = 10 DefaultTaskQueueSize = 100 - DefaultConcurrentTasks = 10 + DefaultConcurrentTasks = 5 + DefaultMaxSendBatch = 5 + DefaultBatchSendInterval = 10.millis type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} + Pricing* = object + address*: EthAddress + price*: UInt256 + + BlockCtx = object + address: BlockAddress + wantType: WantType + priority: int + BlockExcEngine* = ref object of RootObj localStore*: BlockStore # Local block store for this instance network*: BlockExcNetwork # Petwork interface peers*: PeerCtxStore # Peers we're currently actively exchanging with - taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] + uploadQueue*: AsyncHeapQueue[BlockExcPeerCtx] + downloadQueue*: AsyncHeapQueue[BlockAddress] # Peers we're currently processing tasks for - concurrentTasks: int # Number of concurrent peers we're serving at any given time + uploadWorkers: int = DefaultConcurrentTasks # Number of uploads + downloadWorkers: int = DefaultConcurrentTasks # Number of concurrent downloads trackedFutures: TrackedFutures # Tracks futures of blockexc tasks blockexcRunning: bool # Indicates if the blockexc task is running pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved @@ -87,19 +100,26 @@ type pricing*: ?Pricing # Optional bandwidth pricing discovery*: DiscoveryEngine advertiser*: Advertiser - - Pricing* = object - address*: EthAddress - price*: UInt256 + maxSendBatch*: int = DefaultMaxSendBatch + batchSendInterval*: Duration = DefaultBatchSendInterval # attach task scheduler to engine -proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, raises: [].} = - if self.taskQueue.pushOrUpdateNoWait(task).isOk(): - trace "Task scheduled for peer", peer = task.id +proc scheduleUploadTask( + self: BlockExcEngine, task: BlockExcPeerCtx +) {.gcsafe, raises: [].} = + if self.uploadQueue.pushOrUpdateNoWait(task).isOk(): + trace "Upload task scheduled for peer", peer = task.id else: - warn "Unable to schedule task for peer", peer = task.id + warn "Unable to schedule upload task for peer", peer = task.id -proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} +proc scheduleDownloadTask( + self: BlockExcEngine, blk: BlockAddress +) {.gcsafe, async: (raw: true).} = + trace "Scheduling download task for block", blk + self.downloadQueue.pushOrUpdate(blk) + +proc blockexcUploadTaskRunner(self: BlockExcEngine) {.async: (raises: []).} +proc blockexcDownloadTaskRunner(self: BlockExcEngine) {.async: (raises: []).} proc start*(self: BlockExcEngine) {.async: (raises: []).} = ## Start the blockexc task @@ -108,14 +128,19 @@ proc start*(self: BlockExcEngine) {.async: (raises: []).} = await self.discovery.start() await self.advertiser.start() - trace "Blockexc starting with concurrent tasks", tasks = self.concurrentTasks + trace "Blockexc starting with concurrent tasks", + uploadWorkers = self.uploadWorkers, downloadWorkers = self.downloadWorkers if self.blockexcRunning: warn "Starting blockexc twice" return self.blockexcRunning = true - for i in 0 ..< self.concurrentTasks: - let fut = self.blockexcTaskRunner() + for i in 0 ..< self.uploadWorkers: + let fut = self.blockexcUploadTaskRunner() + self.trackedFutures.track(fut) + + for i in 0 ..< self.downloadWorkers: + let fut = self.blockexcDownloadTaskRunner() self.trackedFutures.track(fut) proc stop*(self: BlockExcEngine) {.async: (raises: []).} = @@ -157,66 +182,13 @@ proc sendWantBlock( proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = Rng.instance.sample(peers) -proc downloadInternal( - self: BlockExcEngine, address: BlockAddress -) {.async: (raises: []).} = - logScope: - address = address - - let handle = self.pendingBlocks.getWantHandle(address) - trace "Downloading block" - try: - while address in self.pendingBlocks: - logScope: - retries = self.pendingBlocks.retries(address) - interval = self.pendingBlocks.retryInterval - - if self.pendingBlocks.retriesExhausted(address): - trace "Error retries exhausted" - handle.fail(newException(RetriesExhaustedError, "Error retries exhausted")) - break - - trace "Running retry handle" - let peers = self.peers.getPeersForBlock(address) - logScope: - peersWith = peers.with.len - peersWithout = peers.without.len - - trace "Peers for block" - if peers.with.len > 0: - self.pendingBlocks.setInFlight(address, true) - await self.sendWantBlock(@[address], peers.with.randomPeer) - else: - self.pendingBlocks.setInFlight(address, false) - if peers.without.len > 0: - await self.sendWantHave(@[address], peers.without) - self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - - await (handle or sleepAsync(self.pendingBlocks.retryInterval)) - self.pendingBlocks.decRetries(address) - - if handle.finished: - trace "Handle for block finished", failed = handle.failed - break - except CancelledError as exc: - trace "Block download cancelled" - if not handle.finished: - await handle.cancelAndWait() - except CatchableError as exc: - warn "Error downloadloading block", exc = exc.msg - if not handle.finished: - handle.fail(exc) - finally: - self.pendingBlocks.setInFlight(address, false) - proc requestBlock*( self: BlockExcEngine, address: BlockAddress ): Future[?!Block] {.async: (raises: [CancelledError]).} = - if address notin self.pendingBlocks: - self.trackedFutures.track(self.downloadInternal(address)) - + trace "Requestion block", address try: let handle = self.pendingBlocks.getWantHandle(address) + await self.scheduleDownloadTask(address) success await handle except CancelledError as err: warn "Block request cancelled", address @@ -266,7 +238,7 @@ proc blockPresenceHandler*( if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption: warn "Failed to send wantBlock to peer", peer, err = err.msg -proc scheduleTasks( +proc scheduleUploadTasks( self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] ) {.async: (raises: [CancelledError]).} = let cids = blocksDelivery.mapIt(it.blk.cid) @@ -278,9 +250,9 @@ proc scheduleTasks( # and we have it in our local store if c in p.peerWantsCids: try: + # TODO: the try/except should go away once blockstore tracks exceptions if await (c in self.localStore): - # TODO: the try/except should go away once blockstore tracks exceptions - self.scheduleTask(p) + self.scheduleUploadTask(p) break except CancelledError as exc: warn "Checking local store canceled", cid = c, err = exc.msg @@ -331,8 +303,12 @@ proc cancelBlocks( proc resolveBlocks*( self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] ) {.async: (raises: [CancelledError]).} = + # drop blocks from the queue + blocksDelivery.mapIt(it.address).apply do(address: BlockAddress): + self.downloadQueue.delete(address) + self.pendingBlocks.resolve(blocksDelivery) - await self.scheduleTasks(blocksDelivery) + await self.scheduleUploadTasks(blocksDelivery) await self.cancelBlocks(blocksDelivery.mapIt(it.address)) proc resolveBlocks*( @@ -422,18 +398,19 @@ proc blocksDeliveryHandler*( validatedBlocksDelivery.add(bd) - codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) + if validatedBlocksDelivery.len > 0: + codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) - let peerCtx = self.peers.get(peer) - if peerCtx != nil: - if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption: - warn "Error paying for blocks", err = err.msg + let peerCtx = self.peers.get(peer) + if peerCtx != nil: + if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption: + warn "Error paying for blocks", err = err.msg + return + + if err =? catch(await self.resolveBlocks(validatedBlocksDelivery)).errorOption: + warn "Error resolving blocks", err = err.msg return - if err =? catch(await self.resolveBlocks(validatedBlocksDelivery)).errorOption: - warn "Error resolving blocks", err = err.msg - return - proc wantListHandler*( self: BlockExcEngine, peer: PeerId, wantList: WantList ) {.async: (raises: []).} = @@ -516,8 +493,8 @@ proc wantListHandler*( await self.network.request.sendPresence(peer, presence) if schedulePeer: - self.scheduleTask(peerCtx) - except CancelledError as exc: #TODO: replace with CancelledError + self.scheduleUploadTask(peerCtx) + except CancelledError as exc: warn "Error processing want list", error = exc.msg proc accountHandler*( @@ -577,9 +554,7 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} = # drop the peer from the peers table self.peers.remove(peer) -proc taskHandler*( - self: BlockExcEngine, task: BlockExcPeerCtx -) {.gcsafe, async: (raises: [CancelledError, RetriesExhaustedError]).} = +proc uploadTaskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = # Send to the peer blocks he wants to get, # if they present in our local store @@ -637,19 +612,124 @@ proc taskHandler*( task.peerWants.keepItIf(it.address notin successAddresses) -proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = +proc blockexcUploadTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = ## process tasks ## - trace "Starting blockexc task runner" - try: - while self.blockexcRunning: - let peerCtx = await self.taskQueue.pop() - await self.taskHandler(peerCtx) - except CatchableError as exc: - error "error running block exchange task", error = exc.msg + trace "Starting upload blockexc task runner" + while self.blockexcRunning: + try: + let peerCtx = await self.uploadQueue.pop() - info "Exiting blockexc task runner" + await self.uploadTaskHandler(peerCtx) + except CancelledError: + break + except CatchableError as exc: + error "Error running block exchange upload task", error = exc.msg + + info "Exiting blockexc upload task runner" + +proc downloadBlocks( + self: BlockExcEngine, addresses: seq[BlockAddress] +) {.async: (raises: [CancelledError]).} = + let + requestBlocks = addresses.filterIt( + it in self.pendingBlocks and not self.pendingBlocks.isInFlight(it) + ) + peers = self.peers.getPeersForBlocks(requestBlocks) + + if requestBlocks.len <= 0: + warn "All blocks inflight, no blocks to download" + return + + logScope: + peersWith = peers.with.len + peersWithout = peers.without.len + + trace "Peers for block" + if peers.with.len > 0: + # set inflight flag for all blocks + requestBlocks.apply do(address: BlockAddress): + self.pendingBlocks.setInFlight(address, true) + + await self.sendWantBlock(requestBlocks, peers.with.randomPeer) + else: + # reset inflight flag for all blocks + requestBlocks.apply do(address: BlockAddress): + self.pendingBlocks.setInFlight(address, false) + + if peers.without.len > 0: + await self.sendWantHave(requestBlocks, peers.without) + + # requeue blocks onto the download queue + # await allFutures( + # requestBlocks + # .filterIt(it in self.pendingBlocks and not self.pendingBlocks.isInFlight(it)) + # .mapIt(self.scheduleDownloadTask(it)) + # ) + + self.discovery.queueFindBlocksReq(requestBlocks.mapIt(it.cidOrTreeCid)) + +proc blockexcDownloadTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = + ## process tasks + ## + + trace "Starting download blockexc task runner" + while self.blockexcRunning: + try: + var + batch: HashSet[BlockAddress] + batchSendDeadline = + # TODO: batchSendInterval * downloadWorkers, experimenting with deadline length + # the rationale is that the deadline is roughly per worker, so this gives each + # worker time to build a batch + # Moment.now() + (self.batchSendInterval * self.downloadWorkers) + Moment.now() + self.batchSendInterval + + logScope: + batchSendDeadline = $batchSendDeadline + batchSendInterval = $self.batchSendInterval + batch = batch.len + + while (batch.len < self.maxSendBatch) and (batchSendDeadline > Moment.now()): + let + addressFut = self.downloadQueue.pop() + address = + if self.downloadQueue.empty() and batch.len == 0: + trace "Queue is empty, waiting for block address" + # echo "Pending Futures: ", dumpPendingFutures() + let address = await addressFut + trace "Got block address from queue after empty", address + address + else: + trace "Waiting for block address or deadline" + let timeoutFut = sleepAsync(self.batchSendInterval) + await (addressFut or timeoutFut) + if addressFut.finished: + timeoutFut.cancelSoon() + let address = await addressFut + trace "Got block address from queue", address + if address in batch: + trace "Skipping block download, already in batch or inflight", address + continue + else: + address + else: + addressFut.cancelSoon() + continue + + batch.incl(address) + + trace "Batching block download" + if batch.len > 0: + await self.downloadBlocks(toSeq(batch)) + except CancelledError: + trace "Cancelled blockexc download task runner" + break + except CatchableError as exc: + error "Error running block exchange download task", error = exc.msg + + info "Exiting blockexc download task runner" proc new*( T: type BlockExcEngine, @@ -660,7 +740,10 @@ proc new*( advertiser: Advertiser, peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, - concurrentTasks = DefaultConcurrentTasks, + uploadWorkers = DefaultConcurrentTasks, + downloadWorkers = DefaultConcurrentTasks, + uploadPendingTasks = DefaultTaskQueueSize, + downloadPendingTasks = DefaultTaskQueueSize, ): BlockExcEngine = ## Create new block exchange engine instance ## @@ -671,9 +754,9 @@ proc new*( pendingBlocks: pendingBlocks, network: network, wallet: wallet, - concurrentTasks: concurrentTasks, trackedFutures: TrackedFutures(), - taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), + uploadQueue: newAsyncHeapQueue[BlockExcPeerCtx](uploadPendingTasks), + downloadQueue: newAsyncHeapQueue[BlockAddress](downloadPendingTasks), discovery: discovery, advertiser: advertiser, ) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index f169f744..b1c17988 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -60,9 +60,15 @@ proc getWantHandle*( ## Add an event for a block ## + logScope: + address = address + + trace "Getting want handle" self.blocks.withValue(address, blk): + trace "Want handle already exists" return blk[].handle do: + trace "Creating new want handle" let blk = BlockReq( handle: newFuture[Block]("pendingBlocks.getWantHandle"), inFlight: inFlight, diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index ce2506a8..11477e51 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -73,15 +73,21 @@ func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid)) -proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = +proc getPeersForBlocks*( + self: PeerCtxStore, addressess: openArray[BlockAddress] +): PeersForBlock = var res: PeersForBlock = (@[], @[]) + let addressess = @addressess for peer in self: - if peer.peerHave.anyIt(it == address): + if peer.peerHave.anyIt(it in addressess): res.with.add(peer) else: res.without.add(peer) res +proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = + self.getPeersForBlocks(@[address]) + proc new*(T: type PeerCtxStore): PeerCtxStore = ## create new instance of a peer context store PeerCtxStore(peers: initOrderedTable[PeerId, BlockExcPeerCtx]()) diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 7e13493d..7a3cd2b4 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -61,6 +61,13 @@ proc `==`*(a, b: BlockAddress): bool = a.cid == b.cid ) +proc `<`*(a, b: BlockAddress): bool = + if a.leaf and b.leaf: + if a.treeCid == b.treeCid: + return a.index < b.index + else: + return false + proc `$`*(a: BlockAddress): string = if a.leaf: "treeCid: " & $a.treeCid & ", index: " & $a.index diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 0c250231..ee389bb0 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -107,7 +107,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes": ) peerCtx1.peerWants.add(entry) - check nodeCmps2.engine.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk + check nodeCmps2.engine.uploadQueue.pushOrUpdateNoWait(peerCtx1).isOk check eventually (await nodeCmps1.localStore.hasBlock(blk.cid)).tryGet() check eventually (await blkFut) == blk diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 0541c119..5ad1a5bf 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -171,7 +171,7 @@ asyncchecksuite "NetworkStore engine handlers": # only `wantBlock` are stored in `peerWants` proc handler() {.async.} = - let ctx = await engine.taskQueue.pop() + let ctx = await engine.uploadQueue.pop() check ctx.id == peerId # only `wantBlock` scheduled check ctx.peerWants.mapIt(it.address.cidOrTreeCid) == blocks.mapIt(it.cid) @@ -614,7 +614,7 @@ asyncchecksuite "Task Handler": ) ) - await engine.taskHandler(peersCtx[0]) + await engine.uploadTaskHandler(peersCtx[0]) test "Should set in-flight for outgoing blocks": proc sendBlocksDelivery( @@ -636,7 +636,7 @@ asyncchecksuite "Task Handler": inFlight: false, ) ) - await engine.taskHandler(peersCtx[0]) + await engine.uploadTaskHandler(peersCtx[0]) test "Should clear in-flight when local lookup fails": peersCtx[0].peerWants.add( @@ -649,7 +649,7 @@ asyncchecksuite "Task Handler": inFlight: false, ) ) - await engine.taskHandler(peersCtx[0]) + await engine.uploadTaskHandler(peersCtx[0]) check not peersCtx[0].peerWants[0].inFlight @@ -705,4 +705,4 @@ asyncchecksuite "Task Handler": ) ) - await engine.taskHandler(peersCtx[0]) + await engine.uploadTaskHandler(peersCtx[0]) diff --git a/tests/codex/testasyncheapqueue.nim b/tests/codex/testasyncheapqueue.nim index 2d2cfb0c..3bdfa5ce 100644 --- a/tests/codex/testasyncheapqueue.nim +++ b/tests/codex/testasyncheapqueue.nim @@ -23,142 +23,195 @@ proc toSortedSeq[T](h: AsyncHeapQueue[T], queueType = QueueType.Min): seq[T] = result.add(popNoWait(tmp).tryGet()) suite "Synchronous tests": - test "Test pushNoWait - Min": - var heap = newAsyncHeapQueue[int]() - let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] - for item in data: - check heap.pushNoWait(item).isOk + test "Test pushNoWait - Min": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk - check heap[0] == 0 - check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + check heap[0] == 0 + check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - test "Test pushNoWait - Max": - var heap = newAsyncHeapQueue[int](queueType = QueueType.Max) - let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] - for item in data: - check heap.pushNoWait(item).isOk + test "Test pushNoWait - Max": + var heap = newAsyncHeapQueue[int](queueType = QueueType.Max) + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk - check heap[0] == 9 - check heap.toSortedSeq(QueueType.Max) == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + check heap[0] == 9 + check heap.toSortedSeq(QueueType.Max) == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] - test "Test popNoWait": - var heap = newAsyncHeapQueue[int]() - let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] - for item in data: - check heap.pushNoWait(item).isOk + test "Test popNoWait": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk - var res: seq[int] - while heap.len > 0: - let r = heap.popNoWait() - if r.isOk: - res.add(r.get) + var res: seq[int] + while heap.len > 0: + let r = heap.popNoWait() + if r.isOk: + res.add(r.get) - check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - test "Test popNoWait - Max": - var heap = newAsyncHeapQueue[int](queueType = QueueType.Max) - let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] - for item in data: - check heap.pushNoWait(item).isOk + test "Test popNoWait - Max": + var heap = newAsyncHeapQueue[int](queueType = QueueType.Max) + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk - var res: seq[int] - while heap.len > 0: - let r = heap.popNoWait() - if r.isOk: - res.add(r.get) + var res: seq[int] + while heap.len > 0: + let r = heap.popNoWait() + if r.isOk: + res.add(r.get) - check res == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + check res == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] - test "Test del": - var heap = newAsyncHeapQueue[int]() - let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] - for item in data: - check heap.pushNoWait(item).isOk + test "Test del": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk - heap.del(0) - doAssert(heap[0] == 1) + heap.del(0) + doAssert(heap[0] == 1) - heap.del(heap.find(7)) - check heap.toSortedSeq == @[1, 2, 3, 4, 5, 6, 8, 9] + heap.del(heap.find(7)) + check heap.toSortedSeq == @[1, 2, 3, 4, 5, 6, 8, 9] - heap.del(heap.find(5)) - check heap.toSortedSeq == @[1, 2, 3, 4, 6, 8, 9] + heap.del(heap.find(5)) + check heap.toSortedSeq == @[1, 2, 3, 4, 6, 8, 9] - heap.del(heap.find(6)) - check heap.toSortedSeq == @[1, 2, 3, 4, 8, 9] + heap.del(heap.find(6)) + check heap.toSortedSeq == @[1, 2, 3, 4, 8, 9] - heap.del(heap.find(2)) - check heap.toSortedSeq == @[1, 3, 4, 8, 9] + heap.del(heap.find(2)) + check heap.toSortedSeq == @[1, 3, 4, 8, 9] - test "Test del last": - var heap = newAsyncHeapQueue[int]() - let data = [1, 2, 3] - for item in data: - check heap.pushNoWait(item).isOk + test "Test del last": + var heap = newAsyncHeapQueue[int]() + let data = [1, 2, 3] + for item in data: + check heap.pushNoWait(item).isOk - heap.del(2) - check heap.toSortedSeq == @[1, 2] + heap.del(2) + check heap.toSortedSeq == @[1, 2] - heap.del(1) - check heap.toSortedSeq == @[1] + heap.del(1) + check heap.toSortedSeq == @[1] - heap.del(0) - check heap.toSortedSeq == newSeq[int]() # empty seq has no type + heap.del(0) + check heap.toSortedSeq == newSeq[int]() # empty seq has no type - test "Should throw popping from an empty queue": - var heap = newAsyncHeapQueue[int]() - let err = heap.popNoWait() - check err.isErr - check err.error == AsyncHQErrors.Empty + test "Should throw popping from an empty queue": + var heap = newAsyncHeapQueue[int]() + let err = heap.popNoWait() + check err.isErr + check err.error == AsyncHQErrors.Empty - test "Should throw pushing to an full queue": - var heap = newAsyncHeapQueue[int](1) - check heap.pushNoWait(1).isOk - let err = heap.pushNoWait(2) - check err.isErr - check err.error == AsyncHQErrors.Full + test "Should throw pushing to an full queue": + var heap = newAsyncHeapQueue[int](1) + check heap.pushNoWait(1).isOk + let err = heap.pushNoWait(2) + check err.isErr + check err.error == AsyncHQErrors.Full - test "Test clear": - var heap = newAsyncHeapQueue[int]() - let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] - for item in data: - check heap.pushNoWait(item).isOk + test "Test clear": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] + for item in data: + check heap.pushNoWait(item).isOk - check heap.len == 10 - heap.clear() - check heap.len == 0 + check heap.len == 10 + heap.clear() + check heap.len == 0 asyncchecksuite "Asynchronous Tests": - test "Test push": - var heap = newAsyncHeapQueue[int]() - let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] - for item in data: - await push(heap, item) - check heap[0] == 0 - check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - - test "Test push and pop with maxSize": - var heap = newAsyncHeapQueue[int](5) - let data = [1, 9, 5, 3, 7, 4, 2] - - proc pushTask() {.async.} = + test "Test push": + var heap = newAsyncHeapQueue[int]() + let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: await push(heap, item) + check heap[0] == 0 + check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - asyncSpawn pushTask() + test "Test push and pop with maxSize": + var heap = newAsyncHeapQueue[int](5) + let data = [1, 9, 5, 3, 7, 4, 2] - check heap.len == 5 - check heap[0] == 1 # because we haven't pushed 0 yet + proc pushTask() {.async.} = + for item in data: + await push(heap, item) - check (await heap.pop) == 1 - check (await heap.pop) == 3 - check (await heap.pop) == 5 - check (await heap.pop) == 7 - check (await heap.pop) == 9 + let pushFut = pushTask() - await sleepAsync(1.milliseconds) # allow poll to run once more - check (await heap.pop) == 2 - check (await heap.pop) == 4 + check heap.len == 5 + check heap[0] == 1 # because we haven't pushed 0 yet + + check (await heap.pop) == 1 + check (await heap.pop) == 3 + check (await heap.pop) == 5 + check (await heap.pop) == 7 + check (await heap.pop) == 9 + + await sleepAsync(1.milliseconds) # allow poll to run once more + check (await heap.pop) == 2 + check (await heap.pop) == 4 + await pushFut + + test "Test single push when empty": + let heap = newAsyncHeapQueue[int](1) + + check heap.empty() + let popFut = heap.pop() + + proc pushTask() {.async.} = + await heap.push(1) + + let pushFut = pushTask() + await allFutures(@[FutureBase(pushFut), FutureBase(popFut)]) + check (await popFut) == 1 + await pushFut + + test "Test multiple pushes when empty - single producer, multiple consumers": + let heap = newAsyncHeapQueue[int](1) + + proc pushTask() {.async.} = + var pushes = 2 + for i in 0 .. 100: + echo "Pushing ", i + await heap.push(i) + pushes -= 1 + if pushes == 0: + echo "Sleeping" + await sleepAsync(10.millis) # allow poll to run once more + pushes = 2 + + let pushFut = pushTask() + + var popTasks = 0 + proc popTask() {.async.} = + echo "starting pop task: ", popTasks + let thisTask = popTasks + popTasks += 1 + await sleepAsync(100.millis) + var i = 0 + while not pushFut.finished: + let fut = heap.pop() + if heap.empty(): + await sleepAsync(1.millis) # allow poll to run once more + let popRes = await fut + echo "Task # ", thisTask, " popRes: ", popRes + check popRes == i + i += 1 + + var popFuts = newSeq[Future[void]]() + for i in 0 ..< 10: + popFuts.add(popTask()) + + await allFutures(popFuts.mapIt(FutureBase(it)) & @[FutureBase(pushFut)]) test "Test update": var heap = newAsyncHeapQueue[Task](5) diff --git a/vendor/nim-circom-compat b/vendor/nim-circom-compat index d3fb9039..88197fe6 160000 --- a/vendor/nim-circom-compat +++ b/vendor/nim-circom-compat @@ -1 +1 @@ -Subproject commit d3fb903945c3895f28a2e50685745e0a9762ece5 +Subproject commit 88197fe6ec929559b37e1443785ece650d2e9255