add: forward and backward syncing for data columns, broadcasting data columns created from blobs, added dc support to sync_queue

This commit is contained in:
Agnish Ghosh 2024-06-24 17:32:06 +05:30
parent e2afc583cb
commit 791d2fb0d1
No known key found for this signature in database
GPG Key ID: 7BDDA05D1B25E9F8
10 changed files with 223 additions and 32 deletions

View File

@ -24,6 +24,7 @@ const
## Enough for finalization in an alternative fork ## Enough for finalization in an alternative fork
MaxBlobless = SLOTS_PER_EPOCH MaxBlobless = SLOTS_PER_EPOCH
## Arbitrary ## Arbitrary
MaxColumnless = SLOTS_PER_EPOCH
MaxUnviables = 16 * 1024 MaxUnviables = 16 * 1024
## About a day of blocks - most likely not needed but it's quite cheap.. ## About a day of blocks - most likely not needed but it's quite cheap..
@ -238,6 +239,18 @@ func cleanupBlobless(quarantine: var Quarantine, finalizedSlot: Slot) =
quarantine.addUnviable k quarantine.addUnviable k
quarantine.blobless.del k quarantine.blobless.del k
func cleanupColumnless(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[Eth2Digest]
for k,v in quarantine.columnless:
withBlck(v):
if not isViable(finalizedSlot, forkyBlck.message.slot):
toDel.add k
for k in toDel:
quarantine.addUnviable k
quarantine.columnless.del k
func clearAfterReorg*(quarantine: var Quarantine) = func clearAfterReorg*(quarantine: var Quarantine) =
## Clear missing and orphans to start with a fresh slate in case of a reorg ## Clear missing and orphans to start with a fresh slate in case of a reorg
## Unviables remain unviable and are not cleared. ## Unviables remain unviable and are not cleared.
@ -339,6 +352,29 @@ iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
for k, v in quarantine.blobless.mpairs(): for k, v in quarantine.blobless.mpairs():
yield v yield v
proc addColumnless*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): bool =
if not isViable(finalizedSlot, signedBlock.message.slot):
quarantine.addUnviable(signedBlock.root)
return false
quarantine.cleanupColumnless(finalizedSlot)
if quarantine.columnless.lenu64 >= MaxColumnless:
var oldest_columnless_key: Eth2Digest
for k in quarantine.columnless.keys:
oldest_columnless_key = k
break
quarantine.columnless.del oldest_columnless_key
debug "block quarantine: Adding columnless", blck = shortLog(signedBlock)
quarantine.columnless[signedBlock.root] =
ForkedSignedBeaconBlock.init(signedBlock)
quarantine.missing.del(signedBlock.root)
true
func popColumnless*( func popColumnless*(
quarantine: var Quarantine, quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =

View File

@ -78,7 +78,7 @@ func popDataColumns*(
var c: ref DataColumnSidecar var c: ref DataColumnSidecar
if quarantine.data_columns.pop((digest, ColumnIndex idx), c): if quarantine.data_columns.pop((digest, ColumnIndex idx), c):
r.add(c) r.add(c)
true r
func hasDataColumns*(quarantine: DataColumnQuarantine, func hasDataColumns*(quarantine: DataColumnQuarantine,
blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): bool = blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): bool =

View File

@ -10,7 +10,7 @@
import import
stew/results, stew/results,
chronicles, chronos, metrics, chronicles, chronos, metrics,
../spec/[forks, signatures, signatures_batch], ../spec/[forks, signatures, signatures_batch, eip7594_helpers],
../sszdump ../sszdump
from std/deques import Deque, addLast, contains, initDeque, items, len, shrink from std/deques import Deque, addLast, contains, initDeque, items, len, shrink
@ -27,13 +27,16 @@ from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
from ../consensus_object_pools/block_pools_types import from ../consensus_object_pools/block_pools_types import
EpochRef, VerifierError EpochRef, VerifierError
from ../consensus_object_pools/block_quarantine import from ../consensus_object_pools/block_quarantine import
addBlobless, addOrphan, addUnviable, pop, removeOrphan addBlobless, addOrphan, addUnviable, pop, removeOrphan, addColumnless
from ../consensus_object_pools/blob_quarantine import from ../consensus_object_pools/blob_quarantine import
BlobQuarantine, hasBlobs, popBlobs, put BlobQuarantine, hasBlobs, popBlobs, put
from ../consensus_object_pools/data_column_quarantine import
DataColumnQuarantine, hasDataColumns, popDataColumns, put
from ../validators/validator_monitor import from ../validators/validator_monitor import
MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock, MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock,
registerSyncAggregateInBlock registerSyncAggregateInBlock
from ../beacon_chain_db import getBlobSidecar, putBlobSidecar from ../beacon_chain_db import getBlobSidecar, putBlobSidecar,
getDataColumnSidecar, putDataColumnSidecar
from ../spec/state_transition_block import validate_blobs from ../spec/state_transition_block import validate_blobs
export sszdump, signatures_batch export sszdump, signatures_batch
@ -58,6 +61,7 @@ type
BlockEntry = object BlockEntry = object
blck*: ForkedSignedBeaconBlock blck*: ForkedSignedBeaconBlock
blobs*: Opt[BlobSidecars] blobs*: Opt[BlobSidecars]
data_columns*: Opt[DataColumnSidecars]
maybeFinalized*: bool maybeFinalized*: bool
## The block source claims the block has been finalized already ## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]].Raising([CancelledError]) resfut*: Future[Result[void, VerifierError]].Raising([CancelledError])
@ -102,6 +106,7 @@ type
getBeaconTime: GetBeaconTimeFn getBeaconTime: GetBeaconTimeFn
blobQuarantine: ref BlobQuarantine blobQuarantine: ref BlobQuarantine
dataColumnQuarantine: ref DataColumnQuarantine
verifier: BatchVerifier verifier: BatchVerifier
lastPayload: Slot lastPayload: Slot
@ -174,7 +179,8 @@ from ../consensus_object_pools/block_clearance import
proc storeBackfillBlock( proc storeBackfillBlock(
self: var BlockProcessor, self: var BlockProcessor,
signedBlock: ForkySignedBeaconBlock, signedBlock: ForkySignedBeaconBlock,
blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] = blobsOpt: Opt[BlobSidecars],
dataColumnsOpt: Opt[DataColumnSidecars]): Result[void, VerifierError] =
# The block is certainly not missing any more # The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root) self.consensusManager.quarantine[].missing.del(signedBlock.root)
@ -182,6 +188,7 @@ proc storeBackfillBlock(
# Establish blob viability before calling addbackfillBlock to avoid # Establish blob viability before calling addbackfillBlock to avoid
# writing the block in case of blob error. # writing the block in case of blob error.
var blobsOk = true var blobsOk = true
var columnsOk = true
when typeof(signedBlock).kind >= ConsensusFork.Deneb: when typeof(signedBlock).kind >= ConsensusFork.Deneb:
if blobsOpt.isSome: if blobsOpt.isSome:
let blobs = blobsOpt.get() let blobs = blobsOpt.get()
@ -202,6 +209,24 @@ proc storeBackfillBlock(
if not blobsOk: if not blobsOk:
return err(VerifierError.Invalid) return err(VerifierError.Invalid)
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
if dataColumnsOpt.isSome:
let data_columns = dataColumnsOpt.get()
if data_columns.len > 0:
for i in 0..<data_columns.len:
let r = verify_data_column_sidecar_kzg_proofs(data_columns[i][])
if r.isErr():
debug "backfill datacolumn validation failed",
blockRoot = shortLog(signedBlock.root),
data_column = shortLog(data_columns[i][]),
blck = shortLog(signedBlock.message),
signature = shortLog(signedBlock.signature),
msg = r.error()
columnsOk = r.isOk()
if not columnsOk:
return err(VerifierError.Invalid)
let res = self.consensusManager.dag.addBackfillBlock(signedBlock) let res = self.consensusManager.dag.addBackfillBlock(signedBlock)
if res.isErr(): if res.isErr():
@ -225,6 +250,11 @@ proc storeBackfillBlock(
for b in blobs: for b in blobs:
self.consensusManager.dag.db.putBlobSidecar(b[]) self.consensusManager.dag.db.putBlobSidecar(b[])
# Only store data columns after successfully establishing block validity
let data_columns = dataColumnsOpt.valueOr: DataColumnSidecars @[]
for c in data_columns:
self.consensusManager.dag.db.putDataColumnSidecar(c[])
res res
from web3/engine_api_types import from web3/engine_api_types import
@ -385,6 +415,7 @@ proc checkBloblessSignature(
proc enqueueBlock*( proc enqueueBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], blobs: Opt[BlobSidecars],
data_columns: Opt[DataColumnSidecars],
resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil, resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil,
maybeFinalized = false, maybeFinalized = false,
validationDur = Duration()) = validationDur = Duration()) =
@ -392,7 +423,7 @@ proc enqueueBlock*(
if forkyBlck.message.slot <= self.consensusManager.dag.finalizedHead.slot: if forkyBlck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
# let backfill blocks skip the queue - these are always "fast" to process # let backfill blocks skip the queue - these are always "fast" to process
# because there are no state rewinds to deal with # because there are no state rewinds to deal with
let res = self.storeBackfillBlock(forkyBlck, blobs) let res = self.storeBackfillBlock(forkyBlck, blobs, data_columns)
resfut.complete(res) resfut.complete(res)
return return
@ -400,6 +431,7 @@ proc enqueueBlock*(
self.blockQueue.addLastNoWait(BlockEntry( self.blockQueue.addLastNoWait(BlockEntry(
blck: blck, blck: blck,
blobs: blobs, blobs: blobs,
data_columns: data_columns,
maybeFinalized: maybeFinalized, maybeFinalized: maybeFinalized,
resfut: resfut, queueTick: Moment.now(), resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur, validationDur: validationDur,
@ -411,6 +443,7 @@ proc storeBlock(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime, self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock, signedBlock: ForkySignedBeaconBlock,
blobsOpt: Opt[BlobSidecars], blobsOpt: Opt[BlobSidecars],
dataColumnsOpt: Opt[DataColumnSidecars],
maybeFinalized = false, maybeFinalized = false,
queueTick: Moment = Moment.now(), validationDur = Duration()): queueTick: Moment = Moment.now(), validationDur = Duration()):
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async: (raises: [CancelledError]).} = Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async: (raises: [CancelledError]).} =
@ -508,10 +541,25 @@ proc storeBlock(
Opt.some blob_sidecars Opt.some blob_sidecars
else: else:
Opt.none BlobSidecars Opt.none BlobSidecars
if blobsOk:
var columnsOk = true
let data_columns =
withBlck(parentBlck.get()):
when consensusFork >= ConsensusFork.Deneb:
var data_column_sidecars: DataColumnSidecars
for i in 0..<forkyBlck.message.body.blob_kzg_commitments.len:
let data_column = DataColumnSidecar.new()
if not dag.db.getDataColumnSidecar(parent_root, i.ColumnIndex, data_column[]):
columnsOk = false
break
data_column_sidecars.add data_column
Opt.some data_column_sidecars
else:
Opt.none DataColumnSidecars
if blobsOk and columnsOk:
debug "Loaded parent block from storage", parent_root debug "Loaded parent block from storage", parent_root
self[].enqueueBlock( self[].enqueueBlock(
MsgSource.gossip, parentBlck.unsafeGet().asSigned(), blobs) MsgSource.gossip, parentBlck.unsafeGet().asSigned(), blobs, data_columns)
return handleVerifierError(parent.error()) return handleVerifierError(parent.error())
@ -771,21 +819,23 @@ proc storeBlock(
withBlck(quarantined): withBlck(quarantined):
when typeof(forkyBlck).kind < ConsensusFork.Deneb: when typeof(forkyBlck).kind < ConsensusFork.Deneb:
self[].enqueueBlock( self[].enqueueBlock(
MsgSource.gossip, quarantined, Opt.none(BlobSidecars)) MsgSource.gossip, quarantined, Opt.none(BlobSidecars), Opt.none(DataColumnSidecars))
else: else:
if len(forkyBlck.message.body.blob_kzg_commitments) == 0: if len(forkyBlck.message.body.blob_kzg_commitments) == 0:
self[].enqueueBlock( self[].enqueueBlock(
MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[])) MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]), Opt.some(DataColumnSidecars @[]))
else: else:
if (let res = checkBloblessSignature(self[], forkyBlck); res.isErr): if (let res = checkBloblessSignature(self[], forkyBlck); res.isErr):
warn "Failed to verify signature of unorphaned blobless block", warn "Failed to verify signature of unorphaned blobless block",
blck = shortLog(forkyBlck), blck = shortLog(forkyBlck),
error = res.error() error = res.error()
continue continue
if self.blobQuarantine[].hasBlobs(forkyBlck): if self.blobQuarantine[].hasBlobs(forkyBlck) and self.dataColumnQuarantine[].hasDataColumns(forkyBlck):
let blobs = self.blobQuarantine[].popBlobs( let blobs = self.blobQuarantine[].popBlobs(
forkyBlck.root, forkyBlck) forkyBlck.root, forkyBlck)
self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs)) let data_columns = self.dataColumnQuarantine[].popDataColumns(
forkyBlck.root, forkyBlck)
self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs), Opt.some(data_columns))
else: else:
discard self.consensusManager.quarantine[].addBlobless( discard self.consensusManager.quarantine[].addBlobless(
dag.finalizedHead.slot, forkyBlck) dag.finalizedHead.slot, forkyBlck)
@ -797,7 +847,7 @@ proc storeBlock(
proc addBlock*( proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized = false, blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars], maybeFinalized = false,
validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
## Enqueue a Gossip-validated block for consensus verification ## Enqueue a Gossip-validated block for consensus verification
# Backpressure: # Backpressure:
@ -809,7 +859,7 @@ proc addBlock*(
# - RequestManager (missing ancestor blocks) # - RequestManager (missing ancestor blocks)
# - API # - API
let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock") let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock")
enqueueBlock(self, src, blck, blobs, resfut, maybeFinalized, validationDur) enqueueBlock(self, src, blck, blobs, data_columns, resfut, maybeFinalized, validationDur)
resfut resfut
# Event Loop # Event Loop
@ -830,7 +880,7 @@ proc processBlock(
let res = withBlck(entry.blck): let res = withBlck(entry.blck):
await self.storeBlock( await self.storeBlock(
entry.src, wallTime, forkyBlck, entry.blobs, entry.maybeFinalized, entry.src, wallTime, forkyBlck, entry.blobs, entry.data_columns, entry.maybeFinalized,
entry.queueTick, entry.validationDur) entry.queueTick, entry.validationDur)
if res.isErr and res.error[1] == ProcessingStatus.notCompleted: if res.isErr and res.error[1] == ProcessingStatus.notCompleted:
@ -842,7 +892,7 @@ proc processBlock(
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.2/sync/optimistic.md#execution-engine-errors # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.2/sync/optimistic.md#execution-engine-errors
await sleepAsync(chronos.seconds(1)) await sleepAsync(chronos.seconds(1))
self[].enqueueBlock( self[].enqueueBlock(
entry.src, entry.blck, entry.blobs, entry.resfut, entry.maybeFinalized, entry.src, entry.blck, entry.blobs, entry.data_columns, entry.resfut, entry.maybeFinalized,
entry.validationDur) entry.validationDur)
# To ensure backpressure on the sync manager, do not complete these futures. # To ensure backpressure on the sync manager, do not complete these futures.
return return

View File

@ -261,9 +261,21 @@ proc processSignedBeaconBlock*(
else: else:
Opt.none(BlobSidecars) Opt.none(BlobSidecars)
let data_columns =
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
if self.dataColumnQuarantine[].hasDataColumns(signedBlock):
Opt.some(self.dataColumnQuarantine[].popDataColumns(signedBlock.root, signedBlock))
else:
discard self.quarantine[].addColumnless(self.dag.finalizedHead.slot,
signedBlock)
return v
else:
Opt.none(DataColumnSidecars)
self.blockProcessor[].enqueueBlock( self.blockProcessor[].enqueueBlock(
src, ForkedSignedBeaconBlock.init(signedBlock), src, ForkedSignedBeaconBlock.init(signedBlock),
blobs, blobs,
data_columns,
maybeFinalized = maybeFinalized, maybeFinalized = maybeFinalized,
validationDur = nanoseconds( validationDur = nanoseconds(
(self.getCurrentBeaconTime() - wallTime).nanoseconds)) (self.getCurrentBeaconTime() - wallTime).nanoseconds))
@ -317,7 +329,8 @@ proc processBlobSidecar*(
if self.blobQuarantine[].hasBlobs(forkyBlck): if self.blobQuarantine[].hasBlobs(forkyBlck):
self.blockProcessor[].enqueueBlock( self.blockProcessor[].enqueueBlock(
MsgSource.gossip, blobless, MsgSource.gossip, blobless,
Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck))) Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck)),
Opt.none(DataColumnSidecars))
else: else:
discard self.quarantine[].addBlobless( discard self.quarantine[].addBlobless(
self.dag.finalizedHead.slot, forkyBlck) self.dag.finalizedHead.slot, forkyBlck)
@ -352,7 +365,7 @@ proc processDataColumnSidecar*(
if v.isErr(): if v.isErr():
debug "Dropping data column", error = v.error() debug "Dropping data column", error = v.error()
blob_sidecars_dropped.inc(1, [$v.error[0]]) data_column_sidecars_dropped.inc(1, [$v.error[0]])
return v return v
debug "Data column validated" debug "Data column validated"

View File

@ -2663,6 +2663,14 @@ proc broadcastBlobSidecar*(
topic = getBlobSidecarTopic(forkPrefix, subnet_id) topic = getBlobSidecarTopic(forkPrefix, subnet_id)
node.broadcast(topic, blob) node.broadcast(topic, blob)
proc broadcastDataColumnSidecar*(
node: Eth2Node, subnet_id: uint64, data_column: DataColumnSidecar):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let
forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch)
topic = getDataColumnSidecarTopic(forkPrefix, subnet_id)
node.broadcast(topic, data_column)
proc broadcastSyncCommitteeMessage*( proc broadcastSyncCommitteeMessage*(
node: Eth2Node, msg: SyncCommitteeMessage, node: Eth2Node, msg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex): subcommitteeIdx: SyncSubcommitteeIndex):

View File

@ -393,14 +393,15 @@ proc initFullNode(
rng, taskpool, consensusManager, node.validatorMonitor, rng, taskpool, consensusManager, node.validatorMonitor,
blobQuarantine, getBeaconTime) blobQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool): blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars],
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
# The design with a callback for block verification is unusual compared # The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach # to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise # taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future. # that should probably be reimagined more holistically in the future.
blockProcessor[].addBlock( blockProcessor[].addBlock(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) MsgSource.gossip, signedBlock, blobs, data_columns, maybeFinalized = maybeFinalized)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool): maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
@ -416,12 +417,13 @@ proc initFullNode(
else: else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs), Opt.some(blobs), Opt.none(DataColumnSidecars),
maybeFinalized = maybeFinalized) maybeFinalized = maybeFinalized)
else: else:
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars), Opt.none(BlobSidecars), Opt.none(DataColumnSidecars),
maybeFinalized = maybeFinalized) maybeFinalized = maybeFinalized)
rmanBlockLoader = proc( rmanBlockLoader = proc(
blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] =
dag.getForkedBlock(blockRoot) dag.getForkedBlock(blockRoot)
@ -1896,6 +1898,9 @@ proc installMessageValidators(node: BeaconNode) =
# toValidationResult( # toValidationResult(
# node.processor[].processBlobSidecar( # node.processor[].processBlobSidecar(
# MsgSource.gossip, blobSidecar, subnet_id))) # MsgSource.gossip, blobSidecar, subnet_id)))
# data_column_sidecar_{subnet_id}
#
for it in 0'u64..<DATA_COLUMN_SIDECAR_SUBNET_COUNT: for it in 0'u64..<DATA_COLUMN_SIDECAR_SUBNET_COUNT:
closureScope: # Needed for inner `proc`; don't lift it out of loop. closureScope: # Needed for inner `proc`; don't lift it out of loop.
let subnet_id = it let subnet_id = it

View File

@ -60,6 +60,8 @@ type
kzg_commitments_inclusion_proof*: kzg_commitments_inclusion_proof*:
array[KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH, Eth2Digest] array[KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH, Eth2Digest]
DataColumnSidecars* = seq[ref DataColumnSidecar]
DataColumnIdentifier* = object DataColumnIdentifier* = object
block_root*: Eth2Digest block_root*: Eth2Digest
index*: ColumnIndex index*: ColumnIndex
@ -79,5 +81,8 @@ func shortLog*(v: DataColumnSidecar): auto =
block_header: shortLog(v.signed_block_header.message), block_header: shortLog(v.signed_block_header.message),
) )
func shortLog*(v: seq[DataColumnSidecar]): auto =
"[" & v.mapIt(shortLog(it)).join(", ") & "]"
func shortLog*(x: seq[DataColumnIdentifier]): string = func shortLog*(x: seq[DataColumnIdentifier]): string =
"[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]" "[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]"

