From 75dbeb1be785df5e61c9ab0bcf8393349b9d0f5e Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Tue, 5 May 2026 23:05:20 +0200 Subject: [PATCH] allow in-flight store requests more explicitly Co-authored-by: Copilot --- tests/test_peer_manager.nim | 38 +++++++++++++++++++++++++ waku/node/peer_manager/peer_manager.nim | 25 +++++++++++++--- waku/waku_store/client.nim | 2 ++ waku/waku_store/protocol.nim | 2 ++ 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index f78c3831f..608889d32 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -54,6 +54,44 @@ procSuite "Peer Manager": nodes[0].peerManager.switch.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected + asyncTest "Peer manager tracks active store request state": + let nodes = toSeq(0 ..< 2).mapIt( + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + ) + + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + let peerId = nodes[1].peerInfo.peerId + require ( + await nodes[0].peerManager.connectPeer(nodes[1].peerInfo.toRemotePeerInfo()) + ) + await sleepAsync(chronos.milliseconds(500)) + + nodes[0].peerManager.addActiveStoreRequest(peerId) + check: + nodes[0].peerManager.hasActiveStoreRequest(peerId) + + await nodes[0].peerManager.evictPeer(peerId) + await sleepAsync(chronos.milliseconds(100)) + + check: + nodes[0].peerManager.switch.peerStore.connectedness(peerId) == + Connectedness.Connected + + nodes[0].peerManager.removeActiveStoreRequest(peerId) + check: + not nodes[0].peerManager.hasActiveStoreRequest(peerId) + + await nodes[0].peerManager.evictPeer(peerId) + await sleepAsync(chronos.milliseconds(100)) + + check: + nodes[0].peerManager.switch.peerStore.connectedness(peerId) != + Connectedness.Connected + + await allFutures(nodes.mapIt(it.stop())) + asyncTest "dialPeer() works": # Create 2 nodes let nodes = toSeq(0 ..< 2).mapIt( diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index bed7f8c3c..7284d9dfd 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -107,6 +107,7 @@ type PeerManager* = ref object of RootObj online: bool ## state managed by online_monitor module getShards: GetShards maxConnections: int + activeStoreRequests*: Table[PeerId, int] #~~~~~~~~~~~~~~~~~~~# # Helper Functions # @@ -169,6 +170,22 @@ proc addPeer*( proc getPeer*(pm: PeerManager, peerId: PeerId): RemotePeerInfo = return pm.switch.peerStore.getPeer(peerId) +proc addActiveStoreRequest*(pm: PeerManager, peerId: PeerId) {.gcsafe.} = + pm.activeStoreRequests.mgetOrPut(peerId, 0).inc() + +proc removeActiveStoreRequest*(pm: PeerManager, peerId: PeerId) {.gcsafe.} = + if not pm.activeStoreRequests.contains(peerId): + return + + let count = pm.activeStoreRequests[peerId] - 1 + if count <= 0: + pm.activeStoreRequests.del(peerId) + else: + pm.activeStoreRequests[peerId] = count + +proc hasActiveStoreRequest*(pm: PeerManager, peerId: PeerId): bool {.gcsafe.} = + pm.activeStoreRequests.contains(peerId) + proc loadFromStorage(pm: PeerManager) {.gcsafe.} = ## Load peers from storage, if available @@ -521,11 +538,10 @@ proc connectedPeers*( proc evictPeer(pm: PeerManager, peerId: PeerId) {.async.} = ## Policy-based eviction (relay-peer limit, IP colocation, pruning). - ## Skips the disconnect when the peer has an in-flight store stream to + ## Skips the disconnect when the peer has an in-flight store request to ## avoid aborting active store requests. - let (storeInPeers, storeOutPeers) = pm.connectedPeers(WakuStoreCodec) - if storeInPeers.contains(peerId) or storeOutPeers.contains(peerId): - trace "skipping peer eviction: active store stream", peerId = peerId + if pm.hasActiveStoreRequest(peerId): + trace "skipping peer eviction: active store request", peerId = peerId return await pm.switch.disconnect(peerId) @@ -1224,6 +1240,7 @@ proc new*( pm.serviceSlots = initTable[string, RemotePeerInfo]() pm.ipTable = initTable[string, seq[PeerId]]() + pm.activeStoreRequests = initTable[PeerId, int]() if not storage.isNil(): trace "found persistent peer storage" diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 5b261af47..9b26d44a8 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -33,7 +33,9 @@ proc sendStoreRequest( ): Future[StoreQueryResult] {.async, gcsafe.} = var req = request + self.peerManager.addActiveStoreRequest(connection.peerId) defer: + self.peerManager.removeActiveStoreRequest(connection.peerId) await connection.closeWithEof() if req.requestId == "": diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 891c6a93c..17b7fb214 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -93,7 +93,9 @@ proc initProtocolHandler(self: WakuStore) = var resBuf: StoreResp var queryDuration: float + self.peerManager.addActiveStoreRequest(conn.peerId) defer: + self.peerManager.removeActiveStoreRequest(conn.peerId) await conn.closeWithEof() self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):