From d33804f700f199d40561f1fee75fba9019cba708 Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Fri, 15 Mar 2024 18:50:56 -0300 Subject: [PATCH] fixes double lookups when block does not exist (#739) * fixes double lookups when block does not exist * handle timeouts on requestBlock * fix indentation which was causing an integration test to fail --- codex/blockexchange/engine/engine.nim | 24 ++++--- codex/node.nim | 97 +++++++++++++++++---------- tests/codex/node/helpers.nim | 17 ++++- tests/codex/node/testnode.nim | 15 +++++ 4 files changed, 106 insertions(+), 47 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 60a7a689..007c1a5b 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -73,6 +73,7 @@ type peersPerRequest: int # Max number of peers to request from wallet*: WalletRef # Nitro wallet for micropayments pricing*: ?Pricing # Optional bandwidth pricing + blockFetchTimeout*: Duration # Timeout for fetching blocks over the network discovery*: DiscoveryEngine Pricing* = object @@ -173,12 +174,10 @@ proc monitorBlockHandle( proc requestBlock*( b: BlockExcEngine, address: BlockAddress, - timeout = DefaultBlockTimeout): Future[?!Block] {.async.} = - let blockFuture = b.pendingBlocks.getWantHandle(address, timeout) +): Future[?!Block] {.async.} = + let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) - if b.pendingBlocks.isInFlight(address): - success await blockFuture - else: + if not b.pendingBlocks.isInFlight(address): let peers = b.peers.selectCheapest(address) if peers.len == 0: b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) @@ -199,12 +198,17 @@ proc requestBlock*( await b.sendWantHave(address, @[peer], toSeq(b.peers)) codex_block_exchange_want_have_lists_sent.inc() + # Don't let timeouts bubble up. We can't be too broad here or we break + # cancellations. + try: success await blockFuture + except AsyncTimeoutError as err: + failure err proc requestBlock*( b: BlockExcEngine, - cid: Cid, - timeout = DefaultBlockTimeout): Future[?!Block] = + cid: Cid +): Future[?!Block] = b.requestBlock(BlockAddress.init(cid)) proc blockPresenceHandler*( @@ -614,7 +618,8 @@ proc new*( peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, concurrentTasks = DefaultConcurrentTasks, - peersPerRequest = DefaultMaxPeersPerRequest + peersPerRequest = DefaultMaxPeersPerRequest, + blockFetchTimeout = DefaultBlockTimeout, ): BlockExcEngine = ## Create new block exchange engine instance ## @@ -629,7 +634,8 @@ proc new*( wallet: wallet, concurrentTasks: concurrentTasks, taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), - discovery: discovery) + discovery: discovery, + blockFetchTimeout: blockFetchTimeout) proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: diff --git a/codex/node.nim b/codex/node.nim index b8466943..3eae6cb1 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -209,6 +209,62 @@ proc fetchBatched*( let iter = Iter.fromSlice(0..