From 45c6d890ef4e09534d5598e81a714a5934aae26f Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Thu, 3 Oct 2024 12:37:22 +0300 Subject: [PATCH] fix: out connections leak (#3077) --- waku/node/peer_manager/peer_manager.nim | 10 ++++++---- waku/node/waku_node.nim | 13 ++++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 6e5a9c697..6d4e23d40 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -399,9 +399,11 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = asyncSpawn(pm.switch.disconnect(peerId)) pm.peerStore.delete(peerId) -proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = - ## Returns the peerIds of physical connections (in and out) - ## containing at least one stream with the given protocol. +proc connectedPeers*( + pm: PeerManager, protocol: string = "" +): (seq[PeerId], seq[PeerId]) = + ## Returns the peerIds of physical connections (in and out) + ## If a protocol is specified, only returns peers with at least one stream of that protocol var inPeers: seq[PeerId] var outPeers: seq[PeerId] @@ -409,7 +411,7 @@ proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerI for peerId, muxers in pm.switch.connManager.getConnections(): for peerConn in muxers: let streams = peerConn.getStreams() - if streams.anyIt(it.protocol == protocol): + if protocol.len == 0 or streams.anyIt(it.protocol == protocol): if peerConn.connection.transportDir == Direction.In: inPeers.add(peerId) elif peerConn.connection.transportDir == Direction.Out: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e137f3ed0..a28862508 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1239,16 +1239,19 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} = # TODO: Move this logic to PeerManager proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = while node.started: - # Keep all connected peers alive while running + # Keep connected peers alive while running + # Each node is responsible of keeping its outgoing connections alive trace "Running keepalive" # First get a list of connected peer infos - let peers = - node.peerManager.peerStore.peers().filterIt(it.connectedness == Connected) + let outPeers = node.peerManager.connectedPeers()[1] - for peer in peers: + for peerId in outPeers: try: - let conn = await node.switch.dial(peer.peerId, peer.addrs, PingCodec) + info "calling keepAlive dial", peerId = peerId + let conn = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr: + warn "Failed dialing peer for keep alive", peerId = peerId + continue let pingDelay = await node.libp2pPing.ping(conn) await conn.close() except CatchableError as exc: