diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index f7e4b48e4..2e103e5c4 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -102,7 +102,8 @@ type lastReqTime*: Moment connections*: int enr*: Option[enr.Record] - metadata*: Option[phase0.MetaData] + metadata*: Option[altair.MetaData] + failedMetadataRequests: int lastMetadataTime*: Moment direction*: PeerType disconnectedFut: Future[void] @@ -213,6 +214,11 @@ func phase0metadata*(node: Eth2Node): phase0.MetaData = seq_number: node.metadata.seq_number, attnets: node.metadata.attnets) +func toAltairMetadata*(phase0: phase0.MetaData): altair.MetaData = + altair.MetaData( + seq_number: phase0.seq_number, + attnets: phase0.attnets) + const clientId* = "Nimbus beacon node " & fullVersionStr nodeMetadataFilename = "node-metadata.json" @@ -276,6 +282,15 @@ declareCounter nbc_failed_discoveries, declareCounter nbc_cycling_kicked_peers, "Number of peers kicked for peer cycling" +declareGauge nbc_gossipsub_low_fanout, + "numbers of topics with low fanout" + +declareGauge nbc_gossipsub_good_fanout, + "numbers of topics with good fanout" + +declareGauge nbc_gossipsub_healthy_fanout, + "numbers of topics with dHigh fanout" + const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0] declareHistogram nbc_resolve_time, @@ -986,26 +1001,24 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} = scores[peer.peerId] = thisPeersScore # Split a 1000 points for each topic's peers + # + 10 000 points for each subbed topic # This gives priority to peers in topics with few peers # For instance, a topic with `dHigh` peers will give 80 points to each peer # Whereas a topic with `dLow` peers will give 250 points to each peer - for topic, _ in node.pubsub.topics: + for topic, _ in node.pubsub.gossipsub: let - peerCount = node.pubsub.mesh.peers(topic) - scorePerPeer = 1_000 div max(peerCount, 1) + peersInMesh = node.pubsub.mesh.peers(topic) + peersSubbed = node.pubsub.gossipsub.peers(topic) + scorePerMeshPeer = 10_000 div max(peersInMesh, 1) + scorePerSubbedPeer = 1_000 div max(peersSubbed, 1) - if peerCount == 0: continue - - for peer in node.pubsub.mesh[topic]: + for peer in node.pubsub.mesh.getOrDefault(topic): if peer.peerId notin scores: continue + scores[peer.peerId] = scores[peer.peerId] + scorePerSubbedPeer - # Divide by the number of connections - # A peer using multiple connections is wasteful - let - connCount = node.switch.connmanager.connCount(peer.peerId) - thisPeersScore = scorePerPeer div max(1, connCount) - - scores[peer.peerId] = scores[peer.peerId] + thisPeersScore + for peer in node.pubsub.gossipsub.getOrDefault(topic): + if peer.peerId notin scores: continue + scores[peer.peerId] = scores[peer.peerId] + scorePerMeshPeer proc sortPerScore(a, b: (PeerID, int)): int = system.cmp(a[1], b[1]) @@ -1015,8 +1028,6 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} = var toKick = count for peerId in scores.keys: - #TODO kill a single connection instead of the whole peer - # Not possible with the current libp2p's conn management debug "kicking peer", peerId, score=scores[peerId] await node.switch.disconnect(peerId) dec toKick @@ -1031,6 +1042,10 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) = # - Have 0 subscribed subnet below `dOut` outgoing peers # - Have 0 subnet with < `dHigh` peers from topic subscription + nbc_gossipsub_low_fanout.set(0) + nbc_gossipsub_good_fanout.set(0) + nbc_gossipsub_healthy_fanout.set(0) + template findLowSubnets(topicNameGenerator: untyped, SubnetIdType: type, totalSubnets: static int): auto = @@ -1060,6 +1075,14 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) = if outPeers < node.pubsub.parameters.dOut: belowDOutSubnets.setBit(subNetId) + nbc_gossipsub_low_fanout.inc(int64(lowOutgoingSubnets.countOnes())) + nbc_gossipsub_good_fanout.inc(int64( + notHighOutgoingSubnets.countOnes() - + lowOutgoingSubnets.countOnes() + )) + nbc_gossipsub_healthy_fanout.inc(int64( + totalSubnets - notHighOutgoingSubnets.countOnes())) + if lowOutgoingSubnets.countOnes() > 0: lowOutgoingSubnets elif belowDSubnets.countOnes() > 0: @@ -1599,27 +1622,25 @@ proc updatePeerMetadata(node: Eth2Node, peerId: PeerID) {.async.} = #getMetaData can fail with an exception let newMetadata = try: - tryGet(await peer.getMetaData()) + tryGet(await peer.getMetadata_v2()) except CatchableError: - let metadataV2 = - try: tryGet(await peer.getMetadata_v2()) + let metadataV1 = + try: tryGet(await peer.getMetaData()) except CatchableError as exc: debug "Failed to retrieve metadata from peer!", peerId, msg=exc.msg + peer.failedMetadataRequests.inc() return - phase0.MetaData(seq_number: metadataV2.seq_number, - attnets: metadataV2.attnets) + toAltairMetadata(metadataV1) peer.metadata = some(newMetadata) + peer.failedMetadataRequests = 0 peer.lastMetadataTime = Moment.now() const # For Phase0, metadata change every +27 hours MetadataRequestFrequency = 30.minutes - - # Metadata request has 10 seconds timeout, and the loop sleeps for 5 seconds - # 50 seconds = 3 attempts - MetadataRequestMaxFailureTime = 50.seconds + MetadataRequestMaxFailures = 3 proc peerPingerHeartbeat(node: Eth2Node) {.async.} = while true: @@ -1633,17 +1654,12 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} = heartbeatStart_m - peer.lastMetadataTime > MetadataRequestFrequency: updateFutures.add(node.updatePeerMetadata(peer.peerId)) - discard await allFinished(updateFutures) + await allFutures(updateFutures) for peer in node.peers.values: if peer.connectionState != Connected: continue - let lastMetadata = - if peer.metadata.isNone: - peer.lastMetadataTime - else: - peer.lastMetadataTime + MetadataRequestFrequency - if heartbeatStart_m - lastMetadata > MetadataRequestMaxFailureTime: + if peer.failedMetadataRequests > MetadataRequestMaxFailures: debug "no metadata from peer, kicking it", peer asyncSpawn peer.disconnect(PeerScoreLow)