From 87f694a8b6c666e6f8ab36fba2df4c58d56b903c Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Thu, 18 May 2023 09:40:14 +0200 Subject: [PATCH] chore(networking): set and use target outbound connections + prune (#1739) --- tests/v2/test_peer_manager.nim | 30 ++++++--- waku/v2/node/peer_manager/peer_manager.nim | 72 +++++++++++++++------- 2 files changed, 71 insertions(+), 31 deletions(-) diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 2e36d7e16..85371d458 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -456,7 +456,7 @@ procSuite "Peer Manager": # but the relay peer is not node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false - asyncTest "getNumConnections() returns expected number of connections per protocol": + asyncTest "connectedPeers() returns expected number of connections per protocol": # Create 4 nodes let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) @@ -483,17 +483,29 @@ procSuite "Peer Manager": # assert physical connections check: - nodes[0].peerManager.getNumConnections(WakuRelayCodec) == (0, 2) - nodes[0].peerManager.getNumConnections(WakuFilterCodec) == (0, 2) + nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0 + nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2 - nodes[1].peerManager.getNumConnections(WakuRelayCodec) == (1, 1) - nodes[1].peerManager.getNumConnections(WakuFilterCodec) == (1, 0) + nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0 + nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2 - nodes[2].peerManager.getNumConnections(WakuRelayCodec) == (2, 1) - nodes[2].peerManager.getNumConnections(WakuFilterCodec) == (1, 1) + nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 + nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 - nodes[3].peerManager.getNumConnections(WakuRelayCodec) == (1, 0) - nodes[3].peerManager.getNumConnections(WakuFilterCodec) == (1, 0) + nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 + nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0 + + nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2 + nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 + + nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 + nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1 + + nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 + nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0 + + nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 + nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0 asyncTest "getNumStreams() returns expected number of connections per protocol": # Create 2 nodes diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 7b3ff4aec..06b8a2cf0 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -57,6 +57,9 @@ const # How often the peer store is updated with metrics UpdateMetricsInterval = chronos.seconds(15) + # How often to log peer manager metrics + LogSummaryInterval = chronos.seconds(60) + type PeerManager* = ref object of RootObj switch*: Switch @@ -66,6 +69,7 @@ type maxFailedAttempts*: int storage: PeerStorage serviceSlots*: Table[string, RemotePeerInfo] + outPeersTarget*: int started: bool proc protocolMatcher*(codec: string): Matcher = @@ -333,6 +337,7 @@ proc new*(T: type PeerManager, storage: storage, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, + outPeersTarget: max(maxConnections div 10, 10), maxFailedAttempts: maxFailedAttempts) proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = @@ -477,22 +482,22 @@ proc connectToNodes*(pm: PeerManager, # later. await sleepAsync(chronos.seconds(5)) -# Returns the amount of physical connections (in and out) +# Returns the peerIds of physical connections (in and out) # containing at least one stream with the given protocol. -proc getNumConnections*(pm: PeerManager, protocol: string): (int, int) = - var - numConnsIn = 0 - numConnsOut = 0 +proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = + var inPeers: seq[PeerId] + var outPeers: seq[PeerId] + for peerId, muxers in pm.switch.connManager.getConnections(): for peerConn in muxers: let streams = peerConn.getStreams() if streams.anyIt(it.protocol == protocol): if peerConn.connection.transportDir == Direction.In: - numConnsIn += 1 + inPeers.add(peerId) elif peerConn.connection.transportDir == Direction.Out: - numConnsOut += 1 + outPeers.add(peerId) - return (numConnsIn, numConnsOut) + return (inPeers, outPeers) proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = var @@ -508,28 +513,33 @@ proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = numStreamsOut += 1 return (numStreamsIn, numStreamsOut) +proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = + let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) + let connsToPrune = min(amount, inRelayPeers.len) + + for p in inRelayPeers[0.. inPeersTarget: + await pm.pruneInRelayConns(inRelayPeers.len-inPeersTarget) + + if outRelayPeers.len >= pm.outPeersTarget: + return # Leave some room for service peers if totalRelayPeers >= (maxConnections - 5): return - # TODO: Track only relay connections (nwaku/issues/1566) let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials) - info "Relay peer connections", - inRelayConns = inRelayPeers, - outRelayConns = outRelayPeers, - totalRelayConns = totalRelayPeers, - targetConnectedPeers = maxConnections, - notConnectedPeers = notConnectedPeers.len, - outsideBackoffPeers = outsideBackoffPeers.len - await pm.connectToNodes(outsideBackoffPeers[0..