diff --git a/dagger/blockexchange/pendingblocks.nim b/dagger/blockexchange/pendingblocks.nim index 3b426bb8..3d504f9b 100644 --- a/dagger/blockexchange/pendingblocks.nim +++ b/dagger/blockexchange/pendingblocks.nim @@ -8,6 +8,11 @@ ## those terms. import std/tables +import std/sequtils + +import pkg/upraises + +push: {.upraises: [].} import pkg/questionable import pkg/chronicles @@ -19,18 +24,22 @@ import ../blocktype logScope: topics = "dagger blockexc pendingblocks" +const + DefaultBlockTimeout* = 10.minutes + type PendingBlocksManager* = ref object of RootObj blocks*: Table[Cid, Future[Block]] # pending Block requests -proc addOrAwait*( +proc getWantHandle*( p: PendingBlocksManager, - cid: Cid): Future[Block] {.async.} = + cid: Cid, + timeout = DefaultBlockTimeout): Future[Block] {.async.} = ## Add an event for a block ## if cid notin p.blocks: - p.blocks[cid] = newFuture[Block]() + p.blocks[cid] = newFuture[Block]().wait(timeout) trace "Adding pending future for block", cid try: @@ -52,11 +61,11 @@ proc resolve*( for blk in blocks: # resolve any pending blocks if blk.cid in p.blocks: - let pending = p.blocks[blk.cid] - if not pending.finished: - trace "Resolving block", cid = $blk.cid - pending.complete(blk) - p.blocks.del(blk.cid) + p.blocks.withValue(blk.cid, pending): + if not pending[].finished: + trace "Resolving block", cid = $blk.cid + pending[].complete(blk) + p.blocks.del(blk.cid) proc pending*( p: PendingBlocksManager, @@ -66,6 +75,17 @@ proc contains*( p: PendingBlocksManager, cid: Cid): bool = p.pending(cid) +iterator wantList*(p: PendingBlocksManager): Cid = + for k in p.blocks.keys: + yield k + +iterator wantHandles*(p: PendingBlocksManager): Future[Block] = + for v in p.blocks.values: + yield v + +func len*(p: PendingBlocksManager): int = + p.blocks.len + func new*(T: type PendingBlocksManager): T = T( blocks: initTable[Cid, Future[Block]]()