diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index c24c7519..c1d32cf5 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -27,27 +27,37 @@ const DefaultBlockTimeout* = 10.minutes type + BlockReq* = object + handle*: Future[Block] + inFlight*: bool + PendingBlocksManager* = ref object of RootObj - blocks*: Table[Cid, Future[Block]] # pending Block requests + blocks*: Table[Cid, BlockReq] # pending Block requests proc getWantHandle*( p: PendingBlocksManager, cid: Cid, - timeout = DefaultBlockTimeout): Future[Block] {.async.} = + timeout = DefaultBlockTimeout, + inFlight = false): Future[Block] {.async.} = ## Add an event for a block ## - if cid notin p.blocks: - p.blocks[cid] = newFuture[Block]().wait(timeout) - trace "Adding pending future for block", cid - try: - return await p.blocks[cid] + if cid notin p.blocks: + p.blocks[cid] = BlockReq( + handle: newFuture[Block]("pendingBlocks.getWantHandle"), + inFlight: inFlight) + + trace "Adding pending future for block", cid + + return await p.blocks[cid].handle.wait(timeout) except CancelledError as exc: trace "Blocks cancelled", exc = exc.msg, cid raise exc except CatchableError as exc: trace "Pending WANT failed or expired", exc = exc.msg + # no need to cancel, it is already cancelled by wait() + raise exc finally: p.blocks.del(cid) @@ -59,12 +69,22 @@ proc resolve*( for blk in blocks: # resolve any pending blocks - if blk.cid in p.blocks: - p.blocks.withValue(blk.cid, pending): - if not pending[].finished: - trace "Resolving block", cid = $blk.cid - pending[].complete(blk) - p.blocks.del(blk.cid) + p.blocks.withValue(blk.cid, pending): + if not pending[].handle.completed: + trace "Resolving block", cid = blk.cid + pending[].handle.complete(blk) + +proc setInFlight*( + p: PendingBlocksManager, + cid: Cid) = + p.blocks.withValue(cid, pending): + pending[].inFlight = true + +proc isInFlight*( + p: PendingBlocksManager, + cid: Cid): bool = + p.blocks.withValue(cid, pending): + result = pending[].inFlight proc pending*( p: PendingBlocksManager, @@ -80,11 +100,10 @@ iterator wantList*(p: PendingBlocksManager): Cid = iterator wantHandles*(p: PendingBlocksManager): Future[Block] = for v in p.blocks.values: - yield v + yield v.handle func len*(p: PendingBlocksManager): int = p.blocks.len func new*(T: type PendingBlocksManager): T = - T( - blocks: initTable[Cid, Future[Block]]()) + T() diff --git a/tests/codex/blockexchange/testpendingblocks.nim b/tests/codex/blockexchange/testpendingblocks.nim new file mode 100644 index 00000000..d713cdee --- /dev/null +++ b/tests/codex/blockexchange/testpendingblocks.nim @@ -0,0 +1,80 @@ +import std/sequtils +import std/algorithm + +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/stew/byteutils + +import pkg/codex/blocktype as bt +import pkg/codex/blockexchange + +import ../examples + +suite "Pending Blocks": + test "Should add want handle": + let + pendingBlocks = PendingBlocksManager.new() + blk = bt.Block.new("Hello".toBytes).tryGet + handle = pendingBlocks.getWantHandle(blk.cid) + + check pendingBlocks.pending(blk.cid) + + test "Should resolve want handle": + let + pendingBlocks = PendingBlocksManager.new() + blk = bt.Block.new("Hello".toBytes).tryGet + handle = pendingBlocks.getWantHandle(blk.cid) + + check blk.cid in pendingBlocks + pendingBlocks.resolve(@[blk]) + check (await handle) == blk + check blk.cid notin pendingBlocks + + test "Should cancel want handle": + let + pendingBlocks = PendingBlocksManager.new() + blk = bt.Block.new("Hello".toBytes).tryGet + handle = pendingBlocks.getWantHandle(blk.cid) + + check blk.cid in pendingBlocks + await handle.cancelAndWait() + check blk.cid notin pendingBlocks + + test "Should expire want handle": + let + pendingBlocks = PendingBlocksManager.new() + blk = bt.Block.new("Hello".toBytes).tryGet + handle = pendingBlocks.getWantHandle(blk.cid, 1.millis) + + check blk.cid in pendingBlocks + + await sleepAsync(10.millis) + expect AsyncTimeoutError: + discard await handle + + check blk.cid notin pendingBlocks + + test "Should get wants list": + let + pendingBlocks = PendingBlocksManager.new() + blks = (0..9).mapIt( bt.Block.new(("Hello " & $it).toBytes).tryGet ) + handles = blks.mapIt( pendingBlocks.getWantHandle( it.cid ) ) + + check: + blks.mapIt( $it.cid ).sorted(cmp[string]) == + toSeq(pendingBlocks.wantList).mapIt( $it ).sorted(cmp[string]) + + test "Should get want handles list": + let + pendingBlocks = PendingBlocksManager.new() + blks = (0..9).mapIt( bt.Block.new(("Hello " & $it).toBytes).tryGet ) + handles = blks.mapIt( pendingBlocks.getWantHandle( it.cid ) ) + wantHandles = toSeq(pendingBlocks.wantHandles) + + check wantHandles.len == handles.len + pendingBlocks.resolve(blks) + + check: + (await allFinished(wantHandles)).mapIt( $it.read.cid ).sorted(cmp[string]) == + (await allFinished(handles)).mapIt( $it.read.cid ).sorted(cmp[string]) diff --git a/tests/codex/testblockexchange.nim b/tests/codex/testblockexchange.nim index bb7ace3d..a966a452 100644 --- a/tests/codex/testblockexchange.nim +++ b/tests/codex/testblockexchange.nim @@ -3,5 +3,6 @@ import ./blockexchange/testnetwork import ./blockexchange/testpeerctxstore import ./blockexchange/testdiscovery import ./blockexchange/testprotobuf +import ./blockexchange/testpendingblocks {.warning[UnusedImport]: off.}