implement blob_sidecar Beacon API streaming (#5728)

This commit is contained in:
tersec 2024-01-13 09:52:13 +00:00 committed by GitHub
parent a45609c4a3
commit 69af8f943e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 58 additions and 7 deletions

View File

@ -47,6 +47,7 @@ type
blsToExecQueue*: AsyncEventQueue[SignedBLSToExecutionChange]
propSlashQueue*: AsyncEventQueue[ProposerSlashing]
attSlashQueue*: AsyncEventQueue[AttesterSlashing]
blobSidecarQueue*: AsyncEventQueue[BlobSidecarInfoObject]
finalQueue*: AsyncEventQueue[FinalizationInfoObject]
reorgQueue*: AsyncEventQueue[ReorgInfoObject]
contribQueue*: AsyncEventQueue[SignedContributionAndProof]

View File

@ -7,7 +7,10 @@
{.push raises: [].}
import ../spec/helpers
import
std/tables,
../spec/helpers
from std/sequtils import mapIt
from std/strutils import join
@ -18,10 +21,14 @@ type
BlobQuarantine* = object
blobs*:
OrderedTable[(Eth2Digest, BlobIndex, KzgCommitment), ref BlobSidecar]
onBlobSidecarCallback*: OnBlobSidecarCallback
BlobFetchRecord* = object
block_root*: Eth2Digest
indices*: seq[BlobIndex]
OnBlobSidecarCallback = proc(data: BlobSidecar) {.gcsafe, raises: [].}
func shortLog*(x: seq[BlobIndex]): string =
"<" & x.mapIt($it).join(", ") & ">"
@ -86,3 +93,7 @@ func blobFetchRecord*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock)
(blck.root, idx, blck.message.body.blob_kzg_commitments[i])):
indices.add(idx)
BlobFetchRecord(block_root: blck.root, indices: indices)
func init*(
T: type BlobQuarantine, onBlobSidecarCallback: OnBlobSidecarCallback): T =
T(onBlobSidecarCallback: onBlobSidecarCallback)

View File

