simplify data column reconstruct and broadcast logic

This commit is contained in:
Agnish Ghosh 2024-08-19 12:04:05 +05:30
parent 9ed7a193cd
commit 287bf72fbb

View File

@ -1475,7 +1475,7 @@ proc pruneDataColumns(node: BeaconNode, slot: Slot) =
proc tryReconstructingDataColumns* (self: BeaconNode, proc tryReconstructingDataColumns* (self: BeaconNode,
signed_block: deneb.TrustedSignedBeaconBlock | signed_block: deneb.TrustedSignedBeaconBlock |
electra.TrustedSignedBeaconBlock): electra.TrustedSignedBeaconBlock):
Result[void, string] = Result[seq[DataColumnSidecar], string] =
# Checks whether the data columns can be reconstructed # Checks whether the data columns can be reconstructed
# or not from the recovery matrix # or not from the recovery matrix
@ -1492,7 +1492,9 @@ proc tryReconstructingDataColumns* (self: BeaconNode,
self.network.nodeId, self.network.nodeId,
localCustodySubnetCount) localCustodySubnetCount)
var var
finalisedDataColumns: seq[DataColumnSidecar]
data_column_sidecars: seq[DataColumnSidecar] data_column_sidecars: seq[DataColumnSidecar]
columnsOk = true columnsOk = true
storedColumns: seq[ColumnIndex] storedColumns: seq[ColumnIndex]
@ -1512,9 +1514,7 @@ proc tryReconstructingDataColumns* (self: BeaconNode,
# storedColumn number is less than the NUMBER_OF_COLUMNS # storedColumn number is less than the NUMBER_OF_COLUMNS
# then reconstruction is not possible, and if all the data columns # then reconstruction is not possible, and if all the data columns
# are already stored then we do not need to reconstruct at all # are already stored then we do not need to reconstruct at all
if storedColumns.len < NUMBER_OF_COLUMNS div 2 or storedColumns.len == NUMBER_OF_COLUMNS: if not storedColumns.len < NUMBER_OF_COLUMNS div 2 or storedColumns.len != NUMBER_OF_COLUMNS:
return ok()
else:
# Recover blobs from saved data column sidecars # Recover blobs from saved data column sidecars
let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block) let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block)
@ -1528,61 +1528,40 @@ proc tryReconstructingDataColumns* (self: BeaconNode,
if data_column.index notin custodiedColumnIndices.get: if data_column.index notin custodiedColumnIndices.get:
continue continue
finalisedDataColumns.add(data_column)
db.putDataColumnSidecar(data_column) db.putDataColumnSidecar(data_column)
notice "Data Column Reconstructed and Saved Successfully" notice "Data Column Reconstructed and Saved Successfully"
ok() ok(finalisedDataColumns)
proc reconstructAndSendDataColumns*(node: BeaconNode) {.async.} = proc reconstructAndSendDataColumns*(node: BeaconNode) {.async.} =
let let
db = node.db db = node.db
root = node.dag.head.root root = node.dag.head.root
let localCustodySubnetCount =
if node.config.subscribeAllSubnets:
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
else:
CUSTODY_REQUIREMENT
let blck = getForkedBlock(db, root).valueOr: return let blck = getForkedBlock(db, root).valueOr: return
withBlck(blck): withBlck(blck):
when typeof(forkyBlck).kind < ConsensusFork.Deneb: return when typeof(forkyBlck).kind < ConsensusFork.Deneb: return
else: else:
let res = node.tryReconstructingDataColumns(forkyBlck) let data_column_sidecars = node.tryReconstructingDataColumns(forkyBlck)
if not res.isOk(): if not data_column_sidecars.isOk():
return return
let dc = data_column_sidecars.get
let custody_columns = get_custody_columns( var das_workers = newSeq[Future[SendResult]](len(dc))
node.network.nodeId, for i in 0..<dc.lenu64:
localCustodySubnetCount) let subnet_id = compute_subnet_for_data_column_sidecar(i)
var das_workers[i] =
data_column_sidecars: DataColumnSidecars node.network.broadcastDataColumnSidecar(subnet_id, dc[i])
columnsOk = true let allres = await allFinished(das_workers)
for i in 0..<allres.len:
for custody_column in custody_columns.get: let res = allres[i]
let data_column = DataColumnSidecar.new() doAssert res.finished()
if not db.getDataColumnSidecar( if res.failed():
root, custody_column, data_column[]): notice "Reconstructed data columns not sent",
columnsOk = false data_column = shortLog(dc[i]), error = res.error[]
debug "Issue with loading reconstructed data columns" else:
break notice "Reconstructed data columns sent",
data_column_sidecars.add data_column data_column = shortLog(dc[i])
var das_workers = newSeq[Future[SendResult]](len(data_column_sidecars))
for i in 0..<data_column_sidecars.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(i)
das_workers[i] =
node.network.broadcastDataColumnSidecar(subnet_id, data_column_sidecars[i][])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Reconstructed data columns not sent",
data_column = shortLog(data_column_sidecars[i][]), error = res.error[]
else:
notice "Reconstructed data columns sent",
data_column = shortLog(data_column_sidecars[i][])
proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# Things we do when slot processing has ended and we're about to wait for the # Things we do when slot processing has ended and we're about to wait for the