This commit is contained in:
Dmitriy Ryajov 2025-03-04 16:48:42 -06:00
parent f734831f33
commit 5b4d762600
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
10 changed files with 371 additions and 216 deletions

View File

@ -114,7 +114,7 @@ else
NIM_PARAMS := $(NIM_PARAMS) -d:release
endif
deps: | deps-common nat-libs
deps: | deps-common nat-libs build-nph
ifneq ($(USE_LIBBACKTRACE), 0)
deps: | libbacktrace
endif

View File

@ -142,7 +142,7 @@ when isMainModule:
chronos.poll()
except Exception as exc:
error "Unhandled exception in async proc, aborting", msg = exc.msg
quit QuitFailure
raise exc
try:
# signal handlers guarantee that the shutdown Future will

View File

@ -67,19 +67,32 @@ declareCounter(
const
DefaultMaxPeersPerRequest* = 10
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
DefaultConcurrentTasks = 5
DefaultMaxSendBatch = 5
DefaultBatchSendInterval = 10.millis
type
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.}
Pricing* = object
address*: EthAddress
price*: UInt256
BlockCtx = object
address: BlockAddress
wantType: WantType
priority: int
BlockExcEngine* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
network*: BlockExcNetwork # Petwork interface
peers*: PeerCtxStore # Peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx]
uploadQueue*: AsyncHeapQueue[BlockExcPeerCtx]
downloadQueue*: AsyncHeapQueue[BlockAddress]
# Peers we're currently processing tasks for
concurrentTasks: int # Number of concurrent peers we're serving at any given time
uploadWorkers: int = DefaultConcurrentTasks # Number of uploads
downloadWorkers: int = DefaultConcurrentTasks # Number of concurrent downloads
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
@ -87,19 +100,26 @@ type
pricing*: ?Pricing # Optional bandwidth pricing
discovery*: DiscoveryEngine
advertiser*: Advertiser
Pricing* = object
address*: EthAddress
price*: UInt256
maxSendBatch*: int = DefaultMaxSendBatch
batchSendInterval*: Duration = DefaultBatchSendInterval
# attach task scheduler to engine
proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, raises: [].} =
if self.taskQueue.pushOrUpdateNoWait(task).isOk():
trace "Task scheduled for peer", peer = task.id
proc scheduleUploadTask(
self: BlockExcEngine, task: BlockExcPeerCtx
) {.gcsafe, raises: [].} =
if self.uploadQueue.pushOrUpdateNoWait(task).isOk():
trace "Upload task scheduled for peer", peer = task.id
else:
warn "Unable to schedule task for peer", peer = task.id
warn "Unable to schedule upload task for peer", peer = task.id
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).}
proc scheduleDownloadTask(
self: BlockExcEngine, blk: BlockAddress
) {.gcsafe, async: (raw: true).} =
trace "Scheduling download task for block", blk
self.downloadQueue.pushOrUpdate(blk)
proc blockexcUploadTaskRunner(self: BlockExcEngine) {.async: (raises: []).}
proc blockexcDownloadTaskRunner(self: BlockExcEngine) {.async: (raises: []).}
proc start*(self: BlockExcEngine) {.async: (raises: []).} =
## Start the blockexc task
@ -108,14 +128,19 @@ proc start*(self: BlockExcEngine) {.async: (raises: []).} =
await self.discovery.start()
await self.advertiser.start()
trace "Blockexc starting with concurrent tasks", tasks = self.concurrentTasks
trace "Blockexc starting with concurrent tasks",
uploadWorkers = self.uploadWorkers, downloadWorkers = self.downloadWorkers
if self.blockexcRunning:
warn "Starting blockexc twice"
return
self.blockexcRunning = true
for i in 0 ..< self.concurrentTasks:
let fut = self.blockexcTaskRunner()
for i in 0 ..< self.uploadWorkers:
let fut = self.blockexcUploadTaskRunner()
self.trackedFutures.track(fut)
for i in 0 ..< self.downloadWorkers:
let fut = self.blockexcDownloadTaskRunner()
self.trackedFutures.track(fut)
proc stop*(self: BlockExcEngine) {.async: (raises: []).} =
@ -157,66 +182,13 @@ proc sendWantBlock(
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
Rng.instance.sample(peers)
proc downloadInternal(
self: BlockExcEngine, address: BlockAddress
) {.async: (raises: []).} =
logScope:
address = address
let handle = self.pendingBlocks.getWantHandle(address)
trace "Downloading block"
try:
while address in self.pendingBlocks:
logScope:
retries = self.pendingBlocks.retries(address)
interval = self.pendingBlocks.retryInterval
if self.pendingBlocks.retriesExhausted(address):
trace "Error retries exhausted"
handle.fail(newException(RetriesExhaustedError, "Error retries exhausted"))
break
trace "Running retry handle"
let peers = self.peers.getPeersForBlock(address)
logScope:
peersWith = peers.with.len
peersWithout = peers.without.len
trace "Peers for block"
if peers.with.len > 0:
self.pendingBlocks.setInFlight(address, true)
await self.sendWantBlock(@[address], peers.with.randomPeer)
else:
self.pendingBlocks.setInFlight(address, false)
if peers.without.len > 0:
await self.sendWantHave(@[address], peers.without)
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
self.pendingBlocks.decRetries(address)
if handle.finished:
trace "Handle for block finished", failed = handle.failed
break
except CancelledError as exc:
trace "Block download cancelled"
if not handle.finished:
await handle.cancelAndWait()
except CatchableError as exc:
warn "Error downloadloading block", exc = exc.msg
if not handle.finished:
handle.fail(exc)
finally:
self.pendingBlocks.setInFlight(address, false)
proc requestBlock*(
self: BlockExcEngine, address: BlockAddress
): Future[?!Block] {.async: (raises: [CancelledError]).} =
if address notin self.pendingBlocks:
self.trackedFutures.track(self.downloadInternal(address))
trace "Requestion block", address
try:
let handle = self.pendingBlocks.getWantHandle(address)
await self.scheduleDownloadTask(address)
success await handle
except CancelledError as err:
warn "Block request cancelled", address
@ -266,7 +238,7 @@ proc blockPresenceHandler*(
if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption:
warn "Failed to send wantBlock to peer", peer, err = err.msg
proc scheduleTasks(
proc scheduleUploadTasks(
self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
) {.async: (raises: [CancelledError]).} =
let cids = blocksDelivery.mapIt(it.blk.cid)
@ -278,9 +250,9 @@ proc scheduleTasks(
# and we have it in our local store
if c in p.peerWantsCids:
try:
# TODO: the try/except should go away once blockstore tracks exceptions
if await (c in self.localStore):
# TODO: the try/except should go away once blockstore tracks exceptions
self.scheduleTask(p)
self.scheduleUploadTask(p)
break
except CancelledError as exc:
warn "Checking local store canceled", cid = c, err = exc.msg
@ -331,8 +303,12 @@ proc cancelBlocks(
proc resolveBlocks*(
self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
) {.async: (raises: [CancelledError]).} =
# drop blocks from the queue
blocksDelivery.mapIt(it.address).apply do(address: BlockAddress):
self.downloadQueue.delete(address)
self.pendingBlocks.resolve(blocksDelivery)
await self.scheduleTasks(blocksDelivery)
await self.scheduleUploadTasks(blocksDelivery)
await self.cancelBlocks(blocksDelivery.mapIt(it.address))
proc resolveBlocks*(
@ -422,18 +398,19 @@ proc blocksDeliveryHandler*(
validatedBlocksDelivery.add(bd)
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
if validatedBlocksDelivery.len > 0:
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
let peerCtx = self.peers.get(peer)
if peerCtx != nil:
if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption:
warn "Error paying for blocks", err = err.msg
let peerCtx = self.peers.get(peer)
if peerCtx != nil:
if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption:
warn "Error paying for blocks", err = err.msg
return
if err =? catch(await self.resolveBlocks(validatedBlocksDelivery)).errorOption:
warn "Error resolving blocks", err = err.msg
return
if err =? catch(await self.resolveBlocks(validatedBlocksDelivery)).errorOption:
warn "Error resolving blocks", err = err.msg
return
proc wantListHandler*(
self: BlockExcEngine, peer: PeerId, wantList: WantList
) {.async: (raises: []).} =
@ -516,8 +493,8 @@ proc wantListHandler*(
await self.network.request.sendPresence(peer, presence)
if schedulePeer:
self.scheduleTask(peerCtx)
except CancelledError as exc: #TODO: replace with CancelledError
self.scheduleUploadTask(peerCtx)
except CancelledError as exc:
warn "Error processing want list", error = exc.msg
proc accountHandler*(
@ -577,9 +554,7 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} =
# drop the peer from the peers table
self.peers.remove(peer)
proc taskHandler*(
self: BlockExcEngine, task: BlockExcPeerCtx
) {.gcsafe, async: (raises: [CancelledError, RetriesExhaustedError]).} =
proc uploadTaskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
# Send to the peer blocks he wants to get,
# if they present in our local store
@ -637,19 +612,124 @@ proc taskHandler*(
task.peerWants.keepItIf(it.address notin successAddresses)
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
proc blockexcUploadTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
## process tasks
##
trace "Starting blockexc task runner"
try:
while self.blockexcRunning:
let peerCtx = await self.taskQueue.pop()
await self.taskHandler(peerCtx)
except CatchableError as exc:
error "error running block exchange task", error = exc.msg
trace "Starting upload blockexc task runner"
while self.blockexcRunning:
try:
let peerCtx = await self.uploadQueue.pop()
info "Exiting blockexc task runner"
await self.uploadTaskHandler(peerCtx)
except CancelledError:
break
except CatchableError as exc:
error "Error running block exchange upload task", error = exc.msg
info "Exiting blockexc upload task runner"
proc downloadBlocks(
self: BlockExcEngine, addresses: seq[BlockAddress]
) {.async: (raises: [CancelledError]).} =
let
requestBlocks = addresses.filterIt(
it in self.pendingBlocks and not self.pendingBlocks.isInFlight(it)
)
peers = self.peers.getPeersForBlocks(requestBlocks)
if requestBlocks.len <= 0:
warn "All blocks inflight, no blocks to download"
return
logScope:
peersWith = peers.with.len
peersWithout = peers.without.len
trace "Peers for block"
if peers.with.len > 0:
# set inflight flag for all blocks
requestBlocks.apply do(address: BlockAddress):
self.pendingBlocks.setInFlight(address, true)
await self.sendWantBlock(requestBlocks, peers.with.randomPeer)
else:
# reset inflight flag for all blocks
requestBlocks.apply do(address: BlockAddress):
self.pendingBlocks.setInFlight(address, false)
if peers.without.len > 0:
await self.sendWantHave(requestBlocks, peers.without)
# requeue blocks onto the download queue
# await allFutures(
# requestBlocks
# .filterIt(it in self.pendingBlocks and not self.pendingBlocks.isInFlight(it))
# .mapIt(self.scheduleDownloadTask(it))
# )
self.discovery.queueFindBlocksReq(requestBlocks.mapIt(it.cidOrTreeCid))
proc blockexcDownloadTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
## process tasks
##
trace "Starting download blockexc task runner"
while self.blockexcRunning:
try:
var
batch: HashSet[BlockAddress]
batchSendDeadline =
# TODO: batchSendInterval * downloadWorkers, experimenting with deadline length
# the rationale is that the deadline is roughly per worker, so this gives each
# worker time to build a batch
# Moment.now() + (self.batchSendInterval * self.downloadWorkers)
Moment.now() + self.batchSendInterval
logScope:
batchSendDeadline = $batchSendDeadline
batchSendInterval = $self.batchSendInterval
batch = batch.len
while (batch.len < self.maxSendBatch) and (batchSendDeadline > Moment.now()):
let
addressFut = self.downloadQueue.pop()
address =
if self.downloadQueue.empty() and batch.len == 0:
trace "Queue is empty, waiting for block address"
# echo "Pending Futures: ", dumpPendingFutures()
let address = await addressFut
trace "Got block address from queue after empty", address
address
else:
trace "Waiting for block address or deadline"
let timeoutFut = sleepAsync(self.batchSendInterval)
await (addressFut or timeoutFut)
if addressFut.finished:
timeoutFut.cancelSoon()
let address = await addressFut
trace "Got block address from queue", address
if address in batch:
trace "Skipping block download, already in batch or inflight", address
continue
else:
address
else:
addressFut.cancelSoon()
continue
batch.incl(address)
trace "Batching block download"
if batch.len > 0:
await self.downloadBlocks(toSeq(batch))
except CancelledError:
trace "Cancelled blockexc download task runner"
break
except CatchableError as exc:
error "Error running block exchange download task", error = exc.msg
info "Exiting blockexc download task runner"
proc new*(
T: type BlockExcEngine,
@ -660,7 +740,10 @@ proc new*(
advertiser: Advertiser,
peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager,
concurrentTasks = DefaultConcurrentTasks,
uploadWorkers = DefaultConcurrentTasks,
downloadWorkers = DefaultConcurrentTasks,
uploadPendingTasks = DefaultTaskQueueSize,
downloadPendingTasks = DefaultTaskQueueSize,
): BlockExcEngine =
## Create new block exchange engine instance
##
@ -671,9 +754,9 @@ proc new*(
pendingBlocks: pendingBlocks,
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures(),
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
uploadQueue: newAsyncHeapQueue[BlockExcPeerCtx](uploadPendingTasks),
downloadQueue: newAsyncHeapQueue[BlockAddress](downloadPendingTasks),
discovery: discovery,
advertiser: advertiser,
)

View File

@ -60,9 +60,15 @@ proc getWantHandle*(
## Add an event for a block
##
logScope:
address = address
trace "Getting want handle"
self.blocks.withValue(address, blk):
trace "Want handle already exists"
return blk[].handle
do:
trace "Creating new want handle"
let blk = BlockReq(
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
inFlight: inFlight,

View File

@ -73,15 +73,21 @@ func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx]
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid))
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
proc getPeersForBlocks*(
self: PeerCtxStore, addressess: openArray[BlockAddress]
): PeersForBlock =
var res: PeersForBlock = (@[], @[])
let addressess = @addressess
for peer in self:
if peer.peerHave.anyIt(it == address):
if peer.peerHave.anyIt(it in addressess):
res.with.add(peer)
else:
res.without.add(peer)
res
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
self.getPeersForBlocks(@[address])
proc new*(T: type PeerCtxStore): PeerCtxStore =
## create new instance of a peer context store
PeerCtxStore(peers: initOrderedTable[PeerId, BlockExcPeerCtx]())

View File

@ -61,6 +61,13 @@ proc `==`*(a, b: BlockAddress): bool =
a.cid == b.cid
)
proc `<`*(a, b: BlockAddress): bool =
if a.leaf and b.leaf:
if a.treeCid == b.treeCid:
return a.index < b.index
else:
return false
proc `$`*(a: BlockAddress): string =
if a.leaf:
"treeCid: " & $a.treeCid & ", index: " & $a.index

View File

@ -107,7 +107,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
)
peerCtx1.peerWants.add(entry)
check nodeCmps2.engine.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk
check nodeCmps2.engine.uploadQueue.pushOrUpdateNoWait(peerCtx1).isOk
check eventually (await nodeCmps1.localStore.hasBlock(blk.cid)).tryGet()
check eventually (await blkFut) == blk

View File

@ -171,7 +171,7 @@ asyncchecksuite "NetworkStore engine handlers":
# only `wantBlock` are stored in `peerWants`
proc handler() {.async.} =
let ctx = await engine.taskQueue.pop()
let ctx = await engine.uploadQueue.pop()
check ctx.id == peerId
# only `wantBlock` scheduled
check ctx.peerWants.mapIt(it.address.cidOrTreeCid) == blocks.mapIt(it.cid)
@ -614,7 +614,7 @@ asyncchecksuite "Task Handler":
)
)
await engine.taskHandler(peersCtx[0])
await engine.uploadTaskHandler(peersCtx[0])
test "Should set in-flight for outgoing blocks":
proc sendBlocksDelivery(
@ -636,7 +636,7 @@ asyncchecksuite "Task Handler":
inFlight: false,
)
)
await engine.taskHandler(peersCtx[0])
await engine.uploadTaskHandler(peersCtx[0])
test "Should clear in-flight when local lookup fails":
peersCtx[0].peerWants.add(
@ -649,7 +649,7 @@ asyncchecksuite "Task Handler":
inFlight: false,
)
)
await engine.taskHandler(peersCtx[0])
await engine.uploadTaskHandler(peersCtx[0])
check not peersCtx[0].peerWants[0].inFlight
@ -705,4 +705,4 @@ asyncchecksuite "Task Handler":
)
)
await engine.taskHandler(peersCtx[0])
await engine.uploadTaskHandler(peersCtx[0])

View File

@ -23,142 +23,195 @@ proc toSortedSeq[T](h: AsyncHeapQueue[T], queueType = QueueType.Min): seq[T] =
result.add(popNoWait(tmp).tryGet())
suite "Synchronous tests":
test "Test pushNoWait - Min":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
test "Test pushNoWait - Min":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
check heap[0] == 0
check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
check heap[0] == 0
check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test "Test pushNoWait - Max":
var heap = newAsyncHeapQueue[int](queueType = QueueType.Max)
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
test "Test pushNoWait - Max":
var heap = newAsyncHeapQueue[int](queueType = QueueType.Max)
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
check heap[0] == 9
check heap.toSortedSeq(QueueType.Max) == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
check heap[0] == 9
check heap.toSortedSeq(QueueType.Max) == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
test "Test popNoWait":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
test "Test popNoWait":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
var res: seq[int]
while heap.len > 0:
let r = heap.popNoWait()
if r.isOk:
res.add(r.get)
var res: seq[int]
while heap.len > 0:
let r = heap.popNoWait()
if r.isOk:
res.add(r.get)
check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test "Test popNoWait - Max":
var heap = newAsyncHeapQueue[int](queueType = QueueType.Max)
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
test "Test popNoWait - Max":
var heap = newAsyncHeapQueue[int](queueType = QueueType.Max)
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
var res: seq[int]
while heap.len > 0:
let r = heap.popNoWait()
if r.isOk:
res.add(r.get)
var res: seq[int]
while heap.len > 0:
let r = heap.popNoWait()
if r.isOk:
res.add(r.get)
check res == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
check res == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
test "Test del":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
test "Test del":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
heap.del(0)
doAssert(heap[0] == 1)
heap.del(0)
doAssert(heap[0] == 1)
heap.del(heap.find(7))
check heap.toSortedSeq == @[1, 2, 3, 4, 5, 6, 8, 9]
heap.del(heap.find(7))
check heap.toSortedSeq == @[1, 2, 3, 4, 5, 6, 8, 9]
heap.del(heap.find(5))
check heap.toSortedSeq == @[1, 2, 3, 4, 6, 8, 9]
heap.del(heap.find(5))
check heap.toSortedSeq == @[1, 2, 3, 4, 6, 8, 9]
heap.del(heap.find(6))
check heap.toSortedSeq == @[1, 2, 3, 4, 8, 9]
heap.del(heap.find(6))
check heap.toSortedSeq == @[1, 2, 3, 4, 8, 9]
heap.del(heap.find(2))
check heap.toSortedSeq == @[1, 3, 4, 8, 9]
heap.del(heap.find(2))
check heap.toSortedSeq == @[1, 3, 4, 8, 9]
test "Test del last":
var heap = newAsyncHeapQueue[int]()
let data = [1, 2, 3]
for item in data:
check heap.pushNoWait(item).isOk
test "Test del last":
var heap = newAsyncHeapQueue[int]()
let data = [1, 2, 3]
for item in data:
check heap.pushNoWait(item).isOk
heap.del(2)
check heap.toSortedSeq == @[1, 2]
heap.del(2)
check heap.toSortedSeq == @[1, 2]
heap.del(1)
check heap.toSortedSeq == @[1]
heap.del(1)
check heap.toSortedSeq == @[1]
heap.del(0)
check heap.toSortedSeq == newSeq[int]() # empty seq has no type
heap.del(0)
check heap.toSortedSeq == newSeq[int]() # empty seq has no type
test "Should throw popping from an empty queue":
var heap = newAsyncHeapQueue[int]()
let err = heap.popNoWait()
check err.isErr
check err.error == AsyncHQErrors.Empty
test "Should throw popping from an empty queue":
var heap = newAsyncHeapQueue[int]()
let err = heap.popNoWait()
check err.isErr
check err.error == AsyncHQErrors.Empty
test "Should throw pushing to an full queue":
var heap = newAsyncHeapQueue[int](1)
check heap.pushNoWait(1).isOk
let err = heap.pushNoWait(2)
check err.isErr
check err.error == AsyncHQErrors.Full
test "Should throw pushing to an full queue":
var heap = newAsyncHeapQueue[int](1)
check heap.pushNoWait(1).isOk
let err = heap.pushNoWait(2)
check err.isErr
check err.error == AsyncHQErrors.Full
test "Test clear":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
test "Test clear":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
check heap.len == 10
heap.clear()
check heap.len == 0
check heap.len == 10
heap.clear()
check heap.len == 0
asyncchecksuite "Asynchronous Tests":
test "Test push":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
await push(heap, item)
check heap[0] == 0
check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test "Test push and pop with maxSize":
var heap = newAsyncHeapQueue[int](5)
let data = [1, 9, 5, 3, 7, 4, 2]
proc pushTask() {.async.} =
test "Test push":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
await push(heap, item)
check heap[0] == 0
check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncSpawn pushTask()
test "Test push and pop with maxSize":
var heap = newAsyncHeapQueue[int](5)
let data = [1, 9, 5, 3, 7, 4, 2]
check heap.len == 5
check heap[0] == 1 # because we haven't pushed 0 yet
proc pushTask() {.async.} =
for item in data:
await push(heap, item)
check (await heap.pop) == 1
check (await heap.pop) == 3
check (await heap.pop) == 5
check (await heap.pop) == 7
check (await heap.pop) == 9
let pushFut = pushTask()
await sleepAsync(1.milliseconds) # allow poll to run once more
check (await heap.pop) == 2
check (await heap.pop) == 4
check heap.len == 5
check heap[0] == 1 # because we haven't pushed 0 yet
check (await heap.pop) == 1
check (await heap.pop) == 3
check (await heap.pop) == 5
check (await heap.pop) == 7
check (await heap.pop) == 9
await sleepAsync(1.milliseconds) # allow poll to run once more
check (await heap.pop) == 2
check (await heap.pop) == 4
await pushFut
test "Test single push when empty":
let heap = newAsyncHeapQueue[int](1)
check heap.empty()
let popFut = heap.pop()
proc pushTask() {.async.} =
await heap.push(1)
let pushFut = pushTask()
await allFutures(@[FutureBase(pushFut), FutureBase(popFut)])
check (await popFut) == 1
await pushFut
test "Test multiple pushes when empty - single producer, multiple consumers":
let heap = newAsyncHeapQueue[int](1)
proc pushTask() {.async.} =
var pushes = 2
for i in 0 .. 100:
echo "Pushing ", i
await heap.push(i)
pushes -= 1
if pushes == 0:
echo "Sleeping"
await sleepAsync(10.millis) # allow poll to run once more
pushes = 2
let pushFut = pushTask()
var popTasks = 0
proc popTask() {.async.} =
echo "starting pop task: ", popTasks
let thisTask = popTasks
popTasks += 1
await sleepAsync(100.millis)
var i = 0
while not pushFut.finished:
let fut = heap.pop()
if heap.empty():
await sleepAsync(1.millis) # allow poll to run once more
let popRes = await fut
echo "Task # ", thisTask, " popRes: ", popRes
check popRes == i
i += 1
var popFuts = newSeq[Future[void]]()
for i in 0 ..< 10:
popFuts.add(popTask())
await allFutures(popFuts.mapIt(FutureBase(it)) & @[FutureBase(pushFut)])
test "Test update":
var heap = newAsyncHeapQueue[Task](5)

@ -1 +1 @@
Subproject commit d3fb903945c3895f28a2e50685745e0a9762ece5
Subproject commit 88197fe6ec929559b37e1443785ece650d2e9255