Improve peer cycling (#2888)

* Improve peer cycling

* Change metadata timeout values

* fix bad commit

* Log score when kicking peer

* remove unhealthySubnetCount
This commit is contained in:
Tanguy 2021-09-28 09:58:03 +02:00 committed by GitHub
parent aec5cf2b1b
commit dc46559706
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -281,6 +281,9 @@ declareCounter nbc_successful_discoveries,
declareCounter nbc_failed_discoveries,
"Number of failed discoveries"
declareCounter nbc_cycling_kicked_peers,
"Number of peers kicked for peer cycling"
const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]
declareHistogram nbc_resolve_time,
@ -1001,9 +1004,10 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =
for peerId in scores.keys:
#TODO kill a single connection instead of the whole peer
# Not possible with the current libp2p's conn management
debug "kicking peer", peerId
debug "kicking peer", peerId, score=scores[peerId]
await node.switch.disconnect(peerId)
dec toKick
inc(nbc_cycling_kicked_peers)
if toKick <= 0: return
proc getLowSubnets(node: Eth2Node):
@ -1065,33 +1069,41 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
let discoveredNodes = await node.discovery.queryRandom(
node.forkId, wantedAttnets, wantedSyncnets)
var newPeers = 0
for discNode in discoveredNodes:
let res = discNode.toPeerAddr()
if res.isOk():
let newPeers = block:
var np = newSeq[PeerAddr]()
for discNode in discoveredNodes:
let res = discNode.toPeerAddr()
if res.isErr():
debug "Failed to decode discovery's node address",
node = discnode, errMsg = res.error
continue
let peerAddr = res.get()
# Check if peer present in SeenTable or PeerPool.
if node.checkPeer(peerAddr):
if peerAddr.peerId notin node.connTable:
# We adding to pending connections table here, but going
# to remove it only in `connectWorker`.
if node.checkPeer(peerAddr) and
peerAddr.peerId notin node.connTable:
np.add(peerAddr)
np
# If we are full, try to kick a peer (3 max)
for _ in 0..<3:
if node.peerPool.lenSpace({PeerType.Outgoing}) == 0 and newPeers == 0:
await node.trimConnections(1)
else: break
# We have to be careful to kick enough peers to make room for new ones
# (If we are here, we have an unhealthy mesh, so if we're full, we have bad peers)
# But no kick too many peers because with low max-peers, that can cause disruption
# Also keep in mind that a lot of dial fails, and that we can have incoming peers waiting
let
roomRequired = 1 + newPeers.len()
roomCurrent = node.peerPool.lenSpace({PeerType.Outgoing})
roomDelta = roomRequired - roomCurrent
if node.peerPool.lenSpace({PeerType.Outgoing}) == 0:
# No room anymore
break
maxPeersToKick = len(node.peerPool) div 5
peersToKick = min(roomDelta, maxPeersToKick)
node.connTable.incl(peerAddr.peerId)
await node.connQueue.addLast(peerAddr)
inc(newPeers)
else:
debug "Failed to decode discovery's node address",
node = discnode, errMsg = res.error
if peersToKick > 0 and newPeers.len() > 0:
await node.trimConnections(peersToKick)
for peerAddr in newPeers:
# We adding to pending connections table here, but going
# to remove it only in `connectWorker`.
node.connTable.incl(peerAddr.peerId)
await node.connQueue.addLast(peerAddr)
debug "Discovery tick", wanted_peers = node.wantedPeers,
space = node.peerPool.shortLogSpace(),
@ -1100,9 +1112,10 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool),
discovered_nodes = len(discoveredNodes),
new_peers = newPeers
kicked_peers = max(0, peersToKick),
new_peers = len(newPeers)
if newPeers == 0:
if len(newPeers) == 0:
let currentPeers = node.peerPool.lenCurrent()
if currentPeers <= node.wantedPeers shr 2: # 25%
warn "Peer count low, no new peers discovered",
@ -1557,6 +1570,14 @@ proc updatePeerMetadata(node: Eth2Node, peerId: PeerID) {.async.} =
peer.metadata = some(newMetadata)
peer.lastMetadataTime = Moment.now()
const
# For Phase0, metadata change every +27 hours
MetadataRequestFrequency = 30.minutes
# Metadata request has 10 seconds timeout, and the loop sleeps for 5 seconds
# 50 seconds = 3 attempts
MetadataRequestMaxFailureTime = 50.seconds
proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
while true:
let heartbeatStart_m = Moment.now()
@ -1566,7 +1587,7 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
if peer.connectionState != Connected: continue
if peer.metadata.isNone or
heartbeatStart_m - peer.lastMetadataTime > 30.minutes:
heartbeatStart_m - peer.lastMetadataTime > MetadataRequestFrequency:
updateFutures.add(node.updatePeerMetadata(peer.info.peerId))
discard await allFinished(updateFutures)
@ -1577,13 +1598,13 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
if peer.metadata.isNone:
peer.lastMetadataTime
else:
peer.lastMetadataTime + 30.minutes
peer.lastMetadataTime + MetadataRequestFrequency
if heartbeatStart_m - lastMetadata > 30.seconds:
debug "no metadata for 30 seconds, kicking peer", peer
if heartbeatStart_m - lastMetadata > MetadataRequestMaxFailureTime:
debug "no metadata from peer, kicking it", peer
asyncSpawn peer.disconnect(PeerScoreLow)
await sleepAsync(8.seconds)
await sleepAsync(5.seconds)
func asLibp2pKey*(key: keys.PublicKey): PublicKey =
PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))