chore(networking): set and use target outbound connections + prune (#1739)

This commit is contained in:
Alvaro Revuelta 2023-05-18 09:40:14 +02:00 committed by GitHub
parent 611e9539a6
commit 87f694a8b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 31 deletions

View File

@ -456,7 +456,7 @@ procSuite "Peer Manager":
# but the relay peer is not # but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
asyncTest "getNumConnections() returns expected number of connections per protocol": asyncTest "connectedPeers() returns expected number of connections per protocol":
# Create 4 nodes # Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
@ -483,17 +483,29 @@ procSuite "Peer Manager":
# assert physical connections # assert physical connections
check: check:
nodes[0].peerManager.getNumConnections(WakuRelayCodec) == (0, 2) nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.getNumConnections(WakuFilterCodec) == (0, 2) nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2
nodes[1].peerManager.getNumConnections(WakuRelayCodec) == (1, 1) nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0
nodes[1].peerManager.getNumConnections(WakuFilterCodec) == (1, 0) nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2
nodes[2].peerManager.getNumConnections(WakuRelayCodec) == (2, 1) nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[2].peerManager.getNumConnections(WakuFilterCodec) == (1, 1) nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[3].peerManager.getNumConnections(WakuRelayCodec) == (1, 0) nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[3].peerManager.getNumConnections(WakuFilterCodec) == (1, 0) nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
asyncTest "getNumStreams() returns expected number of connections per protocol": asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes # Create 2 nodes

View File

@ -57,6 +57,9 @@ const
# How often the peer store is updated with metrics # How often the peer store is updated with metrics
UpdateMetricsInterval = chronos.seconds(15) UpdateMetricsInterval = chronos.seconds(15)
# How often to log peer manager metrics
LogSummaryInterval = chronos.seconds(60)
type type
PeerManager* = ref object of RootObj PeerManager* = ref object of RootObj
switch*: Switch switch*: Switch
@ -66,6 +69,7 @@ type
maxFailedAttempts*: int maxFailedAttempts*: int
storage: PeerStorage storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo] serviceSlots*: Table[string, RemotePeerInfo]
outPeersTarget*: int
started: bool started: bool
proc protocolMatcher*(codec: string): Matcher = proc protocolMatcher*(codec: string): Matcher =
@ -333,6 +337,7 @@ proc new*(T: type PeerManager,
storage: storage, storage: storage,
initialBackoffInSec: initialBackoffInSec, initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor, backoffFactor: backoffFactor,
outPeersTarget: max(maxConnections div 10, 10),
maxFailedAttempts: maxFailedAttempts) maxFailedAttempts: maxFailedAttempts)
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
@ -477,22 +482,22 @@ proc connectToNodes*(pm: PeerManager,
# later. # later.
await sleepAsync(chronos.seconds(5)) await sleepAsync(chronos.seconds(5))
# Returns the amount of physical connections (in and out) # Returns the peerIds of physical connections (in and out)
# containing at least one stream with the given protocol. # containing at least one stream with the given protocol.
proc getNumConnections*(pm: PeerManager, protocol: string): (int, int) = proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
var var inPeers: seq[PeerId]
numConnsIn = 0 var outPeers: seq[PeerId]
numConnsOut = 0
for peerId, muxers in pm.switch.connManager.getConnections(): for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers: for peerConn in muxers:
let streams = peerConn.getStreams() let streams = peerConn.getStreams()
if streams.anyIt(it.protocol == protocol): if streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In: if peerConn.connection.transportDir == Direction.In:
numConnsIn += 1 inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out: elif peerConn.connection.transportDir == Direction.Out:
numConnsOut += 1 outPeers.add(peerId)
return (numConnsIn, numConnsOut) return (inPeers, outPeers)
proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
var var
@ -508,28 +513,33 @@ proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
numStreamsOut += 1 numStreamsOut += 1
return (numStreamsIn, numStreamsOut) return (numStreamsIn, numStreamsOut)
proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let connsToPrune = min(amount, inRelayPeers.len)
for p in inRelayPeers[0..<connsToPrune]:
await pm.switch.disconnect(p)
proc connectToRelayPeers*(pm: PeerManager) {.async.} = proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size let maxConnections = pm.switch.connManager.inSema.size
let (inRelayPeers, outRelayPeers) = pm.getNumConnections(WakuRelayCodec) let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let totalRelayPeers = inRelayPeers + outRelayPeers let inPeersTarget = maxConnections - pm.outPeersTarget
if inRelayPeers.len > inPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len-inPeersTarget)
if outRelayPeers.len >= pm.outPeersTarget:
return
# Leave some room for service peers # Leave some room for service peers
if totalRelayPeers >= (maxConnections - 5): if totalRelayPeers >= (maxConnections - 5):
return return
# TODO: Track only relay connections (nwaku/issues/1566)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials) let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials)
info "Relay peer connections",
inRelayConns = inRelayPeers,
outRelayConns = outRelayPeers,
totalRelayConns = totalRelayPeers,
targetConnectedPeers = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect]) await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect])
proc prunePeerStore*(pm: PeerManager) = proc prunePeerStore*(pm: PeerManager) =
@ -607,13 +617,30 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToRelayPeers() await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval) await sleepAsync(ConnectivityLoopInterval)
proc logSummary*(pm: PeerManager) {.async.} =
heartbeat "Log peer manager summary", LogSummaryInterval:
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let inPeersTarget = maxConnections - pm.outPeersTarget
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
info "Relay peer connections",
inRelayConns = $inRelayPeers.len & "/" & $inPeersTarget,
outRelayConns = $outRelayPeers.len & "/" & $pm.outPeersTarget,
totalRelayConns = totalRelayPeers,
maxConnections = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
proc updateMetrics(pm: PeerManager) {.async.} = proc updateMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval: heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
for proto in pm.peerStore.getWakuProtos(): for proto in pm.peerStore.getWakuProtos():
let (protoConnsIn, protoConnsOut) = pm.getNumConnections(proto) let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto)
let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto) let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto)
waku_connected_peers.set(protoConnsIn.float64, labelValues = [$Direction.In, proto]) waku_connected_peers.set(protoConnsIn.len.float64, labelValues = [$Direction.In, proto])
waku_connected_peers.set(protoConnsOut.float64, labelValues = [$Direction.Out, proto]) waku_connected_peers.set(protoConnsOut.len.float64, labelValues = [$Direction.Out, proto])
waku_streams_peers.set(protoStreamsIn.float64, labelValues = [$Direction.In, proto]) waku_streams_peers.set(protoStreamsIn.float64, labelValues = [$Direction.In, proto])
waku_streams_peers.set(protoStreamsOut.float64, labelValues = [$Direction.Out, proto]) waku_streams_peers.set(protoStreamsOut.float64, labelValues = [$Direction.Out, proto])
@ -622,6 +649,7 @@ proc start*(pm: PeerManager) =
asyncSpawn pm.updateMetrics() asyncSpawn pm.updateMetrics()
asyncSpawn pm.relayConnectivityLoop() asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop() asyncSpawn pm.prunePeerStoreLoop()
asyncSpawn pm.logSummary()
proc stop*(pm: PeerManager) = proc stop*(pm: PeerManager) =
pm.started = false pm.started = false