add gossip validation for dc, and data column quarantine strategy (#6581)

* add gossip validation for dc

* review 1

* rm callback

* review 2

* added custody columns as a global entity

* alpha 8

* few typosA
This commit is contained in:
Agnish Ghosh 2024-10-22 10:49:34 +05:30 committed by GitHub
parent 3cb7b9140a
commit 250a80eb0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 382 additions and 7 deletions

View File

@ -22,7 +22,7 @@ import
./el/el_manager, ./el/el_manager,
./consensus_object_pools/[ ./consensus_object_pools/[
blockchain_dag, blob_quarantine, block_quarantine, consensus_manager, blockchain_dag, blob_quarantine, block_quarantine, consensus_manager,
attestation_pool, sync_committee_msg_pool, validator_change_pool], data_column_quarantine, attestation_pool, sync_committee_msg_pool, validator_change_pool],
./spec/datatypes/[base, altair], ./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients, ./spec/eth2_apis/dynamic_fee_recipients,
./sync/[sync_manager, request_manager], ./sync/[sync_manager, request_manager],
@ -73,6 +73,7 @@ type
dag*: ChainDAGRef dag*: ChainDAGRef
quarantine*: ref Quarantine quarantine*: ref Quarantine
blobQuarantine*: ref BlobQuarantine blobQuarantine*: ref BlobQuarantine
dataColumnQuarantine*: ref DataColumnQuarantine
attestationPool*: ref AttestationPool attestationPool*: ref AttestationPool
syncCommitteeMsgPool*: ref SyncCommitteeMsgPool syncCommitteeMsgPool*: ref SyncCommitteeMsgPool
lightClientPool*: ref LightClientPool lightClientPool*: ref LightClientPool

View File

@ -0,0 +1,190 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/tables,
../spec/datatypes/eip7594,
../spec/helpers
from std/sequtils import mapIt
from std/strutils import join
const
MaxDataColumns = 3 * SLOTS_PER_EPOCH * NUMBER_OF_COLUMNS
## Same limit as `MaxOrphans` in `block_quarantine`
## data columns may arrive before an orphan is tagged `columnless`
type
DataColumnQuarantine* = object
data_columns*:
OrderedTable[DataColumnIdentifier, ref DataColumnSidecar]
supernode*: bool
custody_columns*: seq[ColumnIndex]
onDataColumnSidecarCallback*: OnDataColumnSidecarCallback
DataColumnFetchRecord* = object
block_root*: Eth2Digest
indices*: seq[ColumnIndex]
OnDataColumnSidecarCallback = proc(data: DataColumnSidecar) {.gcsafe, raises: [].}
func init*(T: type DataColumnQuarantine): T =
T()
func shortLog*(x: seq[DataColumnFetchRecord]): string =
"[" & x.mapIt(shortLog(it.block_root) & shortLog(it.indices)).join(", ") & "]"
func put*(quarantine: var DataColumnQuarantine,
dataColumnSidecar: ref DataColumnSidecar) =
if quarantine.data_columns.len >= static(MaxDataColumns.int):
# FIFO if full. For example, sync manager and request manager can race
# to put data columns in at the same time, so one gets data column
# insert -> block resolve -> data column insert, which leaves
# garbage data columns.
#
# This also therefore automatically garbage-collects otherwise valid
# data columns that are correctly signed, point to either correct block
# root which isn't ever seen, and then for any reason simply never used.
var oldest_column_key: DataColumnIdentifier
for k in quarantine.data_columns.keys:
oldest_column_key = k
break
quarantine.data_columns.del(oldest_column_key)
let block_root =
hash_tree_root(dataColumnSidecar.signed_block_header.message)
discard quarantine.data_columns.hasKeyOrPut(
DataColumnIdentifier(block_root: block_root,
index: dataColumnSidecar.index),
dataColumnSidecar)
func hasDataColumn*(
quarantine: DataColumnQuarantine,
slot: Slot,
proposer_index: uint64,
index: ColumnIndex): bool =
for data_column_sidecar in quarantine.data_columns.values:
template block_header: untyped =
data_column_sidecar.signed_block_header.message
if block_header.slot == slot and
block_header.proposer_index == proposer_index and
data_column_sidecar.index == index:
return true
false
func peekColumnIndices*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock):
seq[ColumnIndex] =
# Peeks into the currently received column indices
# from quarantine, necessary data availability checks
var indices: seq[ColumnIndex]
for col_idx in quarantine.custody_columns:
if quarantine.data_columns.hasKey(
DataColumnIdentifier(block_root: blck.root,
index: ColumnIndex col_idx)):
indices.add(col_idx)
indices
func gatherDataColumns*(quarantine: DataColumnQuarantine,
digest: Eth2Digest):
seq[ref DataColumnSidecar] =
# Returns the current data columns quried by a
# block header
var columns: seq[ref DataColumnSidecar]
for i in quarantine.custody_columns:
let dc_identifier =
DataColumnIdentifier(
block_root: digest,
index: i)
if quarantine.data_columns.hasKey(dc_identifier):
let value =
quarantine.data_columns.getOrDefault(dc_identifier,
default(ref DataColumnSidecar))
columns.add(value)
columns
func popDataColumns*(
quarantine: var DataColumnQuarantine, digest: Eth2Digest,
blck: electra.SignedBeaconBlock):
seq[ref DataColumnSidecar] =
var r: DataColumnSidecars
for idx in quarantine.custody_columns:
var c: ref DataColumnSidecar
if quarantine.data_columns.pop(
DataColumnIdentifier(block_root: digest,
index: idx),
c):
r.add(c)
r
func hasMissingDataColumns*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock): bool =
# `hasMissingDataColumns` consists of the data columns that,
# have been missed over gossip, also in case of a supernode,
# the method would return missing columns when the supernode
# has not received data columns upto the requisite limit (i.e 50%
# of NUMBER_OF_COLUMNS).
# This method shall be actively used by the `RequestManager` to
# root request columns over RPC.
var col_counter = 0
for idx in quarantine.custody_columns:
let dc_identifier =
DataColumnIdentifier(
block_root: blck.root,
index: idx)
if dc_identifier notin quarantine.data_columns:
inc col_counter
if quarantine.supernode and col_counter != NUMBER_OF_COLUMNS:
return false
elif quarantine.supernode == false and
col_counter != max(SAMPLES_PER_SLOT, CUSTODY_REQUIREMENT):
return false
else:
return true
func hasEnoughDataColumns*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock): bool =
# `hasEnoughDataColumns` dictates whether there is `enough`
# data columns for a block to be enqueued, ideally for a supernode
# if it receives atleast 50%+ gossip and RPC
# Once 50%+ columns are available we can use this function to
# check it, and thereby check column reconstructability, right from
# gossip validation, consequently populating the quarantine with
# rest of the data columns.
if quarantine.supernode:
let
collectedColumns = quarantine.gatherDataColumns(blck.root)
if collectedColumns.len >= (quarantine.custody_columns.len div 2):
return true
else:
for i in quarantine.custody_columns:
let dc_identifier =
DataColumnIdentifier(
block_root: blck.root,
index: i)
if dc_identifier notin quarantine.data_columns:
return false
else:
return true
func dataColumnFetchRecord*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock):
DataColumnFetchRecord =
var indices: seq[ColumnIndex]
for i in quarantine.custody_columns:
let
idx = ColumnIndex(i)
dc_id = DataColumnIdentifier(
block_root: blck.root,
index: idx)
if not quarantine.data_columns.hasKey(
dc_id):
indices.add(idx)
DataColumnFetchRecord(block_root: blck.root, indices: indices)

