refactor:optimize getting number of connections and streams (#1673)

This commit is contained in:
Vaclav Pavlin 2023-04-26 10:47:46 +02:00 committed by GitHub
parent dab5120a97
commit a4d22fadc3
2 changed files with 43 additions and 44 deletions

View File

@ -483,25 +483,17 @@ procSuite "Peer Manager":
# assert physical connections # assert physical connections
check: check:
nodes[0].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 0 nodes[0].peerManager.getNumConnections(WakuRelayCodec) == (0, 2)
nodes[0].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 2 nodes[0].peerManager.getNumConnections(WakuFilterCodec) == (0, 2)
nodes[0].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 0
nodes[0].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 2
nodes[1].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1 nodes[1].peerManager.getNumConnections(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1 nodes[1].peerManager.getNumConnections(WakuFilterCodec) == (1, 0)
nodes[1].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0
nodes[2].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 2 nodes[2].peerManager.getNumConnections(WakuRelayCodec) == (2, 1)
nodes[2].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1 nodes[2].peerManager.getNumConnections(WakuFilterCodec) == (1, 1)
nodes[2].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[2].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 1
nodes[3].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1 nodes[3].peerManager.getNumConnections(WakuRelayCodec) == (1, 0)
nodes[3].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 0 nodes[3].peerManager.getNumConnections(WakuFilterCodec) == (1, 0)
nodes[3].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[3].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0
asyncTest "getNumStreams() returns expected number of connections per protocol": asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes # Create 2 nodes
@ -523,15 +515,11 @@ procSuite "Peer Manager":
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
check: check:
nodes[0].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1 nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1 nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4)
nodes[0].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 0
nodes[0].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 4
nodes[1].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1 nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1 nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0)
nodes[1].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 4
nodes[1].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 0
test "selectPeer() returns the correct peer": test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit # Valid peer id missing the last digit

View File

@ -477,31 +477,41 @@ proc connectToNodes*(pm: PeerManager,
# later. # later.
await sleepAsync(chronos.seconds(5)) await sleepAsync(chronos.seconds(5))
# Returns the amount of physical connections for a given direction # Returns the amount of physical connections (in and out)
# containing at least one stream with the given protocol. # containing at least one stream with the given protocol.
proc getNumConnections*(pm: PeerManager, dir: Direction, protocol: string): int = proc getNumConnections*(pm: PeerManager, protocol: string): (int, int) =
var numConns = 0 var
numConnsIn = 0
numConnsOut = 0
for peerId, muxers in pm.switch.connManager.getConnections(): for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers: for peerConn in muxers:
let streams = peerConn.getStreams() let streams = peerConn.getStreams()
if peerConn.connection.transportDir == dir: if streams.anyIt(it.protocol == protocol):
if streams.anyIt(it.protocol == protocol): if peerConn.connection.transportDir == Direction.In:
numConns += 1 numConnsIn += 1
return numConns elif peerConn.connection.transportDir == Direction.Out:
numConnsOut += 1
proc getNumStreams*(pm: PeerManager, dir: Direction, protocol: string): int = return (numConnsIn, numConnsOut)
var numConns = 0
proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
var
numStreamsIn = 0
numStreamsOut = 0
for peerId, muxers in pm.switch.connManager.getConnections(): for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers: for peerConn in muxers:
for stream in peerConn.getStreams(): for stream in peerConn.getStreams():
if stream.protocol == protocol and stream.dir == dir: if stream.protocol == protocol:
numConns += 1 if stream.dir == Direction.In:
return numConns numStreamsIn += 1
elif stream.dir == Direction.Out:
numStreamsOut += 1
return (numStreamsIn, numStreamsOut)
proc connectToRelayPeers*(pm: PeerManager) {.async.} = proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let maxConnections = pm.switch.connManager.inSema.size let maxConnections = pm.switch.connManager.inSema.size
let inRelayPeers = pm.getNumConnections(Direction.In, WakuRelayCodec) let (inRelayPeers, outRelayPeers) = pm.getNumStreams(WakuRelayCodec)
let outRelayPeers = pm.getNumConnections(Direction.Out, WakuRelayCodec) let (inRelayStreams, outRelayStreams) = pm.getNumConnections(WakuRelayCodec)
let totalRelayPeers = inRelayPeers + outRelayPeers let totalRelayPeers = inRelayPeers + outRelayPeers
# Leave some room for service peers # Leave some room for service peers
@ -600,12 +610,13 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
proc updateMetrics(pm: PeerManager) {.async.} = proc updateMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval: heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
for dir in @[Direction.In, Direction.Out]: for proto in pm.peerStore.getWakuProtos():
for proto in pm.peerStore.getWakuProtos(): let (protoConnsIn, protoConnsOut) = pm.getNumConnections(proto)
let protoDirConns = pm.getNumConnections(dir, proto) let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto)
let protoDirStreams = pm.getNumStreams(dir, proto) waku_connected_peers.set(protoConnsIn.float64, labelValues = [$Direction.In, proto])
waku_connected_peers.set(protoDirConns.float64, labelValues = [$dir, proto]) waku_connected_peers.set(protoConnsOut.float64, labelValues = [$Direction.Out, proto])
waku_streams_peers.set(protoDirStreams.float64, labelValues = [$dir, proto]) waku_streams_peers.set(protoStreamsIn.float64, labelValues = [$Direction.In, proto])
waku_streams_peers.set(protoStreamsOut.float64, labelValues = [$Direction.Out, proto])
proc start*(pm: PeerManager) = proc start*(pm: PeerManager) =
pm.started = true pm.started = true