diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index dbdbfbd76..2e36d7e16 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -483,25 +483,17 @@ procSuite "Peer Manager": # assert physical connections check: - nodes[0].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 0 - nodes[0].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 2 - nodes[0].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 0 - nodes[0].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 2 + nodes[0].peerManager.getNumConnections(WakuRelayCodec) == (0, 2) + nodes[0].peerManager.getNumConnections(WakuFilterCodec) == (0, 2) - nodes[1].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1 - nodes[1].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1 - nodes[1].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1 - nodes[1].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0 + nodes[1].peerManager.getNumConnections(WakuRelayCodec) == (1, 1) + nodes[1].peerManager.getNumConnections(WakuFilterCodec) == (1, 0) - nodes[2].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 2 - nodes[2].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1 - nodes[2].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1 - nodes[2].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 1 + nodes[2].peerManager.getNumConnections(WakuRelayCodec) == (2, 1) + nodes[2].peerManager.getNumConnections(WakuFilterCodec) == (1, 1) - nodes[3].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1 - nodes[3].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 0 - nodes[3].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1 - nodes[3].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0 + nodes[3].peerManager.getNumConnections(WakuRelayCodec) == (1, 0) + nodes[3].peerManager.getNumConnections(WakuFilterCodec) == (1, 0) asyncTest "getNumStreams() returns expected number of connections per protocol": # Create 2 nodes @@ -523,15 +515,11 @@ procSuite "Peer Manager": (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true check: - nodes[0].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1 - nodes[0].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1 - nodes[0].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 0 - nodes[0].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 4 + nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) + nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4) - nodes[1].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1 - nodes[1].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1 - nodes[1].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 4 - nodes[1].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 0 + nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) + nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0) test "selectPeer() returns the correct peer": # Valid peer id missing the last digit diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 8224df400..983bda61e 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -477,31 +477,41 @@ proc connectToNodes*(pm: PeerManager, # later. await sleepAsync(chronos.seconds(5)) -# Returns the amount of physical connections for a given direction +# Returns the amount of physical connections (in and out) # containing at least one stream with the given protocol. -proc getNumConnections*(pm: PeerManager, dir: Direction, protocol: string): int = - var numConns = 0 +proc getNumConnections*(pm: PeerManager, protocol: string): (int, int) = + var + numConnsIn = 0 + numConnsOut = 0 for peerId, muxers in pm.switch.connManager.getConnections(): for peerConn in muxers: let streams = peerConn.getStreams() - if peerConn.connection.transportDir == dir: - if streams.anyIt(it.protocol == protocol): - numConns += 1 - return numConns + if streams.anyIt(it.protocol == protocol): + if peerConn.connection.transportDir == Direction.In: + numConnsIn += 1 + elif peerConn.connection.transportDir == Direction.Out: + numConnsOut += 1 -proc getNumStreams*(pm: PeerManager, dir: Direction, protocol: string): int = - var numConns = 0 + return (numConnsIn, numConnsOut) + +proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = + var + numStreamsIn = 0 + numStreamsOut = 0 for peerId, muxers in pm.switch.connManager.getConnections(): for peerConn in muxers: for stream in peerConn.getStreams(): - if stream.protocol == protocol and stream.dir == dir: - numConns += 1 - return numConns + if stream.protocol == protocol: + if stream.dir == Direction.In: + numStreamsIn += 1 + elif stream.dir == Direction.Out: + numStreamsOut += 1 + return (numStreamsIn, numStreamsOut) proc connectToRelayPeers*(pm: PeerManager) {.async.} = let maxConnections = pm.switch.connManager.inSema.size - let inRelayPeers = pm.getNumConnections(Direction.In, WakuRelayCodec) - let outRelayPeers = pm.getNumConnections(Direction.Out, WakuRelayCodec) + let (inRelayPeers, outRelayPeers) = pm.getNumStreams(WakuRelayCodec) + let (inRelayStreams, outRelayStreams) = pm.getNumConnections(WakuRelayCodec) let totalRelayPeers = inRelayPeers + outRelayPeers # Leave some room for service peers @@ -600,12 +610,13 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = proc updateMetrics(pm: PeerManager) {.async.} = heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval: - for dir in @[Direction.In, Direction.Out]: - for proto in pm.peerStore.getWakuProtos(): - let protoDirConns = pm.getNumConnections(dir, proto) - let protoDirStreams = pm.getNumStreams(dir, proto) - waku_connected_peers.set(protoDirConns.float64, labelValues = [$dir, proto]) - waku_streams_peers.set(protoDirStreams.float64, labelValues = [$dir, proto]) + for proto in pm.peerStore.getWakuProtos(): + let (protoConnsIn, protoConnsOut) = pm.getNumConnections(proto) + let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto) + waku_connected_peers.set(protoConnsIn.float64, labelValues = [$Direction.In, proto]) + waku_connected_peers.set(protoConnsOut.float64, labelValues = [$Direction.Out, proto]) + waku_streams_peers.set(protoStreamsIn.float64, labelValues = [$Direction.In, proto]) + waku_streams_peers.set(protoStreamsOut.float64, labelValues = [$Direction.Out, proto]) proc start*(pm: PeerManager) = pm.started = true