From 73cbafa658295213298b06ef7b349683f0d1f858 Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Wed, 12 Apr 2023 13:05:34 +0200 Subject: [PATCH] chore(networking): get relay number of connections from protocol conns/streams (#1609) --- tests/v2/test_peer_manager.nim | 77 +++++++++++++++++++ waku/common/utils/sequence.nim | 12 +++ waku/v2/node/peer_manager/peer_manager.nim | 59 +++++++++++--- waku/v2/node/peer_manager/waku_peer_store.nim | 13 +++- waku/v2/utils/heartbeat.nim | 29 +++++++ 5 files changed, 176 insertions(+), 14 deletions(-) create mode 100644 waku/common/utils/sequence.nim create mode 100644 waku/v2/utils/heartbeat.nim diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 3cb141af6..4b60afa30 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -462,6 +462,83 @@ procSuite "Peer Manager": # but the relay peer is not node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false + asyncTest "getNumConnections() returns expected number of connections per protocol": + # Create 4 nodes + let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + + # Start them with relay + filter + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + await allFutures(nodes.mapIt(it.mountFilter())) + + let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) + + # create some connections/streams + require: + # some relay connections + (await nodes[0].peerManager.connectRelay(pInfos[1])) == true + (await nodes[0].peerManager.connectRelay(pInfos[2])) == true + (await nodes[1].peerManager.connectRelay(pInfos[2])) == true + + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true + + # isolated dial creates a relay conn under the hood (libp2p behaviour) + (await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true + + + # assert physical connections + check: + nodes[0].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 0 + nodes[0].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 2 + nodes[0].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 0 + nodes[0].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 2 + + nodes[1].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1 + nodes[1].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1 + nodes[1].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1 + nodes[1].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0 + + nodes[2].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 2 + nodes[2].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1 + nodes[2].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1 + nodes[2].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 1 + + nodes[3].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1 + nodes[3].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 0 + nodes[3].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1 + nodes[3].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0 + + asyncTest "getNumStreams() returns expected number of connections per protocol": + # Create 2 nodes + let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + + # Start them with relay + filter + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + await allFutures(nodes.mapIt(it.mountFilter())) + + let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) + + require: + # multiple streams are multiplexed over a single connection. + # note that a relay connection is created under the hood when dialing a peer (libp2p behaviour) + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true + + check: + nodes[0].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1 + nodes[0].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1 + nodes[0].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 0 + nodes[0].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 4 + + nodes[1].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1 + nodes[1].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1 + nodes[1].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 4 + nodes[1].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 0 + test "selectPeer() returns the correct peer": # Valid peer id missing the last digit let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D" diff --git a/waku/common/utils/sequence.nim b/waku/common/utils/sequence.nim new file mode 100644 index 000000000..ca61b36e9 --- /dev/null +++ b/waku/common/utils/sequence.nim @@ -0,0 +1,12 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/sequtils + +proc flatten*[T](a: seq[seq[T]]): seq[T] = + var aFlat = newSeq[T](0) + for subseq in a: + aFlat &= subseq + return aFlat diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 3dc52e4e4..36d25f740 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -9,10 +9,12 @@ import chronos, chronicles, metrics, - libp2p/multistream + libp2p/multistream, + libp2p/muxers/muxer import ../../protocol/waku_relay, ../../utils/peers, + ../../utils/heartbeat, ./peer_store/peer_storage, ./waku_peer_store @@ -22,7 +24,8 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] # TODO: Populate from PeerStore.Source when ready declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] -declarePublicGauge waku_connected_peers, "Number of connected peers per direction", ["direction"] +declarePublicGauge waku_connected_peers, "Number of physical connections per direction and protocol", labels = ["direction", "protocol"] +declarePublicGauge waku_streams_peers, "Number of streams per direction and protocol", labels = ["direction", "protocol"] declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store" declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"] @@ -51,6 +54,9 @@ const # How often the peer store is pruned PrunePeerStoreInterval = chronos.minutes(5) + # How often the peer store is updated with metrics + UpdateMetricsInterval = chronos.seconds(15) + type PeerManager* = ref object of RootObj switch*: Switch @@ -252,7 +258,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = let direction = if event.initiator: Outbound else: Inbound pm.peerStore[ConnectionBook][peerId] = Connected pm.peerStore[DirectionBook][peerId] = direction - waku_connected_peers.inc(1, labelValues=[$direction]) if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected) @@ -261,7 +266,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = elif event.kind == PeerEventKind.Left: pm.peerStore[DirectionBook][peerId] = UnknownDirection pm.peerStore[ConnectionBook][peerId] = CanConnect - waku_connected_peers.dec(1, labelValues=[$pm.peerStore[DirectionBook][peerId]]) if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix) @@ -430,16 +434,35 @@ proc connectToNodes*(pm: PeerManager, # later. await sleepAsync(chronos.seconds(5)) +# Returns the amount of physical connections for a given direction +# containing at least one stream with the given protocol. +proc getNumConnections*(pm: PeerManager, dir: Direction, protocol: string): int = + var numConns = 0 + for peerId, muxers in pm.switch.connManager.getConnections(): + for peerConn in muxers: + let streams = peerConn.getStreams() + if peerConn.connection.transportDir == dir: + if streams.anyIt(it.protocol == protocol): + numConns += 1 + return numConns + +proc getNumStreams*(pm: PeerManager, dir: Direction, protocol: string): int = + var numConns = 0 + for peerId, muxers in pm.switch.connManager.getConnections(): + for peerConn in muxers: + for stream in peerConn.getStreams(): + if stream.protocol == protocol and stream.dir == dir: + numConns += 1 + return numConns + proc connectToRelayPeers*(pm: PeerManager) {.async.} = let maxConnections = pm.switch.connManager.inSema.size - let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len - let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len - let numConPeers = numInPeers + numOutPeers - - # TODO: Enforce a given in/out peers ratio + let inRelayPeers = pm.getNumConnections(Direction.In, WakuRelayCodec) + let outRelayPeers = pm.getNumConnections(Direction.Out, WakuRelayCodec) + let totalRelayPeers = inRelayPeers + outRelayPeers # Leave some room for service peers - if numConPeers >= (maxConnections - 5): + if totalRelayPeers >= (maxConnections - 5): return # TODO: Track only relay connections (nwaku/issues/1566) @@ -447,10 +470,12 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} = let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId, pm.initialBackoffInSec, pm.backoffFactor)) - let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials) + let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials) info "Relay peer connections", - connectedPeers = numConPeers, + inRelayConns = inRelayPeers, + outRelayConns = outRelayPeers, + totalRelayConns = totalRelayPeers, targetConnectedPeers = maxConnections, notConnectedPeers = notConnectedPeers.len, outsideBackoffPeers = outsideBackoffPeers.len @@ -532,8 +557,18 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = await pm.connectToRelayPeers() await sleepAsync(ConnectivityLoopInterval) +proc updateMetrics(pm: PeerManager) {.async.} = + heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval: + for dir in @[Direction.In, Direction.Out]: + for proto in pm.peerStore.getWakuProtos(): + let protoDirConns = pm.getNumConnections(dir, proto) + let protoDirStreams = pm.getNumStreams(dir, proto) + waku_connected_peers.set(protoDirConns.float64, labelValues = [$dir, proto]) + waku_streams_peers.set(protoDirStreams.float64, labelValues = [$dir, proto]) + proc start*(pm: PeerManager) = pm.started = true + asyncSpawn pm.updateMetrics() asyncSpawn pm.relayConnectivityLoop() asyncSpawn pm.prunePeerStoreLoop() diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index d1f575f94..b5ee166e2 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -4,14 +4,15 @@ else: {.push raises: [].} import - std/[tables, sequtils, sets, options, times, math], + std/[tables, sequtils, sets, options, times, math, strutils], chronos, eth/p2p/discoveryv5/enr, libp2p/builders, libp2p/peerstore import - ../../utils/peers + ../../utils/peers, + ../../../common/utils/sequence export peerstore, builders @@ -90,6 +91,14 @@ proc get*(peerStore: PeerStore, numberFailedConn: peerStore[NumberFailedConnBook][peerId] ) +proc getWakuProtos*(peerStore: PeerStore): seq[string] = + ## Get the waku protocols of all the stored peers. + let wakuProtocols = toSeq(peerStore[ProtoBook].book.values()) + .flatten() + .deduplicate() + .filterIt(it.startsWith("/vac/waku")) + return wakuProtocols + # TODO: Rename peers() to getPeersByProtocol() proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = ## Get all the stored information of every peer. diff --git a/waku/v2/utils/heartbeat.nim b/waku/v2/utils/heartbeat.nim new file mode 100644 index 000000000..d2af553d9 --- /dev/null +++ b/waku/v2/utils/heartbeat.nim @@ -0,0 +1,29 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import sequtils +import chronos, chronicles + +# Taken from: https://github.com/status-im/nim-libp2p/blob/master/libp2p/utils/heartbeat.nim + +template heartbeat*(name: string, interval: Duration, body: untyped): untyped = + var nextHeartbeat = Moment.now() + while true: + body + + nextHeartbeat += interval + let now = Moment.now() + if nextHeartbeat < now: + let + delay = now - nextHeartbeat + itv = interval + if delay > itv: + info "Missed multiple heartbeats", heartbeat = name, + delay = delay, hinterval = itv + else: + debug "Missed heartbeat", heartbeat = name, + delay = delay, hinterval = itv + nextHeartbeat = now + itv + await sleepAsync(nextHeartbeat - now)