add: data column reconstruction and broadcast (#6481)

* save commit, decouples reconstruction and broadcasting

* save progress

* add: reconstruction event loop, previous reconstruction related cleanups
This commit is contained in:
Agnish Ghosh 2024-08-08 17:44:55 +05:30 committed by GitHub
parent b32205de7c
commit 9be615dff9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 176 additions and 164 deletions

View File

@ -445,62 +445,6 @@ proc checkForPotentialDoppelganger(
attestation = shortLog(attestation)
quitDoppelganger()
#TODO: need to revamp `recover_blobs` and rewrite this
# proc processDataColumnReconstruction*(
# self: ref Eth2Processor,
# node: Eth2Node,
# signed_block: deneb.SignedBeaconBlock |
# electra.SignedBeaconBlock):
# Future[ValidationRes] {.async: (raises: [CancelledError]).} =
# let
# dag = self.dag
# root = signed_block.root
# custodiedColumnIndices = get_custody_columns(
# node.nodeId,
# CUSTODY_REQUIREMENT)
# var
# data_column_sidecars: seq[DataColumnSidecar]
# columnsOk = true
# storedColumns: seq[ColumnIndex]
# # Loading the data columns from the database
# for custody_column in custodiedColumnIndices.get:
# let data_column = DataColumnSidecar.new()
# if not dag.db.getDataColumnSidecar(root, custody_column, data_column[]):
# columnsOk = false
# break
# data_column_sidecars.add data_column[]
# storedColumns.add data_column.index
# if columnsOk:
# debug "Loaded data column for reconstruction"
# # storedColumn number is less than the NUMBER_OF_COLUMNS
# # then reconstruction is not possible, and if all the data columns
# # are already stored then we do not need to reconstruct at all
# if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS:
# return ok()
# else:
# return errIgnore ("DataColumnSidecar: Reconstruction error!")
# # Recover blobs from saved data column sidecars
# let recovered_blobs = recover_blobs(data_column_sidecars, storedColumns.len, signed_block)
# if not recovered_blobs.isOk:
# return errIgnore ("Error recovering blobs from data columns")
# # Reconstruct data column sidecars from recovered blobs
# let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_blobs.get)
# for data_column in data_column_sidecars:
# if data_column.index notin custodiedColumnIndices.get:
# continue
# dag.db.putDataColumnSidecar(data_column)
# ok()
proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
attestation: phase0.Attestation | electra.Attestation, subnet_id: SubnetId,

View File

@ -19,7 +19,7 @@ import
./networking/[topic_params, network_metadata_downloads, eth2_network],
./rpc/[rest_api, state_ttl_cache],
./spec/datatypes/[altair, bellatrix, phase0],
./spec/[deposit_snapshots, engine_authentication, weak_subjectivity],
./spec/[deposit_snapshots, eip7594_helpers, engine_authentication, weak_subjectivity],
./sync/[sync_protocol, light_client_protocol],
./validators/[keystore_management, beacon_validators],
"."/[
@ -30,6 +30,7 @@ when defined(posix):
import system/ansi_c
from ./spec/datatypes/deneb import SignedBeaconBlock
from ./spec/datatypes/electra import SignedBeaconBlock
from
libp2p/protocols/pubsub/gossipsub
@ -1471,10 +1472,119 @@ proc pruneDataColumns(node: BeaconNode, slot: Slot) =
count = count + 1
debug "pruned data columns", count, dataColumnPruneEpoch
proc tryReconstructingDataColumns* (self: BeaconNode,
signed_block: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock):
Result[void, string] =
# Checks whether the data columns can be reconstructed
# or not from the recovery matrix
let localCustodySubnetCount =
if self.config.subscribeAllSubnets:
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
else:
CUSTODY_REQUIREMENT
let
db = self.db
root = signed_block.root
custodiedColumnIndices = get_custody_columns(
self.network.nodeId,
localCustodySubnetCount)
var
data_column_sidecars: seq[DataColumnSidecar]
columnsOk = true
storedColumns: seq[ColumnIndex]
# Loading the data columns from the database
for custody_column in custodiedColumnIndices.get:
let data_column = DataColumnSidecar.new()
if not db.getDataColumnSidecar(root, custody_column, data_column[]):
columnsOk = false
break
data_column_sidecars.add data_column[]
storedColumns.add data_column.index
if columnsOk:
debug "Loaded data column for reconstruction"
# storedColumn number is less than the NUMBER_OF_COLUMNS
# then reconstruction is not possible, and if all the data columns
# are already stored then we do not need to reconstruct at all
if storedColumns.len < NUMBER_OF_COLUMNS or storedColumns.len == NUMBER_OF_COLUMNS:
return ok()
else:
return err("DataColumnSidecar: Reconstruction error!")
# Recover blobs from saved data column sidecars
let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block)
if not recovered_cps.isOk:
return err("Error recovering cells and proofs from data columns")
# Reconstruct data column sidecars from recovered blobs
let reconstructedDataColumns = get_data_column_sidecars(signed_block, recovered_cps.get)
for data_column in reconstructedDataColumns.get:
if data_column.index notin custodiedColumnIndices.get:
continue
db.putDataColumnSidecar(data_column)
notice "Data Column Reconstructed and Saved Successfully"
ok()
proc reconstructAndSendDataColumns*(node: BeaconNode) {.async.} =
let
db = node.db
root = node.dag.head.root
let blck = getForkedBlock(db, root).valueOr: return
withBlck(blck):
when typeof(forkyBlck).kind < ConsensusFork.Deneb: return
else:
let res = node.tryReconstructingDataColumns(forkyblck)
if not res.isOk():
return
let custody_columns = get_custody_columns(
node.network.nodeId,
CUSTODY_REQUIREMENT)
var
data_column_sidecars: DataColumnSidecars
columnsOk = true
for custody_column in custody_columns.get:
let data_column = DataColumnSidecar.new()
if not db.getDataColumnSidecar(
root, custody_column, data_column[]):
columnsOk = false
debug "Issue with loading reconstructed data columns"
break
data_column_sidecars.add data_column
var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars))
for i in 0..<data_column_sidecars.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(i)
das_workers[i] =
node.network.broadcastDataColumnSidecar(subnet_id, data_column_sidecars[i][])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Reconstructed data columns not sent",
data_column = shortLog(data_column_sidecars[i][]), error = res.error[]
else:
notice "Reconstructed data columns sent",
data_column = shortLog(data_column_sidecars[i][])
proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# Things we do when slot processing has ended and we're about to wait for the
# next slot
await node.reconstructAndSendDataColumns()
# By waiting until close before slot end, ensure that preparation for next
# slot does not interfere with propagation of messages and with VC duties.
const endOffset = aggregateSlotOffset + nanos(

View File

@ -75,7 +75,7 @@ type
BlsCurveType* = ValidatorPrivKey | ValidatorPubKey | ValidatorSig
BlsResult*[T] = Result[T, cstring]
BlsResult*[T] = Result[T, cstring]
TrustedSig* = object
data* {.align: 16.}: array[RawSigSize, byte]
@ -414,6 +414,9 @@ func toHex*(x: CookedPubKey): string =
func `$`*(x: CookedPubKey): string =
$(x.toPubKey())
func toValidatorSig*(x: TrustedSig): ValidatorSig =
ValidatorSig(blob: x.data)
func toValidatorSig*(x: CookedSig): ValidatorSig =
ValidatorSig(blob: blscurve.Signature(x).exportRaw())

View File

@ -136,61 +136,72 @@ proc recover_matrix*(partial_matrix: seq[MatrixEntry],
ok(extended_matrix)
## THIS METHOD IS DEPRECATED, WILL BE REMOVED ONCE ALPHA 4 IS RELEASED
# proc recover_blobs*(
# data_columns: seq[DataColumnSidecar],
# columnCount: int,
# blck: deneb.SignedBeaconBlock |
# electra.SignedBeaconBlock |
# ForkySignedBeaconBlock):
# Result[seq[KzgBlob], cstring] =
# THIS METHOD IS DEPRECATED, WILL BE REMOVED ONCE ALPHA 4 IS RELEASED
proc recover_cells_and_proofs*(
data_columns: seq[DataColumnSidecar],
columnCount: int,
blck: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock |
ForkedTrustedSignedBeaconBlock):
Result[seq[CellsAndProofs], cstring] =
# # This helper recovers blobs from the data column sidecars
# if not (data_columns.len != 0):
# return err("DataColumnSidecar: Length should not be 0")
# This helper recovers blobs from the data column sidecars
if not (data_columns.len != 0):
return err("DataColumnSidecar: Length should not be 0")
# var blobCount = data_columns[0].column.len
# for data_column in data_columns:
# if not (blobCount == data_column.column.len):
# return err ("DataColumns do not have the same length")
var blobCount = data_columns[0].column.len
for data_column in data_columns:
if not (blobCount == data_column.column.len):
return err ("DataColumns do not have the same length")
# var recovered_blobs = newSeqOfCap[KzgBlob](blobCount)
var recovered_cps = newSeqOfCap[CellsAndProofs](blobCount)
# for blobIdx in 0 ..< blobCount:
# var
# cell_ids = newSeqOfCap[CellID](columnCount)
# ckzgCells = newSeqOfCap[KzgCell](columnCount)
for blobIdx in 0 ..< blobCount:
var
cell_ids = newSeqOfCap[CellID](columnCount)
ckzgCells = newSeqOfCap[KzgCell](columnCount)
# for data_column in data_columns:
# cell_ids.add(data_column.index)
for data_column in data_columns:
cell_ids.add(data_column.index)
# let
# column = data_column.column
# cell = column[blobIdx]
let
column = data_column.column
cell = column[blobIdx]
# # Transform the cell as a ckzg cell
# var ckzgCell: Cell
# for i in 0 ..< int(FIELD_ELEMENTS_PER_CELL):
# var start = 32 * i
# for j in 0 ..< 32:
# ckzgCell[start + j] = cell[start+j]
# Transform the cell as a ckzg cell
var ckzgCell: array[BYTES_PER_CELL, byte]
for i in 0 ..< int(FIELD_ELEMENTS_PER_CELL):
var start = 32 * i
for j in 0 ..< 32:
var inter = cell.bytes
ckzgCell[start + j] = inter[start+j].byte
# ckzgCells.add(ckzgCell)
ckzgCells.add(KzgCell(bytes: ckzgCell))
# # Recovering the blob
# let recovered_cells = recoverAllCells(cell_ids, ckzgCells)
# if not recovered_cells.isOk:
# return err ("Recovering all cells for blob failed")
# Recovering the cells and proofs
let recovered_cells_and_proofs = recoverCellsAndKzgProofs(cell_ids, ckzgCells)
# let recovered_blob_res = cellsToBlob(recovered_cells.get)
# if not recovered_blob_res.isOk:
# return err ("Cells to blob for blob failed")
recovered_cps.add(recovered_cells_and_proofs.get)
# recovered_blobs.add(recovered_blob_res.get)
ok(recovered_cps)
# ok(recovered_blobs)
proc compute_signed_block_header(signed_block: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock):
SignedBeaconBlockHeader =
let blck = signed_block.message
let block_header = BeaconBlockHeader(
slot: blck.slot,
proposer_index: blck.proposer_index,
parent_root: blck.parent_root,
state_root: blck.state_root,
body_root: hash_tree_root(blck.body)
)
result = SignedBeaconBlockHeader(
message: block_header,
signature: signed_block.signature.toValidatorSig
)
proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock |
proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock |
electra.SignedBeaconBlock):
SignedBeaconBlockHeader =
let blck = signed_block.message
@ -207,9 +218,9 @@ proc compute_signed_block_header(signed_block: deneb.SignedBeaconBlock |
)
# https://github.com/ethereum/consensus-specs/blob/bb8f3caafc92590cdcf2d14974adb602db9b5ca3/specs/_features/eip7594/das-core.md#get_data_column_sidecars
proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock |
electra.SignedBeaconBlock,
cellsAndProofs: CellsAndProofs):
proc get_data_column_sidecars*(signed_block: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock,
cellsAndProofs: seq[CellsAndProofs]):
Result[seq[DataColumnSidecar], string] =
# Given a signed block and the cells/proofs associated with each blob
# in the block, assemble the sidecars which can be distributed to peers.
@ -221,18 +232,17 @@ proc get_data_column_sidecars*(signed_block: deneb.SignedBeaconBlock |
var sidecars = newSeq[DataColumnSidecar](CELLS_PER_EXT_BLOB)
if cellsAndProofs.cells.len == 0 or
cellsAndProofs.proof.len == 0:
if cellsAndProofs.len == 0:
return ok(sidecars)
for column_index in 0..<NUMBER_OF_COLUMNS:
var
column_cells: DataColumn
column_proofs: KzgProofs
for i in 0..<cellsAndProofs.cells.len:
let check1 = column_cells.add(cellsAndProofs.cells[column_index])
for i in 0..<cellsAndProofs.len:
let check1 = column_cells.add(cellsAndProofs[column_index].cells)
doAssert check1 == true, "Issue fetching cell from CellsAndProofs"
let check2 = column_proofs.add(cellsAndProofs.proofs[column_index])
let check2 = column_proofs.add(cellsAndProofs[column_index].proofs)
doAssert check2 == true, "Issue fetching proof from CellsAndProofs"
var sidecar = DataColumnSidecar(

View File

@ -232,11 +232,6 @@ iterator blobSidecarTopics*(forkDigest: ForkDigest): string =
yield getBlobSidecarTopic(forkDigest, subnet_id)
const
KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH* = 32
MAX_REQUEST_DATA_COLUMN_SIDECARS* = MAX_REQUEST_BLOCKS_DENEB * NUMBER_OF_COLUMNS
MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS* = 4096
func getDataColumnSidecarTopic*(forkDigest: ForkDigest,
subnet_id: uint64): string =
eth2Prefix(forkDigest) & "data_column_sidecar_" & $subnet_id & "/ssz_snappy"

View File

@ -442,7 +442,7 @@ p2pProtocol BeaconSync(version = 1,
reqCount: uint64,
reqColumns: List[ColumnIndex, NUMBER_OF_COLUMNS],
response: MultipleChunksResponse[
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)])
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMNS)])
{.async, libp2pProtocol("data_column_sidecars_by_range", 1).} =
trace "got data columns range request", peer, startSlot,

