feat: add peer count limits to discovery engine

This prevents unbounded peer accumulation while maintaining redundancy.
When peer count exceeds the maximum, the least active peers are removed from
tracking to free resources.
This commit is contained in:
Chrysostomos Nanakos 2025-09-16 12:01:17 +03:00
parent 3a36e38b1f
commit a7a384a66f
No known key found for this signature in database

View File

@ -8,6 +8,7 @@
## those terms.
import std/sequtils
import std/algorithm
import pkg/chronos
import pkg/libp2p/cid
@ -38,6 +39,7 @@ const
DefaultConcurrentDiscRequests = 10
DefaultDiscoveryTimeout = 1.minutes
DefaultMinPeersPerBlock = 3
DefaultMaxPeersPerBlock = 8
DefaultDiscoveryLoopSleep = 3.seconds
type DiscoveryEngine* = ref object of RootObj
@ -51,11 +53,32 @@ type DiscoveryEngine* = ref object of RootObj
discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
minPeersPerBlock*: int # Min number of peers with block
maxPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]]
# Inflight discovery requests
proc cleanupExcessPeers(b: DiscoveryEngine, cid: Cid) {.gcsafe, raises: [].} =
var haves = b.peers.peersHave(cid)
let count = haves.len - b.maxPeersPerBlock
if count <= 0:
return
haves.sort(
proc(a, b: BlockExcPeerCtx): int =
cmp(a.lastExchange, b.lastExchange)
)
let toRemove = haves[0 ..< count]
for peer in toRemove:
try:
peer.cleanPresence(BlockAddress.init(cid))
trace "Removed block presence from peer", cid, peer = peer.id
except CatchableError as exc:
error "Failed to clean presence for peer",
cid, peer = peer.id, error = exc.msg, name = exc.name
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
try:
while b.discEngineRunning:
@ -82,6 +105,12 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
let haves = b.peers.peersHave(cid)
if haves.len > b.maxPeersPerBlock:
trace "Cleaning up excess peers",
cid, peers = haves.len, max = b.maxPeersPerBlock
b.cleanupExcessPeers(cid)
continue
if haves.len < b.minPeersPerBlock:
let request = b.discovery.find(cid)
b.inFlightDiscReqs[cid] = request
@ -158,6 +187,7 @@ proc new*(
concurrentDiscReqs = DefaultConcurrentDiscRequests,
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock,
maxPeersPerBlock = DefaultMaxPeersPerBlock,
): DiscoveryEngine =
## Create a discovery engine instance for advertising services
##
@ -173,4 +203,5 @@ proc new*(
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock,
maxPeersPerBlock: maxPeersPerBlock,
)