Peer dialing/kicking system overhaul (#3346)

* Force dial + excess peer trimmer
* Ensure we always have outgoing peers
* Add configurable hard-max-peers
This commit is contained in:
Tanguy 2022-03-11 11:51:53 +01:00 committed by GitHub
parent 9601735522
commit f589bf2119
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 54 deletions

View File

@ -230,10 +230,14 @@ type
name: "udp-port" }: Port
maxPeers* {.
desc: "The maximum number of peers to connect to"
desc: "The target number of peers to connect to"
defaultValue: 160 # 5 (fanout) * 64 (subnets) / 2 (subs) for a heathy mesh
name: "max-peers" }: int
hardMaxPeers* {.
desc: "The maximum number of peers to connect to. Defaults to maxPeers * 1.5"
name: "hard-max-peers" }: Option[int]
nat* {.
desc: "Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>"

View File

@ -23,7 +23,6 @@ import
libp2p/protocols/pubsub/[
pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer],
libp2p/stream/connection,
libp2p/utils/semaphore,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2],
".."/[version, conf, beacon_clock],
@ -65,6 +64,7 @@ type
discovery*: Eth2DiscoveryProtocol
discoveryEnabled*: bool
wantedPeers*: int
hardMaxPeers*: int
peerPool*: PeerPool[Peer, PeerID]
protocolStates*: seq[RootRef]
metadata*: altair.MetaData
@ -81,6 +81,7 @@ type
peers*: Table[PeerID, Peer]
validTopics: HashSet[string]
peerPingerHeartbeatFut: Future[void]
peerTrimmerHeartbeatFut: Future[void]
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn
@ -887,7 +888,11 @@ proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} =
debug "Connecting to discovered peer"
var deadline = sleepAsync(node.connectTimeout)
var workfut = node.switch.connect(peerAddr.peerId, peerAddr.addrs)
var workfut = node.switch.connect(
peerAddr.peerId,
peerAddr.addrs,
forceDial = true
)
try:
# `or` operation will only raise exception of `workfut`, because `deadline`
@ -916,7 +921,8 @@ proc connectWorker(node: Eth2Node, index: int) {.async.} =
# Previous worker dial might have hit the maximum peers.
# TODO: could clear the whole connTable and connQueue here also, best
# would be to have this event based coming from peer pool or libp2p.
if node.switch.connManager.outSema.count > 0:
if node.peerPool.len < node.hardMaxPeers:
await node.dialPeer(remotePeerAddr, index)
# Peer was added to `connTable` before adding it to `connQueue`, so we
# excluding peer here after processing.
@ -947,7 +953,8 @@ proc queryRandom*(
d: Eth2DiscoveryProtocol,
forkId: ENRForkID,
wantedAttnets: AttnetBits,
wantedSyncnets: SyncnetBits): Future[seq[Node]] {.async.} =
wantedSyncnets: SyncnetBits,
minScore: int): Future[seq[Node]] {.async.} =
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.
@ -999,25 +1006,31 @@ proc queryRandom*(
if wantedSyncnets[i] and syncnetsNode[i]:
score += 10 # connecting to the right syncnet is urgent
if score > 0:
if score >= minScore:
filtered.add((score, n))
d.rng[].shuffle(filtered)
return filtered.sortedByIt(-it[0]).mapIt(it[1])
proc trimConnections(node: Eth2Node, count: int) {.async.} =
proc trimConnections(node: Eth2Node, count: int) =
# Kill `count` peers, scoring them to remove the least useful ones
var scores = initOrderedTable[PeerID, int]()
# Take into account the stabilitySubnets
# During sync, only this will be used to score peers
# since gossipsub is not running yet
#
# A peer subscribed to all stabilitySubnets will
# have 640 points
var peersInGracePeriod = 0
for peer in node.peers.values:
if peer.connectionState != Connected: continue
if peer.metadata.isNone: continue
# Metadata pinger is used as grace period
if peer.metadata.isNone:
peersInGracePeriod.inc()
continue
let
stabilitySubnets = peer.metadata.get().attnets
@ -1026,25 +1039,53 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =
scores[peer.peerId] = thisPeersScore
# Safegard: if we have too many peers in the grace
# period, don't kick anyone. Otherwise, they will be
# preferred over long-standing peers
if peersInGracePeriod > scores.len div 2:
return
# Split a 1000 points for each topic's peers
# + 10 000 points for each subbed topic
# + 5 000 points for each subbed topic
# This gives priority to peers in topics with few peers
# For instance, a topic with `dHigh` peers will give 80 points to each peer
# Whereas a topic with `dLow` peers will give 250 points to each peer
#
# Then, use the average of all topics per peers, to avoid giving too much
# point to big peers
var gossipScores = initTable[PeerID, tuple[sum: int, count: int]]()
for topic, _ in node.pubsub.gossipsub:
let
peersInMesh = node.pubsub.mesh.peers(topic)
peersSubbed = node.pubsub.gossipsub.peers(topic)
scorePerMeshPeer = 10_000 div max(peersInMesh, 1)
scorePerMeshPeer = 5_000 div max(peersInMesh, 1)
scorePerSubbedPeer = 1_000 div max(peersSubbed, 1)
for peer in node.pubsub.mesh.getOrDefault(topic):
if peer.peerId notin scores: continue
scores[peer.peerId] = scores[peer.peerId] + scorePerSubbedPeer
for peer in node.pubsub.gossipsub.getOrDefault(topic):
if peer.peerId notin scores: continue
scores[peer.peerId] = scores[peer.peerId] + scorePerMeshPeer
let currentVal = gossipScores.getOrDefault(peer.peerId)
gossipScores[peer.peerId] = (
currentVal.sum + scorePerSubbedPeer,
currentVal.count + 1
)
# Avoid global topics (>75% of peers), which would greatly reduce
# the average score for small peers
if peersSubbed > scores.len div 4 * 3: continue
for peer in node.pubsub.mesh.getOrDefault(topic):
if peer.peerId notin scores: continue
let currentVal = gossipScores.getOrDefault(peer.peerId)
gossipScores[peer.peerId] = (
currentVal.sum + scorePerMeshPeer,
currentVal.count + 1
)
for peerId, gScore in gossipScores.pairs:
scores[peerId] =
scores.getOrDefault(peerId) + (gScore.sum div gScore.count)
proc sortPerScore(a, b: (PeerID, int)): int =
system.cmp(a[1], b[1])
@ -1055,7 +1096,7 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =
for peerId in scores.keys:
debug "kicking peer", peerId, score=scores[peerId]
await node.switch.disconnect(peerId)
asyncSpawn node.getPeer(peerId).disconnect(PeerScoreLow)
dec toKick
inc(nbc_cycling_kicked_peers)
if toKick <= 0: return
@ -1137,10 +1178,20 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
(wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch)
wantedAttnetsCount = wantedAttnets.countOnes()
wantedSyncnetsCount = wantedSyncnets.countOnes()
outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing})
targetOutgoingPeers = max(node.wantedPeers div 10, 3)
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
let discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets)
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or
outgoingPeers < targetOutgoingPeers:
let
minScore =
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
1
else:
0
discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore)
let newPeers = block:
var np = newSeq[PeerAddr]()
@ -1157,20 +1208,12 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
np.add(peerAddr)
np
# We have to be careful to kick enough peers to make room for new ones
# (If we are here, we have an unhealthy mesh, so if we're full, we have bad peers)
# But no kick too many peers because with low max-peers, that can cause disruption
# Also keep in mind that a lot of dial fails, and that we can have incoming peers waiting
let
roomRequired = 1 + newPeers.len()
roomCurrent = node.peerPool.lenSpace({PeerType.Outgoing})
roomDelta = roomRequired - roomCurrent
roomCurrent = node.hardMaxPeers - len(node.peerPool)
peersToKick = min(newPeers.len - roomCurrent, node.hardMaxPeers div 5)
maxPeersToKick = len(node.peerPool) div 5
peersToKick = min(roomDelta, maxPeersToKick)
if peersToKick > 0 and newPeers.len() > 0:
await node.trimConnections(peersToKick)
if peersToKick > 0 and newPeers.len > 0:
node.trimConnections(peersToKick)
for peerAddr in newPeers:
# We adding to pending connections table here, but going
@ -1178,18 +1221,14 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
node.connTable.incl(peerAddr.peerId)
await node.connQueue.addLast(peerAddr)
debug "Discovery tick", wanted_peers = node.wantedPeers,
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool),
debug "Discovery tick",
wanted_peers = node.wantedPeers,
current_peers = len(node.peerPool),
discovered_nodes = len(discoveredNodes),
kicked_peers = max(0, peersToKick),
new_peers = len(newPeers)
if len(newPeers) == 0:
let currentPeers = node.peerPool.lenCurrent()
let currentPeers = len(node.peerPool)
if currentPeers <= node.wantedPeers shr 2: # 25%
warn "Peer count low, no new peers discovered",
discovered_nodes = len(discoveredNodes), new_peers = newPeers,
@ -1369,8 +1408,9 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
switch: switch,
pubsub: pubsub,
wantedPeers: config.maxPeers,
hardMaxPeers: config.hardMaxPeers.get(config.maxPeers * 3 div 2), #*1.5
cfg: runtimeCfg,
peerPool: newPeerPool[Peer, PeerID](maxPeers = config.maxPeers),
peerPool: newPeerPool[Peer, PeerID](),
# Its important here to create AsyncQueue with limited size, otherwise
# it could produce HIGH cpu usage.
connQueue: newAsyncQueue[PeerAddr](ConcurrentConnections),
@ -1452,16 +1492,12 @@ proc startListening*(node: Eth2Node) {.async.} =
await node.pubsub.start()
proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}
proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}
proc start*(node: Eth2Node) {.async.} =
proc onPeerCountChanged() =
trace "Number of peers has been changed",
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool)
trace "Number of peers has been changed", length = len(node.peerPool)
nbc_peers.set int64(len(node.peerPool))
node.peerPool.setPeerCounter(onPeerCountChanged)
@ -1482,15 +1518,22 @@ proc start*(node: Eth2Node) {.async.} =
if pa.isOk():
await node.connQueue.addLast(pa.get())
node.peerPingerHeartbeatFut = node.peerPingerHeartbeat()
node.peerTrimmerHeartbeatFut = node.peerTrimmerHeartbeat()
proc stop*(node: Eth2Node) {.async.} =
# Ignore errors in futures, since we're shutting down (but log them on the
# TRACE level, if a timeout is reached).
var waitedFutures =
@[
node.switch.stop(),
node.peerPingerHeartbeat.cancelAndWait(),
node.peerTrimmerHeartbeatFut.cancelAndWait(),
]
if node.discoveryEnabled:
waitedFutures &= node.discovery.closeWait()
let
waitedFutures = if node.discoveryEnabled:
@[node.discovery.closeWait(), node.switch.stop()]
else:
@[node.switch.stop()]
timeout = 5.seconds
completed = await withTimeout(allFutures(waitedFutures), timeout)
if not completed:
@ -1691,6 +1734,24 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
await sleepAsync(5.seconds)
proc peerTrimmerHeartbeat(node: Eth2Node) {.async.} =
while true:
# Peer trimmer
# Only count Connected peers
# (to avoid counting Disconnecting ones)
var connectedPeers = 0
for peer in node.peers.values:
if peer.connectionState == Connected:
inc connectedPeers
let excessPeers = connectedPeers - node.wantedPeers
if excessPeers > 0:
# Let chronos take back control every kick
node.trimConnections(1)
await sleepAsync(1.seconds div max(1, excessPeers))
func asLibp2pKey*(key: keys.PublicKey): PublicKey =
PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))

View File

@ -58,7 +58,7 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(34)
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1
await node1.closeWait()
@ -96,7 +96,7 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(42)
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1
await node1.closeWait()
@ -124,7 +124,7 @@ procSuite "Eth2 specific discovery tests":
block:
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 0
block:
@ -139,7 +139,7 @@ procSuite "Eth2 specific discovery tests":
discard node1.addNode(nodes[][0])
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1
await node1.closeWait()