remove Bellatrix EL syncing support from light client (#6352)

Bellatrix light client data does not contain the EL block hash, so we
had to follow blocks gossip to learn the EL `block_hash` of such blocks.
Now that Bellatrix is obsolete, we can simplify EL syncing logic under
light client scenarios. Bellatrix light client data can still be used
to advance the light client sync itself, but will no longer result in
`engine_forkchoiceUpdated` calls until the sync reaches Capella. This
also frees up some memory as we no longer have to retain blocks.
This commit is contained in:
Etan Kissling 2024-06-14 03:23:17 +02:00 committed by GitHub
parent c5326619a4
commit 9fc870777c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 100 additions and 152 deletions

View File

@ -67,6 +67,8 @@ type
config*: BeaconNodeConf config*: BeaconNodeConf
attachedValidators*: ref ValidatorPool attachedValidators*: ref ValidatorPool
optimisticProcessor*: OptimisticProcessor optimisticProcessor*: OptimisticProcessor
optimisticFcuFut*: Future[(PayloadExecutionStatus, Opt[BlockHash])]
.Raising([CancelledError])
lightClient*: LightClient lightClient*: LightClient
dag*: ChainDAGRef dag*: ChainDAGRef
quarantine*: ref Quarantine quarantine*: ref Quarantine

View File

@ -38,61 +38,17 @@ proc initLightClient*(
# for broadcasting light client data as a server. # for broadcasting light client data as a server.
let let
optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): optimisticHandler = proc(
Future[void] {.async: (raises: [CancelledError]).} = signedBlock: ForkedSignedBeaconBlock
debug "New LC optimistic block", ): Future[void] {.async: (raises: [CancelledError]).} =
opt = signedBlock.toBlockId(),
dag = node.dag.head.bid,
wallSlot = node.currentSlot
withBlck(signedBlock): withBlck(signedBlock):
when consensusFork >= ConsensusFork.Bellatrix: when consensusFork >= ConsensusFork.Bellatrix:
if forkyBlck.message.is_execution_block: if forkyBlck.message.is_execution_block:
template blckPayload(): auto = template payload(): auto = forkyBlck.message.body.execution_payload
forkyBlck.message.body.execution_payload if not payload.block_hash.isZero:
if not blckPayload.block_hash.isZero:
# engine_newPayloadV1
discard await node.elManager.newExecutionPayload( discard await node.elManager.newExecutionPayload(
forkyBlck.message) forkyBlck.message)
# Retain optimistic head for other `forkchoiceUpdated` callers.
# May temporarily block `forkchoiceUpdatedV1` calls, e.g., Geth:
# - Refuses `newPayload`: "Ignoring payload while snap syncing"
# - Refuses `fcU`: "Forkchoice requested unknown head"
# Once DAG sync catches up or as new optimistic heads are fetched
# the situation recovers
node.consensusManager[].setOptimisticHead(
forkyBlck.toBlockId(), blckPayload.block_hash)
# engine_forkchoiceUpdatedV1 or engine_forkchoiceUpdatedV2,
# depending on pre or post-Shapella
let beaconHead = node.attestationPool[].getBeaconHead(nil)
template callForkchoiceUpdated(attributes: untyped) =
discard await node.elManager.forkchoiceUpdated(
headBlockHash = blckPayload.block_hash,
safeBlockHash = beaconHead.safeExecutionBlockHash,
finalizedBlockHash = beaconHead.finalizedExecutionBlockHash,
payloadAttributes = Opt.none attributes)
case node.dag.cfg.consensusForkAtEpoch(
forkyBlck.message.slot.epoch)
of ConsensusFork.Deneb, ConsensusFork.Electra:
callForkchoiceUpdated(PayloadAttributesV3)
of ConsensusFork.Capella:
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#specification-1
# Consensus layer client MUST call this method instead of
# `engine_forkchoiceUpdatedV1` under any of the following
# conditions:
# `headBlockHash` references a block which `timestamp` is
# greater or equal to the Shanghai timestamp
callForkchoiceUpdated(PayloadAttributesV2)
of ConsensusFork.Bellatrix:
callForkchoiceUpdated(PayloadAttributesV1)
of ConsensusFork.Phase0, ConsensusFork.Altair:
discard
else: discard else: discard
optimisticProcessor = initOptimisticProcessor( optimisticProcessor = initOptimisticProcessor(
getBeaconTime, optimisticHandler) getBeaconTime, optimisticHandler)
@ -104,9 +60,46 @@ proc initLightClient*(
proc onOptimisticHeader( proc onOptimisticHeader(
lightClient: LightClient, lightClient: LightClient,
optimisticHeader: ForkedLightClientHeader) = optimisticHeader: ForkedLightClientHeader) =
if node.optimisticFcuFut != nil:
return
withForkyHeader(optimisticHeader): withForkyHeader(optimisticHeader):
when lcDataFork > LightClientDataFork.None: when lcDataFork > LightClientDataFork.None:
optimisticProcessor.setOptimisticHeader(forkyHeader.beacon) let bid = forkyHeader.beacon.toBlockId()
logScope:
opt = bid
dag = node.dag.head.bid
wallSlot = node.currentSlot
when lcDataFork >= LightClientDataFork.Capella:
let
consensusFork = node.dag.cfg.consensusForkAtEpoch(bid.slot.epoch)
blockHash = forkyHeader.execution.block_hash
# Retain optimistic head for other `forkchoiceUpdated` callers.
# May temporarily block `forkchoiceUpdated` calls, e.g., Geth:
# - Refuses `newPayload`: "Ignoring payload while snap syncing"
# - Refuses `fcU`: "Forkchoice requested unknown head"
# Once DAG sync catches up or as new optimistic heads are fetched
# the situation recovers
debug "New LC optimistic header"
node.consensusManager[].setOptimisticHead(bid, blockHash)
if not node.consensusManager[]
.shouldSyncOptimistically(node.currentSlot):
return
# engine_forkchoiceUpdated
let beaconHead = node.attestationPool[].getBeaconHead(nil)
withConsensusFork(consensusFork):
when lcDataForkAtConsensusFork(consensusFork) == lcDataFork:
node.optimisticFcuFut = node.elManager.forkchoiceUpdated(
headBlockHash = blockHash,
safeBlockHash = beaconHead.safeExecutionBlockHash,
finalizedBlockHash = beaconHead.finalizedExecutionBlockHash,
payloadAttributes = Opt.none consensusFork.PayloadAttributes)
node.optimisticFcuFut.addCallback do (future: pointer):
node.optimisticFcuFut = nil
else:
# The execution block hash is only available from Capella onward
info "Ignoring new LC optimistic header until Capella"
lightClient.onOptimisticHeader = onOptimisticHeader lightClient.onOptimisticHeader = onOptimisticHeader
lightClient.trustedBlockRoot = config.trustedBlockRoot lightClient.trustedBlockRoot = config.trustedBlockRoot

View File

@ -20,29 +20,19 @@ export gossip_validation
logScope: logScope:
topics = "gossip_opt" topics = "gossip_opt"
const
# Maximum `blocks` to cache (not validated; deleted on new optimistic header)
maxBlocks = 16 # <= `GOSSIP_MAX_SIZE_BELLATRIX` (10 MB) each
# Minimum interval at which spam is logged
minLogInterval = chronos.seconds(5)
type type
MsgTrustedBlockProcessor* = OptimisticBlockVerifier* = proc(
proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): Future[void] {. signedBlock: ForkedSignedBeaconBlock
async: (raises: [CancelledError]).} ): Future[void] {.async: (raises: [CancelledError]).}
OptimisticProcessor* = ref object OptimisticProcessor* = ref object
getBeaconTime: GetBeaconTimeFn getBeaconTime: GetBeaconTimeFn
optimisticVerifier: MsgTrustedBlockProcessor optimisticVerifier: OptimisticBlockVerifier
blocks: Table[Eth2Digest, ref ForkedSignedBeaconBlock]
latestOptimisticSlot: Slot
processFut: Future[void].Raising([CancelledError]) processFut: Future[void].Raising([CancelledError])
logMoment: Moment
proc initOptimisticProcessor*( proc initOptimisticProcessor*(
getBeaconTime: GetBeaconTimeFn, getBeaconTime: GetBeaconTimeFn,
optimisticVerifier: MsgTrustedBlockProcessor): OptimisticProcessor = optimisticVerifier: OptimisticBlockVerifier): OptimisticProcessor =
OptimisticProcessor( OptimisticProcessor(
getBeaconTime: getBeaconTime, getBeaconTime: getBeaconTime,
optimisticVerifier: optimisticVerifier) optimisticVerifier: optimisticVerifier)
@ -56,9 +46,6 @@ proc validateBeaconBlock(
(wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero): (wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero):
return errIgnore("BeaconBlock: slot too high") return errIgnore("BeaconBlock: slot too high")
if signed_beacon_block.message.slot <= self.latestOptimisticSlot:
return errIgnore("BeaconBlock: no significant progress")
if not signed_beacon_block.message.is_execution_block(): if not signed_beacon_block.message.is_execution_block():
return errIgnore("BeaconBlock: no execution block") return errIgnore("BeaconBlock: no execution block")
@ -93,32 +80,16 @@ proc processSignedBeaconBlock*(
debug "Dropping optimistic block", error = v.error debug "Dropping optimistic block", error = v.error
return err(v.error) return err(v.error)
# Note that validation of blocks is delayed by ~4/3 slots because we have to # Only process one block at a time (backpressure)
# wait for the sync committee to sign the correct block and for that signature trace "Optimistic block validated"
# to be included in the next block. Therefore, we skip block validation here if self.processFut == nil:
# and cache the block in memory. Because there is no validation, we have to self.processFut = self.optimisticVerifier(
# mitigate against bogus blocks, mostly by bounding the caches. Assuming that ForkedSignedBeaconBlock.init(signedBlock))
# any denial-of-service attacks eventually subside, care is taken to recover.
template logWithSpamProtection(body: untyped): untyped =
block:
let now = Moment.now()
if self.logMoment + minLogInterval <= now:
logScope: minLogInterval
body
self.logMoment = now
# Store block for later verification proc handleFinishedProcess(future: pointer) =
if not self.blocks.hasKey(signedBlock.root): self.processFut = nil
# If `blocks` is full, we got spammed with multiple blocks for a slot,
# of the optimistic header advancements have been all withheld from us. self.processFut.addCallback(handleFinishedProcess)
# Whenever the optimistic header advances, old blocks are cleared,
# so we can simply ignore additional spam blocks until that happens.
if self.blocks.len >= maxBlocks:
logWithSpamProtection:
error "`blocks` full - ignoring", maxBlocks
else:
self.blocks[signedBlock.root] =
newClone(ForkedSignedBeaconBlock.init(signedBlock))
# Block validation is delegated to the sync committee and is done with delay. # Block validation is delegated to the sync committee and is done with delay.
# If we forward invalid spam blocks, we may be disconnected + IP banned, # If we forward invalid spam blocks, we may be disconnected + IP banned,
@ -127,40 +98,4 @@ proc processSignedBeaconBlock*(
# However, we are actively contributing to other topics, so some of the # However, we are actively contributing to other topics, so some of the
# negative peer score may be offset through those different topics. # negative peer score may be offset through those different topics.
# The practical impact depends on the actually deployed scoring heuristics. # The practical impact depends on the actually deployed scoring heuristics.
trace "Optimistic block cached"
return errIgnore("Validation delegated to sync committee") return errIgnore("Validation delegated to sync committee")
proc setOptimisticHeader*(
self: OptimisticProcessor, optimisticHeader: BeaconBlockHeader) =
# If irrelevant, skip processing
if optimisticHeader.slot <= self.latestOptimisticSlot:
return
self.latestOptimisticSlot = optimisticHeader.slot
# Delete blocks that are no longer of interest
let blockRoot = optimisticHeader.hash_tree_root()
var
rootsToDelete: seq[Eth2Digest]
signedBlock: ref ForkedMsgTrustedSignedBeaconBlock
for root, blck in self.blocks:
if root == blockRoot:
signedBlock = blck.asMsgTrusted()
if blck[].slot <= optimisticHeader.slot:
rootsToDelete.add root
for root in rootsToDelete:
self.blocks.del root
# Block must be known
if signedBlock == nil:
return
# If a block is already being processed, skip (backpressure)
if self.processFut != nil:
return
self.processFut = self.optimisticVerifier(signedBlock[])
proc handleFinishedProcess(future: pointer) =
self.processFut = nil
self.processFut.addCallback(handleFinishedProcess)

View File

@ -1619,7 +1619,7 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix
elif node.backfiller.inProgress: elif node.backfiller.inProgress:
"backfill: " & node.backfiller.syncStatus "backfill: " & node.backfiller.syncStatus
elif optimistic_head: elif optimisticHead:
"synced/opt" "synced/opt"
else: else:
"synced" "synced"
@ -1768,7 +1768,8 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addAsyncValidator( node.network.addAsyncValidator(
getAttestationTopic(digest, subnet_id), proc ( getAttestationTopic(digest, subnet_id), proc (
attestation: electra.Attestation attestation: electra.Attestation
): Future[ValidationResult] {.async: (raises: [CancelledError]).} = ): Future[ValidationResult] {.
async: (raises: [CancelledError]).} =
return toValidationResult( return toValidationResult(
await node.processor.processAttestation( await node.processor.processAttestation(
MsgSource.gossip, attestation, subnet_id, MsgSource.gossip, attestation, subnet_id,
@ -1780,7 +1781,8 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addAsyncValidator( node.network.addAsyncValidator(
getAttestationTopic(digest, subnet_id), proc ( getAttestationTopic(digest, subnet_id), proc (
attestation: phase0.Attestation attestation: phase0.Attestation
): Future[ValidationResult] {.async: (raises: [CancelledError]).} = ): Future[ValidationResult] {.
async: (raises: [CancelledError]).} =
return toValidationResult( return toValidationResult(
await node.processor.processAttestation( await node.processor.processAttestation(
MsgSource.gossip, attestation, subnet_id, MsgSource.gossip, attestation, subnet_id,

View File

@ -107,23 +107,15 @@ programMain:
else: else:
nil nil
optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): optimisticHandler = proc(
Future[void] {.async: (raises: [CancelledError]).} = signedBlock: ForkedSignedBeaconBlock
notice "New LC optimistic block", ): Future[void] {.async: (raises: [CancelledError]).} =
opt = signedBlock.toBlockId(),
wallSlot = getBeaconTime().slotOrZero
withBlck(signedBlock): withBlck(signedBlock):
when consensusFork >= ConsensusFork.Bellatrix: when consensusFork >= ConsensusFork.Bellatrix:
if forkyBlck.message.is_execution_block: if forkyBlck.message.is_execution_block:
template payload(): auto = forkyBlck.message.body.execution_payload template payload(): auto = forkyBlck.message.body.execution_payload
if elManager != nil and not payload.block_hash.isZero: if elManager != nil and not payload.block_hash.isZero:
discard await elManager.newExecutionPayload(forkyBlck.message) discard await elManager.newExecutionPayload(forkyBlck.message)
discard await elManager.forkchoiceUpdated(
headBlockHash = payload.block_hash,
safeBlockHash = payload.block_hash, # stub value
finalizedBlockHash = ZERO_HASH,
payloadAttributes = Opt.none(consensusFork.PayloadAttributes))
else: discard else: discard
optimisticProcessor = initOptimisticProcessor( optimisticProcessor = initOptimisticProcessor(
getBeaconTime, optimisticHandler) getBeaconTime, optimisticHandler)
@ -153,26 +145,54 @@ programMain:
waitFor network.startListening() waitFor network.startListening()
waitFor network.start() waitFor network.start()
func isSynced(optimisticSlot: Slot, wallSlot: Slot): bool =
# Check whether light client has synced sufficiently close to wall slot
const maxAge = 2 * SLOTS_PER_EPOCH
optimisticSlot >= max(wallSlot, maxAge.Slot) - maxAge
proc onFinalizedHeader( proc onFinalizedHeader(
lightClient: LightClient, finalizedHeader: ForkedLightClientHeader) = lightClient: LightClient, finalizedHeader: ForkedLightClientHeader) =
withForkyHeader(finalizedHeader): withForkyHeader(finalizedHeader):
when lcDataFork > LightClientDataFork.None: when lcDataFork > LightClientDataFork.None:
info "New LC finalized header", info "New LC finalized header",
finalized_header = shortLog(forkyHeader) finalized_header = shortLog(forkyHeader)
let let
period = forkyHeader.beacon.slot.sync_committee_period period = forkyHeader.beacon.slot.sync_committee_period
syncCommittee = lightClient.finalizedSyncCommittee.expect("Init OK") syncCommittee = lightClient.finalizedSyncCommittee.expect("Init OK")
db.putSyncCommittee(period, syncCommittee) db.putSyncCommittee(period, syncCommittee)
db.putLatestFinalizedHeader(finalizedHeader) db.putLatestFinalizedHeader(finalizedHeader)
var optimisticFcuFut: Future[(PayloadExecutionStatus, Opt[BlockHash])]
.Raising([CancelledError])
proc onOptimisticHeader( proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: ForkedLightClientHeader) = lightClient: LightClient, optimisticHeader: ForkedLightClientHeader) =
if optimisticFcuFut != nil:
return
withForkyHeader(optimisticHeader): withForkyHeader(optimisticHeader):
when lcDataFork > LightClientDataFork.None: when lcDataFork > LightClientDataFork.None:
info "New LC optimistic header", logScope: optimistic_header = shortLog(forkyHeader)
optimistic_header = shortLog(forkyHeader) when lcDataFork >= LightClientDataFork.Capella:
optimisticProcessor.setOptimisticHeader(forkyHeader.beacon) let
bid = forkyHeader.beacon.toBlockId()
consensusFork = cfg.consensusForkAtEpoch(bid.slot.epoch)
blockHash = forkyHeader.execution.block_hash
info "New LC optimistic header"
if elManager == nil or blockHash.isZero or
not isSynced(bid.slot, getBeaconTime().slotOrZero()):
return
withConsensusFork(consensusFork):
when lcDataForkAtConsensusFork(consensusFork) == lcDataFork:
optimisticFcuFut = elManager.forkchoiceUpdated(
headBlockHash = blockHash,
safeBlockHash = blockHash, # stub value
finalizedBlockHash = ZERO_HASH,
payloadAttributes = Opt.none(consensusFork.PayloadAttributes))
optimisticFcuFut.addCallback do (future: pointer):
optimisticFcuFut = nil
else:
info "Ignoring new LC optimistic header until Capella"
lightClient.onFinalizedHeader = onFinalizedHeader lightClient.onFinalizedHeader = onFinalizedHeader
lightClient.onOptimisticHeader = onOptimisticHeader lightClient.onOptimisticHeader = onOptimisticHeader
@ -204,9 +224,7 @@ programMain:
let optimisticHeader = lightClient.optimisticHeader let optimisticHeader = lightClient.optimisticHeader
withForkyHeader(optimisticHeader): withForkyHeader(optimisticHeader):
when lcDataFork > LightClientDataFork.None: when lcDataFork > LightClientDataFork.None:
# Check whether light client has synced sufficiently close to wall slot isSynced(forkyHeader.beacon.slot, wallSlot)
const maxAge = 2 * SLOTS_PER_EPOCH
forkyHeader.beacon.slot >= max(wallSlot, maxAge.Slot) - maxAge
else: else:
false false

View File

@ -178,13 +178,11 @@ INF 2022-11-21 18:03:27.984+01:00 New LC optimistic header opt
WRN 2022-11-21 18:03:31.419+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=7 wanted_peers=160 WRN 2022-11-21 18:03:31.419+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=7 wanted_peers=160
INF 2022-11-21 18:03:36.001+01:00 Slot start slot=1109718 epoch=34678 sync=synced peers=7 head=c5464508:1109716 finalized=c092a1d1:1109216 delay=1ms98us INF 2022-11-21 18:03:36.001+01:00 Slot start slot=1109718 epoch=34678 sync=synced peers=7 head=c5464508:1109716 finalized=c092a1d1:1109216 delay=1ms98us
INF 2022-11-21 18:03:40.012+01:00 New LC optimistic header optimistic_header="(beacon: (slot: 1109717, proposer_index: 835, parent_root: \"c5464508\", state_root: \"13f823f8\"))" INF 2022-11-21 18:03:40.012+01:00 New LC optimistic header optimistic_header="(beacon: (slot: 1109717, proposer_index: 835, parent_root: \"c5464508\", state_root: \"13f823f8\"))"
NTC 2022-11-21 18:03:40.012+01:00 New LC optimistic block opt=99ab28aa:1109717 wallSlot=1109718
WRN 2022-11-21 18:03:40.422+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=1 new_peers=@[] current_peers=7 wanted_peers=160 WRN 2022-11-21 18:03:40.422+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=1 new_peers=@[] current_peers=7 wanted_peers=160
INF 2022-11-21 18:03:48.001+01:00 Slot start slot=1109719 epoch=34678 sync=synced peers=7 head=99ab28aa:1109717 finalized=c092a1d1:1109216 delay=1ms53us INF 2022-11-21 18:03:48.001+01:00 Slot start slot=1109719 epoch=34678 sync=synced peers=7 head=99ab28aa:1109717 finalized=c092a1d1:1109216 delay=1ms53us
WRN 2022-11-21 18:03:50.205+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=7 wanted_peers=160 WRN 2022-11-21 18:03:50.205+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=7 wanted_peers=160
INF 2022-11-21 18:04:00.001+01:00 Slot start slot=1109720 epoch=34678 sync=synced peers=7 head=99ab28aa:1109717 finalized=c092a1d1:1109216 delay=1ms145us INF 2022-11-21 18:04:00.001+01:00 Slot start slot=1109720 epoch=34678 sync=synced peers=7 head=99ab28aa:1109717 finalized=c092a1d1:1109216 delay=1ms145us
INF 2022-11-21 18:04:03.982+01:00 New LC optimistic header optimistic_header="(beacon: (slot: 1109718, proposer_index: 1202, parent_root: \"99ab28aa\", state_root: \"7f7f88d2\"))" INF 2022-11-21 18:04:03.982+01:00 New LC optimistic header optimistic_header="(beacon: (slot: 1109718, proposer_index: 1202, parent_root: \"99ab28aa\", state_root: \"7f7f88d2\"))"
NTC 2022-11-21 18:04:03.982+01:00 New LC optimistic block opt=ab007266:1109718 wallSlot=1109720
``` ```
!!! note !!! note