add: column support to ENR, Metadata and Request Manager (#6741)

* add csc to enr and metadata

* add column filtering into RequestManager

* nits

* add comment

* resolved reviews 1

* added local custody column set into RequestManager as a field

* faster lookups with hashsets

* fix regressions, fix other reviews, fix response checking for columns

* simpler fix for hashsets
This commit is contained in:
Agnish Ghosh 2024-12-15 17:16:08 +05:30 committed by GitHub
parent 7647d17d9e
commit 2bf0df7c7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 397 additions and 17 deletions

View File

@ -24,6 +24,8 @@ const
## Enough for finalization in an alternative fork
MaxBlobless = SLOTS_PER_EPOCH
## Arbitrary
MaxColumnless = SLOTS_PER_EPOCH
## Arbitrary
MaxUnviables = 16 * 1024
## About a day of blocks - most likely not needed but it's quite cheap..
@ -58,6 +60,12 @@ type
## block as well. A blobless block inserted into this table must
## have a resolved parent (i.e., it is not an orphan).
columnless*: OrderedTable[Eth2Digest, ForkedSignedBeaconBlock]
## Blocks that we don't have columns for. When we have received
## all columns for this block, we can proceed to resolving the
## block as well. A columnless block inserted into this table must
## have a resolved parent (i.e., it is not an orphan)
unviable*: OrderedTable[Eth2Digest, tuple[]]
## Unviable blocks are those that come from a history that does not
## include the finalized checkpoint we're currently following, and can
@ -132,6 +140,10 @@ func removeBlobless*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.blobless.del(signedBlock.root)
func removeColumnless*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.columnless.del(signedBlock.root)
func isViable(
finalizedSlot: Slot, slot: Slot): bool =
# The orphan must be newer than the finalization point so that its parent
@ -236,6 +248,18 @@ func cleanupBlobless(quarantine: var Quarantine, finalizedSlot: Slot) =
quarantine.addUnviable k
quarantine.blobless.del k
func cleanupColumnless(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[Eth2Digest]
for k, v in quarantine.columnless:
withBlck(v):
if not isViable(finalizedSlot, forkyBlck.message.slot):
toDel.add k
for k in toDel:
quarantine.addUnviable k
quarantine.columnless.del k
func clearAfterReorg*(quarantine: var Quarantine) =
## Clear missing and orphans to start with a fresh slate in case of a reorg
## Unviables remain unviable and are not cleared.
@ -325,6 +349,29 @@ proc addBlobless*(
quarantine.missing.del(signedBlock.root)
true
proc addColumnless*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: fulu.SignedBeaconBlock): bool =
if not isViable(finalizedSlot, signedBlock.message.slot):
quarantine.addUnviable(signedBlock.root)
return false
quarantine.cleanupColumnless(finalizedSlot)
if quarantine.columnless.lenu64 >= MaxColumnless:
var oldest_columnless_key: Eth2Digest
for k in quarantine.columnless.keys:
oldest_columnless_key = k
break
quarantine.blobless.del oldest_columnless_key
debug "block quarantine: Adding columnless", blck = shortLog(signedBlock)
quarantine.columnless[signedBlock.root] =
ForkedSignedBeaconBlock.init(signedBlock)
quarantine.missing.del(signedBlock.root)
true
func popBlobless*(
quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
@ -334,6 +381,19 @@ func popBlobless*(
else:
Opt.none(ForkedSignedBeaconBlock)
func popColumnless*(
quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
var blck: ForkedSignedBeaconBlock
if quarantine.columnless.pop(root, blck):
Opt.some(blck)
else:
Opt.none(ForkedSignedBeaconBlock)
iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
for k, v in quarantine.blobless.mpairs():
yield v
iterator peekColumnless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
for k, v in quarantine.columnless.mpairs():
yield v

View File

@ -78,7 +78,7 @@ func hasDataColumn*(
false
func peekColumnIndices*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock):
blck: fulu.SignedBeaconBlock):
seq[ColumnIndex] =
# Peeks into the currently received column indices
# from quarantine, necessary data availability checks
@ -110,7 +110,7 @@ func gatherDataColumns*(quarantine: DataColumnQuarantine,
func popDataColumns*(
quarantine: var DataColumnQuarantine, digest: Eth2Digest,
blck: electra.SignedBeaconBlock):
blck: fulu.SignedBeaconBlock):
seq[ref DataColumnSidecar] =
var r: DataColumnSidecars
for idx in quarantine.custody_columns:
@ -123,7 +123,7 @@ func popDataColumns*(
r
func hasMissingDataColumns*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock): bool =
blck: fulu.SignedBeaconBlock): bool =
# `hasMissingDataColumns` consists of the data columns that,
# have been missed over gossip, also in case of a supernode,
# the method would return missing columns when the supernode
@ -149,7 +149,7 @@ func hasMissingDataColumns*(quarantine: DataColumnQuarantine,
return true
func hasEnoughDataColumns*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock): bool =
blck: fulu.SignedBeaconBlock): bool =
# `hasEnoughDataColumns` dictates whether there is `enough`
# data columns for a block to be enqueued, ideally for a supernode
# if it receives atleast 50%+ gossip and RPC
@ -175,7 +175,7 @@ func hasEnoughDataColumns*(quarantine: DataColumnQuarantine,
return true
func dataColumnFetchRecord*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock):
blck: fulu.SignedBeaconBlock):
DataColumnFetchRecord =
var indices: seq[ColumnIndex]
for i in quarantine.custody_columns:

View File

@ -1647,6 +1647,15 @@ proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} =
# Also, give some time to dial the discovered nodes and update stats etc
await sleepAsync(5.seconds)
proc fetchNodeIdFromPeerId*(peer: Peer): NodeId=
# Convert peer id to node id by extracting the peer's public key
let nodeId =
block:
var key: PublicKey
discard peer.peerId.extractPublicKey(key)
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
nodeId
proc resolvePeer(peer: Peer) =
# Resolve task which performs searching of peer's public key and recovery of
# ENR using discovery5. We only resolve ENR for peers we know about to avoid
@ -2418,6 +2427,33 @@ func announcedENR*(node: Eth2Node): enr.Record =
doAssert node.discovery != nil, "The Eth2Node must be initialized"
node.discovery.localNode.record
proc lookupCscFromPeer*(peer: Peer): uint64 =
# Fetches the custody column count from a remote peer.
# If the peer advertises their custody column count via the `csc` ENR field,
# that value is returned. Otherwise, the default value `CUSTODY_REQUIREMENT`
# is assumed.
let metadata = peer.metadata
if metadata.isOk:
return metadata.get.custody_subnet_count
# Try getting the custody count from ENR if metadata fetch fails.
debug "Could not get csc from metadata, trying from ENR",
peer_id = peer.peerId
let enrOpt = peer.enr
if not enrOpt.isNone:
let enr = enrOpt.get
let enrFieldOpt = enr.get(enrCustodySubnetCountField, seq[byte])
if enrFieldOpt.isOk:
try:
let csc = SSZ.decode(enrFieldOpt.get, uint8)
return csc.uint64
except SszError, SerializationError:
discard # Ignore decoding errors and fallback to default
# Return default value if no valid custody subnet count is found.
return CUSTODY_REQUIREMENT.uint64
func shortForm*(id: NetKeyPair): string =
$PeerId.init(id.pubkey)
@ -2579,6 +2615,20 @@ proc updateStabilitySubnetMetadata*(node: Eth2Node, attnets: AttnetBits) =
else:
debug "Stability subnets changed; updated ENR attnets", attnets
proc loadCscnetMetadataAndEnr*(node: Eth2Node, cscnets: CscCount) =
node.metadata.custody_subnet_count = cscnets.uint64
let res =
node.discovery.updateRecord({
enrCustodySubnetCountField: SSZ.encode(cscnets)
})
if res.isErr:
# This should not occur in this scenario as the private key would always
# be the correct one and the ENR will not increase in size
warn "Failed to update the ENR csc field", error = res.error
else:
debug "Updated ENR csc", cscnets
proc updateSyncnetsMetadata*(node: Eth2Node, syncnets: SyncnetBits) =
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.9/specs/altair/validator.md#sync-committee-subnet-stability
if node.metadata.syncnets == syncnets:

View File

@ -422,6 +422,9 @@ proc initFullNode(
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
else:
CUSTODY_REQUIREMENT.uint64
custody_columns_set =
node.network.nodeId.get_custody_columns_set(max(SAMPLES_PER_SLOT.uint64,
localCustodySubnets))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.elManager,
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
@ -478,6 +481,13 @@ proc initFullNode(
Opt.some blob_sidecar
else:
Opt.none(ref BlobSidecar)
rmanDataColumnLoader = proc(
columnId: DataColumnIdentifier): Opt[ref DataColumnSidecar] =
var data_column_sidecar = DataColumnSidecar.new()
if dag.db.getDataColumnSidecar(columnId.block_root, columnId.index, data_column_sidecar[]):
Opt.some data_column_sidecar
else:
Opt.none(ref DataColumnSidecar)
processor = Eth2Processor.new(
config.doppelgangerDetection,
@ -525,10 +535,10 @@ proc initFullNode(
processor: processor,
network: node.network)
requestManager = RequestManager.init(
node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime,
(proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader)
node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH,
getBeaconTime, (proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)
# As per EIP 7594, the BN is now categorised into a
# `Fullnode` and a `Supernode`, the fullnodes custodies a
@ -552,7 +562,13 @@ proc initFullNode(
dataColumnQuarantine[].supernode = supernode
dataColumnQuarantine[].custody_columns =
node.network.nodeId.get_custody_columns(max(SAMPLES_PER_SLOT.uint64,
localCustodySubnets))
localCustodySubnets))
if node.config.subscribeAllSubnets:
node.network.loadCscnetMetadataAndEnr(DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint8)
else:
node.network.loadCscnetMetadataAndEnr(CUSTODY_REQUIREMENT.uint8)
if node.config.lightClientDataServe:
proc scheduleSendingLightClientUpdates(slot: Slot) =
if node.lightClientPool[].broadcastGossipFut != nil:

View File

@ -117,7 +117,7 @@ type
seq_number*: uint64
attnets*: AttnetBits
syncnets*: SyncnetBits
custody_subnet_count*: CscCount
custody_subnet_count*: uint64
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/deneb/beacon-chain.md#executionpayload
ExecutionPayload* = object

View File

@ -93,6 +93,20 @@ func get_custody_columns*(node_id: NodeId,
sortedColumnIndices(ColumnIndex(columns_per_subnet), subnet_ids)
func get_custody_columns_set*(node_id: NodeId,
custody_subnet_count: uint64):
HashSet[ColumnIndex] =
# This method returns a HashSet of column indices,
# the method is specifically relevant while peer filtering
let
subnet_ids =
get_custody_column_subnets(node_id, custody_subnet_count)
const
columns_per_subnet =
NUMBER_OF_COLUMNS div DATA_COLUMN_SIDECAR_SUBNET_COUNT
sortedColumnIndices(ColumnIndex(columns_per_subnet), subnet_ids).toHashSet()
func get_custody_column_list*(node_id: NodeId,
custody_subnet_count: uint64):
List[ColumnIndex, NUMBER_OF_COLUMNS] =

View File

@ -47,6 +47,7 @@ const
enrAttestationSubnetsField* = "attnets"
enrSyncSubnetsField* = "syncnets"
enrCustodySubnetCountField* = "csc"
enrForkIdField* = "eth2"
template eth2Prefix(forkDigest: ForkDigest): string =

View File

@ -11,10 +11,11 @@ import std/[sequtils, strutils]
import chronos, chronicles
import
../spec/datatypes/[phase0, deneb, fulu],
../spec/[forks, network],
../spec/[forks, network, eip7594_helpers],
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
../consensus_object_pools/blob_quarantine,
../consensus_object_pools/data_column_quarantine,
"."/sync_protocol, "."/sync_manager,
../gossip_processing/block_processor
@ -32,8 +33,13 @@ const
PARALLEL_REQUESTS* = 2
## Number of peers we using to resolve our request.
PARALLEL_REQUESTS_DATA_COLUMNS* = 32
BLOB_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000
## How long to wait for blobs to arrive over gossip before fetching.
## How long to wait for blobs to arri ve over gossip before fetching.
DATA_COLUMN_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000
## How long to wait for blobs to arri ve over gossip before fetching.
POLL_INTERVAL = 1.seconds
@ -50,19 +56,28 @@ type
BlobLoaderFn* = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].}
DataColumnLoaderFn* = proc(
columnId: DataColumnIdentifier):
Opt[ref DataColumnSidecar] {.gcsafe, raises: [].}
InhibitFn* = proc: bool {.gcsafe, raises: [].}
RequestManager* = object
network*: Eth2Node
supernode*: bool
custody_columns_set: HashSet[ColumnIndex]
getBeaconTime: GetBeaconTimeFn
inhibit: InhibitFn
quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine
dataColumnQuarantine: ref DataColumnQuarantine
blockVerifier: BlockVerifierFn
blockLoader: BlockLoaderFn
blobLoader: BlobLoaderFn
dataColumnLoader: DataColumnLoaderFn
blockLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])
dataColumnLoopFuture: Future[void].Raising([CancelledError])
func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
@ -71,23 +86,31 @@ func shortLog*(x: seq[FetchRecord]): string =
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"
proc init*(T: type RequestManager, network: Eth2Node,
supernode: bool,
custody_columns_set: HashSet[ColumnIndex],
denebEpoch: Epoch,
getBeaconTime: GetBeaconTimeFn,
inhibit: InhibitFn,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
blockVerifier: BlockVerifierFn,
blockLoader: BlockLoaderFn = nil,
blobLoader: BlobLoaderFn = nil): RequestManager =
blobLoader: BlobLoaderFn = nil,
dataColumnLoader: DataColumnLoaderFn = nil): RequestManager =
RequestManager(
network: network,
supernode: supernode,
custody_columns_set: custody_columns_set,
getBeaconTime: getBeaconTime,
inhibit: inhibit,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
dataColumnQuarantine: dataColumnQuarantine,
blockVerifier: blockVerifier,
blockLoader: blockLoader,
blobLoader: blobLoader)
blobLoader: blobLoader,
dataColumnLoader: dataColumnLoader)
proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ref ForkedSignedBeaconBlock]): bool =
@ -131,6 +154,30 @@ proc checkResponse(idList: seq[BlobIdentifier],
inc i
true
proc checkResponse(idList: seq[DataColumnIdentifier],
columns: openArray[ref DataColumnSidecar]): bool =
if columns.len > idList.len:
return false
var i = 0
while i < columns.len:
let
block_root = hash_tree_root(columns[i].signed_block_header.message)
id = idList[i]
# Check if the column reponse is a subset
if binarySearch(idList, columns[i], cmpSidecarIdentifier) == -1:
return false
# Verify block root and index match
if id.block_root != block_root or id.index != columns[i].index:
return false
# Verify inclusion proof
columns[i][].verify_data_column_sidecar_inclusion_proof().isOkOr:
return false
inc i
true
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
var peer: Peer
try:
@ -202,7 +249,7 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async:
if not(isNil(peer)):
rman.network.peerPool.release(peer)
func cmpBlobIndexes(x, y: ref BlobSidecar): int =
func cmpSidecarIndexes(x, y: ref BlobSidecar | ref DataColumnSidecar): int =
cmp(x.index, y.index)
proc fetchBlobsFromNetwork(self: RequestManager,
@ -219,7 +266,7 @@ proc fetchBlobsFromNetwork(self: RequestManager,
if blobs.isOk:
var ublobs = blobs.get().asSeq()
ublobs.sort(cmpBlobIndexes)
ublobs.sort(cmpSidecarIndexes)
if not checkResponse(idList, ublobs):
debug "Mismatched response to blobs by root",
peer = peer, blobs = shortLog(idList), ublobs = len(ublobs)
@ -248,6 +295,89 @@ proc fetchBlobsFromNetwork(self: RequestManager,
if not(isNil(peer)):
self.network.peerPool.release(peer)
proc checkPeerCustody*(rman: RequestManager,
peer: Peer):
bool =
# Returns true if the peer custodies atleast
# ONE of the common custody columns, straight
# away returns true if the peer is a supernode.
if rman.supernode:
# For a supernode, it is always best/optimistic
# to filter other supernodes, rather than filter
# too many full nodes that have a subset of the custody
# columns
if peer.lookupCscFromPeer() ==
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64:
return true
else:
if peer.lookupCscFromPeer() ==
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64:
return true
elif peer.lookupCscFromPeer() ==
CUSTODY_REQUIREMENT.uint64:
# Fetch the remote custody count
let remoteCustodySubnetCount =
peer.lookupCscFromPeer()
# Extract remote peer's nodeID from peerID
# Fetch custody columns from remote peer
let
remoteNodeId = fetchNodeIdFromPeerId(peer)
remoteCustodyColumns =
remoteNodeId.get_custody_columns_set(max(SAMPLES_PER_SLOT.uint64,
remoteCustodySubnetCount))
for local_column in rman.custody_columns_set:
if local_column notin remoteCustodyColumns:
return false
return true
else:
return false
proc fetchDataColumnsFromNetwork(rman: RequestManager,
colIdList: seq[DataColumnIdentifier])
{.async: (raises: [CancelledError]).} =
var peer = await rman.network.peerPool.acquire()
try:
if rman.checkPeerCustody(peer):
debug "Requesting data columns by root", peer = peer, columns = shortLog(colIdList),
peer_score = peer.getScore()
let columns = await dataColumnSidecarsByRoot(peer, DataColumnIdentifierList colIdList)
if columns.isOk:
var ucolumns = columns.get().asSeq()
ucolumns.sort(cmpSidecarIndexes)
if not checkResponse(colIdList, ucolumns):
debug "Mismatched response to data columns by root",
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns)
peer.updateScore(PeerScoreBadResponse)
return
for col in ucolumns:
rman.dataColumnQuarantine[].put(col)
var curRoot: Eth2Digest
for col in ucolumns:
let block_root = hash_tree_root(col.signed_block_header.message)
if block_root != curRoot:
curRoot = block_root
if (let o = rman.quarantine[].popColumnless(curRoot); o.isSome):
let col = o.unsafeGet()
discard await rman.blockVerifier(col, false)
else:
debug "Data columns by root request not done, peer doesn't have custody column",
peer = peer, columns = shortLog(colIdList), err = columns.error()
peer.updateScore(PeerScoreNoValues)
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)
proc requestManagerBlockLoop(
rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
@ -416,10 +546,117 @@ proc requestManagerBlobLoop(
blobs_count = len(blobIds),
sync_speed = speed(start, finish)
proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier] =
let
wallTime = rman.getBeaconTime()
wallSlot = wallTime.slotOrZero()
delay = wallTime - wallSlot.start_beacon_time()
const waitDur = TimeDiff(nanoseconds: DATA_COLUMN_GOSSIP_WAIT_TIME_NS)
var
fetches: HashSet[DataColumnIdentifier]
ready: seq[Eth2Digest]
for columnless in rman.quarantine[].peekColumnless():
withBlck(columnless):
when consensusFork >= ConsensusFork.Fulu:
# granting data columns a chance to arrive over gossip
if forkyBlck.message.slot == wallSlot and delay < waitDur:
debug "Not handling missing data columns early in slot"
continue
if not rman.dataColumnQuarantine[].hasMissingDataColumns(forkyBlck):
let missing = rman.dataColumnQuarantine[].dataColumnFetchRecord(forkyBlck)
if len(missing.indices) == 0:
warn "quarantine is missing data columns, but missing indices are empty",
blk = columnless.root,
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
for idx in missing.indices:
let id = DataColumnIdentifier(block_root: columnless.root, index: idx)
if id.index in rman.custody_columns_set and id notin fetches and
len(forkyBlck.message.body.blob_kzg_commitments) != 0:
fetches.incl(id)
else:
# this is a programming error and it not should occur
warn "missing column handler found columnless block with all data columns",
blk = columnless.root,
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
ready.add(columnless.root)
for root in ready:
let columnless = rman.quarantine[].popColumnless(root).valueOr:
continue
discard rman.blockVerifier(columnless, false)
fetches
proc requestManagerDataColumnLoop(
rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue
let missingColumnIds = rman.getMissingDataColumns()
if missingColumnIds.len == 0:
continue
var columnIds: seq[DataColumnIdentifier]
if rman.dataColumnLoader == nil:
for item in missingColumnIds:
columnIds.add item
else:
var
blockRoots: seq[Eth2Digest]
curRoot: Eth2Digest
for columnId in missingColumnIds:
if columnId.block_root != curRoot:
curRoot = columnId.block_root
blockRoots.add curRoot
let data_column_sidecar = rman.dataColumnLoader(columnId).valueOr:
columnIds.add columnId
if blockRoots.len > 0 and blockRoots[^1] == curRoot:
# A data column is missing, remove from list of fully available data columns
discard blockRoots.pop()
continue
debug "Loaded orphaned data columns from storage", columnId
rman.dataColumnQuarantine[].put(data_column_sidecar)
var verifiers = newSeqOfCap[
Future[Result[void, VerifierError]]
.Raising([CancelledError])](blockRoots.len)
for blockRoot in blockRoots:
let blck = rman.quarantine[].popColumnless(blockRoot).valueOr:
continue
verifiers.add rman.blockVerifier(blck, maybeFinalized = false)
try:
await allFutures(verifiers)
except CancelledError as exc:
var futs = newSeqOfCap[Future[void].Raising([])](verifiers.len)
for verifier in verifiers:
futs.add verifier.cancelAndWait()
await noCancel allFutures(futs)
raise exc
if columnIds.len > 0:
debug "Requesting detected missing data columns", columns = shortLog(columnIds)
let start = SyncMoment.now(0)
var workers:
array[PARALLEL_REQUESTS_DATA_COLUMNS, Future[void].Raising([CancelledError])]
for i in 0..<PARALLEL_REQUESTS_DATA_COLUMNS:
workers[i] = rman.fetchDataColumnsFromNetwork(columnIds)
await allFutures(workers)
let finish = SyncMoment.now(uint64(len(columnIds)))
debug "Request manager data column tick",
data_columns_count = len(columnIds),
sync_speed = speed(start, finish)
proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
rman.blockLoopFuture = rman.requestManagerBlockLoop()
rman.blobLoopFuture = rman.requestManagerBlobLoop()
rman.dataColumnLoopFuture = rman.requestManagerDataColumnLoop()
proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
@ -427,3 +664,5 @@ proc stop*(rman: RequestManager) =
rman.blockLoopFuture.cancelSoon()
if not(isNil(rman.blobLoopFuture)):
rman.blobLoopFuture.cancelSoon()
if not(isNil(rman.dataColumnLoopFuture)):
rman.dataColumnLoopFuture.cancelSoon()