initiate data column quarantine

This commit is contained in:
Agnish Ghosh 2024-06-28 14:53:08 +05:30
parent 3db92f8b26
commit 5bf1e021a7
No known key found for this signature in database
GPG Key ID: 7E927C221EBA4F6E
6 changed files with 113 additions and 63 deletions

View File

@ -98,6 +98,5 @@ func dataColumnFetchRecord*(quarantine: DataColumnQuarantine,
DataColumnFetchRecord(block_root: blck.root, indices: indices)
func init*(
T: type DataColumnQuarantine, onDataColumnSidecarCallback: OnDataColumnSidecarCallback): T =
T(onDataColumnSidecarCallback: onDataColumnSidecarCallback)
T: type DataColumnQuarantine): T = T()

View File

@ -134,6 +134,7 @@ proc new*(T: type BlockProcessor,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
blobQuarantine: ref BlobQuarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
@ -143,6 +144,7 @@ proc new*(T: type BlockProcessor,
consensusManager: consensusManager,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
dataColumnQuarantine: dataCOlumnQuarantine,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier.init(rng, taskpool)
)

View File

@ -181,6 +181,7 @@ proc new*(T: type Eth2Processor,
lightClientPool: ref LightClientPool,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
rng: ref HmacDrbgContext,
getBeaconTime: GetBeaconTimeFn,
taskpool: TaskPoolPtr
@ -199,6 +200,7 @@ proc new*(T: type Eth2Processor,
lightClientPool: lightClientPool,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
dataColumnQuarantine: dataColumnQuarantine,
getCurrentBeaconTime: getBeaconTime,
batchCrypto: BatchCrypto.new(
rng = rng,
@ -376,9 +378,24 @@ proc processDataColumnSidecar*(
data_column_sidecars_dropped.inc(1, [$v.error[0]])
return v
debug "Data column validated"
debug "Data column validated, putting data column in quarantine"
self.dataColumnQuarantine[].put(newClone(dataColumnSidecar))
# TODO do something with it!
let block_root = hash_tree_root(block_header)
if (let o = self.quarantine[].popColumnless(block_root); o.isSome):
let columnless = o.unsafeGet()
withBlck(columnless):
when consensusFork >= ConsensusFork.Deneb:
if self.dataColumnQuarantine[].hasDataColumns(forkyBlck):
self.blockProcessor[].enqueueBlock(
MsgSource.gossip, columnless,
Opt.none(BlobSidecars),
Opt.some(self.dataColumnQuarantine[].popDataColumns(block_root, forkyBlck)))
else:
discard self.quarantine[].addColumnless(
self.dag.finalizedHead.slot, forkyBlck)
else:
raiseAssert "Could not have been added as columnless"
data_column_sidecars_received.inc()
data_column_sidecar_delay.observe(delay.toFloatSeconds())

View File

@ -518,20 +518,6 @@ proc validateDataColumnSidecar*(
if not (block_header.slot > dag.finalizedHead.slot):
return errIgnore("DataColumnSidecar: slot already finalized")
# [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by
# `verify_data_column_sidecar_inclusion_proof(sidecar)`.
block:
let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar)
if v.isErr:
return dag.checkedReject(v.error)
# # [REJECT] The sidecar's column data is valid as
# # verified by `verify_data_column_kzg_proofs(sidecar)`
block:
let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar)
if r.isErr:
return dag.checkedReject(r.error)
# [IGNORE] The sidecar is the first sidecar for the tuple
# (block_header.slot, block_header.proposer_index, blob_sidecar.index)
# with valid header signature, sidecar inclusion proof, and kzg proof.
@ -549,13 +535,13 @@ proc validateDataColumnSidecar*(
#
# [REJECT] The sidecar's block's parent (defined by
# `block_header.parent_root`) passes validation.
# let parent = dag.getBlockRef(block_header.parent_root).valueOr:
# if block_header.parent_root in quarantine[].unviable:
# quarantine[].addUnviable(block_root)
# return dag.checkedReject("DataColumnSidecar: parent not validated")
# else:
# quarantine[].addMissing(block_header.parent_root)
# return errIgnore("DataColumnSidecar: parent not found")
let parent = dag.getBlockRef(block_header.parent_root).valueOr:
if block_header.parent_root in quarantine[].unviable:
quarantine[].addUnviable(block_root)
return dag.checkedReject("DataColumnSidecar: parent not validated")
else:
quarantine[].addMissing(block_header.parent_root)
return errIgnore("DataColumnSidecar: parent not found")
# [REJECT] The sidecar is proposed by the expected `proposer_index`
# for the block's slot in the context of the current shuffling
@ -564,23 +550,37 @@ proc validateDataColumnSidecar*(
# shuffling, the sidecar MAY be queued for later processing while proposers
# for the block's branch are calculated -- in such a case do not
# REJECT, instead IGNORE this message.
# let proposer = getProposer(dag, parent, block_header.slot).valueOr:
# warn "cannot compute proposer for blob"
# return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue
let proposer = getProposer(dag, parent, block_header.slot).valueOr:
warn "cannot compute proposer for blob"
return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue
# if uint64(proposer) != block_header.proposer_index:
# return dag.checkedReject("DataColumnSidecar: Unexpected proposer")
if uint64(proposer) != block_header.proposer_index:
return dag.checkedReject("DataColumnSidecar: Unexpected proposer")
# [REJECT] The proposer signature of `blob_sidecar.signed_block_header`,
# is valid with respect to the `block_header.proposer_index` pubkey.
# if not verify_block_signature(
# dag.forkAtEpoch(block_header.slot.epoch),
# getStateField(dag.headState, genesis_validators_root),
# block_header.slot,
# block_root,
# dag.validatorKey(proposer).get(),
# data_column_sidecar.signed_block_header.signature):
# return dag.checkedReject("DataColumnSidecar: Invalid proposer signature")
if not verify_block_signature(
dag.forkAtEpoch(block_header.slot.epoch),
getStateField(dag.headState, genesis_validators_root),
block_header.slot,
block_root,
dag.validatorKey(proposer).get(),
data_column_sidecar.signed_block_header.signature):
return dag.checkedReject("DataColumnSidecar: Invalid proposer signature")
# [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by
# `verify_data_column_sidecar_inclusion_proof(sidecar)`.
block:
let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar)
if v.isErr:
return dag.checkedReject(v.error)
# # [REJECT] The sidecar's column data is valid as
# # verified by `verify_data_column_kzg_proofs(sidecar)`
block:
let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar)
if r.isErr:
return dag.checkedReject(r.error)
# # [REJECT] The sidecar is from a higher slot than the sidecar's
# # block's parent (defined by `block_header.parent_root`).
@ -590,21 +590,21 @@ proc validateDataColumnSidecar*(
# [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's
# block -- i.e. `get_checkpoint_block(store, block_header.parent_root,
# store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`.
# let
# finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint)
# ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot)
let
finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint)
ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot)
# if ancestor.isNil:
# # This shouldn't happen: we should always be able to trace the parent back
# # to the finalized checkpoint (else it wouldn't be in the DAG)
# return errIgnore("DataColumnSidecar: Can't find ancestor")
if ancestor.isNil:
# This shouldn't happen: we should always be able to trace the parent back
# to the finalized checkpoint (else it wouldn't be in the DAG)
return errIgnore("DataColumnSidecar: Can't find ancestor")
# if not (
# finalized_checkpoint.root == ancestor.root or
# finalized_checkpoint.root.isZero):
# quarantine[].addUnviable(block_root)
# return dag.checkedReject(
# "DataColumnSidecar: Finalized checkpoint not an ancestor")
if not (
finalized_checkpoint.root == ancestor.root or
finalized_checkpoint.root.isZero):
quarantine[].addUnviable(block_root)
return dag.checkedReject(
"DataColumnSidecar: Finalized checkpoint not an ancestor")
ok()

View File

@ -383,6 +383,7 @@ proc initFullNode(
dag, attestationPool, onVoluntaryExitAdded, onBLSToExecutionChangeAdded,
onProposerSlashingAdded, onAttesterSlashingAdded))
blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded))
dataColumnQuarantine = newClone(DataColumnQuarantine.init())
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.elManager,
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
@ -391,7 +392,7 @@ proc initFullNode(
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor,
blobQuarantine, getBeaconTime)
blobQuarantine, dataColumnQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars],
maybeFinalized: bool):
@ -406,18 +407,36 @@ proc initFullNode(
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
withBlck(signedBlock):
# when consensusFork >= ConsensusFork.Deneb:
# if not blobQuarantine[].hasBlobs(forkyBlck):
# # We don't have all the blobs for this block, so we have
# # to put it in blobless quarantine.
# if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
# err(VerifierError.UnviableFork)
# else:
# err(VerifierError.MissingParent)
# else:
# let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
# await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
# Opt.some(blobs), Opt.none(DataColumnSidecars),
# maybeFinalized = maybeFinalized)
# else:
# await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
# Opt.none(BlobSidecars), Opt.none(DataColumnSidecars),
# maybeFinalized = maybeFinalized)
when consensusFork >= ConsensusFork.Deneb:
if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have
# to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
if not dataColumnQuarantine[].hasDataColumns(forkyBlck):
# We don't have all the data columns for this block, so we have
# to put it in columnless quarantine.
if not quarantine[].addColumnless(dag.finalizedHead.slot, forkyBlck):
err(VerifierError.UnviableFork)
else:
err(VerifierError.MissingParent)
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
let data_columns = dataColumnQuarantine[].popDataColumns(forkyBlck.root, forkyBlck)
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs), Opt.none(DataColumnSidecars),
Opt.none(BlobSidecars), Opt.some(data_columns),
maybeFinalized = maybeFinalized)
else:
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
@ -435,11 +454,20 @@ proc initFullNode(
else:
Opt.none(ref BlobSidecar)
rmanDataColumnLoader = proc(
columnId: DataColumnIdentifier): Opt[ref DataColumnSidecar] =
var data_column_sidecar = DataColumnSidecar.new()
if dag.db.getDataColumnSidecar(columnId.block_root, columnId.index, data_column_sidecar[]):
Opt.some data_column_sidecar
else:
Opt.none(ref DataColumnSidecar)
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, node.validatorMonitor, dag, attestationPool,
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool)
lightClientPool, quarantine, blobQuarantine, dataColumnQuarantine,
rng, getBeaconTime, taskpool)
syncManager = newSyncManager[Peer, PeerId](
node.network.peerPool,
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
@ -459,8 +487,8 @@ proc initFullNode(
requestManager = RequestManager.init(
node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime,
(proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader)
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)
if node.config.lightClientDataServe:
proc scheduleSendingLightClientUpdates(slot: Slot) =

View File

@ -85,18 +85,22 @@ proc init*(T: type RequestManager, network: Eth2Node,
inhibit: InhibitFn,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
blockVerifier: BlockVerifierFn,
blockLoader: BlockLoaderFn = nil,
blobLoader: BlobLoaderFn = nil): RequestManager =
blobLoader: BlobLoaderFn = nil,
dataColumnLoader: DataColumnLoaderFn = nil): RequestManager =
RequestManager(
network: network,
getBeaconTime: getBeaconTime,
inhibit: inhibit,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
dataColumnQuarantine: dataColumnQuarantine,
blockVerifier: blockVerifier,
blockLoader: blockLoader,
blobLoader: blobLoader)
blobLoader: blobLoader,
dataColumnLoader: dataColumnLoader)
proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ref ForkedSignedBeaconBlock]): bool =