View File

@ -15,10 +15,11 @@ import
stew/byteutils, stew/byteutils,
# Internals # Internals
../spec/[ ../spec/[
beaconstate, state_transition_block, forks, helpers, network, signatures], beaconstate, state_transition_block, forks,
helpers, network, signatures, eip7594_helpers],
../consensus_object_pools/[ ../consensus_object_pools/[
attestation_pool, blockchain_dag, blob_quarantine, block_quarantine, attestation_pool, blockchain_dag, blob_quarantine, block_quarantine,
spec_cache, light_client_pool, sync_committee_msg_pool, data_column_quarantine, spec_cache, light_client_pool, sync_committee_msg_pool,
validator_change_pool], validator_change_pool],
".."/[beacon_clock], ".."/[beacon_clock],
./batch_validation ./batch_validation
@ -209,6 +210,22 @@ func check_blob_sidecar_inclusion_proof(
ok() ok()
func check_data_column_sidecar_inclusion_proof(
data_column_sidecar: DataColumnSidecar): Result[void, ValidationError] =
let res = data_column_sidecar.verify_data_column_sidecar_inclusion_proof()
if res.isErr:
return errReject(res.error)
ok()
proc check_data_column_sidecar_kzg_proofs(
data_column_sidecar: DataColumnSidecar): Result[void, ValidationError] =
let res = data_column_sidecar.verify_data_column_sidecar_kzg_proofs()
if res.isErr:
return errReject(res.error)
ok()
# Gossip Validation # Gossip Validation
# ---------------------------------------------------------------- # ----------------------------------------------------------------
@ -475,6 +492,134 @@ proc validateBlobSidecar*(
ok() ok()
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#data_column_sidecar_subnet_id
proc validateDataColumnSidecar*(
dag: ChainDAGRef, quarantine: ref Quarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
data_column_sidecar: DataColumnSidecar,
wallTime: BeaconTime, subnet_id: uint64):
Result[void, ValidationError] =
template block_header: untyped = data_column_sidecar.signed_block_header.message
# [REJECT] The sidecar's index is consistent with `NUMBER_OF_COLUMNS`
# -- i.e. `data_column_sidecar.index < NUMBER_OF_COLUMNS`
if not (data_column_sidecar.index < NUMBER_OF_COLUMNS):
return dag.checkedReject("DataColumnSidecar: The sidecar's index should be consistent with NUMBER_OF_COLUMNS")
# [REJECT] The sidecar is for the correct subnet
# -- i.e. `compute_subnet_for_data_column_sidecar(blob_sidecar.index) == subnet_id`.
if not (compute_subnet_for_data_column_sidecar(data_column_sidecar.index) == subnet_id):
return dag.checkedReject("DataColumnSidecar: The sidecar is not for the correct subnet")
# [IGNORE] The sidecar is not from a future slot
# (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that
# `block_header.slot <= current_slot`(a client MAY queue future sidecars for
# processing at the appropriate slot).
if not (block_header.slot <=
(wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero):
return errIgnore("DataColumnSidecar: slot too high")
# [IGNORE] The sidecar is from a slot greater than the latest
# finalized slot -- i.e. validate that `block_header.slot >
# compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)`
if not (block_header.slot > dag.finalizedHead.slot):
return errIgnore("DataColumnSidecar: slot already finalized")
# [IGNORE] The sidecar is the first sidecar for the tuple
# (block_header.slot, block_header.proposer_index, data_column_sidecar.index)
# with valid header signature, sidecar inclusion proof, and kzg proof.
let block_root = hash_tree_root(block_header)
if dag.getBlockRef(block_root).isSome():
return errIgnore("DataColumnSidecar: already have block")
if dataColumnQuarantine[].hasDataColumn(
block_header.slot, block_header.proposer_index, data_column_sidecar.index):
return errIgnore("DataColumnSidecar: already have valid data column from same proposer")
# [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by
# `verify_data_column_sidecar_inclusion_proof(sidecar)`.
block:
let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar)
if v.isErr:
return dag.checkedReject(v.error)
# [IGNORE] The sidecar's block's parent (defined by
# `block_header.parent_root`) has been seen (via both gossip and
# non-gossip sources) (a client MAY queue sidecars for processing
# once the parent block is retrieved).
#
# [REJECT] The sidecar's block's parent (defined by
# `block_header.parent_root`) passes validation.
let parent = dag.getBlockRef(block_header.parent_root).valueOr:
if block_header.parent_root in quarantine[].unviable:
quarantine[].addUnviable(block_root)
return dag.checkedReject("DataColumnSidecar: parent not validated")
else:
quarantine[].addMissing(block_header.parent_root)
return errIgnore("DataColumnSidecar: parent not found")
# [REJECT] The sidecar is from a higher slot than the sidecar's
# block's parent (defined by `block_header.parent_root`).
if not (block_header.slot > parent.bid.slot):
return dag.checkedReject("DataColumnSidecar: slot lower than parents'")
# [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's
# block -- i.e. `get_checkpoint_block(store, block_header.parent_root,
# store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`.
let
finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint)
ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot)
if ancestor.isNil:
# This shouldn't happen: we should always be able to trace the parent back
# to the finalized checkpoint (else it wouldn't be in the DAG)
return errIgnore("DataColumnSidecar: Can't find ancestor")
if not (
finalized_checkpoint.root == ancestor.root or
finalized_checkpoint.root.isZero):
quarantine[].addUnviable(block_root)
return dag.checkedReject(
"DataColumnSidecar: Finalized checkpoint not an ancestor")
# [REJECT] The sidecar is proposed by the expected `proposer_index`
# for the block's slot in the context of the current shuffling
# (defined by `block_header.parent_root`/`block_header.slot`).
# If the proposer_index cannot immediately be verified against the expected
# shuffling, the sidecar MAY be queued for later processing while proposers
# for the block's branch are calculated -- in such a case do not
# REJECT, instead IGNORE this message.
let proposer = getProposer(dag, parent, block_header.slot).valueOr:
warn "cannot compute proposer for data column"
return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue
if uint64(proposer) != block_header.proposer_index:
return dag.checkedReject("DataColumnSidecar: Unexpected proposer")
# [REJECT] The proposer signature of `data_column_sidecar.signed_block_header`,
# is valid with respect to the `block_header.proposer_index` pubkey.
if not verify_block_signature(
dag.forkAtEpoch(block_header.slot.epoch),
getStateField(dag.headState, genesis_validators_root),
block_header.slot,
block_root,
dag.validatorKey(proposer).get(),
data_column_sidecar.signed_block_header.signature):
return dag.checkedReject("DataColumnSidecar: Invalid proposer signature")
# [REJECT] The sidecar's column data is valid as
# verified by `verify_data_column_kzg_proofs(sidecar)`
block:
let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar)
if r.isErr:
return dag.checkedReject(r.error)
# Send notification about new data column sidecar via callback
if not(isNil(dataColumnQuarantine.onDataColumnSidecarCallback)):
dataColumnQuarantine.onDataColumnSidecarCallback(data_column_sidecar)
ok()
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/bellatrix/p2p-interface.md#beacon_block # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/bellatrix/p2p-interface.md#beacon_block
proc validateBeaconBlock*( proc validateBeaconBlock*(

View File

@ -13,12 +13,14 @@ import
metrics, metrics/chronos_httpserver, metrics, metrics/chronos_httpserver,
stew/[byteutils, io2], stew/[byteutils, io2],
eth/p2p/discoveryv5/[enr, random2], eth/p2p/discoveryv5/[enr, random2],
./consensus_object_pools/blob_quarantine, ./consensus_object_pools/[blob_quarantine, data_column_quarantine],
./consensus_object_pools/vanity_logs/vanity_logs, ./consensus_object_pools/vanity_logs/vanity_logs,
./networking/[topic_params, network_metadata_downloads], ./networking/[topic_params, network_metadata_downloads],
./rpc/[rest_api, state_ttl_cache], ./rpc/[rest_api, state_ttl_cache],
./spec/datatypes/[altair, bellatrix, phase0], ./spec/datatypes/[altair, bellatrix, phase0],
./spec/[deposit_snapshots, engine_authentication, weak_subjectivity], ./spec/[
deposit_snapshots, engine_authentication, weak_subjectivity,
eip7594_helpers],
./sync/[sync_protocol, light_client_protocol], ./sync/[sync_protocol, light_client_protocol],
./validators/[keystore_management, beacon_validators], ./validators/[keystore_management, beacon_validators],
"."/[ "."/[
@ -400,6 +402,13 @@ proc initFullNode(
onProposerSlashingAdded, onPhase0AttesterSlashingAdded, onProposerSlashingAdded, onPhase0AttesterSlashingAdded,
onElectraAttesterSlashingAdded)) onElectraAttesterSlashingAdded))
blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded)) blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded))
dataColumnQuarantine = newClone(DataColumnQuarantine.init())
supernode = node.config.subscribeAllSubnets
localCustodySubnets =
if supernode:
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
else:
CUSTODY_REQUIREMENT.uint64
consensusManager = ConsensusManager.new( consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.elManager, dag, attestationPool, quarantine, node.elManager,
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets), ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
@ -487,6 +496,29 @@ proc initFullNode(
quarantine, blobQuarantine, rmanBlockVerifier, quarantine, blobQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader) rmanBlockLoader, rmanBlobLoader)
# As per EIP 7594, the BN is now categorised into a
# `Fullnode` and a `Supernode`, the fullnodes custodies a
# given set of data columns, and hence ONLY subcribes to those
# data column subnet topics, however, the supernodes subscribe
# to all of the topics. This in turn keeps our `data column quarantine`
# really variable. Whenever the BN is a supernode, column quarantine
# essentially means all the NUMBER_OF_COLUMNS, as per mentioned in the
# spec. However, in terms of fullnode, quarantine is really dependent
# on the randomly assigned columns, by `get_custody_columns`.
# Hence, in order to keep column quarantine accurate and error proof
# the custody columns are computed once as the BN boots. Then the values
# are used globally around the codebase.
# `get_custody_columns` is not a very expensive function, but there
# are multiple instances of computing custody columns, especially
# during peer selection, sync with columns, and so on. That is why,
# the rationale of populating it at boot and using it gloabally.
dataColumnQuarantine[].supernode = supernode
dataColumnQuarantine[].custody_columns =
node.network.nodeId.get_custody_columns(max(SAMPLES_PER_SLOT.uint64,
localCustodySubnets))
if node.config.lightClientDataServe: if node.config.lightClientDataServe:
proc scheduleSendingLightClientUpdates(slot: Slot) = proc scheduleSendingLightClientUpdates(slot: Slot) =
if node.lightClientPool[].broadcastGossipFut != nil: if node.lightClientPool[].broadcastGossipFut != nil:

