From 7d3e0469bef487278ee9b09c80e5dac69850082e Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 1 May 2026 00:33:47 +0200 Subject: [PATCH] peer manager not disconnect abruptly ongoing service peers streams --- waku/node/peer_manager/peer_manager.nim | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index e3eb8d75b..f7e980e4b 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -404,6 +404,16 @@ proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} = let peerId = peer.peerId await pm.disconnectNode(peerId) +proc evictPeer(pm: PeerManager, peerId: PeerId) {.async.} = + ## Policy-based eviction (relay-peer limit, IP colocation, pruning). + ## Skips the disconnect when the peer has an active store-query stream so + ## that in-flight store requests are not aborted by unrelated limit checks. + let (storeInPeers, storeOutPeers) = pm.connectedPeers(WakuStoreCodec) + if storeInPeers.contains(peerId) or storeOutPeers.contains(peerId): + debug "skipping peer eviction: active store stream", peerId = peerId + return + await pm.switch.disconnect(peerId) + # Dialing should be used for just protocols that require a stream to write and read # This shall not be used to dial Relay protocols, since that would create # unneccesary unused streams. @@ -770,11 +780,11 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0] if inRelayPeers.len > pm.inRelayPeersTarget and peerStore.hasPeer(peerId, WakuRelayCodec): - info "disconnecting relay peer because reached max num in-relay peers", + info "relay peer limit reached, evicting peer", peerId = peerId, inRelayPeers = inRelayPeers.len, inRelayPeersTarget = pm.inRelayPeersTarget - await pm.switch.disconnect(peerId) + await pm.evictPeer(peerId) ## Apply max ip colocation limit if (let ip = pm.getPeerIp(peerId); ip.isSome()): @@ -787,7 +797,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if pm.colocationLimit != 0 and peersBehindIp.len > pm.colocationLimit: for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]: info "Pruning connection due to ip colocation", peerId = peerId, ip = ip - asyncSpawn(pm.switch.disconnect(peerId)) + asyncSpawn(pm.evictPeer(peerId)) peerStore.delete(peerId) WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected) @@ -1100,7 +1110,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = for p in inRelayPeers[0 ..< connsToPrune]: trace "Pruning Peer", Peer = $p - asyncSpawn(pm.switch.disconnect(p)) + asyncSpawn(pm.evictPeer(p)) proc addExtPeerEventHandler*( pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind