diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 71c3ab6d4..c52f243a5 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -22,7 +22,7 @@ import ./el/el_manager, ./consensus_object_pools/[ blockchain_dag, blob_quarantine, block_quarantine, consensus_manager, - attestation_pool, sync_committee_msg_pool, validator_change_pool], + data_column_quarantine, attestation_pool, sync_committee_msg_pool, validator_change_pool], ./spec/datatypes/[base, altair], ./spec/eth2_apis/dynamic_fee_recipients, ./sync/[sync_manager, request_manager], @@ -71,6 +71,7 @@ type dag*: ChainDAGRef quarantine*: ref Quarantine blobQuarantine*: ref BlobQuarantine + dataColumnQuarantine*: ref DataColumnQuarantine attestationPool*: ref AttestationPool syncCommitteeMsgPool*: ref SyncCommitteeMsgPool lightClientPool*: ref LightClientPool diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index 22d309827..4b584979b 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -57,6 +57,8 @@ type ## all blobs for this block, we can proceed to resolving the ## block as well. A blobless block inserted into this table must ## have a resolved parent (i.e., it is not an orphan). + + columnless: OrderedTable[Eth2Digest, ForkedSignedBeaconBlock] unviable*: OrderedTable[Eth2Digest, tuple[]] ## Unviable blocks are those that come from a history that does not @@ -336,3 +338,16 @@ func popBlobless*( iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock = for k, v in quarantine.blobless.mpairs(): yield v + +func popColumnless*( + quarantine: var Quarantine, + root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = + var blck: ForkedSignedBeaconBlock + if quarantine.columnless.pop(root, blck): + Opt.some(blck) + else: + Opt.none(ForkedSignedBeaconBlock) + +iterator peekColumless*(quarantine: var Quarantine): ForkedSignedBeaconBlock = + for k,v in quarantine.columnless.mpairs(): + yield v \ No newline at end of file diff --git a/beacon_chain/consensus_object_pools/data_column_quarantine.nim b/beacon_chain/consensus_object_pools/data_column_quarantine.nim new file mode 100644 index 000000000..b84a9444b --- /dev/null +++ b/beacon_chain/consensus_object_pools/data_column_quarantine.nim @@ -0,0 +1,71 @@ +# beacon_chain +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + std/tables, + ../spec/[helpers, eip7594_helpers] + +from std/sequtils import mapIt +from std/strutils import join + +const + MaxDataColumns = 3 * SLOTS_PER_EPOCH * NUMBER_OF_COLUMNS + ## Same limit as `MaxOrphans` in `block_quarantine` + ## data columns may arrive before an orphan is tagged `columnless` + +type + DataColumnQuarantine* = object + data_columns*: + OrderedTable[(Eth2Digest, ColumnIndex), ref DataColumnSidecar] + onDataColumnSidecarCallback*: OnDataColumnSidecarCallback + + DataColumnFetchRecord* = object + block_root*: Eth2Digest + indices*: seq[ColumnIndex] + + OnDataColumnSidecarCallback = proc(data: DataColumnSidecar) {.gcsafe, raises: [].} + +func shortLog*(x: seq[ColumnIndex]): string = + "<" & x.mapIt($it).join(", ") & ">" + +func shortLog*(x: seq[DataColumnFetchRecord]): string = + "[" & x.mapIt(shortLog(it.block_root) & shortLog(it.indices)).join(", ") & "]" + +func put*(quarantine: var DataColumnQuarantine, dataColumnSidecar: ref DataColumnSidecar) = + if quarantine.data_columns.lenu64 >= MaxDataColumns: + # FIFO if full. For example, sync manager and request manager can race to + # put blobs in at the same time, so one gets blob insert -> block resolve + # -> blob insert sequence, which leaves garbage blobs. + # + # This also therefore automatically garbage-collects otherwise valid garbage + # blobs which are correctly signed, point to either correct block roots or a + # block root which isn't ever seen, and then are for any reason simply never + # used. + var oldest_column_key: (Eth2Digest, ColumnIndex) + for k in quarantine.data_columns.keys: + oldest_column_key = k + break + quarantine.data_columns.del(oldest_column_key) + let block_root = hash_tree_root(dataColumnSidecar.signed_block_header.message) + discard quarantine.data_columns.hasKeyOrPut( + (block_root, dataColumnSidecar.index), dataColumnSidecar) + +func hasDataColumn*( + quarantine: DataColumnQuarantine, + slot: Slot, + proposer_index: uint64, + index: ColumnIndex): bool = + for data_column_sidecar in quarantine.data_columns.values: + template block_header: untyped = data_column_sidecar.signed_block_header.message + if block_header.slot == slot and + block_header.proposer_index == proposer_index and + data_column_sidecar.index == index: + return true + false + diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index d1aae1933..fe6a9b035 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -15,8 +15,8 @@ import ../spec/datatypes/[altair, phase0, deneb, eip7594], ../consensus_object_pools/[ blob_quarantine, block_clearance, block_quarantine, blockchain_dag, - attestation_pool, light_client_pool, sync_committee_msg_pool, - validator_change_pool], + data_column_quarantine, attestation_pool, light_client_pool, + sync_committee_msg_pool, validator_change_pool], ../validators/validator_pool, ../beacon_clock, "."/[gossip_validation, block_processor, batch_validation], @@ -156,6 +156,8 @@ type blobQuarantine*: ref BlobQuarantine + dataColumnQuarantine*: ref DataColumnQuarantine + # Application-provided current time provider (to facilitate testing) getCurrentBeaconTime*: GetBeaconTimeFn @@ -345,7 +347,7 @@ proc processDataColumnSidecar*( debug "Data column received", delay let v = - self.dag.validateDataColumnSidecar(self.quarantine, self.blobQuarantine, + self.dag.validateDataColumnSidecar(self.quarantine, self.dataColumnQuarantine, dataColumnSidecar, wallTime, subnet_id) if v.isErr(): diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 958380e04..49a807ad7 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -16,7 +16,7 @@ import beaconstate, state_transition_block, forks, helpers, network, signatures, eip7594_helpers], ../consensus_object_pools/[ attestation_pool, blockchain_dag, blob_quarantine, block_quarantine, - spec_cache, light_client_pool, sync_committee_msg_pool, + data_column_quarantine, spec_cache, light_client_pool, sync_committee_msg_pool, validator_change_pool], ".."/[beacon_clock], ./batch_validation @@ -490,7 +490,7 @@ proc validateBlobSidecar*( # https://github.com/ethereum/consensus-specs/blob/5f48840f4d768bf0e0a8156a3ed06ec333589007/specs/_features/eip7594/p2p-interface.md#the-gossip-domain-gossipsub proc validateDataColumnSidecar*( dag: ChainDAGRef, quarantine: ref Quarantine, - blobQuarantine: ref BlobQuarantine, data_column_sidecar: DataColumnSidecar, + dataColumnQuarantine: ref DataColumnQuarantine, data_column_sidecar: DataColumnSidecar, wallTime: BeaconTime, subnet_id: uint64): Result[void, ValidationError] = template block_header: untyped = data_column_sidecar.signed_block_header.message @@ -538,7 +538,7 @@ proc validateDataColumnSidecar*( let block_root = hash_tree_root(block_header) if dag.getBlockRef(block_root).isSome(): return errIgnore("DataColumnSidecar: already have block") - if blobQuarantine[].hasBlob( + if dataColumnQuarantine[].hasDataColumn( block_header.slot, block_header.proposer_index, data_column_sidecar.index): return errIgnore("DataColumnSidecar: already have valid data column from same proposer") diff --git a/beacon_chain/spec/datatypes/eip7594.nim b/beacon_chain/spec/datatypes/eip7594.nim index 660d8f900..9b2e7adee 100644 --- a/beacon_chain/spec/datatypes/eip7594.nim +++ b/beacon_chain/spec/datatypes/eip7594.nim @@ -9,6 +9,9 @@ import "."/[base, deneb], kzg4844 +from std/sequtils import mapIt +from std/strutils import join + export base const @@ -51,8 +54,8 @@ type DataColumnSidecar* = object index*: ColumnIndex # Index of column in extended matrix column*: DataColumn - kzg_commitments*: List[KzgCommitment, Limit(MAX_BLOB_COMMITMENTS_PER_BLOCK)] - kzg_proofs*: List[KzgProof, Limit(MAX_BLOB_COMMITMENTS_PER_BLOCK)] + kzg_commitments*: KzgCommitments + kzg_proofs*: KzgProofs signed_block_header*: SignedBeaconBlockHeader kzg_commitments_inclusion_proof*: array[KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH, Eth2Digest] @@ -74,4 +77,7 @@ func shortLog*(v: DataColumnSidecar): auto = kzg_commitments: v.kzg_commitments.len, kzg_proofs: v.kzg_proofs.len, block_header: shortLog(v.signed_block_header.message), - ) \ No newline at end of file + ) + +func shortLog*(x: seq[DataColumnIdentifier]): string = + "[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]" \ No newline at end of file diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 79767e553..c5851eade 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -11,10 +11,11 @@ import std/[sequtils, strutils] import chronos, chronicles import ../spec/datatypes/[phase0, deneb], - ../spec/[forks, network], + ../spec/[forks, network, eip7594_helpers], ../networking/eth2_network, ../consensus_object_pools/block_quarantine, ../consensus_object_pools/blob_quarantine, + ../consensus_object_pools/data_column_quarantine, "."/sync_protocol, "."/sync_manager, ../gossip_processing/block_processor @@ -49,6 +50,10 @@ type BlobLoaderFn* = proc( blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].} + DataColumnLoaderFn* = proc( + columnId: DataColumnIdentifier + ): Opt[ref DataColumnSidecar] {.gcsafe, raises: [].} + InhibitFn* = proc: bool {.gcsafe, raises: [].} RequestManager* = object @@ -57,6 +62,7 @@ type inhibit: InhibitFn quarantine: ref Quarantine blobQuarantine: ref BlobQuarantine + dataColumnQuarantine: ref DataColumnQuarantine blockVerifier: BlockVerifierFn blockLoader: BlockLoaderFn blobLoader: BlobLoaderFn @@ -119,6 +125,23 @@ proc checkResponse(idList: seq[BlobIdentifier], return false true +proc checkResponse(colIdList: seq[DataColumnIdentifier], + columns: openArray[ref DataColumnSidecar]): bool = + if len(columns) > len(colIdList): + return false + for column in columns: + let block_root = hash_tree_root(column.signed_block_header.message) + var found = false + for id in colIdList: + if id.block_root == block_root and id.index == column.index: + found = true + break + if not found: + return false + column[].verify_data_column_sidecar_inclusion_proof().isOkOr: + return false + true + proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} = var peer: Peer try: @@ -232,6 +255,46 @@ proc fetchBlobsFromNetwork(self: RequestManager, if not(isNil(peer)): self.network.peerPool.release(peer) +proc fetchDataColumnsFromNetwork(rman: RequestManager, + colIdList: seq[DataColumnIdentifier]) + {.async: (raises: [CancelledError]).} = + var peer: Peer + + try: + peer = await rman.network.peerPool.acquire() + debug "Requesting data columns by root", peer = peer, columns = shortLog(colIdList), + peer_score = peer.getScore() + + let columns = await dataColumnSidecarsByRoot(peer, DataColumnIdentifierList colIdList) + + if columns.isOk: + let ucolumns = columns.get() + if not checkResponse(colIdList, ucolumns.asSeq()): + debug "Mismatched response to data columns by root", + peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns) + peer.updateScore(PeerScoreBadResponse) + return + + for col in ucolumns: + rman.dataColumnQuarantine[].put(col) + var curRoot: Eth2Digest + for col in ucolumns: + let block_root = hash_tree_root(col.signed_block_header.message) + if block_root != curRoot: + curRoot = block_root + if (let o = rman.quarantine[].popColumnless(curRoot); o.isSome): + let col = o.unsafeGet() + discard await rman.blockVerifier(col, false) + + else: + debug "Data Columns by root request failed", + peer = peer, columns = shortLog(colIdList), err = columns.error() + peer.updateScore(PeerScoreNoValues) + + finally: + if not(isNil(peer)): + rman.network.peerPool.release(peer) + proc requestManagerBlockLoop( rman: RequestManager) {.async: (raises: [CancelledError]).} = while true: