add: data column support in sync_protocol, sync_manager, request_manager, fix: gossipValidation rules

This commit is contained in:
Agnish Ghosh 2024-06-18 19:01:56 +05:30
parent 8e49f88067
commit 46d07b140d
No known key found for this signature in database
GPG Key ID: 7BDDA05D1B25E9F8
7 changed files with 169 additions and 11 deletions

View File

@ -22,7 +22,7 @@ import
./el/el_manager, ./el/el_manager,
./consensus_object_pools/[ ./consensus_object_pools/[
blockchain_dag, blob_quarantine, block_quarantine, consensus_manager, blockchain_dag, blob_quarantine, block_quarantine, consensus_manager,
attestation_pool, sync_committee_msg_pool, validator_change_pool], data_column_quarantine, attestation_pool, sync_committee_msg_pool, validator_change_pool],
./spec/datatypes/[base, altair], ./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients, ./spec/eth2_apis/dynamic_fee_recipients,
./sync/[sync_manager, request_manager], ./sync/[sync_manager, request_manager],
@ -71,6 +71,7 @@ type
dag*: ChainDAGRef dag*: ChainDAGRef
quarantine*: ref Quarantine quarantine*: ref Quarantine
blobQuarantine*: ref BlobQuarantine blobQuarantine*: ref BlobQuarantine
dataColumnQuarantine*: ref DataColumnQuarantine
attestationPool*: ref AttestationPool attestationPool*: ref AttestationPool
syncCommitteeMsgPool*: ref SyncCommitteeMsgPool syncCommitteeMsgPool*: ref SyncCommitteeMsgPool
lightClientPool*: ref LightClientPool lightClientPool*: ref LightClientPool

View File

