diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index e15efadb5..3a5b9b31e 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -13,7 +13,7 @@ import std/options as stdOptions, # Status libs - stew/[leb128, base58, endians2, results, byteutils, io2], bearssl, + stew/[leb128, base58, endians2, results, byteutils, io2, bitops2], bearssl, stew/shims/net as stewNet, stew/shims/[macros, tables], faststreams/[inputs, outputs, buffers], snappy, snappy/framing, @@ -26,7 +26,7 @@ import libp2p/muxers/muxer, libp2p/muxers/mplex/mplex, libp2p/transports/[transport, tcptransport], libp2p/protocols/secure/[secure, noise], - libp2p/protocols/pubsub/[pubsub, gossipsub, rpc/message, rpc/messages], + libp2p/protocols/pubsub/[pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer], libp2p/transports/tcptransport, libp2p/stream/connection, libp2p/utils/semaphore, @@ -85,6 +85,7 @@ type rng*: ref BrHmacDrbgContext peers*: Table[PeerID, Peer] validTopics: HashSet[string] + peerPingerHeartbeatFut: Future[void] cfg: RuntimeConfig getBeaconTime: GetBeaconTimeFn @@ -106,6 +107,8 @@ type lastReqTime*: Moment connections*: int enr*: Option[enr.Record] + metadata*: Option[phase0.MetaData] + lastMetadataTime*: Moment direction*: PeerType disconnectedFut: Future[void] @@ -591,7 +594,10 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, deadline): return neterr StreamOpenTimeout try: # Send the request - await stream.writeChunk(none ResponseCode, requestBytes) + # Some clients don't want a length sent for empty requests + # So don't send anything on empty requests + if requestBytes.len > 0: + await stream.writeChunk(none ResponseCode, requestBytes) # Half-close the stream to mark the end of the request - if this is not # done, the other peer might never send us the response. await stream.close() @@ -901,59 +907,165 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] = proc queryRandom*(d: Eth2DiscoveryProtocol, forkId: ENRForkID, attnets: BitArray[ATTESTATION_SUBNET_COUNT]): - Future[seq[PeerAddr]] {.async.} = - ## Perform a discovery query for a random target matching the eth2 field + Future[seq[Node]] {.async.} = + ## Perform a discovery query for a random target ## (forkId) and matching at least one of the attestation subnets. - let nodes = await d.queryRandom() - let sszForkId = SSZ.encode(forkId) - var filtered: seq[PeerAddr] + let eth2Field = SSZ.encode(forkId) + let nodes = await d.queryRandom((enrForkIdField, eth2Field)) + + var filtered: seq[(int, Node)] for n in nodes: - if n.record.contains((enrForkIdField, sszForkId)): - let res = n.record.tryGet(enrAttestationSubnetsField, seq[byte]) + let res = n.record.tryGet(enrAttestationSubnetsField, seq[byte]) - if res.isSome(): - let attnetsNode = - try: - SSZ.decode(res.get(), BitArray[ATTESTATION_SUBNET_COUNT]) - except SszError as e: - debug "Could not decode attestation subnet bitfield of peer", - peer = n.record.toURI(), exception = e.name, msg = e.msg - continue + if res.isSome(): + let attnetsNode = + try: + SSZ.decode(res.get(), BitArray[ATTESTATION_SUBNET_COUNT]) + except SszError as e: + debug "Could not decode attestation subnet bitfield of peer", + peer = n.record.toURI(), exception = e.name, msg = e.msg + continue - for i in 0.. 0: - # we have at least one subnet match - let peerAddr = n.toPeerAddr() - if peerAddr.isOk(): - filtered.add(peerAddr.get()) - break + var score: int = 0 + for i in 0.. 0: + filtered.add((score, n)) + + d.rng[].shuffle(filtered) + return filtered.sortedByIt(it[0]).mapIt(it[1]) + +proc trimConnections(node: Eth2Node, count: int) {.async.} = + # Kill `count` peers, scoring them to remove the least useful ones + + var scores = initOrderedTable[PeerID, int]() + + # Take into account the stabilitySubnets + # During sync, only this will be used to score peers + # + # A peer subscribed to all stabilitySubnets will + # have 640 points + for peer in node.peers.values: + if peer.connectionState != Connected: continue + if peer.metadata.isNone: continue + + let + stabilitySubnets = peer.metadata.get().attnets + stabilitySubnetsCount = stabilitySubnets.countOnes() + thisPeersScore = 10 * stabilitySubnetsCount + + scores[peer.info.peerId] = thisPeersScore + + # Split a 1000 points for each topic's peers + # This gives priority to peers in topics with few peers + # For instance, a topic with `dHigh` peers will give 80 points to each peer + # Whereas a topic with `dLow` peers will give 250 points to each peer + for topic, _ in node.pubsub.topics: + let + peerCount = node.pubsub.mesh.peers(topic) + scorePerPeer = 1_000 div max(peerCount, 1) + + if peerCount == 0: continue + + for peer in node.pubsub.mesh[topic]: + if peer.peerId notin scores: continue + + # Divide by the number of connections + # A peer using multiple connections is wasteful + let + connCount = node.switch.connmanager.connCount(peer.peerId) + thisPeersScore = scorePerPeer div max(1, connCount) + + scores[peer.peerId] = scores[peer.peerId] + thisPeersScore + + proc sortPerScore(a, b: (PeerID, int)): int = + system.cmp(a[1], b[1]) + + scores.sort(sortPerScore) + + var toKick = count + + for peerId in scores.keys: + #TODO kill a single connection instead of the whole peer + # Not possible with the current libp2p's conn management + debug "kicking peer", peerId + await node.switch.disconnect(peerId) + dec toKick + if toKick <= 0: return + +proc getLowAttnets(node: Eth2Node): BitArray[ATTESTATION_SUBNET_COUNT] = + # Returns the subnets required to have a healthy mesh + # The subnets are computed, to, in order: + # - Have 0 subscribed subnet below `dLow` + # - Have 0 subnet with < `d` peers from topic subscription + # - Have 0 subscribed subnet below `dOut` outgoing peers + + var + lowOutgoingSubnets: BitArray[ATTESTATION_SUBNET_COUNT] + belowDLowSubnets: BitArray[ATTESTATION_SUBNET_COUNT] + belowDOutSubnets: BitArray[ATTESTATION_SUBNET_COUNT] + + for subNetId in 0.. 0: + return belowDLowSubnets + + if lowOutgoingSubnets.countOnes() > 0: + return lowOutgoingSubnets + + return belowDOutSubnets - return filtered proc runDiscoveryLoop*(node: Eth2Node) {.async.} = debug "Starting discovery loop" while true: - if node.switch.connManager.outSema.count > 0: - let forkId = (enrForkIdField, SSZ.encode(node.forkId)) - var discoveredNodes = await node.discovery.queryRandom(forkId) + let + wantedAttnets = node.getLowAttnets() + wantedAttnetsCount = wantedAttnets.countOnes() + + if wantedAttnetsCount > 0: + let discoveredNodes = await node.discovery.queryRandom(node.forkId, wantedAttnets) + var newPeers = 0 for discNode in discoveredNodes: let res = discNode.toPeerAddr() if res.isOk(): let peerAddr = res.get() - # Waiting for an empty space in PeerPool. - while true: - if node.peerPool.lenSpace({PeerType.Outgoing}) == 0: - await node.peerPool.waitForEmptySpace(PeerType.Outgoing) - else: - break # Check if peer present in SeenTable or PeerPool. if node.checkPeer(peerAddr): if peerAddr.peerId notin node.connTable: # We adding to pending connections table here, but going # to remove it only in `connectWorker`. + + # If we are full, try to kick a peer (3 max) + for _ in 0..<3: + if node.peerPool.lenSpace({PeerType.Outgoing}) == 0 and newPeers == 0: + await node.trimConnections(1) + else: break + + if node.peerPool.lenSpace({PeerType.Outgoing}) == 0: + # No room anymore + break + node.connTable.incl(peerAddr.peerId) await node.connQueue.addLast(peerAddr) inc(newPeers) @@ -979,7 +1091,9 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} = # Discovery `queryRandom` can have a synchronous fast path for example # when no peers are in the routing table. Don't run it in continuous loop. - await sleepAsync(1.seconds) + # + # Also, give some time to dial the discovered nodes and update stats etc + await sleepAsync(5.seconds) proc getPersistentNetMetadata*(config: BeaconNodeConf): altair.MetaData {.raises: [Defect, IOError, SerializationError].} = @@ -1170,7 +1284,7 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig, discoveryEnabled: discovery, rng: rng, connectTimeout: connectTimeout, - seenThreshold: seenThreshold, + seenThreshold: seenThreshold ) newSeq node.protocolStates, allProtocols.len @@ -1212,6 +1326,8 @@ proc startListening*(node: Eth2Node) {.async.} = await node.pubsub.start() +proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.} + proc start*(node: Eth2Node) {.async.} = proc onPeerCountChanged() = @@ -1240,6 +1356,7 @@ proc start*(node: Eth2Node) {.async.} = let pa = tr.get().toPeerAddr(tcpProtocol) if pa.isOk(): await node.connQueue.addLast(pa.get()) + node.peerPingerHeartbeatFut = node.peerPingerHeartbeat() proc stop*(node: Eth2Node) {.async.} = # Ignore errors in futures, since we're shutting down (but log them on the @@ -1261,6 +1378,7 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer = network: network, connectionState: ConnectionState.None, lastReqTime: now(chronos.Moment), + lastMetadataTime: now(chronos.Moment), protocolStates: newSeq[RootRef](len(allProtocols)) ) for i in 0 ..< len(allProtocols): @@ -1394,6 +1512,51 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = result.implementProtocolInit = proc (p: P2PProtocol): NimNode = return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit) +#Must import here because of cyclicity +import ../sync/sync_protocol + +proc updatePeerMetadata(node: Eth2Node, peerId: PeerID) {.async.} = + trace "updating peer metadata", peerId + + var peer = node.getPeer(peerId) + let response = await peer.getMetaData() + + if response.isErr: + debug "Failed to retrieve metadata from peer!", peerId + return + + let newMetadata = response.get() + peer.metadata = some(newMetadata) + peer.lastMetadataTime = Moment.now() + +proc peerPingerHeartbeat(node: Eth2Node) {.async.} = + while true: + let heartbeatStart_m = Moment.now() + var updateFutures: seq[Future[void]] + + for peer in node.peers.values: + if peer.connectionState != Connected: continue + + if peer.metadata.isNone or + heartbeatStart_m - peer.lastMetadataTime > 30.minutes: + updateFutures.add(node.updatePeerMetadata(peer.info.peerId)) + + discard await allFinished(updateFutures) + + for peer in node.peers.values: + if peer.connectionState != Connected: continue + let lastMetadata = + if peer.metadata.isNone: + peer.lastMetadataTime + else: + peer.lastMetadataTime + 30.minutes + + if heartbeatStart_m - lastMetadata > 30.seconds: + debug "no metadata for 30 seconds, kicking peer", peer + asyncSpawn peer.disconnect(PeerScoreLow) + + await sleepAsync(8.seconds) + func asLibp2pKey*(key: keys.PublicKey): PublicKey = PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))