From 8ac4cc915271d66f0d2680b8e39bf9b6a951b364 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Mon, 1 Jul 2024 17:42:29 +0530 Subject: [PATCH] add: data column grouping conditions for range request --- beacon_chain/sync/sync_manager.nim | 68 ++++++++++++++++++++++++++++- beacon_chain/sync/sync_protocol.nim | 2 +- beacon_chain/sync/sync_queue.nim | 1 + 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index d560079b4..d40f48ad9 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -12,7 +12,7 @@ import stew/[results, base10], chronos, chronicles import ../spec/datatypes/[phase0, altair], ../spec/eth2_apis/rest_types, - ../spec/[helpers, forks, network], + ../spec/[helpers, forks, network, eip7594_helpers], ../networking/[peer_pool, peer_scores, eth2_network], ../gossip_processing/block_processor, ../beacon_clock, @@ -183,6 +183,12 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool = (wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) +proc shouldGetDataColumns[A, B](man: SyncManager[A, B], e: Epoch): bool = + let wallEpoch = man.getLocalWallSlot().epoch + e >= man.DENEB_FORK_EPOCH and + (wallEpoch < man.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUEST or + e >= wallEpoch - man.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS) + proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, req: SyncRequest): Future[BlobSidecarsRes] {.async: (raises: [CancelledError], raw: true).} = @@ -258,6 +264,66 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] = ? blob_sidecar[].verify_blob_sidecar_inclusion_proof() ok() +proc getDataColumnSidecars[A, B](man: SyncManager[A, B], peer: A, + req: SyncRequest): Future[DataColumnSidecarsRes] + {.async: (raises: [CancelledError], raw: true).} = + mixin getScore, `==` + + logScope: + peer_score = peer.getScore() + peer_speed = peer.netKbps() + sync_indent = man.indent + direction = man.direction + topics = "syncman" + + doAssert(not(req.isEmpty()), "Request must not be empty!") + debug "Requesting data column sidecars from peer", request = req + dataColumnSidecarsByRange(peer, req.slot, req.count, req.columns) + +func groupDataColumns*[T](req: SyncRequest[T], + blocks: seq[ref ForkedSignedBeaconBlock], + data_columns: seq[ref DataColumnSidecar]): + Result[seq[DataColumnSidecars], string] = + var + grouped = newSeq[DataColumnSidecars](len(blocks)) + column_cursor = 0 + for column_idx, blck in blocks: + withBlck(blck[]): + when consensusFork >= consensusFork.Deneb: + template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments + if kzgs.len == 0: + continue + # Clients MUST include all blob sidecars of each block from which they include blob sidecars. + # The following blob sidecars, where they exist, MUST be sent in consecutive (slot, index) order. + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/specs/_features/eip7594/p2p-interface.md + let header = forkyBlck.toSignedBeaconBlockHeader() + for column_idx, kzg_commitment in kzgs: + if column_cursor >= data_columns.len: + return err("DataColumnSidecar: response too short") + let data_column_sidecar = data_columns[column_cursor] + if data_column_sidecar.index == data_columns[column_cursor]: + return err("DataColumnSidecar: unexpected index") + if kzg_commitment notin data_column_sidecar.kzg_commitments: + return err("DataColumnSidecar: unexpected kzg_commitment") + if data_column_sidecar.signed_block_header != header: + return err("DataColumnSidecar: unexpected signed_block_header") + grouped[block_idx].add(data_column_sidecar) + inc column_cursor + + if column_cursor != len(data_columns): + # we reached end of blocks without consuming all data columns so either + # the peer we got too few blocks in the paired request, or the + # peer is sending us spurious data columns. + Result[seq[DataColumnSidecars], string].err "invalid block or data column sequence" + else: + Result[seq[DataColumnSidecars], string].ok grouped + +func checkDataColumns(data_columns: seq[DataColumnSidecars]): Result[void, string] = + for data_column_sidecars in data_columns: + for data_column_sidecar in data_column_sidecars: + ? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof() + ok() + proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async: (raises: [CancelledError]).} = logScope: diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index ad03139cd..e178b3c1c 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -436,7 +436,7 @@ p2pProtocol BeaconSync(version = 1, peer, roots = columnIds.len, count, found # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.2/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyrange-v1 - proc dataColumnSidecarByRange( + proc dataColumnSidecarsByRange( peer: Peer, startSlot: Slot, reqCount: uint64, diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index e7faad5e7..a70e49ca3 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -38,6 +38,7 @@ type index*: uint64 slot*: Slot count*: uint64 + columns*: List[ColumnIndex, NUMBER_OF_COLUMNS] item*: T SyncResult*[T] = object