add: getMissingDataColumns, requestManagerDataColumnLoop

This commit is contained in:
Agnish Ghosh 2024-06-19 03:46:03 +05:30
parent 46d07b140d
commit 51f189ef53
No known key found for this signature in database
GPG Key ID: 7BDDA05D1B25E9F8
4 changed files with 144 additions and 1 deletions

View File

@ -348,6 +348,6 @@ func popColumnless*(
else:
Opt.none(ForkedSignedBeaconBlock)
iterator peekColumless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
iterator peekColumnless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
for k,v in quarantine.columnless.mpairs():
yield v

View File

@ -69,3 +69,35 @@ func hasDataColumn*(
return true
false
func popDataColumns*(
quarantine: var DataColumnQuarantine, digest: Eth2Digest,
blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock):
seq[ref DataColumnSidecar] =
var r: seq[ref DataColumnSidecar]
for idx in 0..<len(blck.message.body.blob_kzg_commitments):
var c: ref DataColumnSidecar
if quarantine.data_columns.pop((digest, ColumnIndex idx), c):
r.add(c)
true
func hasDataColumns*(quarantine: DataColumnQuarantine,
blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): bool =
for idx in 0..<len(blck.message.body.blob_kzg_commitments):
if (blck.root, ColumnIndex idx) notin quarantine.data_columns:
return false
true
func dataColumnFetchRecord*(quarantine: DataColumnQuarantine,
blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): DataColumnFetchRecord =
var indices: seq[ColumnIndex]
for i in 0..<len(blck.message.body.blob_kzg_commitments):
let idx = ColumnIndex(i)
if not quarantine.data_columns.hasKey(
(blck.root, idx)):
indices.add(idx)
DataColumnFetchRecord(block_root: blck.root, indices: indices)
func init*(
T: type DataColumnQuarantine, onDataColumnSidecarCallback: OnDataColumnSidecarCallback): T =
T(onDataColumnSidecarCallback: onDataColumnSidecarCallback)

View File

@ -14,6 +14,7 @@ import
stew/[byteutils, io2],
eth/p2p/discoveryv5/[enr, random2],
./consensus_object_pools/blob_quarantine,
./consensus_object_pools/data_column_quarantine,
./consensus_object_pools/vanity_logs/vanity_logs,
./networking/[topic_params, network_metadata_downloads],
./rpc/[rest_api, state_ttl_cache],

View File

@ -35,6 +35,8 @@ const
BLOB_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000
## How long to wait for blobs to arrive over gossip before fetching.
DATA_COLUMN_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000
POLL_INTERVAL = 1.seconds
type
@ -66,8 +68,10 @@ type
blockVerifier: BlockVerifierFn
blockLoader: BlockLoaderFn
blobLoader: BlobLoaderFn
dataColumnLoader: DataColumnLoaderFn
blockLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])
dataColumnLoopFuture: Future[void].Raising([CancelledError])
func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
@ -463,10 +467,114 @@ proc requestManagerBlobLoop(
blobs_count = len(blobIds),
sync_speed = speed(start, finish)
proc getMissingDataColumns(rman: RequestManager): seq[DataColumnIdentifier] =
let
wallTime = rman.getBeaconTime()
wallSlot = wallTime.slotOrZero()
delay = wallTime - wallSlot.start_beacon_time()
waitDur = TimeDiff(nanoseconds: DATA_COLUMN_GOSSIP_WAIT_TIME_NS)
var
fetches: seq[DataColumnIdentifier]
ready: seq[Eth2Digest]
for columnless in rman.quarantine[].peekColumnless():
withBlck(columnless):
when consensusFork >= ConsensusFork.Deneb:
# granting data columns a chance to arrive over gossip
if forkyBlck.message.slot == wallSlot and delay < waitDur:
debug "Not handling missing data columns early in slot"
continue
if not rman.dataColumnQuarantine[].hasDataColumns(forkyBlck):
let missing = rman.dataColumnQuarantine[].dataColumnFetchRecord(forkyBlck)
if len(missing.indices) == 0:
warn "quarantine is missing data columns, but missing indices are empty",
blk = columnless.root,
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
for idx in missing.indices:
let id = DataColumnIdentifier(block_root: columnless.root, index: idx)
if id notin fetches:
fetches.add(id)
else:
# this is a programming error and it should occur
warn "missing data column handler found columnless block with all data columns",
blk = columnless.root,
commitments=len(forkyBlck.message.body.blob_kzg_commitments)
ready.add(columnless.root)
for root in ready:
let columnless = rman.quarantine[].popColumnless(root).valueOr:
continue
discard rman.blockVerifier(columnless, false)
fetches
proc requestManagerDataColumnLoop(
rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue
let missingColumnIds = rman.getMissingDataColumns()
if missingColumnIds.len == 0:
continue
var columnIds: seq[DataColumnIdentifier]
if rman.dataColumnLoader == nil:
columnIds = missingColumnIds
else:
var
blockRoots: seq[Eth2Digest]
curRoot: Eth2Digest
for columnId in missingColumnIds:
if columnId.block_root != curRoot:
curRoot = columnId.block_root
blockRoots.add curRoot
let data_column_sidecar = rman.dataColumnLoader(columnId).valueOr:
columnIds.add columnId
if blockRoots.len > 0 and blockRoots[^1] == curRoot:
# A data column is missing, remove from list of fully available data columns
discard blockRoots.pop()
continue
debug "Loaded orphaned data columns from storage", columnId
rman.dataColumnQuarantine[].put(data_column_sidecar)
var verifiers = newSeqOfCap[
Future[Result[void, VerifierError]]
.Raising([CancelledError])](blockRoots.len)
for blockRoot in blockRoots:
let blck = rman.quarantine[].popColumnless(blockRoot).valueOr:
continue
verifiers.add rman.blockVerifier(blck, maybeFinalized = false)
try:
await allFutures(verifiers)
except CancelledError as exc:
var futs = newSeqOfCap[Future[void].Raising([])](verifiers.len)
for verifier in verifiers:
futs.add verifier.cancelAndWait()
await noCancel allFutures(futs)
raise exc
if columnIds.len > 0:
debug "Requesting detected missing data columns", columns = shortLog(columnIds)
let start = SyncMoment.now(0)
var workers:
array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0..<PARALLEL_REQUESTS:
workers[i] = rman.fetchDataColumnsFromNetwork(columnIds)
await allFutures(workers)
let finish = SyncMoment.now(uint64(len(columnIds)))
debug "Request manager data column tick",
data_columns_count = len(columnIds),
sync_speed = speed(start, finish)
proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
rman.blockLoopFuture = rman.requestManagerBlockLoop()
rman.blobLoopFuture = rman.requestManagerBlobLoop()
rman.dataColumnLoopFuture = rman.requestManagerDataColumnLoop()
proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
@ -474,3 +582,5 @@ proc stop*(rman: RequestManager) =
rman.blockLoopFuture.cancelSoon()
if not(isNil(rman.blobLoopFuture)):
rman.blobLoopFuture.cancelSoon()
if not(isNil(rman.dataColumnLoopFuture)):
rman.dataColumnLoopFuture.cancelSoon()