From 5bf1e021a7c54974c23a17c7711d70a9bd18c10c Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Fri, 28 Jun 2024 14:53:08 +0530 Subject: [PATCH] initiate data column quarantine --- .../data_column_quarantine.nim | 3 +- .../gossip_processing/block_processor.nim | 2 + .../gossip_processing/eth2_processor.nim | 21 ++++- .../gossip_processing/gossip_validation.nim | 94 +++++++++---------- beacon_chain/nimbus_beacon_node.nim | 48 ++++++++-- beacon_chain/sync/request_manager.nim | 8 +- 6 files changed, 113 insertions(+), 63 deletions(-) diff --git a/beacon_chain/consensus_object_pools/data_column_quarantine.nim b/beacon_chain/consensus_object_pools/data_column_quarantine.nim index 5cc9f3a9d..32051f514 100644 --- a/beacon_chain/consensus_object_pools/data_column_quarantine.nim +++ b/beacon_chain/consensus_object_pools/data_column_quarantine.nim @@ -98,6 +98,5 @@ func dataColumnFetchRecord*(quarantine: DataColumnQuarantine, DataColumnFetchRecord(block_root: blck.root, indices: indices) func init*( - T: type DataColumnQuarantine, onDataColumnSidecarCallback: OnDataColumnSidecarCallback): T = - T(onDataColumnSidecarCallback: onDataColumnSidecarCallback) + T: type DataColumnQuarantine): T = T() diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index cf6b0bf6e..08028b13e 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -134,6 +134,7 @@ proc new*(T: type BlockProcessor, consensusManager: ref ConsensusManager, validatorMonitor: ref ValidatorMonitor, blobQuarantine: ref BlobQuarantine, + dataColumnQuarantine: ref DataColumnQuarantine, getBeaconTime: GetBeaconTimeFn): ref BlockProcessor = (ref BlockProcessor)( dumpEnabled: dumpEnabled, @@ -143,6 +144,7 @@ proc new*(T: type BlockProcessor, consensusManager: consensusManager, validatorMonitor: validatorMonitor, blobQuarantine: blobQuarantine, + dataColumnQuarantine: dataCOlumnQuarantine, getBeaconTime: getBeaconTime, verifier: BatchVerifier.init(rng, taskpool) ) diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 355a87701..3b37e208d 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -181,6 +181,7 @@ proc new*(T: type Eth2Processor, lightClientPool: ref LightClientPool, quarantine: ref Quarantine, blobQuarantine: ref BlobQuarantine, + dataColumnQuarantine: ref DataColumnQuarantine, rng: ref HmacDrbgContext, getBeaconTime: GetBeaconTimeFn, taskpool: TaskPoolPtr @@ -199,6 +200,7 @@ proc new*(T: type Eth2Processor, lightClientPool: lightClientPool, quarantine: quarantine, blobQuarantine: blobQuarantine, + dataColumnQuarantine: dataColumnQuarantine, getCurrentBeaconTime: getBeaconTime, batchCrypto: BatchCrypto.new( rng = rng, @@ -376,9 +378,24 @@ proc processDataColumnSidecar*( data_column_sidecars_dropped.inc(1, [$v.error[0]]) return v - debug "Data column validated" + debug "Data column validated, putting data column in quarantine" + self.dataColumnQuarantine[].put(newClone(dataColumnSidecar)) - # TODO do something with it! + let block_root = hash_tree_root(block_header) + if (let o = self.quarantine[].popColumnless(block_root); o.isSome): + let columnless = o.unsafeGet() + withBlck(columnless): + when consensusFork >= ConsensusFork.Deneb: + if self.dataColumnQuarantine[].hasDataColumns(forkyBlck): + self.blockProcessor[].enqueueBlock( + MsgSource.gossip, columnless, + Opt.none(BlobSidecars), + Opt.some(self.dataColumnQuarantine[].popDataColumns(block_root, forkyBlck))) + else: + discard self.quarantine[].addColumnless( + self.dag.finalizedHead.slot, forkyBlck) + else: + raiseAssert "Could not have been added as columnless" data_column_sidecars_received.inc() data_column_sidecar_delay.observe(delay.toFloatSeconds()) diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 49a807ad7..a0cbb519a 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -518,20 +518,6 @@ proc validateDataColumnSidecar*( if not (block_header.slot > dag.finalizedHead.slot): return errIgnore("DataColumnSidecar: slot already finalized") - # [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by - # `verify_data_column_sidecar_inclusion_proof(sidecar)`. - block: - let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar) - if v.isErr: - return dag.checkedReject(v.error) - - # # [REJECT] The sidecar's column data is valid as - # # verified by `verify_data_column_kzg_proofs(sidecar)` - block: - let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar) - if r.isErr: - return dag.checkedReject(r.error) - # [IGNORE] The sidecar is the first sidecar for the tuple # (block_header.slot, block_header.proposer_index, blob_sidecar.index) # with valid header signature, sidecar inclusion proof, and kzg proof. @@ -549,13 +535,13 @@ proc validateDataColumnSidecar*( # # [REJECT] The sidecar's block's parent (defined by # `block_header.parent_root`) passes validation. - # let parent = dag.getBlockRef(block_header.parent_root).valueOr: - # if block_header.parent_root in quarantine[].unviable: - # quarantine[].addUnviable(block_root) - # return dag.checkedReject("DataColumnSidecar: parent not validated") - # else: - # quarantine[].addMissing(block_header.parent_root) - # return errIgnore("DataColumnSidecar: parent not found") + let parent = dag.getBlockRef(block_header.parent_root).valueOr: + if block_header.parent_root in quarantine[].unviable: + quarantine[].addUnviable(block_root) + return dag.checkedReject("DataColumnSidecar: parent not validated") + else: + quarantine[].addMissing(block_header.parent_root) + return errIgnore("DataColumnSidecar: parent not found") # [REJECT] The sidecar is proposed by the expected `proposer_index` # for the block's slot in the context of the current shuffling @@ -564,23 +550,37 @@ proc validateDataColumnSidecar*( # shuffling, the sidecar MAY be queued for later processing while proposers # for the block's branch are calculated -- in such a case do not # REJECT, instead IGNORE this message. - # let proposer = getProposer(dag, parent, block_header.slot).valueOr: - # warn "cannot compute proposer for blob" - # return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue + let proposer = getProposer(dag, parent, block_header.slot).valueOr: + warn "cannot compute proposer for blob" + return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue - # if uint64(proposer) != block_header.proposer_index: - # return dag.checkedReject("DataColumnSidecar: Unexpected proposer") + if uint64(proposer) != block_header.proposer_index: + return dag.checkedReject("DataColumnSidecar: Unexpected proposer") # [REJECT] The proposer signature of `blob_sidecar.signed_block_header`, # is valid with respect to the `block_header.proposer_index` pubkey. - # if not verify_block_signature( - # dag.forkAtEpoch(block_header.slot.epoch), - # getStateField(dag.headState, genesis_validators_root), - # block_header.slot, - # block_root, - # dag.validatorKey(proposer).get(), - # data_column_sidecar.signed_block_header.signature): - # return dag.checkedReject("DataColumnSidecar: Invalid proposer signature") + if not verify_block_signature( + dag.forkAtEpoch(block_header.slot.epoch), + getStateField(dag.headState, genesis_validators_root), + block_header.slot, + block_root, + dag.validatorKey(proposer).get(), + data_column_sidecar.signed_block_header.signature): + return dag.checkedReject("DataColumnSidecar: Invalid proposer signature") + + # [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by + # `verify_data_column_sidecar_inclusion_proof(sidecar)`. + block: + let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar) + if v.isErr: + return dag.checkedReject(v.error) + + # # [REJECT] The sidecar's column data is valid as + # # verified by `verify_data_column_kzg_proofs(sidecar)` + block: + let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar) + if r.isErr: + return dag.checkedReject(r.error) # # [REJECT] The sidecar is from a higher slot than the sidecar's # # block's parent (defined by `block_header.parent_root`). @@ -590,21 +590,21 @@ proc validateDataColumnSidecar*( # [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's # block -- i.e. `get_checkpoint_block(store, block_header.parent_root, # store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`. - # let - # finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint) - # ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot) + let + finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint) + ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot) - # if ancestor.isNil: - # # This shouldn't happen: we should always be able to trace the parent back - # # to the finalized checkpoint (else it wouldn't be in the DAG) - # return errIgnore("DataColumnSidecar: Can't find ancestor") + if ancestor.isNil: + # This shouldn't happen: we should always be able to trace the parent back + # to the finalized checkpoint (else it wouldn't be in the DAG) + return errIgnore("DataColumnSidecar: Can't find ancestor") - # if not ( - # finalized_checkpoint.root == ancestor.root or - # finalized_checkpoint.root.isZero): - # quarantine[].addUnviable(block_root) - # return dag.checkedReject( - # "DataColumnSidecar: Finalized checkpoint not an ancestor") + if not ( + finalized_checkpoint.root == ancestor.root or + finalized_checkpoint.root.isZero): + quarantine[].addUnviable(block_root) + return dag.checkedReject( + "DataColumnSidecar: Finalized checkpoint not an ancestor") ok() diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 07c480c74..8d90d1482 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -383,6 +383,7 @@ proc initFullNode( dag, attestationPool, onVoluntaryExitAdded, onBLSToExecutionChangeAdded, onProposerSlashingAdded, onAttesterSlashingAdded)) blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded)) + dataColumnQuarantine = newClone(DataColumnQuarantine.init()) consensusManager = ConsensusManager.new( dag, attestationPool, quarantine, node.elManager, ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets), @@ -391,7 +392,7 @@ proc initFullNode( blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, rng, taskpool, consensusManager, node.validatorMonitor, - blobQuarantine, getBeaconTime) + blobQuarantine, dataColumnQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars], maybeFinalized: bool): @@ -406,18 +407,36 @@ proc initFullNode( maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): + # when consensusFork >= ConsensusFork.Deneb: + # if not blobQuarantine[].hasBlobs(forkyBlck): + # # We don't have all the blobs for this block, so we have + # # to put it in blobless quarantine. + # if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): + # err(VerifierError.UnviableFork) + # else: + # err(VerifierError.MissingParent) + # else: + # let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) + # await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + # Opt.some(blobs), Opt.none(DataColumnSidecars), + # maybeFinalized = maybeFinalized) + # else: + # await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + # Opt.none(BlobSidecars), Opt.none(DataColumnSidecars), + # maybeFinalized = maybeFinalized) + when consensusFork >= ConsensusFork.Deneb: - if not blobQuarantine[].hasBlobs(forkyBlck): - # We don't have all the blobs for this block, so we have - # to put it in blobless quarantine. - if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): + if not dataColumnQuarantine[].hasDataColumns(forkyBlck): + # We don't have all the data columns for this block, so we have + # to put it in columnless quarantine. + if not quarantine[].addColumnless(dag.finalizedHead.slot, forkyBlck): err(VerifierError.UnviableFork) else: err(VerifierError.MissingParent) else: - let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) + let data_columns = dataColumnQuarantine[].popDataColumns(forkyBlck.root, forkyBlck) await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.some(blobs), Opt.none(DataColumnSidecars), + Opt.none(BlobSidecars), Opt.some(data_columns), maybeFinalized = maybeFinalized) else: await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, @@ -435,11 +454,20 @@ proc initFullNode( else: Opt.none(ref BlobSidecar) + rmanDataColumnLoader = proc( + columnId: DataColumnIdentifier): Opt[ref DataColumnSidecar] = + var data_column_sidecar = DataColumnSidecar.new() + if dag.db.getDataColumnSidecar(columnId.block_root, columnId.index, data_column_sidecar[]): + Opt.some data_column_sidecar + else: + Opt.none(ref DataColumnSidecar) + processor = Eth2Processor.new( config.doppelgangerDetection, blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, - lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) + lightClientPool, quarantine, blobQuarantine, dataColumnQuarantine, + rng, getBeaconTime, taskpool) syncManager = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, @@ -459,8 +487,8 @@ proc initFullNode( requestManager = RequestManager.init( node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress), - quarantine, blobQuarantine, rmanBlockVerifier, - rmanBlockLoader, rmanBlobLoader) + quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier, + rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader) if node.config.lightClientDataServe: proc scheduleSendingLightClientUpdates(slot: Slot) = diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index dcf34acc8..3b6badb0b 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -85,18 +85,22 @@ proc init*(T: type RequestManager, network: Eth2Node, inhibit: InhibitFn, quarantine: ref Quarantine, blobQuarantine: ref BlobQuarantine, + dataColumnQuarantine: ref DataColumnQuarantine, blockVerifier: BlockVerifierFn, blockLoader: BlockLoaderFn = nil, - blobLoader: BlobLoaderFn = nil): RequestManager = + blobLoader: BlobLoaderFn = nil, + dataColumnLoader: DataColumnLoaderFn = nil): RequestManager = RequestManager( network: network, getBeaconTime: getBeaconTime, inhibit: inhibit, quarantine: quarantine, blobQuarantine: blobQuarantine, + dataColumnQuarantine: dataColumnQuarantine, blockVerifier: blockVerifier, blockLoader: blockLoader, - blobLoader: blobLoader) + blobLoader: blobLoader, + dataColumnLoader: dataColumnLoader) proc checkResponse(roots: openArray[Eth2Digest], blocks: openArray[ref ForkedSignedBeaconBlock]): bool =