fix: out connections leak (#3077)

This commit is contained in:
gabrielmer 2024-10-03 12:37:22 +03:00 committed by Gabriel mermelstein
parent 73c1d60744
commit 45c6d890ef
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
2 changed files with 14 additions and 9 deletions

View File

@ -399,9 +399,11 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
asyncSpawn(pm.switch.disconnect(peerId)) asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId) pm.peerStore.delete(peerId)
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = proc connectedPeers*(
## Returns the peerIds of physical connections (in and out) pm: PeerManager, protocol: string = ""
## containing at least one stream with the given protocol. ): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## If a protocol is specified, only returns peers with at least one stream of that protocol
var inPeers: seq[PeerId] var inPeers: seq[PeerId]
var outPeers: seq[PeerId] var outPeers: seq[PeerId]
@ -409,7 +411,7 @@ proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerI
for peerId, muxers in pm.switch.connManager.getConnections(): for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers: for peerConn in muxers:
let streams = peerConn.getStreams() let streams = peerConn.getStreams()
if streams.anyIt(it.protocol == protocol): if protocol.len == 0 or streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In: if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId) inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out: elif peerConn.connection.transportDir == Direction.Out:

View File

@ -1239,16 +1239,19 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} =
# TODO: Move this logic to PeerManager # TODO: Move this logic to PeerManager
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
while node.started: while node.started:
# Keep all connected peers alive while running # Keep connected peers alive while running
# Each node is responsible of keeping its outgoing connections alive
trace "Running keepalive" trace "Running keepalive"
# First get a list of connected peer infos # First get a list of connected peer infos
let peers = let outPeers = node.peerManager.connectedPeers()[1]
node.peerManager.peerStore.peers().filterIt(it.connectedness == Connected)
for peer in peers: for peerId in outPeers:
try: try:
let conn = await node.switch.dial(peer.peerId, peer.addrs, PingCodec) info "calling keepAlive dial", peerId = peerId
let conn = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr:
warn "Failed dialing peer for keep alive", peerId = peerId
continue
let pingDelay = await node.libp2pPing.ping(conn) let pingDelay = await node.libp2pPing.ping(conn)
await conn.close() await conn.close()
except CatchableError as exc: except CatchableError as exc: