add: data column grouping conditions for range request

This commit is contained in:
Agnish Ghosh 2024-07-01 17:42:29 +05:30
parent 26ac58716b
commit 8ac4cc9152
No known key found for this signature in database
GPG Key ID: 7BDDA05D1B25E9F8
3 changed files with 69 additions and 2 deletions

View File

@ -12,7 +12,7 @@ import stew/[results, base10], chronos, chronicles
import import
../spec/datatypes/[phase0, altair], ../spec/datatypes/[phase0, altair],
../spec/eth2_apis/rest_types, ../spec/eth2_apis/rest_types,
../spec/[helpers, forks, network], ../spec/[helpers, forks, network, eip7594_helpers],
../networking/[peer_pool, peer_scores, eth2_network], ../networking/[peer_pool, peer_scores, eth2_network],
../gossip_processing/block_processor, ../gossip_processing/block_processor,
../beacon_clock, ../beacon_clock,
@ -183,6 +183,12 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
(wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or (wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or
e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
proc shouldGetDataColumns[A, B](man: SyncManager[A, B], e: Epoch): bool =
let wallEpoch = man.getLocalWallSlot().epoch
e >= man.DENEB_FORK_EPOCH and
(wallEpoch < man.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUEST or
e >= wallEpoch - man.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS)
proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BlobSidecarsRes] req: SyncRequest): Future[BlobSidecarsRes]
{.async: (raises: [CancelledError], raw: true).} = {.async: (raises: [CancelledError], raw: true).} =
@ -258,6 +264,66 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =
? blob_sidecar[].verify_blob_sidecar_inclusion_proof() ? blob_sidecar[].verify_blob_sidecar_inclusion_proof()
ok() ok()
proc getDataColumnSidecars[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[DataColumnSidecarsRes]
{.async: (raises: [CancelledError], raw: true).} =
mixin getScore, `==`
logScope:
peer_score = peer.getScore()
peer_speed = peer.netKbps()
sync_indent = man.indent
direction = man.direction
topics = "syncman"
doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting data column sidecars from peer", request = req
dataColumnSidecarsByRange(peer, req.slot, req.count, req.columns)
func groupDataColumns*[T](req: SyncRequest[T],
blocks: seq[ref ForkedSignedBeaconBlock],
data_columns: seq[ref DataColumnSidecar]):
Result[seq[DataColumnSidecars], string] =
var
grouped = newSeq[DataColumnSidecars](len(blocks))
column_cursor = 0
for column_idx, blck in blocks:
withBlck(blck[]):
when consensusFork >= consensusFork.Deneb:
template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments
if kzgs.len == 0:
continue
# Clients MUST include all blob sidecars of each block from which they include blob sidecars.
# The following blob sidecars, where they exist, MUST be sent in consecutive (slot, index) order.
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/specs/_features/eip7594/p2p-interface.md
let header = forkyBlck.toSignedBeaconBlockHeader()
for column_idx, kzg_commitment in kzgs:
if column_cursor >= data_columns.len:
return err("DataColumnSidecar: response too short")
let data_column_sidecar = data_columns[column_cursor]
if data_column_sidecar.index == data_columns[column_cursor]:
return err("DataColumnSidecar: unexpected index")
if kzg_commitment notin data_column_sidecar.kzg_commitments:
return err("DataColumnSidecar: unexpected kzg_commitment")
if data_column_sidecar.signed_block_header != header:
return err("DataColumnSidecar: unexpected signed_block_header")
grouped[block_idx].add(data_column_sidecar)
inc column_cursor
if column_cursor != len(data_columns):
# we reached end of blocks without consuming all data columns so either
# the peer we got too few blocks in the paired request, or the
# peer is sending us spurious data columns.
Result[seq[DataColumnSidecars], string].err "invalid block or data column sequence"
else:
Result[seq[DataColumnSidecars], string].ok grouped
func checkDataColumns(data_columns: seq[DataColumnSidecars]): Result[void, string] =
for data_column_sidecars in data_columns:
for data_column_sidecar in data_column_sidecars:
? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof()
ok()
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
{.async: (raises: [CancelledError]).} = {.async: (raises: [CancelledError]).} =
logScope: logScope:

View File

@ -436,7 +436,7 @@ p2pProtocol BeaconSync(version = 1,
peer, roots = columnIds.len, count, found peer, roots = columnIds.len, count, found
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.2/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyrange-v1 # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.2/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyrange-v1
proc dataColumnSidecarByRange( proc dataColumnSidecarsByRange(
peer: Peer, peer: Peer,
startSlot: Slot, startSlot: Slot,
reqCount: uint64, reqCount: uint64,

View File

@ -38,6 +38,7 @@ type
index*: uint64 index*: uint64
slot*: Slot slot*: Slot
count*: uint64 count*: uint64
columns*: List[ColumnIndex, NUMBER_OF_COLUMNS]
item*: T item*: T
SyncResult*[T] = object SyncResult*[T] = object