mirror of https://github.com/waku-org/nwaku.git
chore(networking): get relay number of connections from protocol conns/streams (#1609)
This commit is contained in:
parent
b2dcb07751
commit
73cbafa658
|
@ -462,6 +462,83 @@ procSuite "Peer Manager":
|
||||||
# but the relay peer is not
|
# but the relay peer is not
|
||||||
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
|
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
|
||||||
|
|
||||||
|
asyncTest "getNumConnections() returns expected number of connections per protocol":
|
||||||
|
# Create 4 nodes
|
||||||
|
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||||
|
|
||||||
|
# Start them with relay + filter
|
||||||
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||||
|
|
||||||
|
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
# create some connections/streams
|
||||||
|
require:
|
||||||
|
# some relay connections
|
||||||
|
(await nodes[0].peerManager.connectRelay(pInfos[1])) == true
|
||||||
|
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
|
||||||
|
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true
|
||||||
|
|
||||||
|
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
|
||||||
|
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true
|
||||||
|
|
||||||
|
# isolated dial creates a relay conn under the hood (libp2p behaviour)
|
||||||
|
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true
|
||||||
|
|
||||||
|
|
||||||
|
# assert physical connections
|
||||||
|
check:
|
||||||
|
nodes[0].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 0
|
||||||
|
nodes[0].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 2
|
||||||
|
nodes[0].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 0
|
||||||
|
nodes[0].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 2
|
||||||
|
|
||||||
|
nodes[1].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1
|
||||||
|
nodes[1].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1
|
||||||
|
nodes[1].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
|
||||||
|
nodes[1].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0
|
||||||
|
|
||||||
|
nodes[2].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 2
|
||||||
|
nodes[2].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1
|
||||||
|
nodes[2].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
|
||||||
|
nodes[2].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 1
|
||||||
|
|
||||||
|
nodes[3].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1
|
||||||
|
nodes[3].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 0
|
||||||
|
nodes[3].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
|
||||||
|
nodes[3].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0
|
||||||
|
|
||||||
|
asyncTest "getNumStreams() returns expected number of connections per protocol":
|
||||||
|
# Create 2 nodes
|
||||||
|
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||||
|
|
||||||
|
# Start them with relay + filter
|
||||||
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||||
|
|
||||||
|
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
require:
|
||||||
|
# multiple streams are multiplexed over a single connection.
|
||||||
|
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
|
||||||
|
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
|
||||||
|
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
|
||||||
|
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
|
||||||
|
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
|
||||||
|
|
||||||
|
check:
|
||||||
|
nodes[0].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1
|
||||||
|
nodes[0].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1
|
||||||
|
nodes[0].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 0
|
||||||
|
nodes[0].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 4
|
||||||
|
|
||||||
|
nodes[1].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1
|
||||||
|
nodes[1].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1
|
||||||
|
nodes[1].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 4
|
||||||
|
nodes[1].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 0
|
||||||
|
|
||||||
test "selectPeer() returns the correct peer":
|
test "selectPeer() returns the correct peer":
|
||||||
# Valid peer id missing the last digit
|
# Valid peer id missing the last digit
|
||||||
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
|
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import std/sequtils
|
||||||
|
|
||||||
|
proc flatten*[T](a: seq[seq[T]]): seq[T] =
|
||||||
|
var aFlat = newSeq[T](0)
|
||||||
|
for subseq in a:
|
||||||
|
aFlat &= subseq
|
||||||
|
return aFlat
|
|
@ -9,10 +9,12 @@ import
|
||||||
chronos,
|
chronos,
|
||||||
chronicles,
|
chronicles,
|
||||||
metrics,
|
metrics,
|
||||||
libp2p/multistream
|
libp2p/multistream,
|
||||||
|
libp2p/muxers/muxer
|
||||||
import
|
import
|
||||||
../../protocol/waku_relay,
|
../../protocol/waku_relay,
|
||||||
../../utils/peers,
|
../../utils/peers,
|
||||||
|
../../utils/heartbeat,
|
||||||
./peer_store/peer_storage,
|
./peer_store/peer_storage,
|
||||||
./waku_peer_store
|
./waku_peer_store
|
||||||
|
|
||||||
|
@ -22,7 +24,8 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
|
||||||
# TODO: Populate from PeerStore.Source when ready
|
# TODO: Populate from PeerStore.Source when ready
|
||||||
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
|
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
|
||||||
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
||||||
declarePublicGauge waku_connected_peers, "Number of connected peers per direction", ["direction"]
|
declarePublicGauge waku_connected_peers, "Number of physical connections per direction and protocol", labels = ["direction", "protocol"]
|
||||||
|
declarePublicGauge waku_streams_peers, "Number of streams per direction and protocol", labels = ["direction", "protocol"]
|
||||||
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
|
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
|
||||||
declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"]
|
declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"]
|
||||||
|
|
||||||
|
@ -51,6 +54,9 @@ const
|
||||||
# How often the peer store is pruned
|
# How often the peer store is pruned
|
||||||
PrunePeerStoreInterval = chronos.minutes(5)
|
PrunePeerStoreInterval = chronos.minutes(5)
|
||||||
|
|
||||||
|
# How often the peer store is updated with metrics
|
||||||
|
UpdateMetricsInterval = chronos.seconds(15)
|
||||||
|
|
||||||
type
|
type
|
||||||
PeerManager* = ref object of RootObj
|
PeerManager* = ref object of RootObj
|
||||||
switch*: Switch
|
switch*: Switch
|
||||||
|
@ -252,7 +258,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||||
let direction = if event.initiator: Outbound else: Inbound
|
let direction = if event.initiator: Outbound else: Inbound
|
||||||
pm.peerStore[ConnectionBook][peerId] = Connected
|
pm.peerStore[ConnectionBook][peerId] = Connected
|
||||||
pm.peerStore[DirectionBook][peerId] = direction
|
pm.peerStore[DirectionBook][peerId] = direction
|
||||||
waku_connected_peers.inc(1, labelValues=[$direction])
|
|
||||||
|
|
||||||
if not pm.storage.isNil:
|
if not pm.storage.isNil:
|
||||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
|
||||||
|
@ -261,7 +266,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||||
elif event.kind == PeerEventKind.Left:
|
elif event.kind == PeerEventKind.Left:
|
||||||
pm.peerStore[DirectionBook][peerId] = UnknownDirection
|
pm.peerStore[DirectionBook][peerId] = UnknownDirection
|
||||||
pm.peerStore[ConnectionBook][peerId] = CanConnect
|
pm.peerStore[ConnectionBook][peerId] = CanConnect
|
||||||
waku_connected_peers.dec(1, labelValues=[$pm.peerStore[DirectionBook][peerId]])
|
|
||||||
|
|
||||||
if not pm.storage.isNil:
|
if not pm.storage.isNil:
|
||||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
||||||
|
@ -430,16 +434,35 @@ proc connectToNodes*(pm: PeerManager,
|
||||||
# later.
|
# later.
|
||||||
await sleepAsync(chronos.seconds(5))
|
await sleepAsync(chronos.seconds(5))
|
||||||
|
|
||||||
|
# Returns the amount of physical connections for a given direction
|
||||||
|
# containing at least one stream with the given protocol.
|
||||||
|
proc getNumConnections*(pm: PeerManager, dir: Direction, protocol: string): int =
|
||||||
|
var numConns = 0
|
||||||
|
for peerId, muxers in pm.switch.connManager.getConnections():
|
||||||
|
for peerConn in muxers:
|
||||||
|
let streams = peerConn.getStreams()
|
||||||
|
if peerConn.connection.transportDir == dir:
|
||||||
|
if streams.anyIt(it.protocol == protocol):
|
||||||
|
numConns += 1
|
||||||
|
return numConns
|
||||||
|
|
||||||
|
proc getNumStreams*(pm: PeerManager, dir: Direction, protocol: string): int =
|
||||||
|
var numConns = 0
|
||||||
|
for peerId, muxers in pm.switch.connManager.getConnections():
|
||||||
|
for peerConn in muxers:
|
||||||
|
for stream in peerConn.getStreams():
|
||||||
|
if stream.protocol == protocol and stream.dir == dir:
|
||||||
|
numConns += 1
|
||||||
|
return numConns
|
||||||
|
|
||||||
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||||
let maxConnections = pm.switch.connManager.inSema.size
|
let maxConnections = pm.switch.connManager.inSema.size
|
||||||
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
|
let inRelayPeers = pm.getNumConnections(Direction.In, WakuRelayCodec)
|
||||||
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
|
let outRelayPeers = pm.getNumConnections(Direction.Out, WakuRelayCodec)
|
||||||
let numConPeers = numInPeers + numOutPeers
|
let totalRelayPeers = inRelayPeers + outRelayPeers
|
||||||
|
|
||||||
# TODO: Enforce a given in/out peers ratio
|
|
||||||
|
|
||||||
# Leave some room for service peers
|
# Leave some room for service peers
|
||||||
if numConPeers >= (maxConnections - 5):
|
if totalRelayPeers >= (maxConnections - 5):
|
||||||
return
|
return
|
||||||
|
|
||||||
# TODO: Track only relay connections (nwaku/issues/1566)
|
# TODO: Track only relay connections (nwaku/issues/1566)
|
||||||
|
@ -447,10 +470,12 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
|
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
|
||||||
pm.initialBackoffInSec,
|
pm.initialBackoffInSec,
|
||||||
pm.backoffFactor))
|
pm.backoffFactor))
|
||||||
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
|
let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials)
|
||||||
|
|
||||||
info "Relay peer connections",
|
info "Relay peer connections",
|
||||||
connectedPeers = numConPeers,
|
inRelayConns = inRelayPeers,
|
||||||
|
outRelayConns = outRelayPeers,
|
||||||
|
totalRelayConns = totalRelayPeers,
|
||||||
targetConnectedPeers = maxConnections,
|
targetConnectedPeers = maxConnections,
|
||||||
notConnectedPeers = notConnectedPeers.len,
|
notConnectedPeers = notConnectedPeers.len,
|
||||||
outsideBackoffPeers = outsideBackoffPeers.len
|
outsideBackoffPeers = outsideBackoffPeers.len
|
||||||
|
@ -532,8 +557,18 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
|
||||||
await pm.connectToRelayPeers()
|
await pm.connectToRelayPeers()
|
||||||
await sleepAsync(ConnectivityLoopInterval)
|
await sleepAsync(ConnectivityLoopInterval)
|
||||||
|
|
||||||
|
proc updateMetrics(pm: PeerManager) {.async.} =
|
||||||
|
heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
|
||||||
|
for dir in @[Direction.In, Direction.Out]:
|
||||||
|
for proto in pm.peerStore.getWakuProtos():
|
||||||
|
let protoDirConns = pm.getNumConnections(dir, proto)
|
||||||
|
let protoDirStreams = pm.getNumStreams(dir, proto)
|
||||||
|
waku_connected_peers.set(protoDirConns.float64, labelValues = [$dir, proto])
|
||||||
|
waku_streams_peers.set(protoDirStreams.float64, labelValues = [$dir, proto])
|
||||||
|
|
||||||
proc start*(pm: PeerManager) =
|
proc start*(pm: PeerManager) =
|
||||||
pm.started = true
|
pm.started = true
|
||||||
|
asyncSpawn pm.updateMetrics()
|
||||||
asyncSpawn pm.relayConnectivityLoop()
|
asyncSpawn pm.relayConnectivityLoop()
|
||||||
asyncSpawn pm.prunePeerStoreLoop()
|
asyncSpawn pm.prunePeerStoreLoop()
|
||||||
|
|
||||||
|
|
|
@ -4,14 +4,15 @@ else:
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, sequtils, sets, options, times, math],
|
std/[tables, sequtils, sets, options, times, math, strutils],
|
||||||
chronos,
|
chronos,
|
||||||
eth/p2p/discoveryv5/enr,
|
eth/p2p/discoveryv5/enr,
|
||||||
libp2p/builders,
|
libp2p/builders,
|
||||||
libp2p/peerstore
|
libp2p/peerstore
|
||||||
|
|
||||||
import
|
import
|
||||||
../../utils/peers
|
../../utils/peers,
|
||||||
|
../../../common/utils/sequence
|
||||||
|
|
||||||
export peerstore, builders
|
export peerstore, builders
|
||||||
|
|
||||||
|
@ -90,6 +91,14 @@ proc get*(peerStore: PeerStore,
|
||||||
numberFailedConn: peerStore[NumberFailedConnBook][peerId]
|
numberFailedConn: peerStore[NumberFailedConnBook][peerId]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
proc getWakuProtos*(peerStore: PeerStore): seq[string] =
|
||||||
|
## Get the waku protocols of all the stored peers.
|
||||||
|
let wakuProtocols = toSeq(peerStore[ProtoBook].book.values())
|
||||||
|
.flatten()
|
||||||
|
.deduplicate()
|
||||||
|
.filterIt(it.startsWith("/vac/waku"))
|
||||||
|
return wakuProtocols
|
||||||
|
|
||||||
# TODO: Rename peers() to getPeersByProtocol()
|
# TODO: Rename peers() to getPeersByProtocol()
|
||||||
proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
||||||
## Get all the stored information of every peer.
|
## Get all the stored information of every peer.
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import sequtils
|
||||||
|
import chronos, chronicles
|
||||||
|
|
||||||
|
# Taken from: https://github.com/status-im/nim-libp2p/blob/master/libp2p/utils/heartbeat.nim
|
||||||
|
|
||||||
|
template heartbeat*(name: string, interval: Duration, body: untyped): untyped =
|
||||||
|
var nextHeartbeat = Moment.now()
|
||||||
|
while true:
|
||||||
|
body
|
||||||
|
|
||||||
|
nextHeartbeat += interval
|
||||||
|
let now = Moment.now()
|
||||||
|
if nextHeartbeat < now:
|
||||||
|
let
|
||||||
|
delay = now - nextHeartbeat
|
||||||
|
itv = interval
|
||||||
|
if delay > itv:
|
||||||
|
info "Missed multiple heartbeats", heartbeat = name,
|
||||||
|
delay = delay, hinterval = itv
|
||||||
|
else:
|
||||||
|
debug "Missed heartbeat", heartbeat = name,
|
||||||
|
delay = delay, hinterval = itv
|
||||||
|
nextHeartbeat = now + itv
|
||||||
|
await sleepAsync(nextHeartbeat - now)
|
Loading…
Reference in New Issue