From fa5b15436c36a7eef139a1386086909099c8f293 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Fri, 19 Jul 2024 13:58:26 +0530 Subject: [PATCH] add: logic constructing valid set of peers --- beacon_chain/networking/eth2_network.nim | 103 ++++++++++++++++++++++- beacon_chain/nimbus_beacon_node.nim | 26 ------ beacon_chain/sync/sync_manager.nim | 2 +- 3 files changed, 101 insertions(+), 30 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index f0d204f25..d3338ddba 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -27,7 +27,8 @@ import eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], ".."/[version, conf, beacon_clock, conf_light_client], ../spec/datatypes/[phase0, altair, bellatrix, eip7594], - ../spec/[eth2_ssz_serialization, network, helpers, forks], + ../spec/[eth2_ssz_serialization, network, + helpers, eip7594_helpers, forks], ../validators/keystore_management, "."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores] @@ -1482,7 +1483,8 @@ proc trimConnections(node: Eth2Node, count: int) = inc(nbc_cycling_kicked_peers) if toKick <= 0: return -proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits, CscBits) = +proc getLowSubnets(node: Eth2Node, epoch: Epoch): + (AttnetBits, SyncnetBits, CscBits) = # Returns the subnets required to have a healthy mesh # The subnets are computed, to, in order: # - Have 0 subnet with < `dLow` peers from topic subscription @@ -1575,7 +1577,11 @@ proc runDiscoveryLoop(node: Eth2Node) {.async.} = else: 0 discoveredNodes = await node.discovery.queryRandom( - node.discoveryForkId, wantedAttnets, wantedSyncnets, wantedCscnets, minScore) + node.discoveryForkId, + wantedAttnets, + wantedSyncnets, + wantedCscnets, + minScore) let newPeers = block: var np = newSeq[PeerAddr]() @@ -1624,6 +1630,97 @@ proc runDiscoveryLoop(node: Eth2Node) {.async.} = # Also, give some time to dial the discovered nodes and update stats etc await sleepAsync(5.seconds) +proc getNodeIdFromPeer*(peer: Peer): NodeId= + # Convert peer id to node id by extracting the peer's public key + let nodeId = + block: + var key: PublicKey + discard peer.peerId.extractPublicKey(key) + keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId() + nodeId + +proc fetchCustodyColumnCountFromRemotePeer*(node: Eth2Node, + pid: PeerId): + uint64 = + # Fetches the custody column count from a remote peer + # if the peer advertises their custody column count + # via the `csc` ENR field. If the peer does NOT, then + # the default value is assume, i.e, CUSTODY_REQUIREMENT + let + eth2node = node + peer = eth2node.getPeer(pid) + + let enrOpt = peer.enr + if enrOpt.isNone: + debug "Could not get ENR from peer", + peer_id = pid + return 0 + + else: + let + enr = enrOpt.get + enrFieldOpt = + enr.get(enrCustodySubnetCountField, uint64) + + if not enrFieldOpt.isOk: + debug "Issue with fetching `csc` field from ENR", + enr = enr + else: + return(enrFieldOpt.get) + +proc constructValidCustodyPeers*(node: Eth2Node, + peers: openArray[Peer], + config: BeaconNodeConf): + seq[PeerId] = + # `constructValidaCustodyPeers` returns a list of peers that are an overall + # superset of the given local node's custody columns and the custody columns + # of the remote peers + + # Get the local custody subnet count + var localCustodySubnetCount: uint64 + if config.subscribeAllSubnets: + localCustodySubnetCount = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 + localCustodySubnetCount = CUSTODY_REQUIREMENT + + # Get the local custody columns + let + localNodeId = node.nodeId + localCustodyColumns = + localNodeId.get_custody_columns(localCustodySubnetCount).get + + var validPeerIds = newSeqOfCap[PeerId](peers.len) + + for peer in peers: + # Get the custody subnet counts of the remote peer + let remoteCustodySubnetCount = + node.fetchCustodyColumnCountFromRemotePeer(peer.peerId) + + # Extract remote peer's nodeID from peerID + # Fetch custody columns fromm remote peer + let + remoteNodeId = getNodeIdFromPeer(peer) + remoteCustodyColumns = + remoteNodeId.get_custody_columns(remoteCustodySubnetCount).get + + # If the remote peer custodies less columns than our local node + # We skip it + if remoteCustodyColumns.len < localCustodyColumns.len: + continue + + # If the remote peer custodies all the possible columns + if remoteCustodyColumns.len == NUMBER_OF_COLUMNS: + validPeerIds.add(peer.peerId) + + # Filtering out the invalid peers + for column in localCustodyColumns: + if column notin remoteCustodyColumns: + continue + + # Otherwise add the peer to the set + # of valid peerIds + validPeerIds.add(peer.peerId) + validPeerIds + proc resolvePeer(peer: Peer) = # Resolve task which performs searching of peer's public key and recovery of # ENR using discovery5. We only resolve ENR for peers we know about to avoid diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 8eae6b049..f5144ec68 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1136,32 +1136,6 @@ proc fetchCustodySubnetCount* (node: BeaconNode): uint64= res = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 res -proc fetchCustodyColumnCountFromRemotePeer*(node: BeaconNode, pid: PeerId): uint64 = - # Fetches the custody column count from a remote peer - # if the peer advertises their custody column count - # via the `csc` ENR field. If the peer does NOT, then - # the default value is assume, i.e, CUSTODY_REQUIREMENT - let - eth2node = node.network - peer = eth2node.getPeer(pid) - - let enrOpt = peer.enr - if enrOpt.isNone: - debug "Could not get ENR from peer", - peer_id = pid - return 0 - - else: - let - enr = enrOpt.get - enrFieldOpt = enr.get(enrCustodySubnetCountField, uint64) - - if not enrFieldOpt.isOk: - debug "Issue with fetching `csc` field from ENR", - enr = enr - else: - return(enrFieldOpt.get) - proc addDenebMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addCapellaMessageHandlers(forkDigest, slot) diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 9ca9bbc27..bef03faf9 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -548,7 +548,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) withBlck(blck[]): when consensusFork >= ConsensusFork.Deneb: if forkyBlck.message.body.blob_kzg_commitments.len > 0: - hasColumns = true + hasColumns = true break hasColumns