From 250a80eb0ce714d000e88dbb67e0ea61fb2c2073 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh <80243668+agnxsh@users.noreply.github.com> Date: Tue, 22 Oct 2024 10:49:34 +0530 Subject: [PATCH] add gossip validation for dc, and data column quarantine strategy (#6581) * add gossip validation for dc * review 1 * rm callback * review 2 * added custody columns as a global entity * alpha 8 * few typosA --- beacon_chain/beacon_node.nim | 3 +- .../data_column_quarantine.nim | 190 ++++++++++++++++++ .../gossip_processing/gossip_validation.nim | 149 +++++++++++++- beacon_chain/nimbus_beacon_node.nim | 36 +++- beacon_chain/spec/datatypes/eip7594.nim | 3 + beacon_chain/spec/forks.nim | 4 +- beacon_chain/spec/network.nim | 4 + 7 files changed, 382 insertions(+), 7 deletions(-) create mode 100644 beacon_chain/consensus_object_pools/data_column_quarantine.nim diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 2ccca461a..94c743172 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -22,7 +22,7 @@ import ./el/el_manager, ./consensus_object_pools/[ blockchain_dag, blob_quarantine, block_quarantine, consensus_manager, - attestation_pool, sync_committee_msg_pool, validator_change_pool], + data_column_quarantine, attestation_pool, sync_committee_msg_pool, validator_change_pool], ./spec/datatypes/[base, altair], ./spec/eth2_apis/dynamic_fee_recipients, ./sync/[sync_manager, request_manager], @@ -73,6 +73,7 @@ type dag*: ChainDAGRef quarantine*: ref Quarantine blobQuarantine*: ref BlobQuarantine + dataColumnQuarantine*: ref DataColumnQuarantine attestationPool*: ref AttestationPool syncCommitteeMsgPool*: ref SyncCommitteeMsgPool lightClientPool*: ref LightClientPool diff --git a/beacon_chain/consensus_object_pools/data_column_quarantine.nim b/beacon_chain/consensus_object_pools/data_column_quarantine.nim new file mode 100644 index 000000000..23cbf995e --- /dev/null +++ b/beacon_chain/consensus_object_pools/data_column_quarantine.nim @@ -0,0 +1,190 @@ +# beacon_chain +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + std/tables, + ../spec/datatypes/eip7594, + ../spec/helpers + +from std/sequtils import mapIt +from std/strutils import join + +const + MaxDataColumns = 3 * SLOTS_PER_EPOCH * NUMBER_OF_COLUMNS + ## Same limit as `MaxOrphans` in `block_quarantine` + ## data columns may arrive before an orphan is tagged `columnless` + +type + DataColumnQuarantine* = object + data_columns*: + OrderedTable[DataColumnIdentifier, ref DataColumnSidecar] + supernode*: bool + custody_columns*: seq[ColumnIndex] + onDataColumnSidecarCallback*: OnDataColumnSidecarCallback + + DataColumnFetchRecord* = object + block_root*: Eth2Digest + indices*: seq[ColumnIndex] + + OnDataColumnSidecarCallback = proc(data: DataColumnSidecar) {.gcsafe, raises: [].} + +func init*(T: type DataColumnQuarantine): T = + T() + +func shortLog*(x: seq[DataColumnFetchRecord]): string = + "[" & x.mapIt(shortLog(it.block_root) & shortLog(it.indices)).join(", ") & "]" + +func put*(quarantine: var DataColumnQuarantine, + dataColumnSidecar: ref DataColumnSidecar) = + if quarantine.data_columns.len >= static(MaxDataColumns.int): + # FIFO if full. For example, sync manager and request manager can race + # to put data columns in at the same time, so one gets data column + # insert -> block resolve -> data column insert, which leaves + # garbage data columns. + # + # This also therefore automatically garbage-collects otherwise valid + # data columns that are correctly signed, point to either correct block + # root which isn't ever seen, and then for any reason simply never used. + var oldest_column_key: DataColumnIdentifier + for k in quarantine.data_columns.keys: + oldest_column_key = k + break + quarantine.data_columns.del(oldest_column_key) + let block_root = + hash_tree_root(dataColumnSidecar.signed_block_header.message) + discard quarantine.data_columns.hasKeyOrPut( + DataColumnIdentifier(block_root: block_root, + index: dataColumnSidecar.index), + dataColumnSidecar) + +func hasDataColumn*( + quarantine: DataColumnQuarantine, + slot: Slot, + proposer_index: uint64, + index: ColumnIndex): bool = + for data_column_sidecar in quarantine.data_columns.values: + template block_header: untyped = + data_column_sidecar.signed_block_header.message + if block_header.slot == slot and + block_header.proposer_index == proposer_index and + data_column_sidecar.index == index: + return true + false + +func peekColumnIndices*(quarantine: DataColumnQuarantine, + blck: electra.SignedBeaconBlock): + seq[ColumnIndex] = + # Peeks into the currently received column indices + # from quarantine, necessary data availability checks + var indices: seq[ColumnIndex] + for col_idx in quarantine.custody_columns: + if quarantine.data_columns.hasKey( + DataColumnIdentifier(block_root: blck.root, + index: ColumnIndex col_idx)): + indices.add(col_idx) + indices + +func gatherDataColumns*(quarantine: DataColumnQuarantine, + digest: Eth2Digest): + seq[ref DataColumnSidecar] = + # Returns the current data columns quried by a + # block header + var columns: seq[ref DataColumnSidecar] + for i in quarantine.custody_columns: + let dc_identifier = + DataColumnIdentifier( + block_root: digest, + index: i) + if quarantine.data_columns.hasKey(dc_identifier): + let value = + quarantine.data_columns.getOrDefault(dc_identifier, + default(ref DataColumnSidecar)) + columns.add(value) + columns + +func popDataColumns*( + quarantine: var DataColumnQuarantine, digest: Eth2Digest, + blck: electra.SignedBeaconBlock): + seq[ref DataColumnSidecar] = + var r: DataColumnSidecars + for idx in quarantine.custody_columns: + var c: ref DataColumnSidecar + if quarantine.data_columns.pop( + DataColumnIdentifier(block_root: digest, + index: idx), + c): + r.add(c) + r + +func hasMissingDataColumns*(quarantine: DataColumnQuarantine, + blck: electra.SignedBeaconBlock): bool = + # `hasMissingDataColumns` consists of the data columns that, + # have been missed over gossip, also in case of a supernode, + # the method would return missing columns when the supernode + # has not received data columns upto the requisite limit (i.e 50% + # of NUMBER_OF_COLUMNS). + + # This method shall be actively used by the `RequestManager` to + # root request columns over RPC. + var col_counter = 0 + for idx in quarantine.custody_columns: + let dc_identifier = + DataColumnIdentifier( + block_root: blck.root, + index: idx) + if dc_identifier notin quarantine.data_columns: + inc col_counter + if quarantine.supernode and col_counter != NUMBER_OF_COLUMNS: + return false + elif quarantine.supernode == false and + col_counter != max(SAMPLES_PER_SLOT, CUSTODY_REQUIREMENT): + return false + else: + return true + +func hasEnoughDataColumns*(quarantine: DataColumnQuarantine, + blck: electra.SignedBeaconBlock): bool = + # `hasEnoughDataColumns` dictates whether there is `enough` + # data columns for a block to be enqueued, ideally for a supernode + # if it receives atleast 50%+ gossip and RPC + + # Once 50%+ columns are available we can use this function to + # check it, and thereby check column reconstructability, right from + # gossip validation, consequently populating the quarantine with + # rest of the data columns. + if quarantine.supernode: + let + collectedColumns = quarantine.gatherDataColumns(blck.root) + if collectedColumns.len >= (quarantine.custody_columns.len div 2): + return true + else: + for i in quarantine.custody_columns: + let dc_identifier = + DataColumnIdentifier( + block_root: blck.root, + index: i) + if dc_identifier notin quarantine.data_columns: + return false + else: + return true + +func dataColumnFetchRecord*(quarantine: DataColumnQuarantine, + blck: electra.SignedBeaconBlock): + DataColumnFetchRecord = + var indices: seq[ColumnIndex] + for i in quarantine.custody_columns: + let + idx = ColumnIndex(i) + dc_id = DataColumnIdentifier( + block_root: blck.root, + index: idx) + if not quarantine.data_columns.hasKey( + dc_id): + indices.add(idx) + DataColumnFetchRecord(block_root: blck.root, indices: indices) \ No newline at end of file diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 121736a77..492b99ae9 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -15,10 +15,11 @@ import stew/byteutils, # Internals ../spec/[ - beaconstate, state_transition_block, forks, helpers, network, signatures], + beaconstate, state_transition_block, forks, + helpers, network, signatures, eip7594_helpers], ../consensus_object_pools/[ attestation_pool, blockchain_dag, blob_quarantine, block_quarantine, - spec_cache, light_client_pool, sync_committee_msg_pool, + data_column_quarantine, spec_cache, light_client_pool, sync_committee_msg_pool, validator_change_pool], ".."/[beacon_clock], ./batch_validation @@ -209,6 +210,22 @@ func check_blob_sidecar_inclusion_proof( ok() +func check_data_column_sidecar_inclusion_proof( + data_column_sidecar: DataColumnSidecar): Result[void, ValidationError] = + let res = data_column_sidecar.verify_data_column_sidecar_inclusion_proof() + if res.isErr: + return errReject(res.error) + + ok() + +proc check_data_column_sidecar_kzg_proofs( + data_column_sidecar: DataColumnSidecar): Result[void, ValidationError] = + let res = data_column_sidecar.verify_data_column_sidecar_kzg_proofs() + if res.isErr: + return errReject(res.error) + + ok() + # Gossip Validation # ---------------------------------------------------------------- @@ -475,6 +492,134 @@ proc validateBlobSidecar*( ok() +# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#data_column_sidecar_subnet_id +proc validateDataColumnSidecar*( + dag: ChainDAGRef, quarantine: ref Quarantine, + dataColumnQuarantine: ref DataColumnQuarantine, + data_column_sidecar: DataColumnSidecar, + wallTime: BeaconTime, subnet_id: uint64): + Result[void, ValidationError] = + + template block_header: untyped = data_column_sidecar.signed_block_header.message + + # [REJECT] The sidecar's index is consistent with `NUMBER_OF_COLUMNS` + # -- i.e. `data_column_sidecar.index < NUMBER_OF_COLUMNS` + if not (data_column_sidecar.index < NUMBER_OF_COLUMNS): + return dag.checkedReject("DataColumnSidecar: The sidecar's index should be consistent with NUMBER_OF_COLUMNS") + + # [REJECT] The sidecar is for the correct subnet + # -- i.e. `compute_subnet_for_data_column_sidecar(blob_sidecar.index) == subnet_id`. + if not (compute_subnet_for_data_column_sidecar(data_column_sidecar.index) == subnet_id): + return dag.checkedReject("DataColumnSidecar: The sidecar is not for the correct subnet") + + # [IGNORE] The sidecar is not from a future slot + # (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that + # `block_header.slot <= current_slot`(a client MAY queue future sidecars for + # processing at the appropriate slot). + if not (block_header.slot <= + (wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero): + return errIgnore("DataColumnSidecar: slot too high") + + # [IGNORE] The sidecar is from a slot greater than the latest + # finalized slot -- i.e. validate that `block_header.slot > + # compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)` + if not (block_header.slot > dag.finalizedHead.slot): + return errIgnore("DataColumnSidecar: slot already finalized") + + # [IGNORE] The sidecar is the first sidecar for the tuple + # (block_header.slot, block_header.proposer_index, data_column_sidecar.index) + # with valid header signature, sidecar inclusion proof, and kzg proof. + let block_root = hash_tree_root(block_header) + if dag.getBlockRef(block_root).isSome(): + return errIgnore("DataColumnSidecar: already have block") + if dataColumnQuarantine[].hasDataColumn( + block_header.slot, block_header.proposer_index, data_column_sidecar.index): + return errIgnore("DataColumnSidecar: already have valid data column from same proposer") + + # [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by + # `verify_data_column_sidecar_inclusion_proof(sidecar)`. + block: + let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar) + if v.isErr: + return dag.checkedReject(v.error) + + # [IGNORE] The sidecar's block's parent (defined by + # `block_header.parent_root`) has been seen (via both gossip and + # non-gossip sources) (a client MAY queue sidecars for processing + # once the parent block is retrieved). + # + # [REJECT] The sidecar's block's parent (defined by + # `block_header.parent_root`) passes validation. + let parent = dag.getBlockRef(block_header.parent_root).valueOr: + if block_header.parent_root in quarantine[].unviable: + quarantine[].addUnviable(block_root) + return dag.checkedReject("DataColumnSidecar: parent not validated") + else: + quarantine[].addMissing(block_header.parent_root) + return errIgnore("DataColumnSidecar: parent not found") + + # [REJECT] The sidecar is from a higher slot than the sidecar's + # block's parent (defined by `block_header.parent_root`). + if not (block_header.slot > parent.bid.slot): + return dag.checkedReject("DataColumnSidecar: slot lower than parents'") + + # [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's + # block -- i.e. `get_checkpoint_block(store, block_header.parent_root, + # store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`. + let + finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint) + ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot) + + if ancestor.isNil: + # This shouldn't happen: we should always be able to trace the parent back + # to the finalized checkpoint (else it wouldn't be in the DAG) + return errIgnore("DataColumnSidecar: Can't find ancestor") + + if not ( + finalized_checkpoint.root == ancestor.root or + finalized_checkpoint.root.isZero): + quarantine[].addUnviable(block_root) + return dag.checkedReject( + "DataColumnSidecar: Finalized checkpoint not an ancestor") + + # [REJECT] The sidecar is proposed by the expected `proposer_index` + # for the block's slot in the context of the current shuffling + # (defined by `block_header.parent_root`/`block_header.slot`). + # If the proposer_index cannot immediately be verified against the expected + # shuffling, the sidecar MAY be queued for later processing while proposers + # for the block's branch are calculated -- in such a case do not + # REJECT, instead IGNORE this message. + let proposer = getProposer(dag, parent, block_header.slot).valueOr: + warn "cannot compute proposer for data column" + return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue + + if uint64(proposer) != block_header.proposer_index: + return dag.checkedReject("DataColumnSidecar: Unexpected proposer") + + # [REJECT] The proposer signature of `data_column_sidecar.signed_block_header`, + # is valid with respect to the `block_header.proposer_index` pubkey. + if not verify_block_signature( + dag.forkAtEpoch(block_header.slot.epoch), + getStateField(dag.headState, genesis_validators_root), + block_header.slot, + block_root, + dag.validatorKey(proposer).get(), + data_column_sidecar.signed_block_header.signature): + return dag.checkedReject("DataColumnSidecar: Invalid proposer signature") + + # [REJECT] The sidecar's column data is valid as + # verified by `verify_data_column_kzg_proofs(sidecar)` + block: + let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar) + if r.isErr: + return dag.checkedReject(r.error) + + # Send notification about new data column sidecar via callback + if not(isNil(dataColumnQuarantine.onDataColumnSidecarCallback)): + dataColumnQuarantine.onDataColumnSidecarCallback(data_column_sidecar) + + ok() + # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/bellatrix/p2p-interface.md#beacon_block proc validateBeaconBlock*( diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 01ae467b6..bd13490a3 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -13,12 +13,14 @@ import metrics, metrics/chronos_httpserver, stew/[byteutils, io2], eth/p2p/discoveryv5/[enr, random2], - ./consensus_object_pools/blob_quarantine, + ./consensus_object_pools/[blob_quarantine, data_column_quarantine], ./consensus_object_pools/vanity_logs/vanity_logs, ./networking/[topic_params, network_metadata_downloads], ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], - ./spec/[deposit_snapshots, engine_authentication, weak_subjectivity], + ./spec/[ + deposit_snapshots, engine_authentication, weak_subjectivity, + eip7594_helpers], ./sync/[sync_protocol, light_client_protocol], ./validators/[keystore_management, beacon_validators], "."/[ @@ -400,6 +402,13 @@ proc initFullNode( onProposerSlashingAdded, onPhase0AttesterSlashingAdded, onElectraAttesterSlashingAdded)) blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded)) + dataColumnQuarantine = newClone(DataColumnQuarantine.init()) + supernode = node.config.subscribeAllSubnets + localCustodySubnets = + if supernode: + DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 + else: + CUSTODY_REQUIREMENT.uint64 consensusManager = ConsensusManager.new( dag, attestationPool, quarantine, node.elManager, ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets), @@ -486,7 +495,30 @@ proc initFullNode( (proc(): bool = syncManager.inProgress), quarantine, blobQuarantine, rmanBlockVerifier, rmanBlockLoader, rmanBlobLoader) + + # As per EIP 7594, the BN is now categorised into a + # `Fullnode` and a `Supernode`, the fullnodes custodies a + # given set of data columns, and hence ONLY subcribes to those + # data column subnet topics, however, the supernodes subscribe + # to all of the topics. This in turn keeps our `data column quarantine` + # really variable. Whenever the BN is a supernode, column quarantine + # essentially means all the NUMBER_OF_COLUMNS, as per mentioned in the + # spec. However, in terms of fullnode, quarantine is really dependent + # on the randomly assigned columns, by `get_custody_columns`. + # Hence, in order to keep column quarantine accurate and error proof + # the custody columns are computed once as the BN boots. Then the values + # are used globally around the codebase. + + # `get_custody_columns` is not a very expensive function, but there + # are multiple instances of computing custody columns, especially + # during peer selection, sync with columns, and so on. That is why, + # the rationale of populating it at boot and using it gloabally. + + dataColumnQuarantine[].supernode = supernode + dataColumnQuarantine[].custody_columns = + node.network.nodeId.get_custody_columns(max(SAMPLES_PER_SLOT.uint64, + localCustodySubnets)) if node.config.lightClientDataServe: proc scheduleSendingLightClientUpdates(slot: Slot) = if node.lightClientPool[].broadcastGossipFut != nil: diff --git a/beacon_chain/spec/datatypes/eip7594.nim b/beacon_chain/spec/datatypes/eip7594.nim index 1388b845b..7d664e766 100644 --- a/beacon_chain/spec/datatypes/eip7594.nim +++ b/beacon_chain/spec/datatypes/eip7594.nim @@ -109,3 +109,6 @@ func shortLog*(v: seq[DataColumnSidecar]): auto = func shortLog*(x: seq[DataColumnIdentifier]): string = "[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]" + +func shortLog*(x: seq[ColumnIndex]): string = + "<" & x.mapIt($it).join(", ") & ">" \ No newline at end of file diff --git a/beacon_chain/spec/forks.nim b/beacon_chain/spec/forks.nim index 221e66f73..edffecff8 100644 --- a/beacon_chain/spec/forks.nim +++ b/beacon_chain/spec/forks.nim @@ -16,12 +16,12 @@ import "."/[ block_id, eth2_merkleization, eth2_ssz_serialization, forks_light_client, presets], - ./datatypes/[phase0, altair, bellatrix, capella, deneb, electra], + ./datatypes/[phase0, altair, bellatrix, capella, deneb, electra, eip7594], ./mev/[bellatrix_mev, capella_mev, deneb_mev, electra_mev] export extras, block_id, phase0, altair, bellatrix, capella, deneb, electra, - eth2_merkleization, eth2_ssz_serialization, forks_light_client, + eip7594, eth2_merkleization, eth2_ssz_serialization, forks_light_client, presets, deneb_mev, electra_mev # This file contains helpers for dealing with forks - we have two ways we can diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 0b59948e6..e5795ec30 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -109,6 +109,10 @@ func getBlobSidecarTopic*(forkDigest: ForkDigest, func compute_subnet_for_blob_sidecar*(blob_index: BlobIndex): BlobId = BlobId(blob_index mod BLOB_SIDECAR_SUBNET_COUNT) +# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#compute_subnet_for_data_column_sidecar +func compute_subnet_for_data_column_sidecar*(column_index: ColumnIndex): uint64 = + uint64(column_index mod DATA_COLUMN_SIDECAR_SUBNET_COUNT) + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#light_client_finality_update func getLightClientFinalityUpdateTopic*(forkDigest: ForkDigest): string = ## For broadcasting or obtaining the latest `LightClientFinalityUpdate`.