@ -58,6 +58,8 @@ type
## block as well. A blobless block inserted into this table must ## block as well. A blobless block inserted into this table must
## have a resolved parent (i.e., it is not an orphan). ## have a resolved parent (i.e., it is not an orphan).
columnless: OrderedTable[Eth2Digest, ForkedSignedBeaconBlock]
unviable*: OrderedTable[Eth2Digest, tuple[]] unviable*: OrderedTable[Eth2Digest, tuple[]]
## Unviable blocks are those that come from a history that does not ## Unviable blocks are those that come from a history that does not
## include the finalized checkpoint we're currently following, and can ## include the finalized checkpoint we're currently following, and can
@ -336,3 +338,16 @@ func popBlobless*(
iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock = iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
for k, v in quarantine.blobless.mpairs(): for k, v in quarantine.blobless.mpairs():
yield v yield v
func popColumnless*(
quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
var blck: ForkedSignedBeaconBlock
if quarantine.columnless.pop(root, blck):
Opt.some(blck)
else:
Opt.none(ForkedSignedBeaconBlock)
iterator peekColumless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
for k,v in quarantine.columnless.mpairs():
yield v

View File

@ -0,0 +1,71 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/tables,
../spec/[helpers, eip7594_helpers]
from std/sequtils import mapIt
from std/strutils import join
const
MaxDataColumns = 3 * SLOTS_PER_EPOCH * NUMBER_OF_COLUMNS
## Same limit as `MaxOrphans` in `block_quarantine`
## data columns may arrive before an orphan is tagged `columnless`
type
DataColumnQuarantine* = object
data_columns*:
OrderedTable[(Eth2Digest, ColumnIndex), ref DataColumnSidecar]
onDataColumnSidecarCallback*: OnDataColumnSidecarCallback
DataColumnFetchRecord* = object
block_root*: Eth2Digest
indices*: seq[ColumnIndex]
OnDataColumnSidecarCallback = proc(data: DataColumnSidecar) {.gcsafe, raises: [].}
func shortLog*(x: seq[ColumnIndex]): string =
"<" & x.mapIt($it).join(", ") & ">"
func shortLog*(x: seq[DataColumnFetchRecord]): string =
"[" & x.mapIt(shortLog(it.block_root) & shortLog(it.indices)).join(", ") & "]"
func put*(quarantine: var DataColumnQuarantine, dataColumnSidecar: ref DataColumnSidecar) =
if quarantine.data_columns.lenu64 >= MaxDataColumns:
# FIFO if full. For example, sync manager and request manager can race to
# put blobs in at the same time, so one gets blob insert -> block resolve
# -> blob insert sequence, which leaves garbage blobs.
#
# This also therefore automatically garbage-collects otherwise valid garbage
# blobs which are correctly signed, point to either correct block roots or a
# block root which isn't ever seen, and then are for any reason simply never
# used.
var oldest_column_key: (Eth2Digest, ColumnIndex)
for k in quarantine.data_columns.keys:
oldest_column_key = k
break
quarantine.data_columns.del(oldest_column_key)
let block_root = hash_tree_root(dataColumnSidecar.signed_block_header.message)
discard quarantine.data_columns.hasKeyOrPut(
(block_root, dataColumnSidecar.index), dataColumnSidecar)
func hasDataColumn*(
quarantine: DataColumnQuarantine,
slot: Slot,
proposer_index: uint64,
index: ColumnIndex): bool =
for data_column_sidecar in quarantine.data_columns.values:
template block_header: untyped = data_column_sidecar.signed_block_header.message
if block_header.slot == slot and
block_header.proposer_index == proposer_index and
data_column_sidecar.index == index:
return true
false

View File

@ -15,8 +15,8 @@ import
../spec/datatypes/[altair, phase0, deneb, eip7594], ../spec/datatypes/[altair, phase0, deneb, eip7594],
../consensus_object_pools/[ ../consensus_object_pools/[
blob_quarantine, block_clearance, block_quarantine, blockchain_dag, blob_quarantine, block_clearance, block_quarantine, blockchain_dag,
attestation_pool, light_client_pool, sync_committee_msg_pool, data_column_quarantine, attestation_pool, light_client_pool,
validator_change_pool], sync_committee_msg_pool, validator_change_pool],
../validators/validator_pool, ../validators/validator_pool,
../beacon_clock, ../beacon_clock,
"."/[gossip_validation, block_processor, batch_validation], "."/[gossip_validation, block_processor, batch_validation],
@ -156,6 +156,8 @@ type
blobQuarantine*: ref BlobQuarantine blobQuarantine*: ref BlobQuarantine
dataColumnQuarantine*: ref DataColumnQuarantine
# Application-provided current time provider (to facilitate testing) # Application-provided current time provider (to facilitate testing)
getCurrentBeaconTime*: GetBeaconTimeFn getCurrentBeaconTime*: GetBeaconTimeFn
@ -345,7 +347,7 @@ proc processDataColumnSidecar*(
debug "Data column received", delay debug "Data column received", delay
let v = let v =
self.dag.validateDataColumnSidecar(self.quarantine, self.blobQuarantine, self.dag.validateDataColumnSidecar(self.quarantine, self.dataColumnQuarantine,
dataColumnSidecar, wallTime, subnet_id) dataColumnSidecar, wallTime, subnet_id)
if v.isErr(): if v.isErr():

View File

@ -16,7 +16,7 @@ import
beaconstate, state_transition_block, forks, helpers, network, signatures, eip7594_helpers], beaconstate, state_transition_block, forks, helpers, network, signatures, eip7594_helpers],
../consensus_object_pools/[ ../consensus_object_pools/[
attestation_pool, blockchain_dag, blob_quarantine, block_quarantine, attestation_pool, blockchain_dag, blob_quarantine, block_quarantine,
spec_cache, light_client_pool, sync_committee_msg_pool, data_column_quarantine, spec_cache, light_client_pool, sync_committee_msg_pool,
validator_change_pool], validator_change_pool],
".."/[beacon_clock], ".."/[beacon_clock],
./batch_validation ./batch_validation
@ -490,7 +490,7 @@ proc validateBlobSidecar*(
# https://github.com/ethereum/consensus-specs/blob/5f48840f4d768bf0e0a8156a3ed06ec333589007/specs/_features/eip7594/p2p-interface.md#the-gossip-domain-gossipsub # https://github.com/ethereum/consensus-specs/blob/5f48840f4d768bf0e0a8156a3ed06ec333589007/specs/_features/eip7594/p2p-interface.md#the-gossip-domain-gossipsub
proc validateDataColumnSidecar*( proc validateDataColumnSidecar*(
dag: ChainDAGRef, quarantine: ref Quarantine, dag: ChainDAGRef, quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine, data_column_sidecar: DataColumnSidecar, dataColumnQuarantine: ref DataColumnQuarantine, data_column_sidecar: DataColumnSidecar,
wallTime: BeaconTime, subnet_id: uint64): Result[void, ValidationError] = wallTime: BeaconTime, subnet_id: uint64): Result[void, ValidationError] =
template block_header: untyped = data_column_sidecar.signed_block_header.message template block_header: untyped = data_column_sidecar.signed_block_header.message
@ -538,7 +538,7 @@ proc validateDataColumnSidecar*(
let block_root = hash_tree_root(block_header) let block_root = hash_tree_root(block_header)
if dag.getBlockRef(block_root).isSome(): if dag.getBlockRef(block_root).isSome():
return errIgnore("DataColumnSidecar: already have block") return errIgnore("DataColumnSidecar: already have block")
if blobQuarantine[].hasBlob( if dataColumnQuarantine[].hasDataColumn(
block_header.slot, block_header.proposer_index, data_column_sidecar.index): block_header.slot, block_header.proposer_index, data_column_sidecar.index):
return errIgnore("DataColumnSidecar: already have valid data column from same proposer") return errIgnore("DataColumnSidecar: already have valid data column from same proposer")

View File

@ -9,6 +9,9 @@
import "."/[base, deneb], kzg4844 import "."/[base, deneb], kzg4844
from std/sequtils import mapIt
from std/strutils import join
export base export base
const const
@ -51,8 +54,8 @@ type
DataColumnSidecar* = object DataColumnSidecar* = object
index*: ColumnIndex # Index of column in extended matrix index*: ColumnIndex # Index of column in extended matrix
column*: DataColumn column*: DataColumn
kzg_commitments*: List[KzgCommitment, Limit(MAX_BLOB_COMMITMENTS_PER_BLOCK)] kzg_commitments*: KzgCommitments
kzg_proofs*: List[KzgProof, Limit(MAX_BLOB_COMMITMENTS_PER_BLOCK)] kzg_proofs*: KzgProofs
signed_block_header*: SignedBeaconBlockHeader signed_block_header*: SignedBeaconBlockHeader
kzg_commitments_inclusion_proof*: kzg_commitments_inclusion_proof*:
array[KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH, Eth2Digest] array[KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH, Eth2Digest]
@ -75,3 +78,6 @@ func shortLog*(v: DataColumnSidecar): auto =
kzg_proofs: v.kzg_proofs.len, kzg_proofs: v.kzg_proofs.len,
block_header: shortLog(v.signed_block_header.message), block_header: shortLog(v.signed_block_header.message),
) )
func shortLog*(x: seq[DataColumnIdentifier]): string =
"[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]"

View File

@ -11,10 +11,11 @@ import std/[sequtils, strutils]
import chronos, chronicles import chronos, chronicles
import import
../spec/datatypes/[phase0, deneb], ../spec/datatypes/[phase0, deneb],
../spec/[forks, network], ../spec/[forks, network, eip7594_helpers],
../networking/eth2_network, ../networking/eth2_network,
../consensus_object_pools/block_quarantine, ../consensus_object_pools/block_quarantine,
../consensus_object_pools/blob_quarantine, ../consensus_object_pools/blob_quarantine,
../consensus_object_pools/data_column_quarantine,
"."/sync_protocol, "."/sync_manager, "."/sync_protocol, "."/sync_manager,
../gossip_processing/block_processor ../gossip_processing/block_processor
@ -49,6 +50,10 @@ type
BlobLoaderFn* = proc( BlobLoaderFn* = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].} blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].}
DataColumnLoaderFn* = proc(
columnId: DataColumnIdentifier
): Opt[ref DataColumnSidecar] {.gcsafe, raises: [].}
InhibitFn* = proc: bool {.gcsafe, raises: [].} InhibitFn* = proc: bool {.gcsafe, raises: [].}
RequestManager* = object RequestManager* = object
@ -57,6 +62,7 @@ type
inhibit: InhibitFn inhibit: InhibitFn
quarantine: ref Quarantine quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine blobQuarantine: ref BlobQuarantine
dataColumnQuarantine: ref DataColumnQuarantine
blockVerifier: BlockVerifierFn blockVerifier: BlockVerifierFn
blockLoader: BlockLoaderFn blockLoader: BlockLoaderFn
blobLoader: BlobLoaderFn blobLoader: BlobLoaderFn
@ -119,6 +125,23 @@ proc checkResponse(idList: seq[BlobIdentifier],
return false return false
true true
proc checkResponse(colIdList: seq[DataColumnIdentifier],
columns: openArray[ref DataColumnSidecar]): bool =
if len(columns) > len(colIdList):
return false
for column in columns:
let block_root = hash_tree_root(column.signed_block_header.message)
var found = false
for id in colIdList:
if id.block_root == block_root and id.index == column.index:
found = true
break
if not found:
return false
column[].verify_data_column_sidecar_inclusion_proof().isOkOr:
return false
true
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} = proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
var peer: Peer var peer: Peer
try: try:
@ -232,6 +255,46 @@ proc fetchBlobsFromNetwork(self: RequestManager,
if not(isNil(peer)): if not(isNil(peer)):
self.network.peerPool.release(peer) self.network.peerPool.release(peer)
proc fetchDataColumnsFromNetwork(rman: RequestManager,
colIdList: seq[DataColumnIdentifier])
{.async: (raises: [CancelledError]).} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
debug "Requesting data columns by root", peer = peer, columns = shortLog(colIdList),
peer_score = peer.getScore()
let columns = await dataColumnSidecarsByRoot(peer, DataColumnIdentifierList colIdList)
if columns.isOk:
let ucolumns = columns.get()
if not checkResponse(colIdList, ucolumns.asSeq()):
debug "Mismatched response to data columns by root",
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns)
peer.updateScore(PeerScoreBadResponse)
return
for col in ucolumns:
rman.dataColumnQuarantine[].put(col)
var curRoot: Eth2Digest
for col in ucolumns:
let block_root = hash_tree_root(col.signed_block_header.message)
if block_root != curRoot:
curRoot = block_root
if (let o = rman.quarantine[].popColumnless(curRoot); o.isSome):
let col = o.unsafeGet()
discard await rman.blockVerifier(col, false)
else:
debug "Data Columns by root request failed",
peer = peer, columns = shortLog(colIdList), err = columns.error()
peer.updateScore(PeerScoreNoValues)
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)
proc requestManagerBlockLoop( proc requestManagerBlockLoop(
rman: RequestManager) {.async: (raises: [CancelledError]).} = rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true: while true: