added reconstruction logic

This commit is contained in:
Agnish Ghosh 2024-07-03 21:50:52 +05:30
parent d292e94560
commit 152d276d78
No known key found for this signature in database
GPG Key ID: 7BDDA05D1B25E9F8
8 changed files with 212 additions and 151 deletions

View File

@ -190,27 +190,27 @@ proc storeBackfillBlock(
# Establish blob viability before calling addbackfillBlock to avoid
# writing the block in case of blob error.
var blobsOk = true
# var blobsOk = true
var columnsOk = true
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
if blobsOpt.isSome:
let blobs = blobsOpt.get()
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
if blobs.len > 0 or kzgCommits.len > 0:
let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob),
blobs.mapIt(it.kzg_proof))
if r.isErr():
debug "backfill blob validation failed",
blockRoot = shortLog(signedBlock.root),
blobs = shortLog(blobs),
blck = shortLog(signedBlock.message),
kzgCommits = mapIt(kzgCommits, shortLog(it)),
signature = shortLog(signedBlock.signature),
msg = r.error()
blobsOk = r.isOk()
# when typeof(signedBlock).kind >= ConsensusFork.Deneb:
# if blobsOpt.isSome:
# let blobs = blobsOpt.get()
# let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
# if blobs.len > 0 or kzgCommits.len > 0:
# let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob),
# blobs.mapIt(it.kzg_proof))
# if r.isErr():
# debug "backfill blob validation failed",
# blockRoot = shortLog(signedBlock.root),
# blobs = shortLog(blobs),
# blck = shortLog(signedBlock.message),
# kzgCommits = mapIt(kzgCommits, shortLog(it)),
# signature = shortLog(signedBlock.signature),
# msg = r.error()
# blobsOk = r.isOk()
if not blobsOk:
return err(VerifierError.Invalid)
# if not blobsOk:
# return err(VerifierError.Invalid)
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
if dataColumnsOpt.isSome:
@ -433,8 +433,8 @@ proc enqueueBlock*(
try:
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
blobs: blobs,
data_columns: Opt.none(DataColumnSidecars),
blobs: Opt.none(BlobSidecars),
data_columns: data_columns,
maybeFinalized: maybeFinalized,
resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur,
@ -442,64 +442,6 @@ proc enqueueBlock*(
except AsyncQueueFullError:
raiseAssert "unbounded queue"
proc reconstructDataColumns(
self: ref BlockProcessor,
node: Eth2Node,
signed_block: deneb.SignedBeaconBlock |
electra.SignedBeaconBlock,
data_column: DataColumnSidecar):
Result[bool, cstring] =
let
dag = self.consensusManager.dag
root = signed_block.root
custodiedColumnIndices = get_custody_columns(
node.nodeId,
CUSTODY_REQUIREMENT)
var
data_column_sidecars: DataColumnSidecars
columnsOk = true
storedColumns = 0
# Loading the data columns from the database
for i in 0 ..< custodiedColumnIndices.len:
let data_column = DataColumnSidecar.new()
if not dag.db.getDataColumnSidecar(root, custodiedColumnIndices[i], data_column[]):
columnsOk = false
break
data_column_sidecars.add data_column
storedColumns.add data_column.index
if columnsOk:
debug "Loaded data column for reconstruction"
# storedColumn number is less than the NUMBER_OF_COLUMNS
# then reconstruction is not possible, and if all the data columns
# are already stored then we do not need to reconstruct at all
if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS:
return ok(false)
else:
return err ("DataColumnSidecar: Reconstruction error!")
# Recover blobs from saved data column sidecars
let recovered_blobs = recover_blobs(data_column_sidecars, storedColumns.len, signed_block)
if not recovered_blobs.isOk:
return err ("Error recovering blobs from data columns")
# Reconstruct data column sidecars from recovered blobs
let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_blobs.get)
if not reconstructedDataColumns.isOk:
return err ("Error reconstructing data columns from recovered blobs")
for data_column in data_column_sidecars:
if data_column.index notin custodiedColumnIndices:
continue
dag.db.putDataColumnSidecar(data_column[])
ok(true)
proc storeBlock(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock,
@ -894,20 +836,20 @@ proc storeBlock(
else:
if len(forkyBlck.message.body.blob_kzg_commitments) == 0:
self[].enqueueBlock(
MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]), Opt.some(DataColumnSidecars @[]))
MsgSource.gossip, quarantined, Opt.none(BlobSidecars), Opt.some(DataColumnSidecars @[]))
else:
if (let res = checkBloblessSignature(self[], forkyBlck); res.isErr):
warn "Failed to verify signature of unorphaned blobless block",
warn "Failed to verify signature of unorphaned blobless/columnless block",
blck = shortLog(forkyBlck),
error = res.error()
continue
if self.blobQuarantine[].hasBlobs(forkyBlck):
let blobs = self.blobQuarantine[].popBlobs(
forkyBlck.root, forkyBlck)
self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs), Opt.none(DataColumnSidecars))
else:
discard self.consensusManager.quarantine[].addBlobless(
dag.finalizedHead.slot, forkyBlck)
# if self.blobQuarantine[].hasBlobs(forkyBlck):
# let blobs = self.blobQuarantine[].popBlobs(
# forkyBlck.root, forkyBlck)
# self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs), Opt.none(DataColumnSidecars))
# else:
# discard self.consensusManager.quarantine[].addBlobless(
# dag.finalizedHead.slot, forkyBlck)
if self.dataColumnQuarantine[].hasDataColumns(forkyBlck):
let data_columns = self.dataColumnQuarantine[].popDataColumns(

View File

@ -11,7 +11,8 @@ import
std/tables,
stew/results,
chronicles, chronos, metrics, taskpools,
../spec/[helpers, forks],
../networking/eth2_network,
../spec/[helpers, forks, eip7594_helpers],
../spec/datatypes/[altair, phase0, deneb, eip7594],
../consensus_object_pools/[
blob_quarantine, block_clearance, block_quarantine, blockchain_dag,
@ -444,6 +445,63 @@ proc checkForPotentialDoppelganger(
attestation = shortLog(attestation)
quitDoppelganger()
proc processDataColumnReconstruction*(
self: ref Eth2Processor,
node: Eth2Node,
signed_block: deneb.SignedBeaconBlock |
electra.SignedBeaconBlock):
Future[ValidationRes] {.async: (raises: [CancelledError]).} =
let
dag = self.dag
root = signed_block.root
custodiedColumnIndices = get_custody_columns(
node.nodeId,
CUSTODY_REQUIREMENT)
var
data_column_sidecars: seq[DataColumnSidecar]
columnsOk = true
storedColumns: seq[ColumnIndex]
# Loading the data columns from the database
for custody_column in custodiedColumnIndices.get:
let data_column = DataColumnSidecar.new()
if not dag.db.getDataColumnSidecar(root, custody_column, data_column[]):
columnsOk = false
break
data_column_sidecars.add data_column[]
storedColumns.add data_column.index
if columnsOk:
debug "Loaded data column for reconstruction"
# storedColumn number is less than the NUMBER_OF_COLUMNS
# then reconstruction is not possible, and if all the data columns
# are already stored then we do not need to reconstruct at all
if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS:
return ok()
else:
return errIgnore ("DataColumnSidecar: Reconstruction error!")
# Recover blobs from saved data column sidecars
let recovered_blobs = recover_blobs(data_column_sidecars, storedColumns.len, signed_block)
if not recovered_blobs.isOk:
return errIgnore ("Error recovering blobs from data columns")
# Reconstruct data column sidecars from recovered blobs
let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_blobs.get)
if not reconstructedDataColumns.isOk:
return errIgnore ("Error reconstructing data columns from recovered blobs")
for data_column in data_column_sidecars:
if data_column.index notin custodiedColumnIndices.get:
continue
dag.db.putDataColumnSidecar(data_column)
ok()
proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
attestation: phase0.Attestation | electra.Attestation, subnet_id: SubnetId,

View File

@ -408,19 +408,19 @@ proc initFullNode(
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
withBlck(signedBlock):
when consensusFork >= ConsensusFork.Deneb:
if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have
# to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
err(VerifierError.UnviableFork)
else:
err(VerifierError.MissingParent)
elif blobQuarantine[].hasBlobs(forkyBlck):
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs), Opt.none(DataColumnSidecars),
maybeFinalized = maybeFinalized)
elif not dataColumnQuarantine[].hasDataColumns(forkyBlck):
# if not blobQuarantine[].hasBlobs(forkyBlck):
# # We don't have all the blobs for this block, so we have
# # to put it in blobless quarantine.
# if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
# err(VerifierError.UnviableFork)
# else:
# err(VerifierError.MissingParent)
# elif blobQuarantine[].hasBlobs(forkyBlck):
# let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
# await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
# Opt.some(blobs), Opt.none(DataColumnSidecars),
# maybeFinalized = maybeFinalized)
if not dataColumnQuarantine[].hasDataColumns(forkyBlck):
# We don't have all the data columns for this block, so we have
# to put it in columnless quarantine.
if not quarantine[].addColumnless(dag.finalizedHead.slot, forkyBlck):

View File

@ -153,7 +153,7 @@ proc recover_blobs*(
if not (data_columns.len != 0):
return err("DataColumnSidecar: Length should not be 0")
var blobCount = data_columns[0].len
var blobCount = data_columns[0].column.len
for data_column in data_columns:
if not (blobCount == data_column.column.len):
return err ("DataColumns do not have the same length")
@ -175,27 +175,27 @@ proc recover_blobs*(
# Transform the cell as a ckzg cell
var ckzgCell: Cell
for i in 0 ..< int(FIELD_ELEMENTS_PER_CELL):
ckzgCell[i] = cell[32*i ..< 32*(i+1)].toArray()
var start = 32 * i
for j in 0 ..< 32:
ckzgCell[start + j] = cell[start+j]
ckzgCells.add(ckzgCell)
# Recovering the blob
let recovered_cells = recoverAllCells(cell_ids, ckzgCells)
if not recovered_cells.isOk:
return err (fmt"Recovering all cells for blob - {blobIdx} failed: {recovered_cells.error}")
return err ("Recovering all cells for blob failed")
let recovered_blob_res = cellsToBlob(recovered_cells.get)
if not recovered_blob_res.isOk:
return err (fmt"Cells to blob for blob - {blobIdx} failed: {recovered_blob_res.error}")
return err ("Cells to blob for blob failed")
recovered_blobs.add(recovered_blob_res.get)
ok(recovered_blobs)
# https://github.com/ethereum/consensus-specs/blob/5f48840f4d768bf0e0a8156a3ed06ec333589007/specs/_features/eip7594/das-core.md#get_data_column_sidecars
proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock | electra.SignedBeaconBlock | ForkySignedBeaconBlock, blobs: seq[KzgBlob]): Result[seq[DataColumnSidecar], cstring] =
proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock | electra.SignedBeaconBlock, blobs: seq[KzgBlob]): Result[seq[DataColumnSidecar], cstring] =
var
sidecar: DataColumnSidecar
signed_block_header: SignedBeaconBlockHeader

View File

@ -29,7 +29,7 @@ const
SYNC_MAX_REQUESTED_BLOCKS* = 32 # Spec allows up to MAX_REQUEST_BLOCKS.
## Maximum number of blocks which will be requested in each
## `beaconBlocksByRoot` invocation.
PARALLEL_REQUESTS* = 2
PARALLEL_REQUESTS* = 8
## Number of peers we using to resolve our request.
BLOB_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000

View File

@ -24,7 +24,7 @@ const
## Allow syncing ~64 blocks/sec (minus request costs)
blobResponseCost = allowedOpsPerSecondCost(1000)
## Multiple can exist per block, they are much smaller than blocks
dataColumnResponseCost = allowedOpsPerSecondCost(4000)
dataColumnResponseCost = allowedOpsPerSecondCost(250)
## 1 blob has an equivalent memory of 8 data columns
type

View File

@ -336,6 +336,14 @@ proc handleLightClientUpdates*(node: BeaconNode, slot: Slot)
warn "LC optimistic update failed to send",
error = sendResult.error()
proc sendReconstructedDataColumns(node: BeaconNode,
blck: ForkySignedBeaconBlock)
{.async: (raises: [CancelledError]).} =
let res = await node.router.routeReconstructedDataColumns(blck)
if not res.isOk:
warn "Unable to send reconstructed data columns"
return
proc createAndSendAttestation(node: BeaconNode,
fork: Fork,
genesis_validators_root: Eth2Digest,
@ -375,6 +383,8 @@ proc createAndSendAttestation(node: BeaconNode,
node.config.dumpDirOutgoing, registered.data,
registered.validator.pubkey)
proc getBlockProposalEth1Data*(node: BeaconNode,
state: ForkedHashedBeaconState):
BlockProposalEth1Data =

View File

@ -15,7 +15,9 @@ import
../spec/network,
../spec/eip7594_helpers,
../consensus_object_pools/spec_cache,
../gossip_processing/eth2_processor,
../gossip_processing/[
eth2_processor,
block_processor],
../networking/eth2_network,
./activity_metrics,
../spec/datatypes/deneb
@ -143,48 +145,47 @@ proc routeSignedBeaconBlock*(
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), error = res.error()
var blobRefs = Opt.none(BlobSidecars)
if blobsOpt.isSome():
let blobs = blobsOpt.get()
var workers = newSeq[Future[SendResult]](blobs.len)
for i in 0..<blobs.lenu64:
let subnet_id = compute_subnet_for_blob_sidecar(i)
workers[i] = router[].network.broadcastBlobSidecar(subnet_id, blobs[i])
let allres = await allFinished(workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Blob not sent",
blob = shortLog(blobs[i]), error = res.error[]
else:
notice "Blob sent", blob = shortLog(blobs[i])
blobRefs = Opt.some(blobs.mapIt(newClone(it)))
# var blobRefs = Opt.none(BlobSidecars)
# if blobsOpt.isSome():
# let blobs = blobsOpt.get()
# var workers = newSeq[Future[SendResult]](blobs.len)
# for i in 0..<blobs.lenu64:
# let subnet_id = compute_subnet_for_blob_sidecar(i)
# workers[i] = router[].network.broadcastBlobSidecar(subnet_id, blobs[i])
# let allres = await allFinished(workers)
# for i in 0..<allres.len:
# let res = allres[i]
# doAssert res.finished()
# if res.failed():
# notice "Blob not sent",
# blob = shortLog(blobs[i]), error = res.error[]
# else:
# notice "Blob sent", blob = shortLog(blobs[i])
# blobRefs = Opt.some(blobs.mapIt(newClone(it)))
# var dataColumnRefs = Opt.none(DataColumnSidecars)
# when typeof(blck).kind >= ConsensusFork.Deneb:
# if blobsOpt.isSome():
# let blobs = blobsOpt.get()
# let data_columns = get_data_column_sidecars(blck, blobs.mapIt(it.blob)).get()
# var das_workers = newSeq[Future[SendResult]](len(data_columns))
# for i in 0..<data_columns.lenu64:
# let subnet_id = compute_subnet_for_data_column_sidecar(i)
# das_workers[i] =
# router[].network.broadcastDataColumnSidecar(subnet_id, data_columns[i])
# let allres = await allFinished(das_workers)
# for i in 0..<allres.len:
# let res = allres[i]
# doAssert res.finished()
# if res.failed():
# notice "Data Columns not sent",
# data_column = shortLog(data_columns[i]), error = res.error[]
# else:
# notice "Data columns sent", data_column = shortLog(data_columns[i])
# blobRefs = Opt.some(blobs.mapIt(newClone(it)))
# dataColumnRefs = Opt.some(data_columns.mapIt(newClone(it)))
var dataColumnRefs = Opt.none(DataColumnSidecars)
when typeof(blck).kind >= ConsensusFork.Deneb:
if blobsOpt.isSome():
let blobs = blobsOpt.get()
let data_columns = get_data_column_sidecars(blck, blobs.mapIt(it.blob)).get()
var das_workers = newSeq[Future[SendResult]](len(data_columns))
for i in 0..<data_columns.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(i)
das_workers[i] =
router[].network.broadcastDataColumnSidecar(subnet_id, data_columns[i])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Data Columns not sent",
data_column = shortLog(data_columns[i]), error = res.error[]
else:
notice "Data columns sent", data_column = shortLog(data_columns[i])
dataColumnRefs = Opt.some(data_columns.mapIt(newClone(it)))
let added = await router[].blockProcessor[].addBlock(
MsgSource.api, ForkedSignedBeaconBlock.init(blck), blobRefs, Opt.none(DataColumnSidecars))
MsgSource.api, ForkedSignedBeaconBlock.init(blck), Opt.none(BlobSidecars), dataColumnRefs)
# The boolean we return tells the caller whether the block was integrated
# into the chain
@ -213,6 +214,56 @@ proc routeSignedBeaconBlock*(
signature = shortLog(blck.signature)
ok(blockRef)
proc routeReconstructedDataColumns*(
router: ref MessageRouter,
blck: ForkySignedBeaconBlock):
Future[SendResult] {.async: (raises: [CancelledError]).} =
## Process reconstructing the data columns and broadcast once done
block:
when typeof(blck).kind >= ConsensusFork.Deneb:
let res = await router[].processor.processDataColumnReconstruction(
router[].network, blck)
if not res.isGoodForSending:
warn "Issue sending reconstructed data columns"
return err(res.error()[1])
let custody_columns = get_custody_columns(
router.network.nodeId,
CUSTODY_REQUIREMENT)
var
data_column_sidecars: DataColumnSidecars
columnsOk = true
for custody_column in custody_columns.get:
let data_column = DataColumnSidecar.new()
if not router[].processor.dag.db.getDataColumnSidecar(
blck.root, custody_column, data_column[]):
columnsOk = false
debug "Issue with loading reconstructed data columns"
break
data_column_sidecars.add data_column
var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars))
for i in 0..<data_column_sidecars.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(i)
das_workers[i] =
router[].network.broadcastDataColumnSidecar(subnet_id, data_column_sidecars[i][])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Reconstructed data columns not sent",
data_column = shortLog(data_column_sidecars[i][]), error = res.error[]
else:
notice "Reconstructed data columns sent",
data_column = shortLog(data_column_sidecars[i][])
return ok()
proc routeAttestation*(
router: ref MessageRouter,
attestation: phase0.Attestation | electra.Attestation,