View File

@ -132,12 +132,12 @@ proc recover_matrix*(partial_matrix: seq[MatrixEntry], blobCount: int): Result[s
proofs.add(e.kzg_proof) proofs.add(e.kzg_proof)
# https://github.com/ethereum/consensus-specs/blob/5f48840f4d768bf0e0a8156a3ed06ec333589007/specs/_features/eip7594/das-core.md#get_data_column_sidecars # https://github.com/ethereum/consensus-specs/blob/5f48840f4d768bf0e0a8156a3ed06ec333589007/specs/_features/eip7594/das-core.md#get_data_column_sidecars
proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock, blobs: seq[KzgBlob]): Result[seq[DataColumnSidecar], cstring] = proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock | electra.SignedBeaconBlock | ForkySignedBeaconBlock, blobs: seq[KzgBlob]): Result[seq[DataColumnSidecar], cstring] =
var sidecar: DataColumnSidecar var
var signed_block_header: deneb.SignedBeaconBlockHeader sidecar: DataColumnSidecar
var blck = signed_block.message signed_block_header: SignedBeaconBlockHeader
blck = signed_block.message
var cellsAndProofs: seq[KzgCellsAndKzgProofs] cellsAndProofs: seq[KzgCellsAndKzgProofs]
for blob in blobs: for blob in blobs:
let let

View File

@ -26,7 +26,8 @@ type
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [].} GetSlotCallback* = proc(): Slot {.gcsafe, raises: [].}
ProcessingCallback* = proc() {.gcsafe, raises: [].} ProcessingCallback* = proc() {.gcsafe, raises: [].}
BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool): blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars],
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
SyncQueueKind* {.pure.} = enum SyncQueueKind* {.pure.} = enum
@ -43,6 +44,7 @@ type
request*: SyncRequest[T] request*: SyncRequest[T]
data*: seq[ref ForkedSignedBeaconBlock] data*: seq[ref ForkedSignedBeaconBlock]
blobs*: Opt[seq[BlobSidecars]] blobs*: Opt[seq[BlobSidecars]]
data_columns*: Opt[seq[DataColumnSidecars]]
GapItem*[T] = object GapItem*[T] = object
start*: Slot start*: Slot
@ -546,6 +548,13 @@ func getOpt(blobs: Opt[seq[BlobSidecars]], i: int): Opt[BlobSidecars] =
else: else:
Opt.none(BlobSidecars) Opt.none(BlobSidecars)
func getOpt(data_columns: Opt[seq[DataColumnSidecars]], i: int):
Opt[DataColumnSidecars] =
if data_columns.isSome:
Opt.some(data_columns.get()[i])
else:
Opt.none(DataColumnSidecars)
iterator blocks[T](sq: SyncQueue[T], iterator blocks[T](sq: SyncQueue[T],
sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars]) = sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars]) =
case sq.kind case sq.kind
@ -556,6 +565,16 @@ iterator blocks[T](sq: SyncQueue[T],
for i in countdown(len(sr.data) - 1, 0): for i in countdown(len(sr.data) - 1, 0):
yield (sr.data[i], sr.blobs.getOpt(i)) yield (sr.data[i], sr.blobs.getOpt(i))
iterator das_blocks[T](sq: SyncQueue[T],
sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[DataColumnSidecars]) =
case sq.kind
of SyncQueueKind.Forward:
for i in countup(0, len(sr.data) - 1):
yield (sr.data[i], sr.data_columns.getOpt(i))
of SyncQueueKind.Backward:
for i in countdown(len(sr.data) - 1, 0):
yield (sr.data[i], sr.data_columns.getOpt(i))
proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) = proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) =
case sq.kind case sq.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
@ -687,7 +706,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
var i=0 var i=0
for blk, blb in sq.blocks(item): for blk, blb in sq.blocks(item):
res = await sq.blockVerifier(blk[], blb, maybeFinalized) res = await sq.blockVerifier(blk[], blb, Opt.none(DataColumnSidecars), maybeFinalized)
inc(i) inc(i)
if res.isOk(): if res.isOk():
@ -717,6 +736,38 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
req.item.updateScore(PeerScoreBadValues) req.item.updateScore(PeerScoreBadValues)
break break
var counter = 0
for blk, dc in sq.das_blocks(item):
res = await sq.blockVerifier(blk[], Opt.none(BlobSidecars), dc, maybeFinalized)
inc(counter)
if res.isOk():
goodBlock = some(blk[].slot)
else:
case res.error()
of VerifierError.MissingParent:
missingParentSlot = some(blk[].slot)
break
of VerifierError.Duplicate:
# Keep going, happens naturally
discard
of VerifierError.UnviableFork:
# Keep going so as to register other unviable blocks with the
# qurantine
if unviableBlock.isNone:
# Remember the first unviable block, so we can log it
unviableBlock = some((blk[].root, blk[].slot))
of VerifierError.Invalid:
hasInvalidBlock = true
let req = item.request
notice "Received invalid sequence of blocks", request = req,
blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data)
req.item.updateScore(PeerScoreBadValues)
break
# When errors happen while processing blocks, we retry the same request # When errors happen while processing blocks, we retry the same request
# with, hopefully, a different peer # with, hopefully, a different peer
let retryRequest = let retryRequest =

