From b33900bd35a80f8d253ec9c09da3cc80d7f76143 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Mon, 15 Jul 2024 13:11:09 +0530 Subject: [PATCH] added enr struct --- beacon_chain/networking/eth2_discovery.nim | 17 +++++++++++- beacon_chain/networking/eth2_network.nim | 19 ++++++++------ beacon_chain/nimbus_beacon_node.nim | 30 ++++++++++++++++++++-- beacon_chain/spec/datatypes/eip7594.nim | 4 ++- beacon_chain/spec/eip7594_helpers.nim | 1 - beacon_chain/spec/network.nim | 1 + beacon_chain/sync/sync_manager.nim | 6 ++--- 7 files changed, 61 insertions(+), 17 deletions(-) diff --git a/beacon_chain/networking/eth2_discovery.nim b/beacon_chain/networking/eth2_discovery.nim index a8887aaf0..1a1b1c70b 100644 --- a/beacon_chain/networking/eth2_discovery.nim +++ b/beacon_chain/networking/eth2_discovery.nim @@ -11,7 +11,7 @@ import std/[algorithm, sequtils], chronos, chronicles, stew/results, eth/p2p/discoveryv5/[enr, protocol, node, random2], - ../spec/datatypes/altair, + ../spec/datatypes/[altair, eip7594], ../spec/eth2_ssz_serialization, ".."/[conf, conf_light_client] @@ -127,6 +127,7 @@ proc queryRandom*( forkId: ENRForkID, wantedAttnets: AttnetBits, wantedSyncnets: SyncnetBits, + wantedCscnets: CscBits, minScore: int): Future[seq[Node]] {.async.} = ## Perform a discovery query for a random target ## (forkId) and matching at least one of the attestation subnets. @@ -151,6 +152,20 @@ proc queryRandom*( if not forkId.isCompatibleForkId(peerForkId): continue + let cscnetsBytes = n.record.get(enrCustodySubnetCountField, seq[byte]) + if cscnetsBytes.isOk(): + let cscnetsNode = + try: + SSZ.decode(cscnetsBytes.get(), CscBits) + except SszError as e: + debug "Could not decode the csc count ENR bitfield of peer", + peer = n.record.toURI(), exception = e.name, msg = e.msg + continue + + if wantedCscnets[0] and cscnetsNode[0]: + debug "Connected to a peer with csc ENR field", + peer = n.record.toURI() + let attnetsBytes = n.record.get(enrAttestationSubnetsField, seq[byte]) if attnetsBytes.isOk(): let attnetsNode = diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 94dc52ee8..f0d204f25 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -26,7 +26,7 @@ import eth/[keys, async_utils], eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], ".."/[version, conf, beacon_clock, conf_light_client], - ../spec/datatypes/[phase0, altair, bellatrix], + ../spec/datatypes/[phase0, altair, bellatrix, eip7594], ../spec/[eth2_ssz_serialization, network, helpers, forks], ../validators/keystore_management, "."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores] @@ -398,7 +398,7 @@ func nodeId*(node: Eth2Node): NodeId = func enrRecord*(node: Eth2Node): Record = node.discovery.localNode.record -proc getPeer(node: Eth2Node, peerId: PeerId): Peer = +proc getPeer*(node: Eth2Node, peerId: PeerId): Peer = node.peers.withValue(peerId, peer) do: return peer[] do: @@ -1482,7 +1482,7 @@ proc trimConnections(node: Eth2Node, count: int) = inc(nbc_cycling_kicked_peers) if toKick <= 0: return -proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) = +proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits, CscBits) = # Returns the subnets required to have a healthy mesh # The subnets are computed, to, in order: # - Have 0 subnet with < `dLow` peers from topic subscription @@ -1547,7 +1547,8 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) = if epoch + 1 >= node.cfg.ALTAIR_FORK_EPOCH: findLowSubnets(getSyncCommitteeTopic, SyncSubcommitteeIndex, SYNC_COMMITTEE_SUBNET_COUNT) else: - default(SyncnetBits) + default(SyncnetBits), + findLowSubnets(getDataColumnSidecarTopic, uint64, DATA_COLUMN_SIDECAR_SUBNET_COUNT.int) ) proc runDiscoveryLoop(node: Eth2Node) {.async.} = @@ -1556,23 +1557,25 @@ proc runDiscoveryLoop(node: Eth2Node) {.async.} = while true: let currentEpoch = node.getBeaconTime().slotOrZero.epoch - (wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch) + (wantedAttnets, wantedSyncnets, wantedCscnets) = node.getLowSubnets(currentEpoch) wantedAttnetsCount = wantedAttnets.countOnes() wantedSyncnetsCount = wantedSyncnets.countOnes() + wantedCscnetsCount = wantedCscnets.countOnes() outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing}) targetOutgoingPeers = max(node.wantedPeers div 10, 3) if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or - outgoingPeers < targetOutgoingPeers: + wantedCscnetsCount > 0 or outgoingPeers < targetOutgoingPeers: let minScore = - if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0: + if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or + wantedCscnetsCount > 0: 1 else: 0 discoveredNodes = await node.discovery.queryRandom( - node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore) + node.discoveryForkId, wantedAttnets, wantedSyncnets, wantedCscnets, minScore) let newPeers = block: var np = newSeq[PeerAddr]() diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index a8559d6a0..3c36336a1 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -16,7 +16,7 @@ import ./consensus_object_pools/blob_quarantine, ./consensus_object_pools/data_column_quarantine, ./consensus_object_pools/vanity_logs/vanity_logs, - ./networking/[topic_params, network_metadata_downloads], + ./networking/[topic_params, network_metadata_downloads, eth2_network], ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[deposit_snapshots, engine_authentication, weak_subjectivity], @@ -1130,11 +1130,37 @@ proc addCapellaMessageHandlers( node.addAltairMessageHandlers(forkDigest, slot) node.network.subscribe(getBlsToExecutionChangeTopic(forkDigest), basicParams) -proc fetchCustodySubnetCount* (res: var uint64, node: BeaconNode) = +proc fetchCustodySubnetCount* (node: BeaconNode, res: var uint64)= res = CUSTODY_REQUIREMENT if node.config.subscribeAllSubnets: res = DATA_COLUMN_SIDECAR_SUBNET_COUNT +proc fetchCustodyColumnCountFromRemotePeer*(node: BeaconNode, pid: PeerId): uint64 = + # Fetches the custody column count from a remote peer + # if the peer advertises their custody column count + # via the `csc` ENR field. If the peer does NOT, then + # the default value is assume, i.e, CUSTODY_REQUIREMENT + let + eth2node = node.network + peer = eth2node.getPeer(pid) + + let enrOpt = peer.enr + if enrOpt.isNone: + debug "Could not get ENR from peer", + peer_id = pid + return 0 + + else: + let + enr = enrOpt.get + enrFieldOpt = enr.get(enrCustodySubnetCountField, uint64) + + if not enrFieldOpt.isOk: + debug "Issue with fetching `csc` field from ENR", + enr = enr + else: + return(enrFieldOpt.get) + proc addDenebMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addCapellaMessageHandlers(forkDigest, slot) diff --git a/beacon_chain/spec/datatypes/eip7594.nim b/beacon_chain/spec/datatypes/eip7594.nim index 26794eb93..5a4f21140 100644 --- a/beacon_chain/spec/datatypes/eip7594.nim +++ b/beacon_chain/spec/datatypes/eip7594.nim @@ -46,7 +46,7 @@ const DATA_COLUMN_SIDECAR_SUBNET_COUNT* = 32 SAMPLES_PER_SLOT* = 8 - CUSTODY_REQUIREMENT* = 16 + CUSTODY_REQUIREMENT* = 1 TARGET_NUMBER_OF_PEERS* = 70 type @@ -74,6 +74,8 @@ type column_index*: ColumnIndex row_index*: RowIndex + CscBits* = BitArray[DATA_COLUMN_SIDECAR_SUBNET_COUNT] + func shortLog*(v: DataColumnSidecar): auto = ( diff --git a/beacon_chain/spec/eip7594_helpers.nim b/beacon_chain/spec/eip7594_helpers.nim index b4e1d7ee6..38b6e5e63 100644 --- a/beacon_chain/spec/eip7594_helpers.nim +++ b/beacon_chain/spec/eip7594_helpers.nim @@ -22,7 +22,6 @@ import ./datatypes/[eip7594, deneb] - type CellBytes = array[eip7594.CELLS_PER_EXT_BLOB, Cell] ProofBytes = array[eip7594.CELLS_PER_EXT_BLOB, KzgProof] diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 75f414160..9bc08c71e 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -47,6 +47,7 @@ const enrAttestationSubnetsField* = "attnets" enrSyncSubnetsField* = "syncnets" + enrCustodySubnetCountField* = "csc" enrForkIdField* = "eth2" template eth2Prefix(forkDigest: ForkDigest): string = diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 46b40c571..9ca9bbc27 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -294,16 +294,14 @@ func groupDataColumns*[T](req: SyncRequest[T], template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments if kzgs.len == 0: continue - # Clients MUST include all blob sidecars of each block from which they include blob sidecars. - # The following blob sidecars, where they exist, MUST be sent in consecutive (slot, index) order. + # Clients MUST include all data column sidecars of each block from which they include data column sidecars. + # The following data column sidecars, where they exist, MUST be sent in consecutive (slot, index) order. # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/specs/_features/eip7594/p2p-interface.md let header = forkyBlck.toSignedBeaconBlockHeader() for column_idx, kzg_commitment in kzgs: if column_cursor >= data_columns.len: return err("DataColumnSidecar: response too short") let data_column_sidecar = data_columns[column_cursor] - if data_column_sidecar.index != ColumnIndex column_idx: - return err("DataColumnSidecar: unexpected index") if kzg_commitment notin data_column_sidecar.kzg_commitments: return err("DataColumnSidecar: unexpected kzg_commitment") if data_column_sidecar.signed_block_header != header: