mirror of https://github.com/waku-org/nwaku.git
fix: out connections leak (#3077)
This commit is contained in:
parent
3ad613cad4
commit
eb2bbae665
|
@ -404,9 +404,11 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
|
||||||
asyncSpawn(pm.switch.disconnect(peerId))
|
asyncSpawn(pm.switch.disconnect(peerId))
|
||||||
pm.wakuPeerStore.delete(peerId)
|
pm.wakuPeerStore.delete(peerId)
|
||||||
|
|
||||||
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
|
proc connectedPeers*(
|
||||||
## Returns the peerIds of physical connections (in and out)
|
pm: PeerManager, protocol: string = ""
|
||||||
## containing at least one stream with the given protocol.
|
): (seq[PeerId], seq[PeerId]) =
|
||||||
|
## Returns the peerIds of physical connections (in and out)
|
||||||
|
## If a protocol is specified, only returns peers with at least one stream of that protocol
|
||||||
|
|
||||||
var inPeers: seq[PeerId]
|
var inPeers: seq[PeerId]
|
||||||
var outPeers: seq[PeerId]
|
var outPeers: seq[PeerId]
|
||||||
|
@ -414,7 +416,7 @@ proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerI
|
||||||
for peerId, muxers in pm.switch.connManager.getConnections():
|
for peerId, muxers in pm.switch.connManager.getConnections():
|
||||||
for peerConn in muxers:
|
for peerConn in muxers:
|
||||||
let streams = peerConn.getStreams()
|
let streams = peerConn.getStreams()
|
||||||
if streams.anyIt(it.protocol == protocol):
|
if protocol.len == 0 or streams.anyIt(it.protocol == protocol):
|
||||||
if peerConn.connection.transportDir == Direction.In:
|
if peerConn.connection.transportDir == Direction.In:
|
||||||
inPeers.add(peerId)
|
inPeers.add(peerId)
|
||||||
elif peerConn.connection.transportDir == Direction.Out:
|
elif peerConn.connection.transportDir == Direction.Out:
|
||||||
|
|
|
@ -1240,16 +1240,19 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} =
|
||||||
# TODO: Move this logic to PeerManager
|
# TODO: Move this logic to PeerManager
|
||||||
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||||
while node.started:
|
while node.started:
|
||||||
# Keep all connected peers alive while running
|
# Keep connected peers alive while running
|
||||||
|
# Each node is responsible of keeping its outgoing connections alive
|
||||||
trace "Running keepalive"
|
trace "Running keepalive"
|
||||||
|
|
||||||
# First get a list of connected peer infos
|
# First get a list of connected peer infos
|
||||||
let peers =
|
let outPeers = node.peerManager.connectedPeers()[1]
|
||||||
node.peerManager.wakuPeerStore.peers().filterIt(it.connectedness == Connected)
|
|
||||||
|
|
||||||
for peer in peers:
|
for peerId in outPeers:
|
||||||
try:
|
try:
|
||||||
let conn = await node.switch.dial(peer.peerId, peer.addrs, PingCodec)
|
info "calling keepAlive dial", peerId = peerId
|
||||||
|
let conn = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr:
|
||||||
|
warn "Failed dialing peer for keep alive", peerId = peerId
|
||||||
|
continue
|
||||||
let pingDelay = await node.libp2pPing.ping(conn)
|
let pingDelay = await node.libp2pPing.ping(conn)
|
||||||
await conn.close()
|
await conn.close()
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
|
Loading…
Reference in New Issue