View File

@ -109,3 +109,6 @@ func shortLog*(v: seq[DataColumnSidecar]): auto =
func shortLog*(x: seq[DataColumnIdentifier]): string = func shortLog*(x: seq[DataColumnIdentifier]): string =
"[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]" "[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]"
func shortLog*(x: seq[ColumnIndex]): string =
"<" & x.mapIt($it).join(", ") & ">"

View File

@ -16,12 +16,12 @@ import
"."/[ "."/[
block_id, eth2_merkleization, eth2_ssz_serialization, block_id, eth2_merkleization, eth2_ssz_serialization,
forks_light_client, presets], forks_light_client, presets],
./datatypes/[phase0, altair, bellatrix, capella, deneb, electra], ./datatypes/[phase0, altair, bellatrix, capella, deneb, electra, eip7594],
./mev/[bellatrix_mev, capella_mev, deneb_mev, electra_mev] ./mev/[bellatrix_mev, capella_mev, deneb_mev, electra_mev]
export export
extras, block_id, phase0, altair, bellatrix, capella, deneb, electra, extras, block_id, phase0, altair, bellatrix, capella, deneb, electra,
eth2_merkleization, eth2_ssz_serialization, forks_light_client, eip7594, eth2_merkleization, eth2_ssz_serialization, forks_light_client,
presets, deneb_mev, electra_mev presets, deneb_mev, electra_mev
# This file contains helpers for dealing with forks - we have two ways we can # This file contains helpers for dealing with forks - we have two ways we can

View File

@ -109,6 +109,10 @@ func getBlobSidecarTopic*(forkDigest: ForkDigest,
func compute_subnet_for_blob_sidecar*(blob_index: BlobIndex): BlobId = func compute_subnet_for_blob_sidecar*(blob_index: BlobIndex): BlobId =
BlobId(blob_index mod BLOB_SIDECAR_SUBNET_COUNT) BlobId(blob_index mod BLOB_SIDECAR_SUBNET_COUNT)
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#compute_subnet_for_data_column_sidecar
func compute_subnet_for_data_column_sidecar*(column_index: ColumnIndex): uint64 =
uint64(column_index mod DATA_COLUMN_SIDECAR_SUBNET_COUNT)
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#light_client_finality_update # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#light_client_finality_update
func getLightClientFinalityUpdateTopic*(forkDigest: ForkDigest): string = func getLightClientFinalityUpdateTopic*(forkDigest: ForkDigest): string =
## For broadcasting or obtaining the latest `LightClientFinalityUpdate`. ## For broadcasting or obtaining the latest `LightClientFinalityUpdate`.