refactored reconstruction
This commit is contained in:
parent
2d65f0e57e
commit
ed5cf4cd8d
|
@ -86,25 +86,16 @@ func accumulateDataColumns*(quarantine: DataColumnQuarantine,
|
|||
indices
|
||||
|
||||
func gatherDataColumns*(quarantine: DataColumnQuarantine,
|
||||
digest: Eth2Digest):
|
||||
seq[ref DataColumnSidecar] =
|
||||
var columns: seq[ref DataColumnSidecar]
|
||||
let
|
||||
localSubnetCount =
|
||||
if quarantine.supernode:
|
||||
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
|
||||
else:
|
||||
CUSTODY_REQUIREMENT.uint64
|
||||
localCustodyColumns =
|
||||
get_custody_columns(quarantine.nodeid,
|
||||
max(SAMPLES_PER_SLOT.uint64,
|
||||
localSubnetCount))
|
||||
for i in localCustodyColumns:
|
||||
blck: deneb.SignedBeaconBlock |
|
||||
electra.SignedBeaconBlock):
|
||||
seq[DataColumnSidecar] =
|
||||
var columns: seq[DataColumnSidecar]
|
||||
for i in 0..<NUMBER_OF_COLUMNS:
|
||||
let idx = ColumnIndex(i)
|
||||
if quarantine.data_columns.hasKey(
|
||||
(digest, idx)):
|
||||
let value = quarantine.data_columns.getOrDefault((digest, idx), default(ref DataColumnSidecar))
|
||||
columns.add(value)
|
||||
(blck.root, idx)):
|
||||
let value = quarantine.data_columns.getOrDefault((blck.root, idx), default(ref DataColumnSidecar))
|
||||
columns.add(value[])
|
||||
columns
|
||||
|
||||
func popDataColumns*(
|
||||
|
@ -175,7 +166,7 @@ func hasEnoughDataColumns*(quarantine: DataColumnQuarantine,
|
|||
localSubnetCount))
|
||||
if quarantine.supernode:
|
||||
let
|
||||
collectedColumns = quarantine.gatherDataColumns(blck.root)
|
||||
collectedColumns = quarantine.gatherDataColumns(blck)
|
||||
if collectedColumns.len >= (localCustodyColumns.len div 2):
|
||||
return true
|
||||
else:
|
||||
|
|
|
@ -392,36 +392,37 @@ proc processDataColumnSidecar*(
|
|||
|
||||
debug "Data column validated, putting data column in quarantine"
|
||||
self.dataColumnQuarantine[].put(newClone(dataColumnSidecar))
|
||||
if self.dataColumnQuarantine[].supernode == false:
|
||||
self.dag.db.putDataColumnSidecar(dataColumnSidecar)
|
||||
debug "Validated column belongs to custody, attempting to persist",
|
||||
data_column = shortLog(dataColumnSidecar)
|
||||
debug "Validated column belongs to custody, attempting to persist",
|
||||
data_column = shortLog(dataColumnSidecar)
|
||||
self.dag.db.putDataColumnSidecar(dataColumnSidecar)
|
||||
|
||||
let block_root = hash_tree_root(block_header)
|
||||
if (let o = self.quarantine[].popColumnless(block_root); o.isSome):
|
||||
let columnless = o.unsafeGet()
|
||||
withBlck(columnless):
|
||||
when consensusFork >= ConsensusFork.Deneb:
|
||||
if self.dataColumnQuarantine[].supernode == false and
|
||||
if self.dataColumnQuarantine[].gatherDataColumns(forkyBlck).len ==
|
||||
max(SAMPLES_PER_SLOT, CUSTODY_REQUIREMENT) and
|
||||
self.dataColumnQuarantine[].hasMissingDataColumns(forkyBlck):
|
||||
let columns =
|
||||
self.dataColumnQuarantine[].gatherDataColumns(block_root).mapIt(it[])
|
||||
for gdc in columns:
|
||||
self.dataColumnQuarantine[].put(newClone(gdc))
|
||||
self.blockProcessor[].enqueueBlock(
|
||||
MsgSource.gossip, columnless,
|
||||
Opt.none(BlobSidecars),
|
||||
Opt.some(self.dataColumnQuarantine[].popDataColumns(block_root, forkyBlck)))
|
||||
if self.dataColumnQuarantine[].supernode == false:
|
||||
let columns =
|
||||
self.dataColumnQuarantine[].gatherDataColumns(forkyBlck)
|
||||
for gdc in columns:
|
||||
self.dataColumnQuarantine[].put(newClone(gdc))
|
||||
self.blockProcessor[].enqueueBlock(
|
||||
MsgSource.gossip, columnless,
|
||||
Opt.none(BlobSidecars),
|
||||
Opt.some(self.dataColumnQuarantine[].popDataColumns(block_root, forkyBlck)))
|
||||
elif self.dataColumnQuarantine[].hasEnoughDataColumns(forkyBlck):
|
||||
let
|
||||
columns = self.dataColumnQuarantine[].gatherDataColumns(block_root)
|
||||
columns = self.dataColumnQuarantine[].gatherDataColumns(forkyBlck)
|
||||
if columns.len >= (NUMBER_OF_COLUMNS div 2) and
|
||||
self.dataColumnQuarantine[].supernode:
|
||||
let
|
||||
reconstructed_columns =
|
||||
self.processReconstructionFromGossip(forkyBlck, columns.mapIt(it[]))
|
||||
self.processReconstructionFromGossip(forkyBlck, columns)
|
||||
for rc in reconstructed_columns.get:
|
||||
if rc notin self.dataColumnQuarantine[].gatherDataColumns(block_root).mapIt(it[]):
|
||||
if rc notin self.dataColumnQuarantine[].gatherDataColumns(forkyBlck):
|
||||
self.dataColumnQuarantine[].put(newClone(rc))
|
||||
self.blockProcessor[].enqueueBlock(
|
||||
MsgSource.gossip, columnless,
|
||||
|
|
|
@ -412,20 +412,50 @@ proc initFullNode(
|
|||
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
|
||||
withBlck(signedBlock):
|
||||
when consensusFork >= ConsensusFork.Deneb:
|
||||
if not dataColumnQuarantine[].checkForInitialDcSidecars(forkyBlck):
|
||||
let
|
||||
localSubnetCount =
|
||||
if supernode:
|
||||
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
|
||||
else:
|
||||
CUSTODY_REQUIREMENT.uint64
|
||||
localCustodyColumns = get_custody_columns(node.network.nodeId,
|
||||
max(SAMPLES_PER_SLOT.uint64,
|
||||
localSubnetCount))
|
||||
accumulatedColumns = dataColumnQuarantine[].accumulateDataColumns(forkyBlck)
|
||||
if accumulatedColumns.len == 0:
|
||||
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
Opt.none(BlobSidecars), Opt.none(DataColumnSidecars),
|
||||
maybeFinalized = maybeFinalized)
|
||||
elif supernode == true and accumulatedColumns.len <= localCustodyColumns.len div 2 :
|
||||
# We don't have all the data columns for this block, so we have
|
||||
# to put it in columnless quarantine.
|
||||
if not quarantine[].addColumnless(dag.finalizedHead.slot, forkyBlck):
|
||||
err(VerifierError.UnviableFork)
|
||||
return err(VerifierError.UnviableFork)
|
||||
else:
|
||||
err(VerifierError.MissingParent)
|
||||
else:
|
||||
return err(VerifierError.MissingParent)
|
||||
elif supernode == true and accumulatedColumns.len == localCustodyColumns.len:
|
||||
let data_columns = dataColumnQuarantine[].popDataColumns(forkyBlck.root, forkyBlck)
|
||||
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
Opt.none(BlobSidecars), Opt.some(data_columns),
|
||||
maybeFinalized = maybeFinalized)
|
||||
elif supernode == true and accumulatedColumns.len >= localCustodyColumns.len div 2:
|
||||
let data_columns = dataColumnQuarantine[].popDataColumns(forkyBlck.root, forkyBlck)
|
||||
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
Opt.none(BlobSidecars), Opt.some(data_columns),
|
||||
maybeFinalized = maybeFinalized)
|
||||
elif supernode == false and accumulatedColumns.len <= localCustodyColumns.len:
|
||||
# We don't have all the data columns for this block, so we have
|
||||
# to put it in columnless quarantine.
|
||||
if not quarantine[].addColumnless(dag.finalizedHead.slot, forkyBlck):
|
||||
return err(VerifierError.UnviableFork)
|
||||
else:
|
||||
return err(VerifierError.MissingParent)
|
||||
else:
|
||||
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
Opt.none(BlobSidecars), Opt.none(DataColumnSidecars),
|
||||
maybeFinalized = maybeFinalized)
|
||||
else:
|
||||
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
Opt.none(BlobSidecars), Opt.none(DataColumnSidecars),
|
||||
maybeFinalized = maybeFinalized)
|
||||
|
||||
|
@ -1840,8 +1870,8 @@ proc onSlotStart(node: BeaconNode, wallTime: BeaconTime,
|
|||
verifyFinalization(node, wallSlot)
|
||||
|
||||
node.consensusManager[].updateHead(wallSlot)
|
||||
await node.handleValidatorDuties(lastSlot, wallSlot)
|
||||
await node.reconstructAndSendDataColumns()
|
||||
await node.handleValidatorDuties(lastSlot, wallSlot)
|
||||
await onSlotEnd(node, wallSlot)
|
||||
|
||||
# https://github.com/ethereum/builder-specs/blob/v0.4.0/specs/bellatrix/validator.md#registration-dissemination
|
||||
|
|
Loading…
Reference in New Issue