Tune peering (#3348)
- Request metadata_v2 (altair) by default instead of the v1 - Change the metadata pinger to a 3 failure-then-kick, instead of being time based - Update kicker scorer to take into account topics which we're not subscribed to, to be sure that we will be able to publish correctly - Add some metrics to give "fanout" health (in the same spirit of mesh health)
This commit is contained in:
parent
f5de887df7
commit
bcd7b4598c
|
@ -102,7 +102,8 @@ type
|
||||||
lastReqTime*: Moment
|
lastReqTime*: Moment
|
||||||
connections*: int
|
connections*: int
|
||||||
enr*: Option[enr.Record]
|
enr*: Option[enr.Record]
|
||||||
metadata*: Option[phase0.MetaData]
|
metadata*: Option[altair.MetaData]
|
||||||
|
failedMetadataRequests: int
|
||||||
lastMetadataTime*: Moment
|
lastMetadataTime*: Moment
|
||||||
direction*: PeerType
|
direction*: PeerType
|
||||||
disconnectedFut: Future[void]
|
disconnectedFut: Future[void]
|
||||||
|
@ -213,6 +214,11 @@ func phase0metadata*(node: Eth2Node): phase0.MetaData =
|
||||||
seq_number: node.metadata.seq_number,
|
seq_number: node.metadata.seq_number,
|
||||||
attnets: node.metadata.attnets)
|
attnets: node.metadata.attnets)
|
||||||
|
|
||||||
|
func toAltairMetadata*(phase0: phase0.MetaData): altair.MetaData =
|
||||||
|
altair.MetaData(
|
||||||
|
seq_number: phase0.seq_number,
|
||||||
|
attnets: phase0.attnets)
|
||||||
|
|
||||||
const
|
const
|
||||||
clientId* = "Nimbus beacon node " & fullVersionStr
|
clientId* = "Nimbus beacon node " & fullVersionStr
|
||||||
nodeMetadataFilename = "node-metadata.json"
|
nodeMetadataFilename = "node-metadata.json"
|
||||||
|
@ -276,6 +282,15 @@ declareCounter nbc_failed_discoveries,
|
||||||
declareCounter nbc_cycling_kicked_peers,
|
declareCounter nbc_cycling_kicked_peers,
|
||||||
"Number of peers kicked for peer cycling"
|
"Number of peers kicked for peer cycling"
|
||||||
|
|
||||||
|
declareGauge nbc_gossipsub_low_fanout,
|
||||||
|
"numbers of topics with low fanout"
|
||||||
|
|
||||||
|
declareGauge nbc_gossipsub_good_fanout,
|
||||||
|
"numbers of topics with good fanout"
|
||||||
|
|
||||||
|
declareGauge nbc_gossipsub_healthy_fanout,
|
||||||
|
"numbers of topics with dHigh fanout"
|
||||||
|
|
||||||
const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]
|
const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]
|
||||||
|
|
||||||
declareHistogram nbc_resolve_time,
|
declareHistogram nbc_resolve_time,
|
||||||
|
@ -986,26 +1001,24 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =
|
||||||
scores[peer.peerId] = thisPeersScore
|
scores[peer.peerId] = thisPeersScore
|
||||||
|
|
||||||
# Split a 1000 points for each topic's peers
|
# Split a 1000 points for each topic's peers
|
||||||
|
# + 10 000 points for each subbed topic
|
||||||
# This gives priority to peers in topics with few peers
|
# This gives priority to peers in topics with few peers
|
||||||
# For instance, a topic with `dHigh` peers will give 80 points to each peer
|
# For instance, a topic with `dHigh` peers will give 80 points to each peer
|
||||||
# Whereas a topic with `dLow` peers will give 250 points to each peer
|
# Whereas a topic with `dLow` peers will give 250 points to each peer
|
||||||
for topic, _ in node.pubsub.topics:
|
for topic, _ in node.pubsub.gossipsub:
|
||||||
let
|
let
|
||||||
peerCount = node.pubsub.mesh.peers(topic)
|
peersInMesh = node.pubsub.mesh.peers(topic)
|
||||||
scorePerPeer = 1_000 div max(peerCount, 1)
|
peersSubbed = node.pubsub.gossipsub.peers(topic)
|
||||||
|
scorePerMeshPeer = 10_000 div max(peersInMesh, 1)
|
||||||
|
scorePerSubbedPeer = 1_000 div max(peersSubbed, 1)
|
||||||
|
|
||||||
if peerCount == 0: continue
|
for peer in node.pubsub.mesh.getOrDefault(topic):
|
||||||
|
|
||||||
for peer in node.pubsub.mesh[topic]:
|
|
||||||
if peer.peerId notin scores: continue
|
if peer.peerId notin scores: continue
|
||||||
|
scores[peer.peerId] = scores[peer.peerId] + scorePerSubbedPeer
|
||||||
|
|
||||||
# Divide by the number of connections
|
for peer in node.pubsub.gossipsub.getOrDefault(topic):
|
||||||
# A peer using multiple connections is wasteful
|
if peer.peerId notin scores: continue
|
||||||
let
|
scores[peer.peerId] = scores[peer.peerId] + scorePerMeshPeer
|
||||||
connCount = node.switch.connmanager.connCount(peer.peerId)
|
|
||||||
thisPeersScore = scorePerPeer div max(1, connCount)
|
|
||||||
|
|
||||||
scores[peer.peerId] = scores[peer.peerId] + thisPeersScore
|
|
||||||
|
|
||||||
proc sortPerScore(a, b: (PeerID, int)): int =
|
proc sortPerScore(a, b: (PeerID, int)): int =
|
||||||
system.cmp(a[1], b[1])
|
system.cmp(a[1], b[1])
|
||||||
|
@ -1015,8 +1028,6 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =
|
||||||
var toKick = count
|
var toKick = count
|
||||||
|
|
||||||
for peerId in scores.keys:
|
for peerId in scores.keys:
|
||||||
#TODO kill a single connection instead of the whole peer
|
|
||||||
# Not possible with the current libp2p's conn management
|
|
||||||
debug "kicking peer", peerId, score=scores[peerId]
|
debug "kicking peer", peerId, score=scores[peerId]
|
||||||
await node.switch.disconnect(peerId)
|
await node.switch.disconnect(peerId)
|
||||||
dec toKick
|
dec toKick
|
||||||
|
@ -1031,6 +1042,10 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) =
|
||||||
# - Have 0 subscribed subnet below `dOut` outgoing peers
|
# - Have 0 subscribed subnet below `dOut` outgoing peers
|
||||||
# - Have 0 subnet with < `dHigh` peers from topic subscription
|
# - Have 0 subnet with < `dHigh` peers from topic subscription
|
||||||
|
|
||||||
|
nbc_gossipsub_low_fanout.set(0)
|
||||||
|
nbc_gossipsub_good_fanout.set(0)
|
||||||
|
nbc_gossipsub_healthy_fanout.set(0)
|
||||||
|
|
||||||
template findLowSubnets(topicNameGenerator: untyped,
|
template findLowSubnets(topicNameGenerator: untyped,
|
||||||
SubnetIdType: type,
|
SubnetIdType: type,
|
||||||
totalSubnets: static int): auto =
|
totalSubnets: static int): auto =
|
||||||
|
@ -1060,6 +1075,14 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) =
|
||||||
if outPeers < node.pubsub.parameters.dOut:
|
if outPeers < node.pubsub.parameters.dOut:
|
||||||
belowDOutSubnets.setBit(subNetId)
|
belowDOutSubnets.setBit(subNetId)
|
||||||
|
|
||||||
|
nbc_gossipsub_low_fanout.inc(int64(lowOutgoingSubnets.countOnes()))
|
||||||
|
nbc_gossipsub_good_fanout.inc(int64(
|
||||||
|
notHighOutgoingSubnets.countOnes() -
|
||||||
|
lowOutgoingSubnets.countOnes()
|
||||||
|
))
|
||||||
|
nbc_gossipsub_healthy_fanout.inc(int64(
|
||||||
|
totalSubnets - notHighOutgoingSubnets.countOnes()))
|
||||||
|
|
||||||
if lowOutgoingSubnets.countOnes() > 0:
|
if lowOutgoingSubnets.countOnes() > 0:
|
||||||
lowOutgoingSubnets
|
lowOutgoingSubnets
|
||||||
elif belowDSubnets.countOnes() > 0:
|
elif belowDSubnets.countOnes() > 0:
|
||||||
|
@ -1599,27 +1622,25 @@ proc updatePeerMetadata(node: Eth2Node, peerId: PeerID) {.async.} =
|
||||||
#getMetaData can fail with an exception
|
#getMetaData can fail with an exception
|
||||||
let newMetadata =
|
let newMetadata =
|
||||||
try:
|
try:
|
||||||
tryGet(await peer.getMetaData())
|
tryGet(await peer.getMetadata_v2())
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
let metadataV2 =
|
let metadataV1 =
|
||||||
try: tryGet(await peer.getMetadata_v2())
|
try: tryGet(await peer.getMetaData())
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
debug "Failed to retrieve metadata from peer!", peerId, msg=exc.msg
|
debug "Failed to retrieve metadata from peer!", peerId, msg=exc.msg
|
||||||
|
peer.failedMetadataRequests.inc()
|
||||||
return
|
return
|
||||||
|
|
||||||
phase0.MetaData(seq_number: metadataV2.seq_number,
|
toAltairMetadata(metadataV1)
|
||||||
attnets: metadataV2.attnets)
|
|
||||||
|
|
||||||
peer.metadata = some(newMetadata)
|
peer.metadata = some(newMetadata)
|
||||||
|
peer.failedMetadataRequests = 0
|
||||||
peer.lastMetadataTime = Moment.now()
|
peer.lastMetadataTime = Moment.now()
|
||||||
|
|
||||||
const
|
const
|
||||||
# For Phase0, metadata change every +27 hours
|
# For Phase0, metadata change every +27 hours
|
||||||
MetadataRequestFrequency = 30.minutes
|
MetadataRequestFrequency = 30.minutes
|
||||||
|
MetadataRequestMaxFailures = 3
|
||||||
# Metadata request has 10 seconds timeout, and the loop sleeps for 5 seconds
|
|
||||||
# 50 seconds = 3 attempts
|
|
||||||
MetadataRequestMaxFailureTime = 50.seconds
|
|
||||||
|
|
||||||
proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
|
proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
|
@ -1633,17 +1654,12 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
|
||||||
heartbeatStart_m - peer.lastMetadataTime > MetadataRequestFrequency:
|
heartbeatStart_m - peer.lastMetadataTime > MetadataRequestFrequency:
|
||||||
updateFutures.add(node.updatePeerMetadata(peer.peerId))
|
updateFutures.add(node.updatePeerMetadata(peer.peerId))
|
||||||
|
|
||||||
discard await allFinished(updateFutures)
|
await allFutures(updateFutures)
|
||||||
|
|
||||||
for peer in node.peers.values:
|
for peer in node.peers.values:
|
||||||
if peer.connectionState != Connected: continue
|
if peer.connectionState != Connected: continue
|
||||||
let lastMetadata =
|
|
||||||
if peer.metadata.isNone:
|
|
||||||
peer.lastMetadataTime
|
|
||||||
else:
|
|
||||||
peer.lastMetadataTime + MetadataRequestFrequency
|
|
||||||
|
|
||||||
if heartbeatStart_m - lastMetadata > MetadataRequestMaxFailureTime:
|
if peer.failedMetadataRequests > MetadataRequestMaxFailures:
|
||||||
debug "no metadata from peer, kicking it", peer
|
debug "no metadata from peer, kicking it", peer
|
||||||
asyncSpawn peer.disconnect(PeerScoreLow)
|
asyncSpawn peer.disconnect(PeerScoreLow)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue