diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index 18b5221c9..da33499be 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -24,6 +24,8 @@ const ## Enough for finalization in an alternative fork MaxBlobless = SLOTS_PER_EPOCH ## Arbitrary + MaxColumnless = SLOTS_PER_EPOCH + ## Arbitrary MaxUnviables = 16 * 1024 ## About a day of blocks - most likely not needed but it's quite cheap.. @@ -58,6 +60,12 @@ type ## block as well. A blobless block inserted into this table must ## have a resolved parent (i.e., it is not an orphan). + columnless*: OrderedTable[Eth2Digest, ForkedSignedBeaconBlock] + ## Blocks that we don't have columns for. When we have received + ## all columns for this block, we can proceed to resolving the + ## block as well. A columnless block inserted into this table must + ## have a resolved parent (i.e., it is not an orphan) + unviable*: OrderedTable[Eth2Digest, tuple[]] ## Unviable blocks are those that come from a history that does not ## include the finalized checkpoint we're currently following, and can @@ -132,6 +140,10 @@ func removeBlobless*( quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) = quarantine.blobless.del(signedBlock.root) +func removeColumnless*( + quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) = + quarantine.columnless.del(signedBlock.root) + func isViable( finalizedSlot: Slot, slot: Slot): bool = # The orphan must be newer than the finalization point so that its parent @@ -236,6 +248,18 @@ func cleanupBlobless(quarantine: var Quarantine, finalizedSlot: Slot) = quarantine.addUnviable k quarantine.blobless.del k +func cleanupColumnless(quarantine: var Quarantine, finalizedSlot: Slot) = + var toDel: seq[Eth2Digest] + + for k, v in quarantine.columnless: + withBlck(v): + if not isViable(finalizedSlot, forkyBlck.message.slot): + toDel.add k + + for k in toDel: + quarantine.addUnviable k + quarantine.columnless.del k + func clearAfterReorg*(quarantine: var Quarantine) = ## Clear missing and orphans to start with a fresh slate in case of a reorg ## Unviables remain unviable and are not cleared. @@ -325,6 +349,29 @@ proc addBlobless*( quarantine.missing.del(signedBlock.root) true +proc addColumnless*( + quarantine: var Quarantine, finalizedSlot: Slot, + signedBlock: fulu.SignedBeaconBlock): bool = + + if not isViable(finalizedSlot, signedBlock.message.slot): + quarantine.addUnviable(signedBlock.root) + return false + + quarantine.cleanupColumnless(finalizedSlot) + + if quarantine.columnless.lenu64 >= MaxColumnless: + var oldest_columnless_key: Eth2Digest + for k in quarantine.columnless.keys: + oldest_columnless_key = k + break + quarantine.blobless.del oldest_columnless_key + + debug "block quarantine: Adding columnless", blck = shortLog(signedBlock) + quarantine.columnless[signedBlock.root] = + ForkedSignedBeaconBlock.init(signedBlock) + quarantine.missing.del(signedBlock.root) + true + func popBlobless*( quarantine: var Quarantine, root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = @@ -334,6 +381,19 @@ func popBlobless*( else: Opt.none(ForkedSignedBeaconBlock) +func popColumnless*( + quarantine: var Quarantine, + root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = + var blck: ForkedSignedBeaconBlock + if quarantine.columnless.pop(root, blck): + Opt.some(blck) + else: + Opt.none(ForkedSignedBeaconBlock) + iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock = for k, v in quarantine.blobless.mpairs(): yield v + +iterator peekColumnless*(quarantine: var Quarantine): ForkedSignedBeaconBlock = + for k, v in quarantine.columnless.mpairs(): + yield v diff --git a/beacon_chain/consensus_object_pools/data_column_quarantine.nim b/beacon_chain/consensus_object_pools/data_column_quarantine.nim index e073f1074..8e156b24d 100644 --- a/beacon_chain/consensus_object_pools/data_column_quarantine.nim +++ b/beacon_chain/consensus_object_pools/data_column_quarantine.nim @@ -78,7 +78,7 @@ func hasDataColumn*( false func peekColumnIndices*(quarantine: DataColumnQuarantine, - blck: electra.SignedBeaconBlock): + blck: fulu.SignedBeaconBlock): seq[ColumnIndex] = # Peeks into the currently received column indices # from quarantine, necessary data availability checks @@ -110,7 +110,7 @@ func gatherDataColumns*(quarantine: DataColumnQuarantine, func popDataColumns*( quarantine: var DataColumnQuarantine, digest: Eth2Digest, - blck: electra.SignedBeaconBlock): + blck: fulu.SignedBeaconBlock): seq[ref DataColumnSidecar] = var r: DataColumnSidecars for idx in quarantine.custody_columns: @@ -123,7 +123,7 @@ func popDataColumns*( r func hasMissingDataColumns*(quarantine: DataColumnQuarantine, - blck: electra.SignedBeaconBlock): bool = + blck: fulu.SignedBeaconBlock): bool = # `hasMissingDataColumns` consists of the data columns that, # have been missed over gossip, also in case of a supernode, # the method would return missing columns when the supernode @@ -149,7 +149,7 @@ func hasMissingDataColumns*(quarantine: DataColumnQuarantine, return true func hasEnoughDataColumns*(quarantine: DataColumnQuarantine, - blck: electra.SignedBeaconBlock): bool = + blck: fulu.SignedBeaconBlock): bool = # `hasEnoughDataColumns` dictates whether there is `enough` # data columns for a block to be enqueued, ideally for a supernode # if it receives atleast 50%+ gossip and RPC @@ -175,7 +175,7 @@ func hasEnoughDataColumns*(quarantine: DataColumnQuarantine, return true func dataColumnFetchRecord*(quarantine: DataColumnQuarantine, - blck: electra.SignedBeaconBlock): + blck: fulu.SignedBeaconBlock): DataColumnFetchRecord = var indices: seq[ColumnIndex] for i in quarantine.custody_columns: diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 96003fdfb..7d89b66d8 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1647,6 +1647,15 @@ proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} = # Also, give some time to dial the discovered nodes and update stats etc await sleepAsync(5.seconds) +proc fetchNodeIdFromPeerId*(peer: Peer): NodeId= + # Convert peer id to node id by extracting the peer's public key + let nodeId = + block: + var key: PublicKey + discard peer.peerId.extractPublicKey(key) + keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId() + nodeId + proc resolvePeer(peer: Peer) = # Resolve task which performs searching of peer's public key and recovery of # ENR using discovery5. We only resolve ENR for peers we know about to avoid @@ -2418,6 +2427,33 @@ func announcedENR*(node: Eth2Node): enr.Record = doAssert node.discovery != nil, "The Eth2Node must be initialized" node.discovery.localNode.record +proc lookupCscFromPeer*(peer: Peer): uint64 = + # Fetches the custody column count from a remote peer. + # If the peer advertises their custody column count via the `csc` ENR field, + # that value is returned. Otherwise, the default value `CUSTODY_REQUIREMENT` + # is assumed. + + let metadata = peer.metadata + if metadata.isOk: + return metadata.get.custody_subnet_count + + # Try getting the custody count from ENR if metadata fetch fails. + debug "Could not get csc from metadata, trying from ENR", + peer_id = peer.peerId + let enrOpt = peer.enr + if not enrOpt.isNone: + let enr = enrOpt.get + let enrFieldOpt = enr.get(enrCustodySubnetCountField, seq[byte]) + if enrFieldOpt.isOk: + try: + let csc = SSZ.decode(enrFieldOpt.get, uint8) + return csc.uint64 + except SszError, SerializationError: + discard # Ignore decoding errors and fallback to default + + # Return default value if no valid custody subnet count is found. + return CUSTODY_REQUIREMENT.uint64 + func shortForm*(id: NetKeyPair): string = $PeerId.init(id.pubkey) @@ -2579,6 +2615,20 @@ proc updateStabilitySubnetMetadata*(node: Eth2Node, attnets: AttnetBits) = else: debug "Stability subnets changed; updated ENR attnets", attnets +proc loadCscnetMetadataAndEnr*(node: Eth2Node, cscnets: CscCount) = + node.metadata.custody_subnet_count = cscnets.uint64 + let res = + node.discovery.updateRecord({ + enrCustodySubnetCountField: SSZ.encode(cscnets) + }) + + if res.isErr: + # This should not occur in this scenario as the private key would always + # be the correct one and the ENR will not increase in size + warn "Failed to update the ENR csc field", error = res.error + else: + debug "Updated ENR csc", cscnets + proc updateSyncnetsMetadata*(node: Eth2Node, syncnets: SyncnetBits) = # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.9/specs/altair/validator.md#sync-committee-subnet-stability if node.metadata.syncnets == syncnets: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 1a5337dd6..44d6d3998 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -422,6 +422,9 @@ proc initFullNode( DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 else: CUSTODY_REQUIREMENT.uint64 + custody_columns_set = + node.network.nodeId.get_custody_columns_set(max(SAMPLES_PER_SLOT.uint64, + localCustodySubnets)) consensusManager = ConsensusManager.new( dag, attestationPool, quarantine, node.elManager, ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets), @@ -478,6 +481,13 @@ proc initFullNode( Opt.some blob_sidecar else: Opt.none(ref BlobSidecar) + rmanDataColumnLoader = proc( + columnId: DataColumnIdentifier): Opt[ref DataColumnSidecar] = + var data_column_sidecar = DataColumnSidecar.new() + if dag.db.getDataColumnSidecar(columnId.block_root, columnId.index, data_column_sidecar[]): + Opt.some data_column_sidecar + else: + Opt.none(ref DataColumnSidecar) processor = Eth2Processor.new( config.doppelgangerDetection, @@ -525,10 +535,10 @@ proc initFullNode( processor: processor, network: node.network) requestManager = RequestManager.init( - node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, - (proc(): bool = syncManager.inProgress), - quarantine, blobQuarantine, rmanBlockVerifier, - rmanBlockLoader, rmanBlobLoader) + node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH, + getBeaconTime, (proc(): bool = syncManager.inProgress), + quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier, + rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader) # As per EIP 7594, the BN is now categorised into a # `Fullnode` and a `Supernode`, the fullnodes custodies a @@ -552,7 +562,13 @@ proc initFullNode( dataColumnQuarantine[].supernode = supernode dataColumnQuarantine[].custody_columns = node.network.nodeId.get_custody_columns(max(SAMPLES_PER_SLOT.uint64, - localCustodySubnets)) + localCustodySubnets)) + + if node.config.subscribeAllSubnets: + node.network.loadCscnetMetadataAndEnr(DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint8) + else: + node.network.loadCscnetMetadataAndEnr(CUSTODY_REQUIREMENT.uint8) + if node.config.lightClientDataServe: proc scheduleSendingLightClientUpdates(slot: Slot) = if node.lightClientPool[].broadcastGossipFut != nil: diff --git a/beacon_chain/spec/datatypes/fulu.nim b/beacon_chain/spec/datatypes/fulu.nim index 8d34c7294..6ef03ac24 100644 --- a/beacon_chain/spec/datatypes/fulu.nim +++ b/beacon_chain/spec/datatypes/fulu.nim @@ -117,7 +117,7 @@ type seq_number*: uint64 attnets*: AttnetBits syncnets*: SyncnetBits - custody_subnet_count*: CscCount + custody_subnet_count*: uint64 # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/deneb/beacon-chain.md#executionpayload ExecutionPayload* = object diff --git a/beacon_chain/spec/eip7594_helpers.nim b/beacon_chain/spec/eip7594_helpers.nim index 0c489dfa4..94037505b 100644 --- a/beacon_chain/spec/eip7594_helpers.nim +++ b/beacon_chain/spec/eip7594_helpers.nim @@ -93,6 +93,20 @@ func get_custody_columns*(node_id: NodeId, sortedColumnIndices(ColumnIndex(columns_per_subnet), subnet_ids) +func get_custody_columns_set*(node_id: NodeId, + custody_subnet_count: uint64): + HashSet[ColumnIndex] = + # This method returns a HashSet of column indices, + # the method is specifically relevant while peer filtering + let + subnet_ids = + get_custody_column_subnets(node_id, custody_subnet_count) + const + columns_per_subnet = + NUMBER_OF_COLUMNS div DATA_COLUMN_SIDECAR_SUBNET_COUNT + + sortedColumnIndices(ColumnIndex(columns_per_subnet), subnet_ids).toHashSet() + func get_custody_column_list*(node_id: NodeId, custody_subnet_count: uint64): List[ColumnIndex, NUMBER_OF_COLUMNS] = diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 230602daa..6fdca9e97 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -47,6 +47,7 @@ const enrAttestationSubnetsField* = "attnets" enrSyncSubnetsField* = "syncnets" + enrCustodySubnetCountField* = "csc" enrForkIdField* = "eth2" template eth2Prefix(forkDigest: ForkDigest): string = diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index bf5b64ab9..4f1bb56db 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -11,10 +11,11 @@ import std/[sequtils, strutils] import chronos, chronicles import ../spec/datatypes/[phase0, deneb, fulu], - ../spec/[forks, network], + ../spec/[forks, network, eip7594_helpers], ../networking/eth2_network, ../consensus_object_pools/block_quarantine, ../consensus_object_pools/blob_quarantine, + ../consensus_object_pools/data_column_quarantine, "."/sync_protocol, "."/sync_manager, ../gossip_processing/block_processor @@ -32,8 +33,13 @@ const PARALLEL_REQUESTS* = 2 ## Number of peers we using to resolve our request. + PARALLEL_REQUESTS_DATA_COLUMNS* = 32 + BLOB_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000 - ## How long to wait for blobs to arrive over gossip before fetching. + ## How long to wait for blobs to arri ve over gossip before fetching. + + DATA_COLUMN_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000 + ## How long to wait for blobs to arri ve over gossip before fetching. POLL_INTERVAL = 1.seconds @@ -50,19 +56,28 @@ type BlobLoaderFn* = proc( blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].} + DataColumnLoaderFn* = proc( + columnId: DataColumnIdentifier): + Opt[ref DataColumnSidecar] {.gcsafe, raises: [].} + InhibitFn* = proc: bool {.gcsafe, raises: [].} RequestManager* = object network*: Eth2Node + supernode*: bool + custody_columns_set: HashSet[ColumnIndex] getBeaconTime: GetBeaconTimeFn inhibit: InhibitFn quarantine: ref Quarantine blobQuarantine: ref BlobQuarantine + dataColumnQuarantine: ref DataColumnQuarantine blockVerifier: BlockVerifierFn blockLoader: BlockLoaderFn blobLoader: BlobLoaderFn + dataColumnLoader: DataColumnLoaderFn blockLoopFuture: Future[void].Raising([CancelledError]) blobLoopFuture: Future[void].Raising([CancelledError]) + dataColumnLoopFuture: Future[void].Raising([CancelledError]) func shortLog*(x: seq[Eth2Digest]): string = "[" & x.mapIt(shortLog(it)).join(", ") & "]" @@ -71,23 +86,31 @@ func shortLog*(x: seq[FetchRecord]): string = "[" & x.mapIt(shortLog(it.root)).join(", ") & "]" proc init*(T: type RequestManager, network: Eth2Node, + supernode: bool, + custody_columns_set: HashSet[ColumnIndex], denebEpoch: Epoch, getBeaconTime: GetBeaconTimeFn, inhibit: InhibitFn, quarantine: ref Quarantine, blobQuarantine: ref BlobQuarantine, + dataColumnQuarantine: ref DataColumnQuarantine, blockVerifier: BlockVerifierFn, blockLoader: BlockLoaderFn = nil, - blobLoader: BlobLoaderFn = nil): RequestManager = + blobLoader: BlobLoaderFn = nil, + dataColumnLoader: DataColumnLoaderFn = nil): RequestManager = RequestManager( network: network, + supernode: supernode, + custody_columns_set: custody_columns_set, getBeaconTime: getBeaconTime, inhibit: inhibit, quarantine: quarantine, blobQuarantine: blobQuarantine, + dataColumnQuarantine: dataColumnQuarantine, blockVerifier: blockVerifier, blockLoader: blockLoader, - blobLoader: blobLoader) + blobLoader: blobLoader, + dataColumnLoader: dataColumnLoader) proc checkResponse(roots: openArray[Eth2Digest], blocks: openArray[ref ForkedSignedBeaconBlock]): bool = @@ -131,6 +154,30 @@ proc checkResponse(idList: seq[BlobIdentifier], inc i true +proc checkResponse(idList: seq[DataColumnIdentifier], + columns: openArray[ref DataColumnSidecar]): bool = + if columns.len > idList.len: + return false + var i = 0 + while i < columns.len: + let + block_root = hash_tree_root(columns[i].signed_block_header.message) + id = idList[i] + + # Check if the column reponse is a subset + if binarySearch(idList, columns[i], cmpSidecarIdentifier) == -1: + return false + + # Verify block root and index match + if id.block_root != block_root or id.index != columns[i].index: + return false + + # Verify inclusion proof + columns[i][].verify_data_column_sidecar_inclusion_proof().isOkOr: + return false + inc i + true + proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} = var peer: Peer try: @@ -202,7 +249,7 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: if not(isNil(peer)): rman.network.peerPool.release(peer) -func cmpBlobIndexes(x, y: ref BlobSidecar): int = +func cmpSidecarIndexes(x, y: ref BlobSidecar | ref DataColumnSidecar): int = cmp(x.index, y.index) proc fetchBlobsFromNetwork(self: RequestManager, @@ -219,7 +266,7 @@ proc fetchBlobsFromNetwork(self: RequestManager, if blobs.isOk: var ublobs = blobs.get().asSeq() - ublobs.sort(cmpBlobIndexes) + ublobs.sort(cmpSidecarIndexes) if not checkResponse(idList, ublobs): debug "Mismatched response to blobs by root", peer = peer, blobs = shortLog(idList), ublobs = len(ublobs) @@ -248,6 +295,89 @@ proc fetchBlobsFromNetwork(self: RequestManager, if not(isNil(peer)): self.network.peerPool.release(peer) +proc checkPeerCustody*(rman: RequestManager, + peer: Peer): + bool = + # Returns true if the peer custodies atleast + # ONE of the common custody columns, straight + # away returns true if the peer is a supernode. + if rman.supernode: + # For a supernode, it is always best/optimistic + # to filter other supernodes, rather than filter + # too many full nodes that have a subset of the custody + # columns + if peer.lookupCscFromPeer() == + DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64: + return true + + else: + if peer.lookupCscFromPeer() == + DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64: + return true + + elif peer.lookupCscFromPeer() == + CUSTODY_REQUIREMENT.uint64: + + # Fetch the remote custody count + let remoteCustodySubnetCount = + peer.lookupCscFromPeer() + + # Extract remote peer's nodeID from peerID + # Fetch custody columns from remote peer + let + remoteNodeId = fetchNodeIdFromPeerId(peer) + remoteCustodyColumns = + remoteNodeId.get_custody_columns_set(max(SAMPLES_PER_SLOT.uint64, + remoteCustodySubnetCount)) + + for local_column in rman.custody_columns_set: + if local_column notin remoteCustodyColumns: + return false + + return true + + else: + return false + +proc fetchDataColumnsFromNetwork(rman: RequestManager, + colIdList: seq[DataColumnIdentifier]) + {.async: (raises: [CancelledError]).} = + var peer = await rman.network.peerPool.acquire() + try: + + if rman.checkPeerCustody(peer): + debug "Requesting data columns by root", peer = peer, columns = shortLog(colIdList), + peer_score = peer.getScore() + let columns = await dataColumnSidecarsByRoot(peer, DataColumnIdentifierList colIdList) + + if columns.isOk: + var ucolumns = columns.get().asSeq() + ucolumns.sort(cmpSidecarIndexes) + if not checkResponse(colIdList, ucolumns): + debug "Mismatched response to data columns by root", + peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns) + peer.updateScore(PeerScoreBadResponse) + return + + for col in ucolumns: + rman.dataColumnQuarantine[].put(col) + var curRoot: Eth2Digest + for col in ucolumns: + let block_root = hash_tree_root(col.signed_block_header.message) + if block_root != curRoot: + curRoot = block_root + if (let o = rman.quarantine[].popColumnless(curRoot); o.isSome): + let col = o.unsafeGet() + discard await rman.blockVerifier(col, false) + else: + debug "Data columns by root request not done, peer doesn't have custody column", + peer = peer, columns = shortLog(colIdList), err = columns.error() + peer.updateScore(PeerScoreNoValues) + + finally: + if not(isNil(peer)): + rman.network.peerPool.release(peer) + proc requestManagerBlockLoop( rman: RequestManager) {.async: (raises: [CancelledError]).} = while true: @@ -416,10 +546,117 @@ proc requestManagerBlobLoop( blobs_count = len(blobIds), sync_speed = speed(start, finish) +proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier] = + let + wallTime = rman.getBeaconTime() + wallSlot = wallTime.slotOrZero() + delay = wallTime - wallSlot.start_beacon_time() + + const waitDur = TimeDiff(nanoseconds: DATA_COLUMN_GOSSIP_WAIT_TIME_NS) + + var + fetches: HashSet[DataColumnIdentifier] + ready: seq[Eth2Digest] + + for columnless in rman.quarantine[].peekColumnless(): + withBlck(columnless): + when consensusFork >= ConsensusFork.Fulu: + # granting data columns a chance to arrive over gossip + if forkyBlck.message.slot == wallSlot and delay < waitDur: + debug "Not handling missing data columns early in slot" + continue + + if not rman.dataColumnQuarantine[].hasMissingDataColumns(forkyBlck): + let missing = rman.dataColumnQuarantine[].dataColumnFetchRecord(forkyBlck) + if len(missing.indices) == 0: + warn "quarantine is missing data columns, but missing indices are empty", + blk = columnless.root, + commitments = len(forkyBlck.message.body.blob_kzg_commitments) + for idx in missing.indices: + let id = DataColumnIdentifier(block_root: columnless.root, index: idx) + if id.index in rman.custody_columns_set and id notin fetches and + len(forkyBlck.message.body.blob_kzg_commitments) != 0: + fetches.incl(id) + else: + # this is a programming error and it not should occur + warn "missing column handler found columnless block with all data columns", + blk = columnless.root, + commitments = len(forkyBlck.message.body.blob_kzg_commitments) + ready.add(columnless.root) + + for root in ready: + let columnless = rman.quarantine[].popColumnless(root).valueOr: + continue + discard rman.blockVerifier(columnless, false) + fetches + +proc requestManagerDataColumnLoop( + rman: RequestManager) {.async: (raises: [CancelledError]).} = + while true: + + await sleepAsync(POLL_INTERVAL) + if rman.inhibit(): + continue + + let missingColumnIds = rman.getMissingDataColumns() + if missingColumnIds.len == 0: + continue + + var columnIds: seq[DataColumnIdentifier] + if rman.dataColumnLoader == nil: + for item in missingColumnIds: + columnIds.add item + else: + var + blockRoots: seq[Eth2Digest] + curRoot: Eth2Digest + for columnId in missingColumnIds: + if columnId.block_root != curRoot: + curRoot = columnId.block_root + blockRoots.add curRoot + let data_column_sidecar = rman.dataColumnLoader(columnId).valueOr: + columnIds.add columnId + if blockRoots.len > 0 and blockRoots[^1] == curRoot: + # A data column is missing, remove from list of fully available data columns + discard blockRoots.pop() + continue + debug "Loaded orphaned data columns from storage", columnId + rman.dataColumnQuarantine[].put(data_column_sidecar) + var verifiers = newSeqOfCap[ + Future[Result[void, VerifierError]] + .Raising([CancelledError])](blockRoots.len) + for blockRoot in blockRoots: + let blck = rman.quarantine[].popColumnless(blockRoot).valueOr: + continue + verifiers.add rman.blockVerifier(blck, maybeFinalized = false) + try: + await allFutures(verifiers) + except CancelledError as exc: + var futs = newSeqOfCap[Future[void].Raising([])](verifiers.len) + for verifier in verifiers: + futs.add verifier.cancelAndWait() + await noCancel allFutures(futs) + raise exc + if columnIds.len > 0: + debug "Requesting detected missing data columns", columns = shortLog(columnIds) + let start = SyncMoment.now(0) + var workers: + array[PARALLEL_REQUESTS_DATA_COLUMNS, Future[void].Raising([CancelledError])] + for i in 0..