@ -440,8 +440,11 @@ proc validateBlobSidecar*(
if not ok:
return dag.checkedReject("BlobSidecar: blob invalid")
ok()
# Send notification about new blob sidecar via callback
if not(isNil(blobQuarantine.onBlobSidecarCallback)):
blobQuarantine.onBlobSidecarCallback(blob_sidecar)
ok()
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/bellatrix/p2p-interface.md#beacon_block

View File

@ -8,7 +8,7 @@
{.push raises: [].}
import
std/[os, random, sequtils, terminal, times],
std/[os, random, terminal, times],
chronos, chronicles,
metrics, metrics/chronos_httpserver,
stew/[byteutils, io2],
@ -273,6 +273,8 @@ proc checkWeakSubjectivityCheckpoint(
headStateSlot = getStateField(dag.headState, slot)
quit 1
from ./spec/state_transition_block import kzg_commitment_to_versioned_hash
proc initFullNode(
node: BeaconNode,
rng: ref HmacDrbgContext,
@ -293,6 +295,14 @@ proc initFullNode(
node.eventBus.propSlashQueue.emit(data)
proc onAttesterSlashingAdded(data: AttesterSlashing) =
node.eventBus.attSlashQueue.emit(data)
proc onBlobSidecarAdded(data: BlobSidecar) =
node.eventBus.blobSidecarQueue.emit(
BlobSidecarInfoObject(
block_root: hash_tree_root(data.signed_block_header.message),
index: data.index,
slot: data.signed_block_header.message.slot,
kzg_commitment: data.kzg_commitment,
versioned_hash: data.kzg_commitment.kzg_commitment_to_versioned_hash))
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
let optimistic =
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
@ -373,7 +383,7 @@ proc initFullNode(
validatorChangePool = newClone(ValidatorChangePool.init(
dag, attestationPool, onVoluntaryExitAdded, onBLSToExecutionChangeAdded,
onProposerSlashingAdded, onAttesterSlashingAdded))
blobQuarantine = newClone(BlobQuarantine())
blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.elManager,
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
@ -553,6 +563,7 @@ proc init*(T: type BeaconNode,
blsToExecQueue: newAsyncEventQueue[SignedBLSToExecutionChange](),
propSlashQueue: newAsyncEventQueue[ProposerSlashing](),
attSlashQueue: newAsyncEventQueue[AttesterSlashing](),
blobSidecarQueue: newAsyncEventQueue[BlobSidecarInfoObject](),
finalQueue: newAsyncEventQueue[FinalizationInfoObject](),
reorgQueue: newAsyncEventQueue[ReorgInfoObject](),
contribQueue: newAsyncEventQueue[SignedContributionAndProof](),
@ -869,6 +880,8 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
# finalization occurs every slot, to 4 slots vs scheduledSlot.
doAssert finalizedEpoch + 4 >= epoch
from std/sequtils import toSeq
func subnetLog(v: BitArray): string =
$toSeq(v.oneIndices())

View File

@ -145,6 +145,10 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
let handler = response.eventHandler(node.eventBus.attSlashQueue,
"attester_slashing")
res.add(handler)
if EventTopic.BlobSidecar in eventTopics:
let handler = response.eventHandler(node.eventBus.blobSidecarQueue,
"blob_sidecar")
res.add(handler)
if EventTopic.FinalizedCheckpoint in eventTopics:
let handler = response.eventHandler(node.eventBus.finalQueue,
"finalized_checkpoint")

View File

@ -60,6 +60,16 @@ type
kzg_commitment_inclusion_proof*:
array[KZG_COMMITMENT_INCLUSION_PROOF_DEPTH, Eth2Digest]
# https://github.com/ethereum/beacon-APIs/blob/4882aa0803b622b75bab286b285599d70b7a2429/apis/eventstream/index.yaml#L138-L142
# Spec object, not only internal, because it gets serialized out for the
# event stream Beacon API
BlobSidecarInfoObject* = object
block_root*: Eth2Digest
index*: BlobIndex
slot*: Slot
kzg_commitment*: KzgCommitment
versioned_hash*: VersionedHash
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blobidentifier
BlobIdentifier* = object
block_root*: Eth2Digest

View File

@ -51,6 +51,7 @@ RestJson.useDefaultSerializationFor(
BLSToExecutionChange,
BeaconBlockHeader,
BlobSidecar,
BlobSidecarInfoObject,
BlobsBundle,
Checkpoint,
ContributionAndProof,
@ -299,6 +300,7 @@ const
type
EncodeTypes* =
AttesterSlashing |
BlobSidecarInfoObject |
DeleteKeystoresBody |
EmptyBody |
ImportDistributedKeystoresBody |
@ -4145,6 +4147,8 @@ proc decodeString*(t: typedesc[EventTopic],
ok(EventTopic.ProposerSlashing)
of "attester_slashing":
ok(EventTopic.AttesterSlashing)
of "blob_sidecar":
ok(EventTopic.BlobSidecar)
of "finalized_checkpoint":
ok(EventTopic.FinalizedCheckpoint)
of "chain_reorg":
@ -4174,6 +4178,8 @@ proc encodeString*(value: set[EventTopic]): Result[string, cstring] =
res.add("proposer_slashing,")
if EventTopic.AttesterSlashing in value:
res.add("attester_slashing,")
if EventTopic.BlobSidecar in value:
res.add("blob_sidecar,")
if EventTopic.FinalizedCheckpoint in value:
res.add("finalized_checkpoint,")
if EventTopic.ChainReorg in value:

View File

@ -55,8 +55,9 @@ type
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/apis/eventstream/index.yaml
EventTopic* {.pure.} = enum
Head, Block, Attestation, VoluntaryExit, BLSToExecutionChange,
ProposerSlashing, AttesterSlashing, FinalizedCheckpoint, ChainReorg,
ContributionAndProof, LightClientFinalityUpdate, LightClientOptimisticUpdate
ProposerSlashing, AttesterSlashing, BlobSidecar, FinalizedCheckpoint,
ChainReorg, ContributionAndProof, LightClientFinalityUpdate,
LightClientOptimisticUpdate
EventTopics* = set[EventTopic]

View File

@ -6,7 +6,7 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/[parsecsv, streams],
std/parsecsv,
stew/[io2, byteutils], chronicles, confutils, snappy,
../beacon_chain/spec/datatypes/base,
./ncli_common
@ -200,6 +200,8 @@ proc advanceEpochs*(aggregator: var ValidatorDbAggregator, epoch: Epoch,
aggregator.epochsAggregated = 0
when isMainModule:
import std/streams
when defined(posix):
import system/ansi_c