diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 6268278e1..113a99eb9 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -230,10 +230,14 @@ type name: "udp-port" }: Port maxPeers* {. - desc: "The maximum number of peers to connect to" + desc: "The target number of peers to connect to" defaultValue: 160 # 5 (fanout) * 64 (subnets) / 2 (subs) for a heathy mesh name: "max-peers" }: int + hardMaxPeers* {. + desc: "The maximum number of peers to connect to. Defaults to maxPeers * 1.5" + name: "hard-max-peers" }: Option[int] + nat* {. desc: "Specify method to use for determining public address. " & "Must be one of: any, none, upnp, pmp, extip:" diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index f402790bb..b6030afac 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -23,7 +23,6 @@ import libp2p/protocols/pubsub/[ pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer], libp2p/stream/connection, - libp2p/utils/semaphore, eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], ".."/[version, conf, beacon_clock], @@ -65,6 +64,7 @@ type discovery*: Eth2DiscoveryProtocol discoveryEnabled*: bool wantedPeers*: int + hardMaxPeers*: int peerPool*: PeerPool[Peer, PeerID] protocolStates*: seq[RootRef] metadata*: altair.MetaData @@ -81,6 +81,7 @@ type peers*: Table[PeerID, Peer] validTopics: HashSet[string] peerPingerHeartbeatFut: Future[void] + peerTrimmerHeartbeatFut: Future[void] cfg: RuntimeConfig getBeaconTime: GetBeaconTimeFn @@ -887,7 +888,11 @@ proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} = debug "Connecting to discovered peer" var deadline = sleepAsync(node.connectTimeout) - var workfut = node.switch.connect(peerAddr.peerId, peerAddr.addrs) + var workfut = node.switch.connect( + peerAddr.peerId, + peerAddr.addrs, + forceDial = true + ) try: # `or` operation will only raise exception of `workfut`, because `deadline` @@ -916,7 +921,8 @@ proc connectWorker(node: Eth2Node, index: int) {.async.} = # Previous worker dial might have hit the maximum peers. # TODO: could clear the whole connTable and connQueue here also, best # would be to have this event based coming from peer pool or libp2p. - if node.switch.connManager.outSema.count > 0: + + if node.peerPool.len < node.hardMaxPeers: await node.dialPeer(remotePeerAddr, index) # Peer was added to `connTable` before adding it to `connQueue`, so we # excluding peer here after processing. @@ -947,7 +953,8 @@ proc queryRandom*( d: Eth2DiscoveryProtocol, forkId: ENRForkID, wantedAttnets: AttnetBits, - wantedSyncnets: SyncnetBits): Future[seq[Node]] {.async.} = + wantedSyncnets: SyncnetBits, + minScore: int): Future[seq[Node]] {.async.} = ## Perform a discovery query for a random target ## (forkId) and matching at least one of the attestation subnets. @@ -999,25 +1006,31 @@ proc queryRandom*( if wantedSyncnets[i] and syncnetsNode[i]: score += 10 # connecting to the right syncnet is urgent - if score > 0: + if score >= minScore: filtered.add((score, n)) d.rng[].shuffle(filtered) return filtered.sortedByIt(-it[0]).mapIt(it[1]) -proc trimConnections(node: Eth2Node, count: int) {.async.} = +proc trimConnections(node: Eth2Node, count: int) = # Kill `count` peers, scoring them to remove the least useful ones var scores = initOrderedTable[PeerID, int]() # Take into account the stabilitySubnets # During sync, only this will be used to score peers + # since gossipsub is not running yet # # A peer subscribed to all stabilitySubnets will # have 640 points + var peersInGracePeriod = 0 for peer in node.peers.values: if peer.connectionState != Connected: continue - if peer.metadata.isNone: continue + + # Metadata pinger is used as grace period + if peer.metadata.isNone: + peersInGracePeriod.inc() + continue let stabilitySubnets = peer.metadata.get().attnets @@ -1026,25 +1039,53 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} = scores[peer.peerId] = thisPeersScore + + # Safegard: if we have too many peers in the grace + # period, don't kick anyone. Otherwise, they will be + # preferred over long-standing peers + if peersInGracePeriod > scores.len div 2: + return + # Split a 1000 points for each topic's peers - # + 10 000 points for each subbed topic + # + 5 000 points for each subbed topic # This gives priority to peers in topics with few peers # For instance, a topic with `dHigh` peers will give 80 points to each peer # Whereas a topic with `dLow` peers will give 250 points to each peer + # + # Then, use the average of all topics per peers, to avoid giving too much + # point to big peers + + var gossipScores = initTable[PeerID, tuple[sum: int, count: int]]() for topic, _ in node.pubsub.gossipsub: let peersInMesh = node.pubsub.mesh.peers(topic) peersSubbed = node.pubsub.gossipsub.peers(topic) - scorePerMeshPeer = 10_000 div max(peersInMesh, 1) + scorePerMeshPeer = 5_000 div max(peersInMesh, 1) scorePerSubbedPeer = 1_000 div max(peersSubbed, 1) - for peer in node.pubsub.mesh.getOrDefault(topic): - if peer.peerId notin scores: continue - scores[peer.peerId] = scores[peer.peerId] + scorePerSubbedPeer - for peer in node.pubsub.gossipsub.getOrDefault(topic): if peer.peerId notin scores: continue - scores[peer.peerId] = scores[peer.peerId] + scorePerMeshPeer + let currentVal = gossipScores.getOrDefault(peer.peerId) + gossipScores[peer.peerId] = ( + currentVal.sum + scorePerSubbedPeer, + currentVal.count + 1 + ) + + # Avoid global topics (>75% of peers), which would greatly reduce + # the average score for small peers + if peersSubbed > scores.len div 4 * 3: continue + + for peer in node.pubsub.mesh.getOrDefault(topic): + if peer.peerId notin scores: continue + let currentVal = gossipScores.getOrDefault(peer.peerId) + gossipScores[peer.peerId] = ( + currentVal.sum + scorePerMeshPeer, + currentVal.count + 1 + ) + + for peerId, gScore in gossipScores.pairs: + scores[peerId] = + scores.getOrDefault(peerId) + (gScore.sum div gScore.count) proc sortPerScore(a, b: (PeerID, int)): int = system.cmp(a[1], b[1]) @@ -1055,7 +1096,7 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} = for peerId in scores.keys: debug "kicking peer", peerId, score=scores[peerId] - await node.switch.disconnect(peerId) + asyncSpawn node.getPeer(peerId).disconnect(PeerScoreLow) dec toKick inc(nbc_cycling_kicked_peers) if toKick <= 0: return @@ -1137,10 +1178,20 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} = (wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch) wantedAttnetsCount = wantedAttnets.countOnes() wantedSyncnetsCount = wantedSyncnets.countOnes() + outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing}) + targetOutgoingPeers = max(node.wantedPeers div 10, 3) - if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0: - let discoveredNodes = await node.discovery.queryRandom( - node.discoveryForkId, wantedAttnets, wantedSyncnets) + if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or + outgoingPeers < targetOutgoingPeers: + + let + minScore = + if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0: + 1 + else: + 0 + discoveredNodes = await node.discovery.queryRandom( + node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore) let newPeers = block: var np = newSeq[PeerAddr]() @@ -1157,20 +1208,12 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} = np.add(peerAddr) np - # We have to be careful to kick enough peers to make room for new ones - # (If we are here, we have an unhealthy mesh, so if we're full, we have bad peers) - # But no kick too many peers because with low max-peers, that can cause disruption - # Also keep in mind that a lot of dial fails, and that we can have incoming peers waiting let - roomRequired = 1 + newPeers.len() - roomCurrent = node.peerPool.lenSpace({PeerType.Outgoing}) - roomDelta = roomRequired - roomCurrent + roomCurrent = node.hardMaxPeers - len(node.peerPool) + peersToKick = min(newPeers.len - roomCurrent, node.hardMaxPeers div 5) - maxPeersToKick = len(node.peerPool) div 5 - peersToKick = min(roomDelta, maxPeersToKick) - - if peersToKick > 0 and newPeers.len() > 0: - await node.trimConnections(peersToKick) + if peersToKick > 0 and newPeers.len > 0: + node.trimConnections(peersToKick) for peerAddr in newPeers: # We adding to pending connections table here, but going @@ -1178,18 +1221,14 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} = node.connTable.incl(peerAddr.peerId) await node.connQueue.addLast(peerAddr) - debug "Discovery tick", wanted_peers = node.wantedPeers, - space = node.peerPool.shortLogSpace(), - acquired = node.peerPool.shortLogAcquired(), - available = node.peerPool.shortLogAvailable(), - current = node.peerPool.shortLogCurrent(), - length = len(node.peerPool), + debug "Discovery tick", + wanted_peers = node.wantedPeers, + current_peers = len(node.peerPool), discovered_nodes = len(discoveredNodes), - kicked_peers = max(0, peersToKick), new_peers = len(newPeers) if len(newPeers) == 0: - let currentPeers = node.peerPool.lenCurrent() + let currentPeers = len(node.peerPool) if currentPeers <= node.wantedPeers shr 2: # 25% warn "Peer count low, no new peers discovered", discovered_nodes = len(discoveredNodes), new_peers = newPeers, @@ -1369,8 +1408,9 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig, switch: switch, pubsub: pubsub, wantedPeers: config.maxPeers, + hardMaxPeers: config.hardMaxPeers.get(config.maxPeers * 3 div 2), #*1.5 cfg: runtimeCfg, - peerPool: newPeerPool[Peer, PeerID](maxPeers = config.maxPeers), + peerPool: newPeerPool[Peer, PeerID](), # Its important here to create AsyncQueue with limited size, otherwise # it could produce HIGH cpu usage. connQueue: newAsyncQueue[PeerAddr](ConcurrentConnections), @@ -1452,16 +1492,12 @@ proc startListening*(node: Eth2Node) {.async.} = await node.pubsub.start() proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.} +proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.} proc start*(node: Eth2Node) {.async.} = proc onPeerCountChanged() = - trace "Number of peers has been changed", - space = node.peerPool.shortLogSpace(), - acquired = node.peerPool.shortLogAcquired(), - available = node.peerPool.shortLogAvailable(), - current = node.peerPool.shortLogCurrent(), - length = len(node.peerPool) + trace "Number of peers has been changed", length = len(node.peerPool) nbc_peers.set int64(len(node.peerPool)) node.peerPool.setPeerCounter(onPeerCountChanged) @@ -1482,15 +1518,22 @@ proc start*(node: Eth2Node) {.async.} = if pa.isOk(): await node.connQueue.addLast(pa.get()) node.peerPingerHeartbeatFut = node.peerPingerHeartbeat() + node.peerTrimmerHeartbeatFut = node.peerTrimmerHeartbeat() proc stop*(node: Eth2Node) {.async.} = # Ignore errors in futures, since we're shutting down (but log them on the # TRACE level, if a timeout is reached). + var waitedFutures = + @[ + node.switch.stop(), + node.peerPingerHeartbeat.cancelAndWait(), + node.peerTrimmerHeartbeatFut.cancelAndWait(), + ] + + if node.discoveryEnabled: + waitedFutures &= node.discovery.closeWait() + let - waitedFutures = if node.discoveryEnabled: - @[node.discovery.closeWait(), node.switch.stop()] - else: - @[node.switch.stop()] timeout = 5.seconds completed = await withTimeout(allFutures(waitedFutures), timeout) if not completed: @@ -1691,6 +1734,24 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} = await sleepAsync(5.seconds) +proc peerTrimmerHeartbeat(node: Eth2Node) {.async.} = + while true: + # Peer trimmer + + # Only count Connected peers + # (to avoid counting Disconnecting ones) + var connectedPeers = 0 + for peer in node.peers.values: + if peer.connectionState == Connected: + inc connectedPeers + + let excessPeers = connectedPeers - node.wantedPeers + if excessPeers > 0: + # Let chronos take back control every kick + node.trimConnections(1) + + await sleepAsync(1.seconds div max(1, excessPeers)) + func asLibp2pKey*(key: keys.PublicKey): PublicKey = PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key)) diff --git a/tests/test_discovery.nim b/tests/test_discovery.nim index f5a5f228c..d3bb2636e 100644 --- a/tests/test_discovery.nim +++ b/tests/test_discovery.nim @@ -58,7 +58,7 @@ procSuite "Eth2 specific discovery tests": attnetsSelected.setBit(34) let discovered = await node1.queryRandom( - enrForkId, attnetsSelected, noSyncnetsPreference) + enrForkId, attnetsSelected, noSyncnetsPreference, 1) check discovered.len == 1 await node1.closeWait() @@ -96,7 +96,7 @@ procSuite "Eth2 specific discovery tests": attnetsSelected.setBit(42) let discovered = await node1.queryRandom( - enrForkId, attnetsSelected, noSyncnetsPreference) + enrForkId, attnetsSelected, noSyncnetsPreference, 1) check discovered.len == 1 await node1.closeWait() @@ -124,7 +124,7 @@ procSuite "Eth2 specific discovery tests": block: let discovered = await node1.queryRandom( - enrForkId, attnetsSelected, noSyncnetsPreference) + enrForkId, attnetsSelected, noSyncnetsPreference, 1) check discovered.len == 0 block: @@ -139,7 +139,7 @@ procSuite "Eth2 specific discovery tests": discard node1.addNode(nodes[][0]) let discovered = await node1.queryRandom( - enrForkId, attnetsSelected, noSyncnetsPreference) + enrForkId, attnetsSelected, noSyncnetsPreference, 1) check discovered.len == 1 await node1.closeWait()