rework some of the reconstruction publishing logic

This commit is contained in:
Agnish Ghosh 2024-09-21 15:47:58 +05:30
parent c1cb673d6c
commit 4d0c66657f
2 changed files with 37 additions and 40 deletions

View File

@ -1534,9 +1534,7 @@ proc tryReconstructingDataColumns* (self: BeaconNode,
# storedColumn number is less than the NUMBER_OF_COLUMNS
# then reconstruction is not possible, and if all the data columns
# are already stored then we do not need to reconstruct at all
if storedColumns.len < NUMBER_OF_COLUMNS div 2 or storedColumns.len == NUMBER_OF_COLUMNS:
ok(finalisedDataColumns)
else:
if not storedColumns.len < NUMBER_OF_COLUMNS div 2 or storedColumns.len != NUMBER_OF_COLUMNS:
# Recover blobs from saved data column sidecars
let recovered_cps = recover_cells_and_proofs(data_column_sidecars, storedColumns.len, signed_block)
if not recovered_cps.isOk:
@ -1551,7 +1549,7 @@ proc tryReconstructingDataColumns* (self: BeaconNode,
finalisedDataColumns.add(data_column)
db.putDataColumnSidecar(data_column)
ok(finalisedDataColumns)
ok(finalisedDataColumns)
proc reconstructAndSendDataColumns*(node: BeaconNode) {.async.} =
let
@ -1570,9 +1568,9 @@ proc reconstructAndSendDataColumns*(node: BeaconNode) {.async.} =
let dc = data_column_sidecars.get
var
worker_count = len(dc)
das_workers = newSeq[Future[SendResult]](worker_count)
das_workers = newSeq[Future[SendResult]](dc.len)
for i in 0..<dc.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(i)
let subnet_id = compute_subnet_for_data_column_sidecar(dc[i].index)
das_workers[i] =
node.network.broadcastDataColumnSidecar(subnet_id, dc[i])
let allres = await allFinished(das_workers)

View File

@ -167,41 +167,40 @@ proc routeSignedBeaconBlock*(
# blobRefs = Opt.some(blobs.mapIt(newClone(it)))
var dataColumnRefs = Opt.none(DataColumnSidecars)
when typeof(blck).kind >= ConsensusFork.Deneb:
if blobsOpt.isSome():
let blobs = blobsOpt.get()
debugEcho blobs.len
debugEcho blobs.len
if blobs.len != 0:
let dataColumnsOpt =
newClone get_data_column_sidecars(blck, blobs.mapIt(KzgBlob(bytes: it.blob)))
if not dataColumnsOpt[].isOk:
debug "Issue with computing data column from blob bundle"
let data_columns = dataColumnsOpt[].get()
var das_workers = newSeq[Future[SendResult]](len(dataColumnsOpt[].get()))
debugEcho "das workers len"
debugEcho das_workers.len
for i in 0..<data_columns.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(data_columns[i].index)
das_workers[i] =
router[].network.broadcastDataColumnSidecar(subnet_id, data_columns[i])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Data Columns not sent",
data_column = shortLog(data_columns[i]), error = res.error[]
else:
notice "Data columns sent", data_column = shortLog(dataColumnsOpt[].get()[i])
let
metadata = router[].network.metadata.custody_subnet_count.uint64
custody_columns = router[].network.nodeId.get_custody_columns(max(SAMPLES_PER_SLOT.uint64,
metadata))
when typeof(blck).kind >= ConsensusFork.Deneb:
let blobs = blobsOpt.get()
debugEcho blobs.len
debugEcho blobs.len
if blobs.len != 0:
let dataColumnsOpt =
newClone get_data_column_sidecars(blck, blobs.mapIt(KzgBlob(bytes: it.blob)))
if not dataColumnsOpt[].isOk:
debug "Issue with computing data column from blob bundle"
let data_columns = dataColumnsOpt[].get()
var das_workers = newSeq[Future[SendResult]](len(dataColumnsOpt[].get()))
debugEcho "das workers len"
debugEcho das_workers.len
for i in 0..<data_columns.lenu64:
let subnet_id = compute_subnet_for_data_column_sidecar(data_columns[i].index)
das_workers[i] =
router[].network.broadcastDataColumnSidecar(subnet_id, data_columns[i])
let allres = await allFinished(das_workers)
for i in 0..<allres.len:
let res = allres[i]
doAssert res.finished()
if res.failed():
notice "Data Columns not sent",
data_column = shortLog(data_columns[i]), error = res.error[]
else:
notice "Data columns sent", data_column = shortLog(dataColumnsOpt[].get()[i])
let
metadata = router[].network.metadata.custody_subnet_count.uint64
custody_columns = router[].network.nodeId.get_custody_columns(max(SAMPLES_PER_SLOT.uint64,
metadata))
for dc in data_columns:
if dc.index in custody_columns:
let dataColumnRefs = Opt.some(dataColumnsOpt[].get().mapIt(newClone(it)))
for dc in data_columns:
if dc.index in custody_columns:
dataColumnRefs = Opt.some(dataColumnsOpt[].get().mapIt(newClone(it)))
let added = await router[].blockProcessor[].addBlock(
MsgSource.api, ForkedSignedBeaconBlock.init(blck), blobRefs, dataColumnRefs)