Req/Resp domain for columns part 1 (#6723)

* added column support for req resp domain

* fix

* update links
This commit is contained in:
Agnish Ghosh 2024-11-27 11:04:19 +07:00 committed by GitHub
parent a461bb102f
commit f12ceb8c75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 154 additions and 1 deletions

View File

@ -235,6 +235,8 @@ const
"Validator inactive"
BlobsOutOfRange* =
"Requested slot is outside of blobs window"
DataColumnsOutOfRange* =
"Requested slot is outside of data columns window"
InvalidBlsToExecutionChangeObjectError* =
"Unable to decode BLS to execution change object(s)"
BlsToExecutionChangeValidationError* =

View File

@ -34,6 +34,10 @@ const
MAX_REQUEST_BLOB_SIDECARS*: uint64 =
MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.9/specs/_features/eip7594/p2p-interface.md#configuration
MAX_REQUEST_DATA_COLUMN_SIDECARS*: uint64 =
MAX_REQUEST_BLOCKS_DENEB * NUMBER_OF_COLUMNS
defaultEth2TcpPort* = 9000
defaultEth2TcpPortDesc* = $defaultEth2TcpPort

View File

@ -10,7 +10,7 @@
import std/[sequtils, strutils]
import chronos, chronicles
import
../spec/datatypes/[phase0, deneb],
../spec/datatypes/[phase0, deneb, fulu],
../spec/[forks, network],
../networking/eth2_network,
../consensus_object_pools/block_quarantine,

View File

@ -24,6 +24,8 @@ const
## Allow syncing ~64 blocks/sec (minus request costs)
blobResponseCost = allowedOpsPerSecondCost(1000)
## Multiple can exist per block, they are much smaller than blocks
dataColumnResponseCost = allowedOpsPerSecondCost(8000)
## 8 data columns take the same memory as 1 blob approximately
type
BeaconSyncNetworkState* {.final.} = ref object of RootObj
@ -37,6 +39,7 @@ type
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
BlobIdentifierList* = List[BlobIdentifier, Limit (MAX_REQUEST_BLOB_SIDECARS)]
DataColumnIdentifierList* = List[DataColumnIdentifier, Limit (MAX_REQUEST_DATA_COLUMN_SIDECARS)]
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
@ -80,6 +83,28 @@ proc readChunkPayload*(
else:
return neterr InvalidContextBytes
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref DataColumnSidecar)):
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CatchableError:
return neterr UnexpectedEOF
let contextFork =
peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr:
return neterr InvalidContextBytes
withConsensusFork(contextFork):
when consensusFork >= ConsensusFork.Fulu:
let res = await readChunkPayload(conn, peer, DataColumnSidecar)
if res.isOk:
return ok newClone(res.get)
else:
return err(res.error)
else:
return neterr InvalidContextBytes
{.pop.} # TODO fix p2p macro for raises
p2pProtocol BeaconSync(version = 1,
@ -339,6 +364,128 @@ p2pProtocol BeaconSync(version = 1,
debug "BlobSidecar range request done",
peer, startSlot, count = reqCount, found
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyroot-v1
proc dataColumnSidecarsByRoot(
peer: Peer,
colIds: DataColumnIdentifierList,
response: MultipleChunksResponse[
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)])
{.async, libp2pProtocol("data_column_sidecars_by_root", 1).} =
trace "got data column root request", peer, len = colIds.len
if colIds.len == 0:
raise newException(InvalidInputsError, "No data columns request for root")
if colIds.lenu64 > MAX_REQUEST_DATA_COLUMN_SIDECARS:
raise newException(InvalidInputsError, "Exceeding data column request limit")
let
dag = peer.networkState.dag
count = colIds.len
var
found = 0
bytes: seq[byte]
for i in 0..<count:
let blockRef =
dag.getBlockRef(colIds[i].block_root).valueOr:
continue
let index =
colIds[i].index
if dag.db.getDataColumnSidecarSZ(blockRef.bid.root, index, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read data column size, database corrupt?",
bytes = bytes.len, blck = shortLog(blockRef), columnIndex = index
continue
peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
await response.writeBytesSZ(
uncompressedLen, bytes,
peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found
# additional logging for devnets
debug "responsded to data column sidecar by root request",
peer, blck = shortLog(blockRef), columnIndex = index
debug "Data column root request done",
peer, roots = colIds.len, count, found
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyrange-v1
proc dataColumnSidecarsByRange(
peer: Peer,
startSlot: Slot,
reqCount: uint64,
reqColumns: List[ColumnIndex, NUMBER_OF_COLUMNS],
response: MultipleChunksResponse[
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)])
{.async, libp2pProtocol("data_column_sidecars_by_range", 1).} =
trace "got data columns range request", peer, startSlot,
count = reqCount, columns = reqColumns
if reqCount == 0 or reqColumns.len == 0:
raise newException(InvalidInputsError, "Empty range requested")
let
dag = peer.networkState.dag
# Using MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS until
# MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS is released in
# Fulu. Effectively both the values are same
epochBoundary =
if dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS >= dag.head.slot.epoch:
GENESIS_EPOCH
else:
dag.head.slot.epoch - dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if startSlot.epoch < epochBoundary:
raise newException(ResourceUnavailableError, DataColumnsOutOfRange)
var blockIds: array[int(MAX_REQUEST_DATA_COLUMN_SIDECARS), BlockId]
let
count = int min(reqCount, blockIds.lenu64)
endIndex = count - 1
startIndex =
dag.getBlockRange(startSlot, 1, blockIds.toOpenArray(0, endIndex))
var
found = 0
bytes: seq[byte]
for i in startIndex..endIndex:
for k in reqColumns:
if dag.db.getDataColumnSidecarSZ(blockIds[i].root, ColumnIndex k, bytes):
if blockIds[i].slot.epoch >= dag.cfg.DENEB_FORK_EPOCH and
not dag.head.executionValid:
continue
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read data column sidecar size, database corrup?",
bytes = bytes.len, blck = shortLog(blockIds[i])
continue
peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1")
peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1")
await response.writeBytesSZ(
uncompressedLen, bytes,
peer.network.forkDigestAtEpoch(blockIds[i].slot.epoch).data)
inc found
var
respondedCols: seq[ColumnIndex]
respondedCols.add(k)
# additional logging for devnets
debug "responded to data column sidecar range request",
peer, blck = shortLog(blockIds[i]), columns = respondedCols
debug "Data column range request done",
peer, startSlot, count = reqCount, columns = reqColumns, found
proc init*(T: type BeaconSync.NetworkState, dag: ChainDAGRef): T =
T(
dag: dag,