Blocks leak and inflight for pending blocks (#315)

* attach `wait` to handle fut

* fix crash when can't find a CID and timeout hits

the exception was not raised, thus a Nil was returned that was
than wrapped in an option, leading to crash.


Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>

* add inFlight flag

* adding pending blocks tests

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
Co-authored-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Dmitriy Ryajov 2022-11-15 00:12:05 -06:00 committed by GitHub
parent 978e2f09e4
commit 5f9507cfcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 116 additions and 16 deletions

View File

@ -27,27 +27,37 @@ const
DefaultBlockTimeout* = 10.minutes DefaultBlockTimeout* = 10.minutes
type type
BlockReq* = object
handle*: Future[Block]
inFlight*: bool
PendingBlocksManager* = ref object of RootObj PendingBlocksManager* = ref object of RootObj
blocks*: Table[Cid, Future[Block]] # pending Block requests blocks*: Table[Cid, BlockReq] # pending Block requests
proc getWantHandle*( proc getWantHandle*(
p: PendingBlocksManager, p: PendingBlocksManager,
cid: Cid, cid: Cid,
timeout = DefaultBlockTimeout): Future[Block] {.async.} = timeout = DefaultBlockTimeout,
inFlight = false): Future[Block] {.async.} =
## Add an event for a block ## Add an event for a block
## ##
if cid notin p.blocks:
p.blocks[cid] = newFuture[Block]().wait(timeout)
trace "Adding pending future for block", cid
try: try:
return await p.blocks[cid] if cid notin p.blocks:
p.blocks[cid] = BlockReq(
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
inFlight: inFlight)
trace "Adding pending future for block", cid
return await p.blocks[cid].handle.wait(timeout)
except CancelledError as exc: except CancelledError as exc:
trace "Blocks cancelled", exc = exc.msg, cid trace "Blocks cancelled", exc = exc.msg, cid
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Pending WANT failed or expired", exc = exc.msg trace "Pending WANT failed or expired", exc = exc.msg
# no need to cancel, it is already cancelled by wait()
raise exc
finally: finally:
p.blocks.del(cid) p.blocks.del(cid)
@ -59,12 +69,22 @@ proc resolve*(
for blk in blocks: for blk in blocks:
# resolve any pending blocks # resolve any pending blocks
if blk.cid in p.blocks: p.blocks.withValue(blk.cid, pending):
p.blocks.withValue(blk.cid, pending): if not pending[].handle.completed:
if not pending[].finished: trace "Resolving block", cid = blk.cid
trace "Resolving block", cid = $blk.cid pending[].handle.complete(blk)
pending[].complete(blk)
p.blocks.del(blk.cid) proc setInFlight*(
p: PendingBlocksManager,
cid: Cid) =
p.blocks.withValue(cid, pending):
pending[].inFlight = true
proc isInFlight*(
p: PendingBlocksManager,
cid: Cid): bool =
p.blocks.withValue(cid, pending):
result = pending[].inFlight
proc pending*( proc pending*(
p: PendingBlocksManager, p: PendingBlocksManager,
@ -80,11 +100,10 @@ iterator wantList*(p: PendingBlocksManager): Cid =
iterator wantHandles*(p: PendingBlocksManager): Future[Block] = iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
for v in p.blocks.values: for v in p.blocks.values:
yield v yield v.handle
func len*(p: PendingBlocksManager): int = func len*(p: PendingBlocksManager): int =
p.blocks.len p.blocks.len
func new*(T: type PendingBlocksManager): T = func new*(T: type PendingBlocksManager): T =
T( T()
blocks: initTable[Cid, Future[Block]]())

View File

@ -0,0 +1,80 @@
import std/sequtils
import std/algorithm
import pkg/chronos
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils
import pkg/codex/blocktype as bt
import pkg/codex/blockexchange
import ../examples
suite "Pending Blocks":
test "Should add want handle":
let
pendingBlocks = PendingBlocksManager.new()
blk = bt.Block.new("Hello".toBytes).tryGet
handle = pendingBlocks.getWantHandle(blk.cid)
check pendingBlocks.pending(blk.cid)
test "Should resolve want handle":
let
pendingBlocks = PendingBlocksManager.new()
blk = bt.Block.new("Hello".toBytes).tryGet
handle = pendingBlocks.getWantHandle(blk.cid)
check blk.cid in pendingBlocks
pendingBlocks.resolve(@[blk])
check (await handle) == blk
check blk.cid notin pendingBlocks
test "Should cancel want handle":
let
pendingBlocks = PendingBlocksManager.new()
blk = bt.Block.new("Hello".toBytes).tryGet
handle = pendingBlocks.getWantHandle(blk.cid)
check blk.cid in pendingBlocks
await handle.cancelAndWait()
check blk.cid notin pendingBlocks
test "Should expire want handle":
let
pendingBlocks = PendingBlocksManager.new()
blk = bt.Block.new("Hello".toBytes).tryGet
handle = pendingBlocks.getWantHandle(blk.cid, 1.millis)
check blk.cid in pendingBlocks
await sleepAsync(10.millis)
expect AsyncTimeoutError:
discard await handle
check blk.cid notin pendingBlocks
test "Should get wants list":
let
pendingBlocks = PendingBlocksManager.new()
blks = (0..9).mapIt( bt.Block.new(("Hello " & $it).toBytes).tryGet )
handles = blks.mapIt( pendingBlocks.getWantHandle( it.cid ) )
check:
blks.mapIt( $it.cid ).sorted(cmp[string]) ==
toSeq(pendingBlocks.wantList).mapIt( $it ).sorted(cmp[string])
test "Should get want handles list":
let
pendingBlocks = PendingBlocksManager.new()
blks = (0..9).mapIt( bt.Block.new(("Hello " & $it).toBytes).tryGet )
handles = blks.mapIt( pendingBlocks.getWantHandle( it.cid ) )
wantHandles = toSeq(pendingBlocks.wantHandles)
check wantHandles.len == handles.len
pendingBlocks.resolve(blks)
check:
(await allFinished(wantHandles)).mapIt( $it.read.cid ).sorted(cmp[string]) ==
(await allFinished(handles)).mapIt( $it.read.cid ).sorted(cmp[string])

View File

@ -3,5 +3,6 @@ import ./blockexchange/testnetwork
import ./blockexchange/testpeerctxstore import ./blockexchange/testpeerctxstore
import ./blockexchange/testdiscovery import ./blockexchange/testdiscovery
import ./blockexchange/testprotobuf import ./blockexchange/testprotobuf
import ./blockexchange/testpendingblocks
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}