diff --git a/dagger/blockexchange/pendingblocks.nim b/dagger/blockexchange/pendingblocks.nim index 3b426bb8..6d254414 100644 --- a/dagger/blockexchange/pendingblocks.nim +++ b/dagger/blockexchange/pendingblocks.nim @@ -8,6 +8,7 @@ ## those terms. import std/tables +import std/sequtils import pkg/questionable import pkg/chronicles @@ -19,18 +20,22 @@ import ../blocktype logScope: topics = "dagger blockexc pendingblocks" +const + DefaultBlockTimeout* = 10.minutes + type PendingBlocksManager* = ref object of RootObj blocks*: Table[Cid, Future[Block]] # pending Block requests -proc addOrAwait*( +proc getWantHandle*( p: PendingBlocksManager, - cid: Cid): Future[Block] {.async.} = + cid: Cid, + timeout = DefaultBlockTimeout): Future[Block] {.async.} = ## Add an event for a block ## if cid notin p.blocks: - p.blocks[cid] = newFuture[Block]() + p.blocks[cid] = newFuture[Block]().wait(timeout) trace "Adding pending future for block", cid try: @@ -43,6 +48,12 @@ proc addOrAwait*( finally: p.blocks.del(cid) +proc addOrAwait*( + p: PendingBlocksManager, + cid: Cid, + timeout = DefaultBlockTimeout): Future[Block] {.deprecated: "Use getWantHandle".} = + p.getWantHandle(cid, timeout) + proc resolve*( p: PendingBlocksManager, blocks: seq[Block]) = @@ -66,6 +77,17 @@ proc contains*( p: PendingBlocksManager, cid: Cid): bool = p.pending(cid) +iterator wantList*(p: PendingBlocksManager): Cid = + for k in p.blocks.keys: + yield k + +iterator wantHandles*(p: PendingBlocksManager): Future[Block] = + for v in p.blocks.values: + yield v + +func len*(p: PendingBlocksManager): int = + p.blocks.len + func new*(T: type PendingBlocksManager): T = T( blocks: initTable[Cid, Future[Block]]()