View File

@ -220,56 +220,6 @@ proc routeSignedBeaconBlock*(
signature = shortLog(blck.signature)
ok(blockRef)
proc routeReconstructedDataColumns*(
router: ref MessageRouter,
blck: ForkySignedBeaconBlock):
Future[SendResult] {.async: (raises: [CancelledError]).} =
## Process reconstructing the data columns and broadcast once done
block:
when typeof(blck).kind >= ConsensusFork.Deneb:
let res = await router[].processor.processDataColumnReconstruction(
router[].network, blck)
if not res.isGoodForSending:
warn "Issue sending reconstructed data columns"
return err(res.error()[1])
let custody_columns = get_custody_columns(
router.network.nodeId,
CUSTODY_REQUIREMENT)
var
data_column_sidecars: DataColumnSidecars
columnsOk = true
for custody_column in custody_columns.get:
let data_column = DataColumnSidecar.new()
if not router[].processor.dag.db.getDataColumnSidecar(
blck.root, custody_column, data_column[]):
columnsOk = false
debug "Issue with loading reconstructed data columns"
break
data_column_sidecars.add data_column
var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars))
for i in 0..<data_column_sidecars.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(i)
das_workers[i] =
router[].network.broadcastDataColumnSidecar(subnet_id, data_column_sidecars[i][])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Reconstructed data columns not sent",
data_column = shortLog(data_column_sidecars[i][]), error = res.error[]
else:
notice "Reconstructed data columns sent",
data_column = shortLog(data_column_sidecars[i][])
return ok()
proc routeAttestation*(
router: ref MessageRouter,
attestation: phase0.Attestation | electra.Attestation,