From 152d276d78d4f92d707b74196a58a3b8ed7f0147 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Wed, 3 Jul 2024 21:50:52 +0530 Subject: [PATCH] added reconstruction logic --- .../gossip_processing/block_processor.nim | 118 ++++------------ .../gossip_processing/eth2_processor.nim | 60 +++++++- beacon_chain/nimbus_beacon_node.nim | 26 ++-- beacon_chain/spec/eip7594_helpers.nim | 14 +- beacon_chain/sync/request_manager.nim | 2 +- beacon_chain/sync/sync_protocol.nim | 2 +- beacon_chain/validators/beacon_validators.nim | 10 ++ beacon_chain/validators/message_router.nim | 131 ++++++++++++------ 8 files changed, 212 insertions(+), 151 deletions(-) diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 08028b13e..a03d8cc84 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -190,27 +190,27 @@ proc storeBackfillBlock( # Establish blob viability before calling addbackfillBlock to avoid # writing the block in case of blob error. - var blobsOk = true + # var blobsOk = true var columnsOk = true - when typeof(signedBlock).kind >= ConsensusFork.Deneb: - if blobsOpt.isSome: - let blobs = blobsOpt.get() - let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq - if blobs.len > 0 or kzgCommits.len > 0: - let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob), - blobs.mapIt(it.kzg_proof)) - if r.isErr(): - debug "backfill blob validation failed", - blockRoot = shortLog(signedBlock.root), - blobs = shortLog(blobs), - blck = shortLog(signedBlock.message), - kzgCommits = mapIt(kzgCommits, shortLog(it)), - signature = shortLog(signedBlock.signature), - msg = r.error() - blobsOk = r.isOk() + # when typeof(signedBlock).kind >= ConsensusFork.Deneb: + # if blobsOpt.isSome: + # let blobs = blobsOpt.get() + # let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq + # if blobs.len > 0 or kzgCommits.len > 0: + # let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob), + # blobs.mapIt(it.kzg_proof)) + # if r.isErr(): + # debug "backfill blob validation failed", + # blockRoot = shortLog(signedBlock.root), + # blobs = shortLog(blobs), + # blck = shortLog(signedBlock.message), + # kzgCommits = mapIt(kzgCommits, shortLog(it)), + # signature = shortLog(signedBlock.signature), + # msg = r.error() + # blobsOk = r.isOk() - if not blobsOk: - return err(VerifierError.Invalid) + # if not blobsOk: + # return err(VerifierError.Invalid) when typeof(signedBlock).kind >= ConsensusFork.Deneb: if dataColumnsOpt.isSome: @@ -433,8 +433,8 @@ proc enqueueBlock*( try: self.blockQueue.addLastNoWait(BlockEntry( blck: blck, - blobs: blobs, - data_columns: Opt.none(DataColumnSidecars), + blobs: Opt.none(BlobSidecars), + data_columns: data_columns, maybeFinalized: maybeFinalized, resfut: resfut, queueTick: Moment.now(), validationDur: validationDur, @@ -442,64 +442,6 @@ proc enqueueBlock*( except AsyncQueueFullError: raiseAssert "unbounded queue" -proc reconstructDataColumns( - self: ref BlockProcessor, - node: Eth2Node, - signed_block: deneb.SignedBeaconBlock | - electra.SignedBeaconBlock, - data_column: DataColumnSidecar): - Result[bool, cstring] = - - let - dag = self.consensusManager.dag - root = signed_block.root - custodiedColumnIndices = get_custody_columns( - node.nodeId, - CUSTODY_REQUIREMENT) - - var - data_column_sidecars: DataColumnSidecars - columnsOk = true - storedColumns = 0 - - # Loading the data columns from the database - for i in 0 ..< custodiedColumnIndices.len: - let data_column = DataColumnSidecar.new() - if not dag.db.getDataColumnSidecar(root, custodiedColumnIndices[i], data_column[]): - columnsOk = false - break - data_column_sidecars.add data_column - storedColumns.add data_column.index - - if columnsOk: - debug "Loaded data column for reconstruction" - - # storedColumn number is less than the NUMBER_OF_COLUMNS - # then reconstruction is not possible, and if all the data columns - # are already stored then we do not need to reconstruct at all - if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS: - return ok(false) - else: - return err ("DataColumnSidecar: Reconstruction error!") - - # Recover blobs from saved data column sidecars - let recovered_blobs = recover_blobs(data_column_sidecars, storedColumns.len, signed_block) - if not recovered_blobs.isOk: - return err ("Error recovering blobs from data columns") - - # Reconstruct data column sidecars from recovered blobs - let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_blobs.get) - if not reconstructedDataColumns.isOk: - return err ("Error reconstructing data columns from recovered blobs") - - for data_column in data_column_sidecars: - if data_column.index notin custodiedColumnIndices: - continue - - dag.db.putDataColumnSidecar(data_column[]) - - ok(true) - proc storeBlock( self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime, signedBlock: ForkySignedBeaconBlock, @@ -894,20 +836,20 @@ proc storeBlock( else: if len(forkyBlck.message.body.blob_kzg_commitments) == 0: self[].enqueueBlock( - MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]), Opt.some(DataColumnSidecars @[])) + MsgSource.gossip, quarantined, Opt.none(BlobSidecars), Opt.some(DataColumnSidecars @[])) else: if (let res = checkBloblessSignature(self[], forkyBlck); res.isErr): - warn "Failed to verify signature of unorphaned blobless block", + warn "Failed to verify signature of unorphaned blobless/columnless block", blck = shortLog(forkyBlck), error = res.error() continue - if self.blobQuarantine[].hasBlobs(forkyBlck): - let blobs = self.blobQuarantine[].popBlobs( - forkyBlck.root, forkyBlck) - self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs), Opt.none(DataColumnSidecars)) - else: - discard self.consensusManager.quarantine[].addBlobless( - dag.finalizedHead.slot, forkyBlck) + # if self.blobQuarantine[].hasBlobs(forkyBlck): + # let blobs = self.blobQuarantine[].popBlobs( + # forkyBlck.root, forkyBlck) + # self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs), Opt.none(DataColumnSidecars)) + # else: + # discard self.consensusManager.quarantine[].addBlobless( + # dag.finalizedHead.slot, forkyBlck) if self.dataColumnQuarantine[].hasDataColumns(forkyBlck): let data_columns = self.dataColumnQuarantine[].popDataColumns( diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 2af410d29..dee9a7db2 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -11,7 +11,8 @@ import std/tables, stew/results, chronicles, chronos, metrics, taskpools, - ../spec/[helpers, forks], + ../networking/eth2_network, + ../spec/[helpers, forks, eip7594_helpers], ../spec/datatypes/[altair, phase0, deneb, eip7594], ../consensus_object_pools/[ blob_quarantine, block_clearance, block_quarantine, blockchain_dag, @@ -444,6 +445,63 @@ proc checkForPotentialDoppelganger( attestation = shortLog(attestation) quitDoppelganger() +proc processDataColumnReconstruction*( + self: ref Eth2Processor, + node: Eth2Node, + signed_block: deneb.SignedBeaconBlock | + electra.SignedBeaconBlock): + Future[ValidationRes] {.async: (raises: [CancelledError]).} = + + let + dag = self.dag + root = signed_block.root + custodiedColumnIndices = get_custody_columns( + node.nodeId, + CUSTODY_REQUIREMENT) + + var + data_column_sidecars: seq[DataColumnSidecar] + columnsOk = true + storedColumns: seq[ColumnIndex] + + # Loading the data columns from the database + for custody_column in custodiedColumnIndices.get: + let data_column = DataColumnSidecar.new() + if not dag.db.getDataColumnSidecar(root, custody_column, data_column[]): + columnsOk = false + break + data_column_sidecars.add data_column[] + storedColumns.add data_column.index + + if columnsOk: + debug "Loaded data column for reconstruction" + + # storedColumn number is less than the NUMBER_OF_COLUMNS + # then reconstruction is not possible, and if all the data columns + # are already stored then we do not need to reconstruct at all + if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS: + return ok() + else: + return errIgnore ("DataColumnSidecar: Reconstruction error!") + + # Recover blobs from saved data column sidecars + let recovered_blobs = recover_blobs(data_column_sidecars, storedColumns.len, signed_block) + if not recovered_blobs.isOk: + return errIgnore ("Error recovering blobs from data columns") + + # Reconstruct data column sidecars from recovered blobs + let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_blobs.get) + if not reconstructedDataColumns.isOk: + return errIgnore ("Error reconstructing data columns from recovered blobs") + + for data_column in data_column_sidecars: + if data_column.index notin custodiedColumnIndices.get: + continue + + dag.db.putDataColumnSidecar(data_column) + + ok() + proc processAttestation*( self: ref Eth2Processor, src: MsgSource, attestation: phase0.Attestation | electra.Attestation, subnet_id: SubnetId, diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index bef1639aa..a8559d6a0 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -408,19 +408,19 @@ proc initFullNode( Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): when consensusFork >= ConsensusFork.Deneb: - if not blobQuarantine[].hasBlobs(forkyBlck): - # We don't have all the blobs for this block, so we have - # to put it in blobless quarantine. - if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): - err(VerifierError.UnviableFork) - else: - err(VerifierError.MissingParent) - elif blobQuarantine[].hasBlobs(forkyBlck): - let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) - await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.some(blobs), Opt.none(DataColumnSidecars), - maybeFinalized = maybeFinalized) - elif not dataColumnQuarantine[].hasDataColumns(forkyBlck): + # if not blobQuarantine[].hasBlobs(forkyBlck): + # # We don't have all the blobs for this block, so we have + # # to put it in blobless quarantine. + # if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): + # err(VerifierError.UnviableFork) + # else: + # err(VerifierError.MissingParent) + # elif blobQuarantine[].hasBlobs(forkyBlck): + # let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) + # await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + # Opt.some(blobs), Opt.none(DataColumnSidecars), + # maybeFinalized = maybeFinalized) + if not dataColumnQuarantine[].hasDataColumns(forkyBlck): # We don't have all the data columns for this block, so we have # to put it in columnless quarantine. if not quarantine[].addColumnless(dag.finalizedHead.slot, forkyBlck): diff --git a/beacon_chain/spec/eip7594_helpers.nim b/beacon_chain/spec/eip7594_helpers.nim index 1ae0b00cb..9d9f2eadc 100644 --- a/beacon_chain/spec/eip7594_helpers.nim +++ b/beacon_chain/spec/eip7594_helpers.nim @@ -153,7 +153,7 @@ proc recover_blobs*( if not (data_columns.len != 0): return err("DataColumnSidecar: Length should not be 0") - var blobCount = data_columns[0].len + var blobCount = data_columns[0].column.len for data_column in data_columns: if not (blobCount == data_column.column.len): return err ("DataColumns do not have the same length") @@ -175,27 +175,27 @@ proc recover_blobs*( # Transform the cell as a ckzg cell var ckzgCell: Cell for i in 0 ..< int(FIELD_ELEMENTS_PER_CELL): - ckzgCell[i] = cell[32*i ..< 32*(i+1)].toArray() + var start = 32 * i + for j in 0 ..< 32: + ckzgCell[start + j] = cell[start+j] ckzgCells.add(ckzgCell) # Recovering the blob let recovered_cells = recoverAllCells(cell_ids, ckzgCells) if not recovered_cells.isOk: - return err (fmt"Recovering all cells for blob - {blobIdx} failed: {recovered_cells.error}") + return err ("Recovering all cells for blob failed") let recovered_blob_res = cellsToBlob(recovered_cells.get) if not recovered_blob_res.isOk: - return err (fmt"Cells to blob for blob - {blobIdx} failed: {recovered_blob_res.error}") + return err ("Cells to blob for blob failed") recovered_blobs.add(recovered_blob_res.get) ok(recovered_blobs) - - # https://github.com/ethereum/consensus-specs/blob/5f48840f4d768bf0e0a8156a3ed06ec333589007/specs/_features/eip7594/das-core.md#get_data_column_sidecars -proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock | electra.SignedBeaconBlock | ForkySignedBeaconBlock, blobs: seq[KzgBlob]): Result[seq[DataColumnSidecar], cstring] = +proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock | electra.SignedBeaconBlock, blobs: seq[KzgBlob]): Result[seq[DataColumnSidecar], cstring] = var sidecar: DataColumnSidecar signed_block_header: SignedBeaconBlockHeader diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index f002aab0c..29f8cb489 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -29,7 +29,7 @@ const SYNC_MAX_REQUESTED_BLOCKS* = 32 # Spec allows up to MAX_REQUEST_BLOCKS. ## Maximum number of blocks which will be requested in each ## `beaconBlocksByRoot` invocation. - PARALLEL_REQUESTS* = 2 + PARALLEL_REQUESTS* = 8 ## Number of peers we using to resolve our request. BLOB_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000 diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index 02702d336..dcbe7b343 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -24,7 +24,7 @@ const ## Allow syncing ~64 blocks/sec (minus request costs) blobResponseCost = allowedOpsPerSecondCost(1000) ## Multiple can exist per block, they are much smaller than blocks - dataColumnResponseCost = allowedOpsPerSecondCost(4000) + dataColumnResponseCost = allowedOpsPerSecondCost(250) ## 1 blob has an equivalent memory of 8 data columns type diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index c4375f3ca..181f96975 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -336,6 +336,14 @@ proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) warn "LC optimistic update failed to send", error = sendResult.error() +proc sendReconstructedDataColumns(node: BeaconNode, + blck: ForkySignedBeaconBlock) + {.async: (raises: [CancelledError]).} = + let res = await node.router.routeReconstructedDataColumns(blck) + if not res.isOk: + warn "Unable to send reconstructed data columns" + return + proc createAndSendAttestation(node: BeaconNode, fork: Fork, genesis_validators_root: Eth2Digest, @@ -375,6 +383,8 @@ proc createAndSendAttestation(node: BeaconNode, node.config.dumpDirOutgoing, registered.data, registered.validator.pubkey) + + proc getBlockProposalEth1Data*(node: BeaconNode, state: ForkedHashedBeaconState): BlockProposalEth1Data = diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 479a91e9d..e529cd748 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -15,7 +15,9 @@ import ../spec/network, ../spec/eip7594_helpers, ../consensus_object_pools/spec_cache, - ../gossip_processing/eth2_processor, + ../gossip_processing/[ + eth2_processor, + block_processor], ../networking/eth2_network, ./activity_metrics, ../spec/datatypes/deneb @@ -143,48 +145,47 @@ proc routeSignedBeaconBlock*( blockRoot = shortLog(blck.root), blck = shortLog(blck.message), signature = shortLog(blck.signature), error = res.error() - var blobRefs = Opt.none(BlobSidecars) - if blobsOpt.isSome(): - let blobs = blobsOpt.get() - var workers = newSeq[Future[SendResult]](blobs.len) - for i in 0..= ConsensusFork.Deneb: - # if blobsOpt.isSome(): - # let blobs = blobsOpt.get() - # let data_columns = get_data_column_sidecars(blck, blobs.mapIt(it.blob)).get() - # var das_workers = newSeq[Future[SendResult]](len(data_columns)) - # for i in 0..= ConsensusFork.Deneb: + if blobsOpt.isSome(): + let blobs = blobsOpt.get() + let data_columns = get_data_column_sidecars(blck, blobs.mapIt(it.blob)).get() + var das_workers = newSeq[Future[SendResult]](len(data_columns)) + for i in 0..= ConsensusFork.Deneb: + let res = await router[].processor.processDataColumnReconstruction( + router[].network, blck) + + if not res.isGoodForSending: + warn "Issue sending reconstructed data columns" + return err(res.error()[1]) + + let custody_columns = get_custody_columns( + router.network.nodeId, + CUSTODY_REQUIREMENT) + + var + data_column_sidecars: DataColumnSidecars + columnsOk = true + + for custody_column in custody_columns.get: + let data_column = DataColumnSidecar.new() + if not router[].processor.dag.db.getDataColumnSidecar( + blck.root, custody_column, data_column[]): + columnsOk = false + debug "Issue with loading reconstructed data columns" + break + data_column_sidecars.add data_column + + var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars)) + for i in 0..