From b9fe7dce582ef6a8532024cfedf9127d3a2141c3 Mon Sep 17 00:00:00 2001 From: Chrysostomos Nanakos Date: Tue, 16 Sep 2025 12:01:17 +0300 Subject: [PATCH] feat: add peer count limits to discovery engine This prevents unbounded peer accumulation while maintaining redundancy. When peer count exceeds the maximum, the least active peers are removed from tracking to free resources. Part of https://github.com/codex-storage/nim-codex/issues/974 --- codex/blockexchange/engine/discovery.nim | 33 +++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 6288ceae..883eea8e 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -8,6 +8,7 @@ ## those terms. import std/sequtils +import std/algorithm import pkg/chronos import pkg/libp2p/cid @@ -38,6 +39,7 @@ const DefaultConcurrentDiscRequests = 10 DefaultDiscoveryTimeout = 1.minutes DefaultMinPeersPerBlock = 3 + DefaultMaxPeersPerBlock = 8 DefaultDiscoveryLoopSleep = 3.seconds type DiscoveryEngine* = ref object of RootObj @@ -51,11 +53,32 @@ type DiscoveryEngine* = ref object of RootObj discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle discoveryQueue*: AsyncQueue[Cid] # Discovery queue trackedFutures*: TrackedFutures # Tracked Discovery tasks futures - minPeersPerBlock*: int # Max number of peers with block + minPeersPerBlock*: int # Min number of peers with block + maxPeersPerBlock*: int # Max number of peers with block discoveryLoopSleep: Duration # Discovery loop sleep inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests +proc cleanupExcessPeers(b: DiscoveryEngine, cid: Cid) {.gcsafe, raises: [].} = + var haves = b.peers.peersHave(cid) + let count = haves.len - b.maxPeersPerBlock + if count <= 0: + return + + haves.sort( + proc(a, b: BlockExcPeerCtx): int = + cmp(a.lastExchange, b.lastExchange) + ) + + let toRemove = haves[0 ..< count] + for peer in toRemove: + try: + peer.cleanPresence(BlockAddress.init(cid)) + trace "Removed block presence from peer", cid, peer = peer.id + except CatchableError as exc: + error "Failed to clean presence for peer", + cid, peer = peer.id, error = exc.msg, name = exc.name + proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} = try: while b.discEngineRunning: @@ -82,6 +105,12 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = let haves = b.peers.peersHave(cid) + if haves.len > b.maxPeersPerBlock: + trace "Cleaning up excess peers", + cid, peers = haves.len, max = b.maxPeersPerBlock + b.cleanupExcessPeers(cid) + continue + if haves.len < b.minPeersPerBlock: let request = b.discovery.find(cid) b.inFlightDiscReqs[cid] = request @@ -158,6 +187,7 @@ proc new*( concurrentDiscReqs = DefaultConcurrentDiscRequests, discoveryLoopSleep = DefaultDiscoveryLoopSleep, minPeersPerBlock = DefaultMinPeersPerBlock, + maxPeersPerBlock = DefaultMaxPeersPerBlock, ): DiscoveryEngine = ## Create a discovery engine instance for advertising services ## @@ -173,4 +203,5 @@ proc new*( inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](), discoveryLoopSleep: discoveryLoopSleep, minPeersPerBlock: minPeersPerBlock, + maxPeersPerBlock: maxPeersPerBlock, )