diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index dc06da5b..bb54128a 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -92,6 +92,7 @@ const # Don't do more than one discovery request per `DiscoveryRateLimit` seconds. DiscoveryRateLimit = 1.seconds DefaultPeerActivityTimeout = 1.minutes + PresenceBatchSize = 1024 type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} @@ -196,6 +197,9 @@ proc refreshBlockKnowledge( await self.network.request.sendWantList(peer.id, toAsk, full = true) proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} = + let runtimeQuota = 10.milliseconds + var lastIdle = Moment.now() + for peer in self.peers.peers.values.toSeq: # We refresh block knowledge if: # 1. the peer hasn't been refreshed in a while; @@ -217,6 +221,13 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr else: trace "Not refreshing: peer is up to date", peer = peer.id + if (Moment.now() - lastIdle) >= runtimeQuota: + try: + await idleAsync() + except CancelledError: + discard + lastIdle = Moment.now() + proc searchForNewPeers(self: BlockExcEngine, cid: Cid) = if self.lastDiscRequest + DiscoveryRateLimit < Moment.now(): trace "Searching for new peers for", cid = cid @@ -595,6 +606,9 @@ proc blocksDeliveryHandler*( var validatedBlocksDelivery: seq[BlockDelivery] let peerCtx = self.peers.get(peer) + let runtimeQuota = 10.milliseconds + var lastIdle = Moment.now() + for bd in blocksDelivery: logScope: peer = peer @@ -632,6 +646,15 @@ proc blocksDeliveryHandler*( validatedBlocksDelivery.add(bd) + if (Moment.now() - lastIdle) >= runtimeQuota: + try: + await idleAsync() + except CancelledError: + discard + except CatchableError: + discard + lastIdle = Moment.now() + codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) if peerCtx != nil: @@ -657,6 +680,9 @@ proc wantListHandler*( presence: seq[BlockPresence] schedulePeer = false + let runtimeQuota = 10.milliseconds + var lastIdle = Moment.now() + try: for e in wantList.entries: logScope: @@ -717,8 +743,20 @@ proc wantListHandler*( if e.wantType == WantType.WantBlock: schedulePeer = true + if presence.len >= PresenceBatchSize or (Moment.now() - lastIdle) >= runtimeQuota: + if presence.len > 0: + trace "Sending presence batch to remote", items = presence.len + await self.network.request.sendPresence(peer, presence) + presence = @[] + try: + await idleAsync() + except CancelledError: + discard + lastIdle = Moment.now() + + # Send any remaining presence messages if presence.len > 0: - trace "Sending presence to remote", items = presence.mapIt($it).join(",") + trace "Sending final presence to remote", items = presence.len await self.network.request.sendPresence(peer, presence) if schedulePeer: diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 7eda9186..d047dc51 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -38,12 +38,19 @@ method getBlocks*( localAddresses: seq[BlockAddress] remoteAddresses: seq[BlockAddress] + let runtimeQuota = 10.milliseconds + var lastIdle = Moment.now() + for address in addresses: if not (await address in self.localStore): remoteAddresses.add(address) else: localAddresses.add(address) + if (Moment.now() - lastIdle) >= runtimeQuota: + await idleAsync() + lastIdle = Moment.now() + return chain( await self.localStore.getBlocks(localAddresses), self.engine.requestBlocks(remoteAddresses),