From 791d2fb0d15dd60c232bb41aa0e898e00c2c80c1 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Mon, 24 Jun 2024 17:32:06 +0530 Subject: [PATCH] add: forward and backward syncing for data columns, broadcasting data columns created from blobs, added dc support to sync_queue --- .../block_quarantine.nim | 36 +++++++++ .../data_column_quarantine.nim | 2 +- .../gossip_processing/block_processor.nim | 80 +++++++++++++++---- .../gossip_processing/eth2_processor.nim | 17 +++- beacon_chain/networking/eth2_network.nim | 8 ++ beacon_chain/nimbus_beacon_node.nim | 13 ++- beacon_chain/spec/datatypes/eip7594.nim | 5 ++ beacon_chain/spec/eip7594_helpers.nim | 12 +-- beacon_chain/sync/sync_queue.nim | 55 ++++++++++++- beacon_chain/validators/message_router.nim | 27 ++++++- 10 files changed, 223 insertions(+), 32 deletions(-) diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index 49d84c383..868db2342 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -24,6 +24,7 @@ const ## Enough for finalization in an alternative fork MaxBlobless = SLOTS_PER_EPOCH ## Arbitrary + MaxColumnless = SLOTS_PER_EPOCH MaxUnviables = 16 * 1024 ## About a day of blocks - most likely not needed but it's quite cheap.. @@ -238,6 +239,18 @@ func cleanupBlobless(quarantine: var Quarantine, finalizedSlot: Slot) = quarantine.addUnviable k quarantine.blobless.del k +func cleanupColumnless(quarantine: var Quarantine, finalizedSlot: Slot) = + var toDel: seq[Eth2Digest] + + for k,v in quarantine.columnless: + withBlck(v): + if not isViable(finalizedSlot, forkyBlck.message.slot): + toDel.add k + + for k in toDel: + quarantine.addUnviable k + quarantine.columnless.del k + func clearAfterReorg*(quarantine: var Quarantine) = ## Clear missing and orphans to start with a fresh slate in case of a reorg ## Unviables remain unviable and are not cleared. @@ -339,6 +352,29 @@ iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock = for k, v in quarantine.blobless.mpairs(): yield v +proc addColumnless*( + quarantine: var Quarantine, finalizedSlot: Slot, + signedBlock: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): bool = + + if not isViable(finalizedSlot, signedBlock.message.slot): + quarantine.addUnviable(signedBlock.root) + return false + + quarantine.cleanupColumnless(finalizedSlot) + + if quarantine.columnless.lenu64 >= MaxColumnless: + var oldest_columnless_key: Eth2Digest + for k in quarantine.columnless.keys: + oldest_columnless_key = k + break + quarantine.columnless.del oldest_columnless_key + + debug "block quarantine: Adding columnless", blck = shortLog(signedBlock) + quarantine.columnless[signedBlock.root] = + ForkedSignedBeaconBlock.init(signedBlock) + quarantine.missing.del(signedBlock.root) + true + func popColumnless*( quarantine: var Quarantine, root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = diff --git a/beacon_chain/consensus_object_pools/data_column_quarantine.nim b/beacon_chain/consensus_object_pools/data_column_quarantine.nim index 16d233a4a..5cc9f3a9d 100644 --- a/beacon_chain/consensus_object_pools/data_column_quarantine.nim +++ b/beacon_chain/consensus_object_pools/data_column_quarantine.nim @@ -78,7 +78,7 @@ func popDataColumns*( var c: ref DataColumnSidecar if quarantine.data_columns.pop((digest, ColumnIndex idx), c): r.add(c) - true + r func hasDataColumns*(quarantine: DataColumnQuarantine, blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): bool = diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 082031211..94c4037f1 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -10,7 +10,7 @@ import stew/results, chronicles, chronos, metrics, - ../spec/[forks, signatures, signatures_batch], + ../spec/[forks, signatures, signatures_batch, eip7594_helpers], ../sszdump from std/deques import Deque, addLast, contains, initDeque, items, len, shrink @@ -27,13 +27,16 @@ from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot from ../consensus_object_pools/block_pools_types import EpochRef, VerifierError from ../consensus_object_pools/block_quarantine import - addBlobless, addOrphan, addUnviable, pop, removeOrphan + addBlobless, addOrphan, addUnviable, pop, removeOrphan, addColumnless from ../consensus_object_pools/blob_quarantine import BlobQuarantine, hasBlobs, popBlobs, put +from ../consensus_object_pools/data_column_quarantine import + DataColumnQuarantine, hasDataColumns, popDataColumns, put from ../validators/validator_monitor import MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock, registerSyncAggregateInBlock -from ../beacon_chain_db import getBlobSidecar, putBlobSidecar +from ../beacon_chain_db import getBlobSidecar, putBlobSidecar, + getDataColumnSidecar, putDataColumnSidecar from ../spec/state_transition_block import validate_blobs export sszdump, signatures_batch @@ -58,6 +61,7 @@ type BlockEntry = object blck*: ForkedSignedBeaconBlock blobs*: Opt[BlobSidecars] + data_columns*: Opt[DataColumnSidecars] maybeFinalized*: bool ## The block source claims the block has been finalized already resfut*: Future[Result[void, VerifierError]].Raising([CancelledError]) @@ -102,6 +106,7 @@ type getBeaconTime: GetBeaconTimeFn blobQuarantine: ref BlobQuarantine + dataColumnQuarantine: ref DataColumnQuarantine verifier: BatchVerifier lastPayload: Slot @@ -174,7 +179,8 @@ from ../consensus_object_pools/block_clearance import proc storeBackfillBlock( self: var BlockProcessor, signedBlock: ForkySignedBeaconBlock, - blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] = + blobsOpt: Opt[BlobSidecars], + dataColumnsOpt: Opt[DataColumnSidecars]): Result[void, VerifierError] = # The block is certainly not missing any more self.consensusManager.quarantine[].missing.del(signedBlock.root) @@ -182,6 +188,7 @@ proc storeBackfillBlock( # Establish blob viability before calling addbackfillBlock to avoid # writing the block in case of blob error. var blobsOk = true + var columnsOk = true when typeof(signedBlock).kind >= ConsensusFork.Deneb: if blobsOpt.isSome: let blobs = blobsOpt.get() @@ -201,6 +208,24 @@ proc storeBackfillBlock( if not blobsOk: return err(VerifierError.Invalid) + + when typeof(signedBlock).kind >= ConsensusFork.Deneb: + if dataColumnsOpt.isSome: + let data_columns = dataColumnsOpt.get() + if data_columns.len > 0: + for i in 0..= ConsensusFork.Deneb: + var data_column_sidecars: DataColumnSidecars + for i in 0..= ConsensusFork.Deneb: + if self.dataColumnQuarantine[].hasDataColumns(signedBlock): + Opt.some(self.dataColumnQuarantine[].popDataColumns(signedBlock.root, signedBlock)) + else: + discard self.quarantine[].addColumnless(self.dag.finalizedHead.slot, + signedBlock) + return v + else: + Opt.none(DataColumnSidecars) + self.blockProcessor[].enqueueBlock( src, ForkedSignedBeaconBlock.init(signedBlock), blobs, + data_columns, maybeFinalized = maybeFinalized, validationDur = nanoseconds( (self.getCurrentBeaconTime() - wallTime).nanoseconds)) @@ -317,7 +329,8 @@ proc processBlobSidecar*( if self.blobQuarantine[].hasBlobs(forkyBlck): self.blockProcessor[].enqueueBlock( MsgSource.gossip, blobless, - Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck))) + Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck)), + Opt.none(DataColumnSidecars)) else: discard self.quarantine[].addBlobless( self.dag.finalizedHead.slot, forkyBlck) @@ -352,7 +365,7 @@ proc processDataColumnSidecar*( if v.isErr(): debug "Dropping data column", error = v.error() - blob_sidecars_dropped.inc(1, [$v.error[0]]) + data_column_sidecars_dropped.inc(1, [$v.error[0]]) return v debug "Data column validated" diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 4779ca1be..94dc52ee8 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -2663,6 +2663,14 @@ proc broadcastBlobSidecar*( topic = getBlobSidecarTopic(forkPrefix, subnet_id) node.broadcast(topic, blob) +proc broadcastDataColumnSidecar*( + node: Eth2Node, subnet_id: uint64, data_column: DataColumnSidecar): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = + let + forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch) + topic = getDataColumnSidecarTopic(forkPrefix, subnet_id) + node.broadcast(topic, data_column) + proc broadcastSyncCommitteeMessage*( node: Eth2Node, msg: SyncCommitteeMessage, subcommitteeIdx: SyncSubcommitteeIndex): diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index feb8f9e9d..c55ad9005 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -393,14 +393,15 @@ proc initFullNode( rng, taskpool, consensusManager, node.validatorMonitor, blobQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], maybeFinalized: bool): + blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars], + maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach # taken in the sync/request managers - this is an architectural compromise # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( - MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) + MsgSource.gossip, signedBlock, blobs, data_columns, maybeFinalized = maybeFinalized) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = @@ -416,12 +417,13 @@ proc initFullNode( else: let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.some(blobs), + Opt.some(blobs), Opt.none(DataColumnSidecars), maybeFinalized = maybeFinalized) else: await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.none(BlobSidecars), + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars), maybeFinalized = maybeFinalized) + rmanBlockLoader = proc( blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = dag.getForkedBlock(blockRoot) @@ -1896,6 +1898,9 @@ proc installMessageValidators(node: BeaconNode) = # toValidationResult( # node.processor[].processBlobSidecar( # MsgSource.gossip, blobSidecar, subnet_id))) + + # data_column_sidecar_{subnet_id} + # for it in 0'u64..= ConsensusFork.Deneb: + if blobsOpt.isSome(): + let blobs = blobsOpt.get() + let data_columns = get_data_column_sidecars(blck, blobs.mapIt(it.blob)).get() + var das_workers = newSeq[Future[SendResult]](len(data_columns)) + for i in 0..