diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 6288ceae..883eea8e 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -8,6 +8,7 @@ ## those terms. import std/sequtils +import std/algorithm import pkg/chronos import pkg/libp2p/cid @@ -38,6 +39,7 @@ const DefaultConcurrentDiscRequests = 10 DefaultDiscoveryTimeout = 1.minutes DefaultMinPeersPerBlock = 3 + DefaultMaxPeersPerBlock = 8 DefaultDiscoveryLoopSleep = 3.seconds type DiscoveryEngine* = ref object of RootObj @@ -51,11 +53,32 @@ type DiscoveryEngine* = ref object of RootObj discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle discoveryQueue*: AsyncQueue[Cid] # Discovery queue trackedFutures*: TrackedFutures # Tracked Discovery tasks futures - minPeersPerBlock*: int # Max number of peers with block + minPeersPerBlock*: int # Min number of peers with block + maxPeersPerBlock*: int # Max number of peers with block discoveryLoopSleep: Duration # Discovery loop sleep inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests +proc cleanupExcessPeers(b: DiscoveryEngine, cid: Cid) {.gcsafe, raises: [].} = + var haves = b.peers.peersHave(cid) + let count = haves.len - b.maxPeersPerBlock + if count <= 0: + return + + haves.sort( + proc(a, b: BlockExcPeerCtx): int = + cmp(a.lastExchange, b.lastExchange) + ) + + let toRemove = haves[0 ..< count] + for peer in toRemove: + try: + peer.cleanPresence(BlockAddress.init(cid)) + trace "Removed block presence from peer", cid, peer = peer.id + except CatchableError as exc: + error "Failed to clean presence for peer", + cid, peer = peer.id, error = exc.msg, name = exc.name + proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} = try: while b.discEngineRunning: @@ -82,6 +105,12 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = let haves = b.peers.peersHave(cid) + if haves.len > b.maxPeersPerBlock: + trace "Cleaning up excess peers", + cid, peers = haves.len, max = b.maxPeersPerBlock + b.cleanupExcessPeers(cid) + continue + if haves.len < b.minPeersPerBlock: let request = b.discovery.find(cid) b.inFlightDiscReqs[cid] = request @@ -158,6 +187,7 @@ proc new*( concurrentDiscReqs = DefaultConcurrentDiscRequests, discoveryLoopSleep = DefaultDiscoveryLoopSleep, minPeersPerBlock = DefaultMinPeersPerBlock, + maxPeersPerBlock = DefaultMaxPeersPerBlock, ): DiscoveryEngine = ## Create a discovery engine instance for advertising services ## @@ -173,4 +203,5 @@ proc new*( inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](), discoveryLoopSleep: discoveryLoopSleep, minPeersPerBlock: minPeersPerBlock, + maxPeersPerBlock: maxPeersPerBlock, )