Add blob handling to message router (#5106)
* Add blob handling to message router * address review feedback * Fix typos
This commit is contained in:
parent
41b93ae57a
commit
1234900065
|
@ -2673,6 +2673,14 @@ proc broadcastBeaconBlock*(
|
||||||
let topic = getBeaconBlocksTopic(node.forkDigests.deneb)
|
let topic = getBeaconBlocksTopic(node.forkDigests.deneb)
|
||||||
node.broadcast(topic, blck)
|
node.broadcast(topic, blck)
|
||||||
|
|
||||||
|
proc broadcastBlobSidecar*(
|
||||||
|
node: Eth2Node, subnet_id: SubnetId, blob: deneb.SignedBlobSidecar):
|
||||||
|
Future[SendResult] =
|
||||||
|
let
|
||||||
|
forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch)
|
||||||
|
topic = getBlobSidecarTopic(forkPrefix, subnet_id)
|
||||||
|
node.broadcast(topic, blob)
|
||||||
|
|
||||||
proc broadcastSyncCommitteeMessage*(
|
proc broadcastSyncCommitteeMessage*(
|
||||||
node: Eth2Node, msg: SyncCommitteeMessage,
|
node: Eth2Node, msg: SyncCommitteeMessage,
|
||||||
subcommitteeIdx: SyncSubcommitteeIndex): Future[SendResult] =
|
subcommitteeIdx: SyncSubcommitteeIndex): Future[SendResult] =
|
||||||
|
|
|
@ -1558,12 +1558,12 @@ proc installMessageValidators(node: BeaconNode) =
|
||||||
|
|
||||||
when consensusFork >= ConsensusFork.Deneb:
|
when consensusFork >= ConsensusFork.Deneb:
|
||||||
# blob_sidecar_{index}
|
# blob_sidecar_{index}
|
||||||
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/p2p-interface.md#blob_sidecar_index
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.0/specs/deneb/p2p-interface.md#blob_sidecar_subnet_id
|
||||||
for i in 0 ..< MAX_BLOBS_PER_BLOCK:
|
for i in 0 ..< BLOB_SIDECAR_SUBNET_COUNT:
|
||||||
closureScope: # Needed for inner `proc`; don't lift it out of loop.
|
closureScope: # Needed for inner `proc`; don't lift it out of loop.
|
||||||
let idx = i
|
let idx = i
|
||||||
node.network.addValidator(
|
node.network.addValidator(
|
||||||
getBlobSidecarTopic(digest, idx), proc (
|
getBlobSidecarTopic(digest, SubnetId(idx)), proc (
|
||||||
signedBlobSidecar: SignedBlobSidecar
|
signedBlobSidecar: SignedBlobSidecar
|
||||||
): ValidationResult =
|
): ValidationResult =
|
||||||
toValidationResult(
|
toValidationResult(
|
||||||
|
|
|
@ -848,10 +848,32 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
||||||
doAssert strictVerification notin node.dag.updateFlags
|
doAssert strictVerification notin node.dag.updateFlags
|
||||||
return RestApiResponse.jsonError(Http400, InvalidBlockObjectError)
|
return RestApiResponse.jsonError(Http400, InvalidBlockObjectError)
|
||||||
|
|
||||||
withBlck(forked):
|
case restBlock.kind
|
||||||
|
of ConsensusFork.Phase0:
|
||||||
|
var blck = restBlock.phase0Data
|
||||||
blck.root = hash_tree_root(blck.message)
|
blck.root = hash_tree_root(blck.message)
|
||||||
# TODO: Fetch blobs from EE when blck is deneb.SignedBeaconBlock
|
await node.router.routeSignedBeaconBlock(blck,
|
||||||
await node.router.routeSignedBeaconBlock(blck)
|
Opt.none(SignedBlobSidecars))
|
||||||
|
of ConsensusFork.Altair:
|
||||||
|
var blck = restBlock.altairData
|
||||||
|
blck.root = hash_tree_root(blck.message)
|
||||||
|
await node.router.routeSignedBeaconBlock(blck,
|
||||||
|
Opt.none(SignedBlobSidecars))
|
||||||
|
of ConsensusFork.Bellatrix:
|
||||||
|
var blck = restBlock.bellatrixData
|
||||||
|
blck.root = hash_tree_root(blck.message)
|
||||||
|
await node.router.routeSignedBeaconBlock(blck,
|
||||||
|
Opt.none(SignedBlobSidecars))
|
||||||
|
of ConsensusFork.Capella:
|
||||||
|
var blck = restBlock.capellaData
|
||||||
|
blck.root = hash_tree_root(blck.message)
|
||||||
|
await node.router.routeSignedBeaconBlock(blck,
|
||||||
|
Opt.none(SignedBlobSidecars))
|
||||||
|
of ConsensusFork.Deneb:
|
||||||
|
var blck = restBlock.denebData.signed_block
|
||||||
|
blck.root = hash_tree_root(blck.message)
|
||||||
|
await node.router.routeSignedBeaconBlock(
|
||||||
|
blck, Opt.some(asSeq restBlock.denebData.signed_blob_sidecars))
|
||||||
|
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
return RestApiResponse.jsonError(
|
return RestApiResponse.jsonError(
|
||||||
|
@ -950,7 +972,8 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
||||||
|
|
||||||
let res = withBlck(forked):
|
let res = withBlck(forked):
|
||||||
blck.root = hash_tree_root(blck.message)
|
blck.root = hash_tree_root(blck.message)
|
||||||
await node.router.routeSignedBeaconBlock(blck)
|
await node.router.routeSignedBeaconBlock(blck,
|
||||||
|
Opt.none(SignedBlobSidecars))
|
||||||
|
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
return RestApiResponse.jsonError(
|
return RestApiResponse.jsonError(
|
||||||
|
|
|
@ -62,3 +62,6 @@ const
|
||||||
|
|
||||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.0/specs/phase0/fork-choice.md#configuration
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.0/specs/phase0/fork-choice.md#configuration
|
||||||
PROPOSER_SCORE_BOOST*: uint64 = 40
|
PROPOSER_SCORE_BOOST*: uint64 = 40
|
||||||
|
|
||||||
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-alpha.3/specs/deneb/p2p-interface.md#configuration
|
||||||
|
BLOB_SIDECAR_SUBNET_COUNT*: uint64 = 6
|
||||||
|
|
|
@ -107,8 +107,12 @@ func getSyncCommitteeContributionAndProofTopic*(forkDigest: ForkDigest): string
|
||||||
|
|
||||||
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/p2p-interface.md#blob_sidecar_index
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/p2p-interface.md#blob_sidecar_index
|
||||||
func getBlobSidecarTopic*(forkDigest: ForkDigest,
|
func getBlobSidecarTopic*(forkDigest: ForkDigest,
|
||||||
index: BlobIndex): string =
|
subnet_id: SubnetId): string =
|
||||||
eth2Prefix(forkDigest) & "blob_sidecar_" & $index & "/ssz_snappy"
|
eth2Prefix(forkDigest) & "blob_sidecar_" & $subnet_id & "/ssz_snappy"
|
||||||
|
|
||||||
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.0/specs/deneb/validator.md#sidecar
|
||||||
|
func compute_subnet_for_blob_sidecar*(blob_index: BlobIndex): SubnetId =
|
||||||
|
SubnetId(blob_index mod BLOB_SIDECAR_SUBNET_COUNT)
|
||||||
|
|
||||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.0/specs/altair/light-client/p2p-interface.md#light_client_finality_update
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.0/specs/altair/light-client/p2p-interface.md#light_client_finality_update
|
||||||
func getLightClientFinalityUpdateTopic*(forkDigest: ForkDigest): string =
|
func getLightClientFinalityUpdateTopic*(forkDigest: ForkDigest): string =
|
||||||
|
|
|
@ -18,6 +18,7 @@ import
|
||||||
../networking/eth2_network,
|
../networking/eth2_network,
|
||||||
./activity_metrics,
|
./activity_metrics,
|
||||||
../spec/datatypes/deneb
|
../spec/datatypes/deneb
|
||||||
|
from ../spec/state_transition_block import validate_blobs
|
||||||
|
|
||||||
export eth2_processor, eth2_network
|
export eth2_processor, eth2_network
|
||||||
|
|
||||||
|
@ -78,9 +79,14 @@ template blockProcessor(router: MessageRouter): ref BlockProcessor =
|
||||||
template getCurrentBeaconTime(router: MessageRouter): BeaconTime =
|
template getCurrentBeaconTime(router: MessageRouter): BeaconTime =
|
||||||
router.processor[].getCurrentBeaconTime()
|
router.processor[].getCurrentBeaconTime()
|
||||||
|
|
||||||
|
type SignedBlobSidecars* = seq[SignedBlobSidecar]
|
||||||
|
func shortLog*(v: SignedBlobSidecars): auto =
|
||||||
|
"[" & v.mapIt(shortLog(it)).join(", ") & "]"
|
||||||
|
|
||||||
type RouteBlockResult = Result[Opt[BlockRef], cstring]
|
type RouteBlockResult = Result[Opt[BlockRef], cstring]
|
||||||
proc routeSignedBeaconBlock*(
|
proc routeSignedBeaconBlock*(
|
||||||
router: ref MessageRouter, blck: ForkySignedBeaconBlock):
|
router: ref MessageRouter, blck: ForkySignedBeaconBlock,
|
||||||
|
blobsOpt: Opt[SignedBlobSidecars]):
|
||||||
Future[RouteBlockResult] {.async.} =
|
Future[RouteBlockResult] {.async.} =
|
||||||
## Validate and broadcast beacon block, then add it to the block database
|
## Validate and broadcast beacon block, then add it to the block database
|
||||||
## Returns the new Head when block is added successfully to dag, none when
|
## Returns the new Head when block is added successfully to dag, none when
|
||||||
|
@ -93,21 +99,35 @@ proc routeSignedBeaconBlock*(
|
||||||
let res = validateBeaconBlock(
|
let res = validateBeaconBlock(
|
||||||
router[].dag, router[].quarantine, blck, wallTime, {})
|
router[].dag, router[].quarantine, blck, wallTime, {})
|
||||||
|
|
||||||
# TODO blob gossip validation
|
|
||||||
debugRaiseAssert $denebImplementationMissing
|
|
||||||
|
|
||||||
if not res.isGoodForSending():
|
if not res.isGoodForSending():
|
||||||
warn "Block failed validation",
|
warn "Block failed validation",
|
||||||
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
|
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
|
||||||
signature = shortLog(blck.signature), error = res.error()
|
signature = shortLog(blck.signature), error = res.error()
|
||||||
return err(res.error()[1])
|
return err(res.error()[1])
|
||||||
|
|
||||||
|
when typeof(blck).toFork() >= ConsensusFork.Deneb:
|
||||||
|
if blobsOpt.isSome:
|
||||||
|
let blobs = blobsOpt.get()
|
||||||
|
let kzgCommits = blck.message.body.blob_kzg_commitments.asSeq
|
||||||
|
if blobs.len > 0 or kzgCommits.len > 0:
|
||||||
|
let res = validate_blobs(kzgCommits, blobs.mapIt(it.message.blob),
|
||||||
|
blobs.mapIt(it.message.kzg_proof))
|
||||||
|
if res.isErr():
|
||||||
|
warn "blobs failed validation",
|
||||||
|
blockRoot = shortLog(blck.root),
|
||||||
|
blobs = shortLog(blobs),
|
||||||
|
blck = shortLog(blck.message),
|
||||||
|
signature = shortLog(blck.signature),
|
||||||
|
msg = res.error()
|
||||||
|
return err(res.error())
|
||||||
|
|
||||||
let
|
let
|
||||||
sendTime = router[].getCurrentBeaconTime()
|
sendTime = router[].getCurrentBeaconTime()
|
||||||
delay = sendTime - blck.message.slot.block_deadline()
|
delay = sendTime - blck.message.slot.block_deadline()
|
||||||
# The block passed basic gossip validation - we can "safely" broadcast it
|
# The block (and blobs, if present) passed basic gossip validation
|
||||||
# now. In fact, per the spec, we should broadcast it even if it later fails
|
# - we can "safely" broadcast it now. In fact, per the spec, we
|
||||||
# to apply to our state.
|
# should broadcast it even if it later fails to apply to our
|
||||||
|
# state.
|
||||||
|
|
||||||
let res = await router[].network.broadcastBeaconBlock(blck)
|
let res = await router[].network.broadcastBeaconBlock(blck)
|
||||||
|
|
||||||
|
@ -123,8 +143,27 @@ proc routeSignedBeaconBlock*(
|
||||||
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
|
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
|
||||||
signature = shortLog(blck.signature), error = res.error()
|
signature = shortLog(blck.signature), error = res.error()
|
||||||
|
|
||||||
|
var blobs = Opt.none(seq[ref BlobSidecar])
|
||||||
|
if blobsOpt.isSome():
|
||||||
|
let signedBlobs = blobsOpt.get()
|
||||||
|
var workers = newSeq[Future[SendResult]](signedBlobs.len)
|
||||||
|
for i in 0..<signedBlobs.len:
|
||||||
|
let subnet_id = compute_subnet_for_blob_sidecar(BlobIndex(i))
|
||||||
|
workers[i] = router[].network.broadcastBlobsidecar(subnet_id, signedBlobs[i])
|
||||||
|
let allres = await allFinished(workers)
|
||||||
|
for i in 0..<allres.len:
|
||||||
|
let res = allres[i]
|
||||||
|
doAssert res.finished()
|
||||||
|
if res.failed():
|
||||||
|
notice "Blob not sent",
|
||||||
|
blob = shortLog(signedBlobs[i])
|
||||||
|
else:
|
||||||
|
notice "Blob sent", blob = shortLog(signedBlobs[i]), error = res.error[]
|
||||||
|
blobs = Opt.some(blobsOpt.get().mapIt(newClone(it.message)))
|
||||||
|
|
||||||
|
|
||||||
let newBlockRef = await router[].blockProcessor.storeBlock(
|
let newBlockRef = await router[].blockProcessor.storeBlock(
|
||||||
MsgSource.api, sendTime, blck, Opt.none(BlobSidecars))
|
MsgSource.api, sendTime, blck, blobs)
|
||||||
|
|
||||||
# The boolean we return tells the caller whether the block was integrated
|
# The boolean we return tells the caller whether the block was integrated
|
||||||
# into the chain
|
# into the chain
|
||||||
|
|
|
@ -93,7 +93,8 @@ proc unblindAndRouteBlockMEV*(
|
||||||
blck = shortLog(signedBlock)
|
blck = shortLog(signedBlock)
|
||||||
|
|
||||||
let newBlockRef =
|
let newBlockRef =
|
||||||
(await node.router.routeSignedBeaconBlock(signedBlock)).valueOr:
|
(await node.router.routeSignedBeaconBlock(
|
||||||
|
signedBlock, Opt.none(SignedBlobSidecars))).valueOr:
|
||||||
# submitBlindedBlock has run, so don't allow fallback to run
|
# submitBlindedBlock has run, so don't allow fallback to run
|
||||||
return err("routeSignedBeaconBlock error") # Errors logged in router
|
return err("routeSignedBeaconBlock error") # Errors logged in router
|
||||||
|
|
||||||
|
@ -168,7 +169,8 @@ proc unblindAndRouteBlockMEV*(
|
||||||
blck = shortLog(signedBlock)
|
blck = shortLog(signedBlock)
|
||||||
|
|
||||||
let newBlockRef =
|
let newBlockRef =
|
||||||
(await node.router.routeSignedBeaconBlock(signedBlock)).valueOr:
|
(await node.router.routeSignedBeaconBlock(
|
||||||
|
signedBlock, Opt.none(SignedBlobSidecars))).valueOr:
|
||||||
# submitBlindedBlock has run, so don't allow fallback to run
|
# submitBlindedBlock has run, so don't allow fallback to run
|
||||||
return err("routeSignedBeaconBlock error") # Errors logged in router
|
return err("routeSignedBeaconBlock error") # Errors logged in router
|
||||||
|
|
||||||
|
|
|
@ -979,7 +979,7 @@ proc proposeBlockAux(
|
||||||
else:
|
else:
|
||||||
static: doAssert "Unknown SignedBeaconBlock type"
|
static: doAssert "Unknown SignedBeaconBlock type"
|
||||||
newBlockRef =
|
newBlockRef =
|
||||||
(await node.router.routeSignedBeaconBlock(signedBlock)).valueOr:
|
(await node.router.routeSignedBeaconBlock(signedBlock, Opt.none(SignedBlobSidecars))).valueOr:
|
||||||
return head # Errors logged in router
|
return head # Errors logged in router
|
||||||
|
|
||||||
if newBlockRef.isNone():
|
if newBlockRef.isNone():
|
||||||
|
|
|
@ -78,7 +78,7 @@ suite "Honest validator":
|
||||||
"/eth2/00000000/sync_committee_1/ssz_snappy"
|
"/eth2/00000000/sync_committee_1/ssz_snappy"
|
||||||
getSyncCommitteeTopic(forkDigest, SyncSubcommitteeIndex(3)) ==
|
getSyncCommitteeTopic(forkDigest, SyncSubcommitteeIndex(3)) ==
|
||||||
"/eth2/00000000/sync_committee_3/ssz_snappy"
|
"/eth2/00000000/sync_committee_3/ssz_snappy"
|
||||||
getBlobSidecarTopic(forkDigest, BlobIndex(1)) ==
|
getBlobSidecarTopic(forkDigest, SubnetId(1)) ==
|
||||||
"/eth2/00000000/blob_sidecar_1/ssz_snappy"
|
"/eth2/00000000/blob_sidecar_1/ssz_snappy"
|
||||||
|
|
||||||
test "is_aggregator":
|
test "is_aggregator":
|
||||||
|
|
Loading…
Reference in New Issue