diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index d50bfd84..40ba33a2 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -82,7 +82,7 @@ proc handleWantList( if isNil(b.handlers.onWantList): return - trace "Handling want list for peer", peer = peer.id + trace "Handling want list for peer", peer = peer.id, items = list.entries.len b.handlers.onWantList(peer.id, list) # TODO: make into a template @@ -119,7 +119,7 @@ proc broadcastWantList*( if id notin b.peers: return - trace "Sending want list to peer", peer = id, `type` = $wantType, len = cids.len + trace "Sending want list to peer", peer = id, `type` = $wantType, items = cids.len let wantList = makeWantList( @@ -142,7 +142,7 @@ proc handleBlocks( if isNil(b.handlers.onBlocks): return - trace "Handling blocks for peer", peer = peer.id + trace "Handling blocks for peer", peer = peer.id, items = blocks.len var blks: seq[bt.Block] for blob in blocks: @@ -178,7 +178,7 @@ proc broadcastBlocks*( return b.peers.withValue(id, peer): - trace "Sending blocks to peer", peer = id, len = blocks.len + trace "Sending blocks to peer", peer = id, items = blocks.len peer[].broadcast(pb.Message(payload: makeBlocks(blocks))) proc handleBlockPresence( @@ -191,7 +191,7 @@ proc handleBlockPresence( if isNil(b.handlers.onPresence): return - trace "Handling block presence for peer", peer = peer.id + trace "Handling block presence for peer", peer = peer.id, items = presence.len b.handlers.onPresence(peer.id, presence) proc broadcastBlockPresence*( @@ -204,7 +204,7 @@ proc broadcastBlockPresence*( if id notin b.peers: return - trace "Sending presence to peer", peer = id + trace "Sending presence to peer", peer = id, items = presence.len b.peers.withValue(id, peer): peer[].broadcast(Message(blockPresences: @presence)) diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 964a8586..1a213383 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -38,7 +38,7 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = return try: - while not conn.atEof: + while not conn.atEof or not conn.closed: let data = await conn.readLp(MaxMessageSize) msg: Message = Protobuf.decode(data, Message) diff --git a/codex/node.nim b/codex/node.nim index 82e73763..8ae3a1c9 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -34,6 +34,9 @@ import ./contracts logScope: topics = "codex node" +const + PrefetchBatch = 100 + type CodexError = object of CatchableError @@ -128,8 +131,12 @@ proc retrieve*( ## Initiates requests to all blocks in the manifest ## try: - discard await allFinished( - manifest.mapIt( node.blockStore.getBlock( it ) )) + let + batch = manifest.blocks.len div PrefetchBatch + trace "Prefetching in batches of", batch + for blks in manifest.blocks.distribute(batch, true): + discard await allFinished( + blks.mapIt( node.blockStore.getBlock( it ) )) except CatchableError as exc: trace "Exception prefetching blocks", exc = exc.msg diff --git a/codex/stores/fsstore.nim b/codex/stores/fsstore.nim index ac98d42d..de846e50 100644 --- a/codex/stores/fsstore.nim +++ b/codex/stores/fsstore.nim @@ -150,10 +150,6 @@ method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} = except CatchableError as exc: trace "Couldn't get block", cid = $(cid.get()) - # TODO: this should run on a thread which - # wouldn't need the sleep - await sleepAsync(100.millis) # avoid blocking - proc new*( T: type FSStore, repoDir: string,