From 8528bbc8291a330b219f4365bd391ee4474cdd32 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Thu, 7 May 2026 17:28:30 +0200 Subject: [PATCH] Evict peer instead of abrupt disconnect and avoid sending unnecessary store requests (#3857) * peer manager not disconnect abruptly ongoing service peers streams * fix: recv_service delivers store-recovered messages (#3805) * recv_service now delivers store-recovered messages via MessageReceivedEvent --- tests/test_peer_manager.nim | 38 +++++++++++++++++++++++++ waku/node/peer_manager/peer_manager.nim | 36 ++++++++++++++++++++--- waku/waku_store/client.nim | 2 ++ waku/waku_store/protocol.nim | 2 ++ 4 files changed, 74 insertions(+), 4 deletions(-) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index f78c3831f..608889d32 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -54,6 +54,44 @@ procSuite "Peer Manager": nodes[0].peerManager.switch.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected + asyncTest "Peer manager tracks active store request state": + let nodes = toSeq(0 ..< 2).mapIt( + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + ) + + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + let peerId = nodes[1].peerInfo.peerId + require ( + await nodes[0].peerManager.connectPeer(nodes[1].peerInfo.toRemotePeerInfo()) + ) + await sleepAsync(chronos.milliseconds(500)) + + nodes[0].peerManager.addActiveStoreRequest(peerId) + check: + nodes[0].peerManager.hasActiveStoreRequest(peerId) + + await nodes[0].peerManager.evictPeer(peerId) + await sleepAsync(chronos.milliseconds(100)) + + check: + nodes[0].peerManager.switch.peerStore.connectedness(peerId) == + Connectedness.Connected + + nodes[0].peerManager.removeActiveStoreRequest(peerId) + check: + not nodes[0].peerManager.hasActiveStoreRequest(peerId) + + await nodes[0].peerManager.evictPeer(peerId) + await sleepAsync(chronos.milliseconds(100)) + + check: + nodes[0].peerManager.switch.peerStore.connectedness(peerId) != + Connectedness.Connected + + await allFutures(nodes.mapIt(it.stop())) + asyncTest "dialPeer() works": # Create 2 nodes let nodes = toSeq(0 ..< 2).mapIt( diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index e3eb8d75b..1d8b55bb5 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -107,6 +107,7 @@ type PeerManager* = ref object of RootObj online: bool ## state managed by online_monitor module getShards: GetShards maxConnections: int + activeStoreRequests*: Table[PeerId, int] #~~~~~~~~~~~~~~~~~~~# # Helper Functions # @@ -169,6 +170,23 @@ proc addPeer*( proc getPeer*(pm: PeerManager, peerId: PeerId): RemotePeerInfo = return pm.switch.peerStore.getPeer(peerId) +proc addActiveStoreRequest*(pm: PeerManager, peerId: PeerId) {.gcsafe.} = + pm.activeStoreRequests.mgetOrPut(peerId, 0).inc() + +proc removeActiveStoreRequest*(pm: PeerManager, peerId: PeerId) {.gcsafe.} = + let count = pm.activeStoreRequests.getOrDefault(peerId, 0) + if count == 0: + return + + let newCount = count - 1 + if newCount <= 0: + pm.activeStoreRequests.del(peerId) + else: + pm.activeStoreRequests[peerId] = newCount + +proc hasActiveStoreRequest*(pm: PeerManager, peerId: PeerId): bool {.gcsafe.} = + pm.activeStoreRequests.contains(peerId) + proc loadFromStorage(pm: PeerManager) {.gcsafe.} = ## Load peers from storage, if available @@ -519,6 +537,15 @@ proc connectedPeers*( return (inPeers, outPeers) +proc evictPeer*(pm: PeerManager, peerId: PeerId) {.async.} = + ## Policy-based eviction (relay-peer limit, IP colocation, pruning). + ## Skips the disconnect when the peer has an in-flight store request to + ## avoid aborting active store requests. + if pm.hasActiveStoreRequest(peerId): + trace "skipping peer eviction: active store request", peerId = peerId + return + await pm.switch.disconnect(peerId) + proc capablePeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = ## Returns the PeerIds of peers with an active socket connection. ## If a protocol is specified, it returns peers that have identified @@ -770,11 +797,11 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0] if inRelayPeers.len > pm.inRelayPeersTarget and peerStore.hasPeer(peerId, WakuRelayCodec): - info "disconnecting relay peer because reached max num in-relay peers", + info "relay peer limit reached, evicting peer", peerId = peerId, inRelayPeers = inRelayPeers.len, inRelayPeersTarget = pm.inRelayPeersTarget - await pm.switch.disconnect(peerId) + await pm.evictPeer(peerId) ## Apply max ip colocation limit if (let ip = pm.getPeerIp(peerId); ip.isSome()): @@ -787,7 +814,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if pm.colocationLimit != 0 and peersBehindIp.len > pm.colocationLimit: for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]: info "Pruning connection due to ip colocation", peerId = peerId, ip = ip - asyncSpawn(pm.switch.disconnect(peerId)) + asyncSpawn(pm.evictPeer(peerId)) peerStore.delete(peerId) WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected) @@ -1100,7 +1127,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = for p in inRelayPeers[0 ..< connsToPrune]: trace "Pruning Peer", Peer = $p - asyncSpawn(pm.switch.disconnect(p)) + asyncSpawn(pm.evictPeer(p)) proc addExtPeerEventHandler*( pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind @@ -1214,6 +1241,7 @@ proc new*( pm.serviceSlots = initTable[string, RemotePeerInfo]() pm.ipTable = initTable[string, seq[PeerId]]() + pm.activeStoreRequests = initTable[PeerId, int]() if not storage.isNil(): trace "found persistent peer storage" diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 5b261af47..9b26d44a8 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -33,7 +33,9 @@ proc sendStoreRequest( ): Future[StoreQueryResult] {.async, gcsafe.} = var req = request + self.peerManager.addActiveStoreRequest(connection.peerId) defer: + self.peerManager.removeActiveStoreRequest(connection.peerId) await connection.closeWithEof() if req.requestId == "": diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 891c6a93c..17b7fb214 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -93,7 +93,9 @@ proc initProtocolHandler(self: WakuStore) = var resBuf: StoreResp var queryDuration: float + self.peerManager.addActiveStoreRequest(conn.peerId) defer: + self.peerManager.removeActiveStoreRequest(conn.peerId) await conn.closeWithEof() self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):