From f12ceb8c75d016da1133bb617e8b17ba10657acd Mon Sep 17 00:00:00 2001 From: Agnish Ghosh <80243668+agnxsh@users.noreply.github.com> Date: Wed, 27 Nov 2024 11:04:19 +0700 Subject: [PATCH] Req/Resp domain for columns part 1 (#6723) * added column support for req resp domain * fix * update links --- beacon_chain/rpc/rest_constants.nim | 2 + beacon_chain/spec/network.nim | 4 + beacon_chain/sync/request_manager.nim | 2 +- beacon_chain/sync/sync_protocol.nim | 147 ++++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 1 deletion(-) diff --git a/beacon_chain/rpc/rest_constants.nim b/beacon_chain/rpc/rest_constants.nim index ca1f8a510..bdd8bcc84 100644 --- a/beacon_chain/rpc/rest_constants.nim +++ b/beacon_chain/rpc/rest_constants.nim @@ -235,6 +235,8 @@ const "Validator inactive" BlobsOutOfRange* = "Requested slot is outside of blobs window" + DataColumnsOutOfRange* = + "Requested slot is outside of data columns window" InvalidBlsToExecutionChangeObjectError* = "Unable to decode BLS to execution change object(s)" BlsToExecutionChangeValidationError* = diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index d283bc15e..230602daa 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -34,6 +34,10 @@ const MAX_REQUEST_BLOB_SIDECARS*: uint64 = MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.9/specs/_features/eip7594/p2p-interface.md#configuration + MAX_REQUEST_DATA_COLUMN_SIDECARS*: uint64 = + MAX_REQUEST_BLOCKS_DENEB * NUMBER_OF_COLUMNS + defaultEth2TcpPort* = 9000 defaultEth2TcpPortDesc* = $defaultEth2TcpPort diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 79767e553..964e9fd79 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -10,7 +10,7 @@ import std/[sequtils, strutils] import chronos, chronicles import - ../spec/datatypes/[phase0, deneb], + ../spec/datatypes/[phase0, deneb, fulu], ../spec/[forks, network], ../networking/eth2_network, ../consensus_object_pools/block_quarantine, diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index d79ecb10a..00aa946e7 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -24,6 +24,8 @@ const ## Allow syncing ~64 blocks/sec (minus request costs) blobResponseCost = allowedOpsPerSecondCost(1000) ## Multiple can exist per block, they are much smaller than blocks + dataColumnResponseCost = allowedOpsPerSecondCost(8000) + ## 8 data columns take the same memory as 1 blob approximately type BeaconSyncNetworkState* {.final.} = ref object of RootObj @@ -37,6 +39,7 @@ type BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS] BlobIdentifierList* = List[BlobIdentifier, Limit (MAX_REQUEST_BLOB_SIDECARS)] + DataColumnIdentifierList* = List[DataColumnIdentifier, Limit (MAX_REQUEST_DATA_COLUMN_SIDECARS)] proc readChunkPayload*( conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)): @@ -80,6 +83,28 @@ proc readChunkPayload*( else: return neterr InvalidContextBytes +proc readChunkPayload*( + conn: Connection, peer: Peer, MsgType: type (ref DataColumnSidecar)): + Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} = + var contextBytes: ForkDigest + try: + await conn.readExactly(addr contextBytes, sizeof contextBytes) + except CatchableError: + return neterr UnexpectedEOF + let contextFork = + peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr: + return neterr InvalidContextBytes + + withConsensusFork(contextFork): + when consensusFork >= ConsensusFork.Fulu: + let res = await readChunkPayload(conn, peer, DataColumnSidecar) + if res.isOk: + return ok newClone(res.get) + else: + return err(res.error) + else: + return neterr InvalidContextBytes + {.pop.} # TODO fix p2p macro for raises p2pProtocol BeaconSync(version = 1, @@ -339,6 +364,128 @@ p2pProtocol BeaconSync(version = 1, debug "BlobSidecar range request done", peer, startSlot, count = reqCount, found + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyroot-v1 + proc dataColumnSidecarsByRoot( + peer: Peer, + colIds: DataColumnIdentifierList, + response: MultipleChunksResponse[ + ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)]) + {.async, libp2pProtocol("data_column_sidecars_by_root", 1).} = + + trace "got data column root request", peer, len = colIds.len + if colIds.len == 0: + raise newException(InvalidInputsError, "No data columns request for root") + + if colIds.lenu64 > MAX_REQUEST_DATA_COLUMN_SIDECARS: + raise newException(InvalidInputsError, "Exceeding data column request limit") + + let + dag = peer.networkState.dag + count = colIds.len + + var + found = 0 + bytes: seq[byte] + + for i in 0..= dag.head.slot.epoch: + GENESIS_EPOCH + else: + dag.head.slot.epoch - dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS + + if startSlot.epoch < epochBoundary: + raise newException(ResourceUnavailableError, DataColumnsOutOfRange) + + var blockIds: array[int(MAX_REQUEST_DATA_COLUMN_SIDECARS), BlockId] + let + count = int min(reqCount, blockIds.lenu64) + endIndex = count - 1 + startIndex = + dag.getBlockRange(startSlot, 1, blockIds.toOpenArray(0, endIndex)) + + var + found = 0 + bytes: seq[byte] + + for i in startIndex..endIndex: + for k in reqColumns: + if dag.db.getDataColumnSidecarSZ(blockIds[i].root, ColumnIndex k, bytes): + if blockIds[i].slot.epoch >= dag.cfg.DENEB_FORK_EPOCH and + not dag.head.executionValid: + continue + + let uncompressedLen = uncompressedLenFramed(bytes).valueOr: + warn "Cannot read data column sidecar size, database corrup?", + bytes = bytes.len, blck = shortLog(blockIds[i]) + continue + + peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1") + peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1") + + await response.writeBytesSZ( + uncompressedLen, bytes, + peer.network.forkDigestAtEpoch(blockIds[i].slot.epoch).data) + inc found + + var + respondedCols: seq[ColumnIndex] + respondedCols.add(k) + + # additional logging for devnets + debug "responded to data column sidecar range request", + peer, blck = shortLog(blockIds[i]), columns = respondedCols + + debug "Data column range request done", + peer, startSlot, count = reqCount, columns = reqColumns, found + proc init*(T: type BeaconSync.NetworkState, dag: ChainDAGRef): T = T( dag: dag,