View File

@ -13,6 +13,7 @@ import
chronicles, chronicles,
metrics, metrics,
../spec/network, ../spec/network,
../spec/eip7594_helpers,
../consensus_object_pools/spec_cache, ../consensus_object_pools/spec_cache,
../gossip_processing/eth2_processor, ../gossip_processing/eth2_processor,
../networking/eth2_network, ../networking/eth2_network,
@ -160,8 +161,30 @@ proc routeSignedBeaconBlock*(
notice "Blob sent", blob = shortLog(blobs[i]) notice "Blob sent", blob = shortLog(blobs[i])
blobRefs = Opt.some(blobs.mapIt(newClone(it))) blobRefs = Opt.some(blobs.mapIt(newClone(it)))
var dataColumnRefs = Opt.none(DataColumnSidecars)
when typeof(blck).kind >= ConsensusFork.Deneb:
if blobsOpt.isSome():
let blobs = blobsOpt.get()
let data_columns = get_data_column_sidecars(blck, blobs.mapIt(it.blob)).get()
var das_workers = newSeq[Future[SendResult]](len(data_columns))
for i in 0..<data_columns.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(i)
das_workers[i] =
router[].network.broadcastDataColumnSidecar(subnet_id, data_columns[i])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Data Columns not sent",
data_column = shortLog(data_columns[i]), error = res.error[]
else:
notice "Data columns sent", data_column = shortLog(data_columns[i])
blobRefs = Opt.some(blobs.mapIt(newClone(it)))
dataColumnRefs = Opt.some(data_columns.mapIt(newClone(it)))
let added = await router[].blockProcessor[].addBlock( let added = await router[].blockProcessor[].addBlock(
MsgSource.api, ForkedSignedBeaconBlock.init(blck), blobRefs) MsgSource.api, ForkedSignedBeaconBlock.init(blck), blobRefs, dataColumnRefs)
# The boolean we return tells the caller whether the block was integrated # The boolean we return tells the caller whether the block was integrated
# into the chain # into the chain