integrate light client into beacon node (#3557)

Adds a `LightClient` instance to the beacon node as preparation to
accelerate syncing in the future (optimistic sync).

- `--light-client-enable` turns on the feature
- `--light-client-trusted-block-root` configures block to start from

If no block root is configured, light client tracks DAG `finalizedHead`.
This commit is contained in:
Etan Kissling 2022-06-07 19:01:11 +02:00 committed by GitHub
parent 3bd9622572
commit 72a46bd520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 458 additions and 260 deletions

View File

@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -14,7 +14,7 @@ import
chronos, json_rpc/servers/httpserver, presto,
# Local modules
"."/[beacon_clock, beacon_chain_db, conf],
"."/[beacon_clock, beacon_chain_db, conf, light_client],
./gossip_processing/[eth2_processor, block_processor, consensus_manager],
./networking/eth2_network,
./eth1/eth1_monitor,
@ -27,9 +27,10 @@ import
./rpc/state_ttl_cache
export
osproc, chronos, httpserver, presto, action_tracker, beacon_clock,
beacon_chain_db, conf, attestation_pool, sync_committee_msg_pool,
validator_pool, eth2_network, eth1_monitor, request_manager, sync_manager,
osproc, chronos, httpserver, presto, action_tracker,
beacon_clock, beacon_chain_db, conf, light_client,
attestation_pool, sync_committee_msg_pool, validator_pool,
eth2_network, eth1_monitor, request_manager, sync_manager,
eth2_processor, blockchain_dag, block_quarantine, base, exit_pool,
validator_monitor, consensus_manager
@ -44,6 +45,7 @@ type
db*: BeaconChainDB
config*: BeaconNodeConf
attachedValidators*: ref ValidatorPool
lightClient*: LightClient
dag*: ChainDAGRef
quarantine*: ref Quarantine
attestationPool*: ref AttestationPool

View File

@ -0,0 +1,103 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
# This implements the pre-release proposal of the libp2p based light client sync
# protocol. See https://github.com/ethereum/consensus-specs/pull/2802
import
chronicles,
./beacon_node
logScope: topics = "beacnde"
proc initLightClient*(
node: BeaconNode,
rng: ref BrHmacDrbgContext,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
getBeaconTime: GetBeaconTimeFn,
genesis_validators_root: Eth2Digest) =
template config(): auto = node.config
# Creating a light client is not dependent on `lightClientEnable`
# because the light client module also handles gossip subscriptions
# for broadcasting light client data as a server.
let lightClient = createLightClient(
node.network, rng, config, cfg,
forkDigests, getBeaconTime, genesis_validators_root)
if config.lightClientEnable.get:
lightClient.trustedBlockRoot = config.lightClientTrustedBlockRoot
elif config.lightClientTrustedBlockRoot.isSome:
warn "Ignoring `lightClientTrustedBlockRoot`, light client not enabled",
lightClientEnable = config.lightClientEnable.get,
lightClientTrustedBlockRoot = config.lightClientTrustedBlockRoot
node.lightClient = lightClient
proc startLightClient*(node: BeaconNode) =
if not node.config.lightClientEnable.get:
return
node.lightClient.start()
proc installLightClientMessageValidators*(node: BeaconNode) =
let eth2Processor =
if node.config.serveLightClientData.get:
# Process gossip using both full node and light client
node.processor
elif node.config.lightClientEnable.get:
# Only process gossip using light client
nil
else:
# Light client topics will never be subscribed to, no validators needed
return
node.lightClient.installMessageValidators(eth2Processor)
proc updateLightClientGossipStatus*(
node: BeaconNode, slot: Slot, dagIsBehind: bool) =
let isBehind =
if node.config.serveLightClientData.get:
# Forward DAG's readiness to handle light client gossip
dagIsBehind
else:
# Full node is not interested in gossip
true
node.lightClient.updateGossipStatus(slot, some isBehind)
proc updateLightClientFromDag*(node: BeaconNode) =
if not node.config.lightClientEnable.get:
return
if node.config.lightClientTrustedBlockRoot.isSome:
return
let
dagHead = node.dag.finalizedHead
dagPeriod = dagHead.slot.sync_committee_period
if dagHead.slot < node.dag.cfg.ALTAIR_FORK_EPOCH.start_slot:
return
let lcHeader = node.lightClient.finalizedHeader
if lcHeader.isSome:
if dagPeriod <= lcHeader.get.slot.sync_committee_period:
return
let
bdata = node.dag.getForkedBlock(dagHead.blck.bid).valueOr:
return
header = bdata.toBeaconBlockHeader
current_sync_committee = block:
var tmpState = assignClone(node.dag.headState)
node.dag.currentSyncCommitteeForPeriod(tmpState[], dagPeriod).valueOr:
return
node.lightClient.resetToFinalizedHeader(header, current_sync_committee)

View File

@ -267,6 +267,16 @@ type
desc: "Weak subjectivity checkpoint in the format block_root:epoch_number"
name: "weak-subjectivity-checkpoint" }: Option[Checkpoint]
lightClientEnable* {.
hidden
desc: "BETA: Accelerate sync using light client."
name: "light-client-enable" .}: Option[bool]
lightClientTrustedBlockRoot* {.
hidden
desc: "BETA: Recent trusted finalized block root to initialize light client from."
name: "light-client-trusted-block-root" }: Option[Eth2Digest]
finalizedCheckpointState* {.
desc: "SSZ file specifying a recent finalized state"
name: "finalized-checkpoint-state" }: Option[InputFile]
@ -906,6 +916,13 @@ proc createDumpDirs*(config: BeaconNodeConf) =
warn "Could not create dump directory",
path = config.dumpDirOutgoing, err = ioErrorMsg(res.error)
func parseCmdArg*(T: type Eth2Digest, input: string): T
{.raises: [ValueError, Defect].} =
Eth2Digest.fromHex(input)
func completeCmdArg*(T: type Eth2Digest, input: string): seq[string] =
return @[]
func parseCmdArg*(T: type GraffitiBytes, input: string): T
{.raises: [ValueError, Defect].} =
GraffitiBytes.init(input)

View File

@ -122,10 +122,3 @@ type LightClientConf* = object
desc: "The wall-time epoch at which to exit the program. (for testing purposes)"
defaultValue: 0
name: "stop-at-epoch" }: uint64
func parseCmdArg*(T: type Eth2Digest, input: string): T
{.raises: [ValueError, Defect].} =
Eth2Digest.fromHex(input)
func completeCmdArg*(T: type Eth2Digest, input: string): seq[string] =
return @[]

View File

@ -520,6 +520,26 @@ proc getForkedBlock*(
# In case we didn't have a summary - should be rare, but ..
dag.db.getForkedBlock(root)
proc currentSyncCommitteeForPeriod*(
dag: ChainDAGRef,
tmpState: var ForkedHashedBeaconState,
period: SyncCommitteePeriod): Opt[SyncCommittee] =
## Fetch a `SyncCommittee` for a given sync committee period.
## For non-finalized periods, follow the chain as selected by fork choice.
let earliestSlot = max(dag.tail.slot, dag.cfg.ALTAIR_FORK_EPOCH.start_slot)
if period < earliestSlot.sync_committee_period:
return err()
let
periodStartSlot = period.start_slot
syncCommitteeSlot = max(periodStartSlot, earliestSlot)
bsi = ? dag.getBlockIdAtSlot(syncCommitteeSlot)
dag.withUpdatedState(tmpState, bsi) do:
withState(state):
when stateFork >= BeaconStateFork.Altair:
ok state.data.current_sync_committee
else: err()
do: err()
proc updateBeaconMetrics(
state: ForkedHashedBeaconState, bid: BlockId, cache: var StateCache) =
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#additional-metrics

View File

@ -113,24 +113,17 @@ proc getExistingForkedBlock*(
doAssert verifyFinalization notin dag.updateFlags
bdata
proc currentSyncCommitteeForPeriod(
proc existingCurrentSyncCommitteeForPeriod(
dag: ChainDAGRef,
tmpState: var ForkedHashedBeaconState,
period: SyncCommitteePeriod): Opt[SyncCommittee] =
## Fetch a `SyncCommittee` for a given sync committee period.
## For non-finalized periods, follow the chain as selected by fork choice.
let earliestSlot = dag.computeEarliestLightClientSlot
doAssert period >= earliestSlot.sync_committee_period
let
periodStartSlot = period.start_slot
syncCommitteeSlot = max(periodStartSlot, earliestSlot)
bsi = ? dag.getExistingBlockIdAtSlot(syncCommitteeSlot)
dag.withUpdatedExistingState(tmpState, bsi) do:
withState(state):
when stateFork >= BeaconStateFork.Altair:
ok state.data.current_sync_committee
else: raiseAssert "Unreachable"
do: err()
## Wrapper around `currentSyncCommitteeForPeriod` for states known to exist.
let syncCommittee = dag.currentSyncCommitteeForPeriod(tmpState, period)
if syncCommittee.isErr:
error "Current sync committee failed to load unexpectedly",
period, tail = dag.tail.slot
doAssert verifyFinalization notin dag.updateFlags
syncCommittee
template syncCommitteeRoot(
state: HashedBeaconStateWithSyncCommittee): Eth2Digest =
@ -795,7 +788,7 @@ proc getLightClientBootstrap*(
bootstrap.header =
blck.toBeaconBlockHeader
bootstrap.current_sync_committee =
? dag.currentSyncCommitteeForPeriod(tmpState[], period)
? dag.existingCurrentSyncCommitteeForPeriod(tmpState[], period)
bootstrap.current_sync_committee_branch =
cachedBootstrap.current_sync_committee_branch
return ok bootstrap

View File

@ -29,6 +29,8 @@ export
light_client_pool, sync_committee_msg_pool, validator_pool, beacon_clock,
gossip_validation, block_processor, batch_validation, block_quarantine
logScope: topics = "gossip_eth2"
# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received,
"Number of valid unaggregated attestations processed by this node"
@ -62,14 +64,6 @@ declareCounter beacon_sync_committee_contributions_received,
"Number of valid sync committee contributions processed by this node"
declareCounter beacon_sync_committee_contributions_dropped,
"Number of invalid sync committee contributions dropped by this node", labels = ["reason"]
declareCounter beacon_light_client_finality_updates_received,
"Number of valid LC finality updates processed by this node"
declareCounter beacon_light_client_finality_updates_dropped,
"Number of invalid LC finality updates dropped by this node", labels = ["reason"]
declareCounter beacon_light_client_optimistic_updates_received,
"Number of valid LC optimistic updates processed by this node"
declareCounter beacon_light_client_optimistic_updates_dropped,
"Number of invalid LC optimistic updates dropped by this node", labels = ["reason"]
const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf]
@ -553,23 +547,10 @@ proc lightClientFinalityUpdateValidator*(
self: var Eth2Processor, src: MsgSource,
finality_update: altair.LightClientFinalityUpdate
): Result[void, ValidationError] =
logScope:
finality_update
debug "LC finality update received"
let
wallTime = self.getCurrentBeaconTime()
v = validateLightClientFinalityUpdate(
self.lightClientPool[], self.dag, finality_update, wallTime)
if v.isOk():
trace "LC finality update validated"
beacon_light_client_finality_updates_received.inc()
else:
debug "Dropping LC finality update", error = v.error
beacon_light_client_finality_updates_dropped.inc(1, [$v.error[0]])
v
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_optimistic_update
@ -577,21 +558,8 @@ proc lightClientOptimisticUpdateValidator*(
self: var Eth2Processor, src: MsgSource,
optimistic_update: altair.LightClientOptimisticUpdate
): Result[void, ValidationError] =
logScope:
optimistic_update
debug "LC optimistic update received"
let
wallTime = self.getCurrentBeaconTime()
v = validateLightClientOptimisticUpdate(
self.lightClientPool[], self.dag, optimistic_update, wallTime)
if v.isOk():
trace "LC optimistic update validated"
beacon_light_client_optimistic_updates_received.inc()
else:
debug "Dropping LC optimistic update", error = v.error
beacon_light_client_optimistic_updates_dropped.inc(1, [$v.error[0]])
v

View File

@ -18,6 +18,8 @@ import
export sszdump, eth2_processor, gossip_validation
logScope: topics = "gossip_lc"
# Light Client Processor
# ------------------------------------------------------------------------------
# The light client processor handles received light client objects
@ -74,6 +76,8 @@ type
lastDuplicateTick: BeaconTime # Moment when last duplicate update received
numDuplicatesSinceProgress: int # Number of duplicates since last progress
latestFinalityUpdate: altair.LightClientOptimisticUpdate
const
# These constants have been chosen empirically and are not backed by spec
duplicateRateLimit = chronos.seconds(5) # Rate limit for counting duplicates
@ -217,6 +221,47 @@ proc processObject(
res
template withReportedProgress(expectFinalityUpdate: bool, body: untyped): bool =
block:
let
previousWasInitialized = store[].isSome
previousFinalized =
if store[].isSome:
store[].get.finalized_header
else:
BeaconBlockHeader()
previousOptimistic =
if store[].isSome:
store[].get.optimistic_header
else:
BeaconBlockHeader()
body
var didProgress = false
if store[].isSome != previousWasInitialized:
didProgress = true
if self.onStoreInitialized != nil:
self.onStoreInitialized()
self.onStoreInitialized = nil
if store[].get.optimistic_header != previousOptimistic:
when not expectFinalityUpdate:
didProgress = true
if self.onOptimisticHeader != nil:
self.onOptimisticHeader()
if store[].get.finalized_header != previousFinalized:
didProgress = true
if self.onFinalizedHeader != nil:
self.onFinalizedHeader()
didProgress
template withReportedProgress(body: untyped): bool =
withReportedProgress(false, body)
proc storeObject*(
self: var LightClientProcessor,
src: MsgSource, wallTime: BeaconTime,
@ -227,60 +272,49 @@ proc storeObject*(
let
startTick = Moment.now()
store = self.store
previousFinalized =
if store[].isSome:
store[].get.finalized_header
else:
BeaconBlockHeader()
previousOptimistic =
if store[].isSome:
store[].get.optimistic_header
else:
BeaconBlockHeader()
? self.processObject(obj, wallTime)
didProgress =
withReportedProgress(obj is SomeLightClientUpdateWithFinality):
? self.processObject(obj, wallTime)
let
storeObjectTick = Moment.now()
storeObjectDur = storeObjectTick - startTick
let
storeObjectTick = Moment.now()
storeObjectDur = storeObjectTick - startTick
light_client_store_object_duration_seconds.observe(
storeObjectDur.toFloatSeconds())
let objSlot =
when obj is altair.LightClientBootstrap:
obj.header.slot
elif obj is SomeLightClientUpdateWithFinality:
obj.finalized_header.slot
else:
obj.attested_header.slot
debug "LC object processed",
finalizedSlot = store[].get.finalized_header.slot,
optimisticSlot = store[].get.optimistic_header.slot,
kind = typeof(obj).name,
objectSlot = objSlot,
storeObjectDur
when obj is altair.LightClientBootstrap:
if self.onStoreInitialized != nil:
self.onStoreInitialized()
self.onStoreInitialized = nil
var didProgress = false
if store[].get.optimistic_header != previousOptimistic:
when obj isnot SomeLightClientUpdateWithFinality:
didProgress = true
if self.onOptimisticHeader != nil:
self.onOptimisticHeader()
if store[].get.finalized_header != previousFinalized:
didProgress = true
if self.onFinalizedHeader != nil:
self.onFinalizedHeader()
light_client_store_object_duration_seconds.observe(
storeObjectDur.toFloatSeconds())
let objSlot =
when obj is altair.LightClientBootstrap:
obj.header.slot
elif obj is SomeLightClientUpdateWithFinality:
obj.finalized_header.slot
else:
obj.attested_header.slot
debug "LC object processed",
finalizedSlot = store[].get.finalized_header.slot,
optimisticSlot = store[].get.optimistic_header.slot,
kind = typeof(obj).name,
objectSlot = objSlot,
storeObjectDur
ok didProgress
proc resetToFinalizedHeader*(
self: var LightClientProcessor,
header: BeaconBlockHeader,
current_sync_committee: SyncCommittee) =
let store = self.store
discard withReportedProgress:
store[] = some LightClientStore(
finalized_header: header,
current_sync_committee: current_sync_committee,
optimistic_header: header)
debug "LC reset to finalized header",
finalizedSlot = store[].get.finalized_header.slot,
optimisticSlot = store[].get.optimistic_header.slot
# Enqueue
# ------------------------------------------------------------------------------
@ -322,70 +356,61 @@ proc addObject*(
# Message validators
# ------------------------------------------------------------------------------
declareCounter lc_light_client_finality_updates_received,
"Number of valid LC finality updates processed by this LC"
declareCounter lc_light_client_finality_updates_dropped,
"Number of invalid LC finality updates dropped by this LC", labels = ["reason"]
declareCounter lc_light_client_optimistic_updates_received,
"Number of valid LC optimistic updates processed by this LC"
declareCounter lc_light_client_optimistic_updates_dropped,
"Number of invalid LC optimistic updates dropped by this LC", labels = ["reason"]
func toValidationError(
v: Result[bool, BlockError],
wallTime: BeaconTime): Result[void, ValidationError] =
if v.isOk:
let didProgress = v.get
self: var LightClientProcessor,
r: Result[bool, BlockError],
wallTime: BeaconTime,
obj: SomeLightClientObject): Result[void, ValidationError] =
if r.isOk:
let didProgress = r.get
if didProgress:
when v is SomeLightClientUpdate:
let
signature_slot = v.signature_slot
currentTime = wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY
forwardTime = signature_slot.light_client_optimistic_update_time
if currentTime < forwardTime:
# [IGNORE] The `finality_update` / `optimistic_update` is received
# after the block at `signature_slot` was given enough time to
# propagate through the network.
return errIgnore(typeof(v).name & ": received too early")
let
signature_slot = obj.signature_slot
currentTime = wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY
forwardTime = signature_slot.light_client_finality_update_time
if currentTime < forwardTime:
# [IGNORE] The `finality_update` is received after the block
# at `signature_slot` was given enough time to propagate through
# the network.
# [IGNORE] The `optimistic_update` is received after the block
# at `signature_slot` was given enough time to propagate through
# the network.
return errIgnore(typeof(obj).name & ": received too early")
ok()
else:
# [IGNORE] The `finality_update` / `optimistic_update`
# advances the `finalized_header` / `optimistic_header`
# of the local `LightClientStore`.
errIgnore(typeof(v).name & ": no significant progress")
when obj is altair.LightClientOptimisticUpdate:
# [IGNORE] The `optimistic_update` either matches corresponding fields
# of the most recently forwarded `LightClientFinalityUpdate` (if any),
# or it advances the `optimistic_header` of the local `LightClientStore`
if obj == self.latestFinalityUpdate:
return ok()
# [IGNORE] The `finality_update` advances the `finalized_header` of the
# local `LightClientStore`.
errIgnore(typeof(obj).name & ": no significant progress")
else:
case v.error
case r.error
of BlockError.Invalid:
# [REJECT] The `finality_update` / `optimistic_update` is valid.
errReject($v.error)
# [REJECT] The `finality_update` is valid.
# [REJECT] The `optimistic_update` is valid.
errReject($r.error)
of BlockError.MissingParent, BlockError.UnviableFork, BlockError.Duplicate:
# [IGNORE] No other `finality_update` with a lower or equal
# `finalized_header.slot` / `attested_header.slot` was already
# forwarded on the network.
errIgnore($v.error)
# `finalized_header.slot` was already forwarded on the network.
# [IGNORE] No other `optimistic_update` with a lower or equal
# `attested_header.slot` was already forwarded on the network.
errIgnore($r.error)
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_finality_update
proc lightClientFinalityUpdateValidator*(
self: var LightClientProcessor, src: MsgSource,
finality_update: altair.LightClientFinalityUpdate
): Result[void, ValidationError] =
logScope:
finality_update
debug "LC finality update received"
let
wallTime = self.getBeaconTime()
r = self.storeObject(src, wallTime, finality_update)
v = r.toValidationError(wallTime)
if v.isOk():
trace "LC finality update validated"
lc_light_client_finality_updates_received.inc()
else:
debug "Dropping LC finality update", error = v.error
lc_light_client_finality_updates_dropped.inc(1, [$v.error[0]])
v = self.toValidationError(r, wallTime, finality_update)
if v.isOk:
self.latestFinalityUpdate = finality_update.toOptimistic
v
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_optimistic_update
@ -393,21 +418,12 @@ proc lightClientOptimisticUpdateValidator*(
self: var LightClientProcessor, src: MsgSource,
optimistic_update: altair.LightClientOptimisticUpdate
): Result[void, ValidationError] =
logScope:
optimistic_update
debug "LC optimistic update received"
let
wallTime = self.getBeaconTime()
r = self.storeObject(src, wallTime, optimistic_update)
v = r.toValidationError(wallTime)
if v.isOk():
trace "LC optimistic update validated"
lc_light_client_optimistic_updates_received.inc()
else:
debug "Dropping LC optimistic update", error = v.error
lc_light_client_optimistic_updates_dropped.inc(1, [$v.error[0]])
v = self.toValidationError(r, wallTime, optimistic_update)
if v.isOk:
let latestFinalitySlot = self.latestFinalityUpdate.attested_header.slot
if optimistic_update.attested_header.slot >= latestFinalitySlot:
self.latestFinalityUpdate.reset() # Only forward once
v

View File

@ -167,61 +167,167 @@ proc start*(lightClient: LightClient) =
trusted_block_root = lightClient.trustedBlockRoot
lightClient.manager.start()
proc resetToFinalizedHeader*(
lightClient: LightClient,
header: BeaconBlockHeader,
current_sync_committee: SyncCommittee) =
lightClient.processor[].resetToFinalizedHeader(header, current_sync_committee)
import metrics
from
libp2p/protocols/pubsub/gossipsub
import
TopicParams, validateParameters, init
proc installMessageValidators*(lightClient: LightClient) =
from
./gossip_processing/eth2_processor
import
lightClientFinalityUpdateValidator, lightClientOptimisticUpdateValidator
declareCounter beacon_light_client_finality_updates_received,
"Number of valid LC finality updates processed by this node"
declareCounter beacon_light_client_finality_updates_dropped,
"Number of invalid LC finality updates dropped by this node", labels = ["reason"]
declareCounter beacon_light_client_optimistic_updates_received,
"Number of valid LC optimistic updates processed by this node"
declareCounter beacon_light_client_optimistic_updates_dropped,
"Number of invalid LC optimistic updates dropped by this node", labels = ["reason"]
template logReceived(
msg: altair.LightClientFinalityUpdate) =
debug "LC finality update received", finality_update = msg
template logValidated(
msg: altair.LightClientFinalityUpdate) =
trace "LC finality update validated", finality_update = msg
beacon_light_client_finality_updates_received.inc()
proc logDropped(
msg: altair.LightClientFinalityUpdate, es: varargs[ValidationError]) =
for e in es:
debug "Dropping LC finality update", finality_update = msg, error = e
beacon_light_client_finality_updates_dropped.inc(1, [$es[0][0]])
template logReceived(
msg: altair.LightClientOptimisticUpdate) =
debug "LC optimistic update received", optimistic_update = msg
template logValidated(
msg: altair.LightClientOptimisticUpdate) =
trace "LC optimistic update validated", optimistic_update = msg
beacon_light_client_optimistic_updates_received.inc()
proc logDropped(
msg: altair.LightClientOptimisticUpdate, es: varargs[ValidationError]) =
for e in es:
debug "Dropping LC optimistic update", optimistic_update = msg, error = e
beacon_light_client_optimistic_updates_dropped.inc(1, [$es[0][0]])
proc installMessageValidators*(
lightClient: LightClient, eth2Processor: ref Eth2Processor = nil) =
# When registering multiple message validators, IGNORE results take precedence
# over ACCEPT results. However, because the opposite behaviour is needed here,
# we handle both full node and light client validation in this module
template getLocalWallPeriod(): auto =
lightClient.getBeaconTime().slotOrZero().sync_committee_period
template validate[T: SomeLightClientObject](
msg: T, validatorProcName: untyped): ValidationResult =
msg.logReceived()
var
ignoreErrors {.noinit.}: array[2, ValidationError]
numIgnoreErrors = 0
let res1 =
if eth2Processor != nil:
let
v = eth2Processor[].`validatorProcName`(MsgSource.gossip, msg)
res = v.toValidationResult()
if res == ValidationResult.Reject:
msg.logDropped(v.error)
return res
if res == ValidationResult.Ignore:
ignoreErrors[numIgnoreErrors] = v.error
inc numIgnoreErrors
res
else:
ValidationResult.Ignore
let res2 =
if lightClient.manager.isGossipSupported(getLocalWallPeriod()):
let
v = lightClient.processor[].`validatorProcName`(MsgSource.gossip, msg)
res = v.toValidationResult()
if res == ValidationResult.Reject:
msg.logDropped(v.error)
return res
if res == ValidationResult.Ignore:
ignoreErrors[numIgnoreErrors] = v.error
inc numIgnoreErrors
res
else:
ValidationResult.Ignore
if res1 == ValidationResult.Accept or res2 == ValidationResult.Accept:
msg.logValidated()
return ValidationResult.Accept
doAssert res1 == ValidationResult.Ignore and res2 == ValidationResult.Ignore
if numIgnoreErrors == 0:
ignoreErrors[numIgnoreErrors] = static:
(ValidationResult.Ignore, cstring T.name & ": irrelevant")
inc numIgnoreErrors
msg.logDropped(ignoreErrors.toOpenArray(0, numIgnoreErrors - 1))
ValidationResult.Ignore
let forkDigests = lightClient.forkDigests
for digest in [forkDigests.altair, forkDigests.bellatrix]:
lightClient.network.addValidator(
getLightClientFinalityUpdateTopic(digest),
proc(msg: altair.LightClientFinalityUpdate): ValidationResult =
if lightClient.manager.isGossipSupported(getLocalWallPeriod()):
toValidationResult(
lightClient.processor[].lightClientFinalityUpdateValidator(
MsgSource.gossip, msg))
else:
ValidationResult.Ignore)
validate(msg, lightClientFinalityUpdateValidator))
lightClient.network.addValidator(
getLightClientOptimisticUpdateTopic(digest),
proc(msg: altair.LightClientOptimisticUpdate): ValidationResult =
if lightClient.manager.isGossipSupported(getLocalWallPeriod()):
toValidationResult(
lightClient.processor[].lightClientOptimisticUpdateValidator(
MsgSource.gossip, msg))
else:
ValidationResult.Ignore)
validate(msg, lightClientOptimisticUpdateValidator))
const lightClientTopicParams = TopicParams.init()
static: lightClientTopicParams.validateParameters().tryGet()
proc updateGossipStatus*(
lightClient: LightClient, slot: Slot, dagIsBehind = default(Option[bool])) =
template cfg(): auto = lightClient.cfg
let
isBehind =
if lightClient.manager.isGossipSupported(slot.sync_committee_period):
false
elif dagIsBehind.isSome:
# While separate message validators can be installed for both
# full node and light client (both are called unless one rejects msg),
# only a single subscription can be installed per topic for now.
# The full node subscription is also handled here, even though it
# does not directly relate to the client side of the LC sync protocol
dagIsBehind.get
else:
true # Force `targetGossipState` to `{}`
epoch = slot.epoch
lcBehind =
not lightClient.manager.isGossipSupported(slot.sync_committee_period)
dagBehind =
# While separate message validators can be installed for both
# full node and light client (both are called unless one rejects msg),
# only a single subscription is supported per topic.
# The full node subscription is also handled in this module, even though
# it does not directly relate to the client side of the LC sync protocol
dagIsBehind.get(true)
isBehind = lcBehind and dagBehind
currentEpochTargetGossipState = getTargetGossipState(
epoch, cfg.ALTAIR_FORK_EPOCH, cfg.BELLATRIX_FORK_EPOCH, isBehind)
targetGossipState =
getTargetGossipState(
slot.epoch,
lightClient.cfg.ALTAIR_FORK_EPOCH,
lightClient.cfg.BELLATRIX_FORK_EPOCH,
isBehind)
if lcBehind or epoch < 1:
currentEpochTargetGossipState
else:
# The fork digest for light client topics depends on the attested slot,
# which is in the past relative to the signature slot (current slot).
# Therefore, LC topic subscriptions are kept for 1 extra epoch.
let previousEpochTargetGossipState = getTargetGossipState(
epoch - 1, cfg.ALTAIR_FORK_EPOCH, cfg.BELLATRIX_FORK_EPOCH, isBehind)
currentEpochTargetGossipState + previousEpochTargetGossipState
template currentGossipState(): auto = lightClient.gossipState
if currentGossipState == targetGossipState:
return
@ -245,10 +351,15 @@ proc updateGossipStatus*(
let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork)
lightClient.network.unsubscribe(
getLightClientFinalityUpdateTopic(forkDigest))
lightClient.network.unsubscribe(
getLightClientOptimisticUpdateTopic(forkDigest))
for gossipFork in newGossipForks:
if gossipFork >= BeaconStateFork.Altair:
let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork)
lightClient.network.subscribe(
getLightClientFinalityUpdateTopic(forkDigest),
lightClientTopicParams)
lightClient.network.subscribe(
getLightClientOptimisticUpdateTopic(forkDigest),
lightClientTopicParams)

View File

@ -43,6 +43,7 @@ type
Eth2NetworkConfigDefaults* = object
## Network specific config defaults
lightClientEnable*: bool
serveLightClientData*: bool
importLightClientData*: ImportLightClientData
@ -164,7 +165,7 @@ proc loadEth2NetworkMetadata*(path: string, eth1Network = none(Eth1Network)): Et
else:
""
enableLightClientData =
shouldSupportLightClient =
if genesisData.len >= 40:
# SSZ processing at compile time does not work well.
#
@ -191,10 +192,12 @@ proc loadEth2NetworkMetadata*(path: string, eth1Network = none(Eth1Network)): Et
configDefaults =
Eth2NetworkConfigDefaults(
lightClientEnable:
false, # Only produces debug logs so far
serveLightClientData:
enableLightClientData,
shouldSupportLightClient,
importLightClientData:
if enableLightClientData:
if shouldSupportLightClient:
ImportLightClientData.OnlyNew
else:
ImportLightClientData.None

View File

@ -19,8 +19,8 @@ import
./spec/[engine_authentication, weak_subjectivity],
./validators/[keystore_management, validator_duties],
"."/[
beacon_node, deposits, interop, nimbus_binary_common, statusbar,
trusted_node_sync, wallets]
beacon_node, beacon_node_light_client, deposits, interop,
nimbus_binary_common, statusbar, trusted_node_sync, wallets]
when defined(posix):
import system/ansi_c
@ -237,6 +237,7 @@ proc initFullNode(
discard trackFinalizedState(eth1Monitor,
finalizedEpochRef.eth1_data,
finalizedEpochRef.eth1_deposit_index)
node.updateLightClientFromDag()
eventBus.emit("finalization", data)
func getLocalHeadSlot(): Slot =
@ -664,7 +665,7 @@ proc init*(T: type BeaconNode,
else:
nil
var node = BeaconNode(
let node = BeaconNode(
nickname: nickname,
graffitiBytes: if config.graffiti.isSome: config.graffiti.get
else: defaultGraffitiBytes(),
@ -682,12 +683,15 @@ proc init*(T: type BeaconNode,
gossipState: {},
beaconClock: beaconClock,
validatorMonitor: validatorMonitor,
stateTtlCache: stateTtlCache
)
stateTtlCache: stateTtlCache)
node.initLightClient(
rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root)
node.initFullNode(
rng, dag, taskpool, getBeaconTime)
node.updateLightClientFromDag()
node
func verifyFinalization(node: BeaconNode, slot: Slot) =
@ -870,12 +874,6 @@ proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Sl
node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets)
if node.config.serveLightClientData.get:
node.network.subscribe(
getLightClientFinalityUpdateTopic(forkDigest), basicParams)
node.network.subscribe(
getLightClientOptimisticUpdateTopic(forkDigest), basicParams)
proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.removePhase0MessageHandlers(forkDigest)
@ -887,12 +885,6 @@ proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest))
if node.config.serveLightClientData.get:
node.network.unsubscribe(
getLightClientFinalityUpdateTopic(forkDigest))
node.network.unsubscribe(
getLightClientOptimisticUpdateTopic(forkDigest))
proc trackCurrentSyncCommitteeTopics(node: BeaconNode, slot: Slot) =
# Unlike trackNextSyncCommitteeTopics, just snap to the currently correct
# set of subscriptions, and use current_sync_committee. Furthermore, this
@ -1009,12 +1001,14 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
headDistance =
if slot > head.slot: (slot - head.slot).uint64
else: 0'u64
isBehind =
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER
targetGossipState =
getTargetGossipState(
slot.epoch,
node.dag.cfg.ALTAIR_FORK_EPOCH,
node.dag.cfg.BELLATRIX_FORK_EPOCH,
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER)
isBehind)
doAssert targetGossipState.card <= 2
@ -1089,6 +1083,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
node.gossipState = targetGossipState
node.updateAttestationSubnetHandlers(slot)
node.updateLightClientGossipStatus(slot, isBehind)
proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# Things we do when slot processing has ended and we're about to wait for the
@ -1393,29 +1388,7 @@ proc installMessageValidators(node: BeaconNode) =
installSyncCommitteeeValidators(forkDigests.altair)
installSyncCommitteeeValidators(forkDigests.bellatrix)
template installLightClientDataValidators(digest: auto) =
node.network.addValidator(
getLightClientFinalityUpdateTopic(digest),
proc(msg: altair.LightClientFinalityUpdate): ValidationResult =
if node.config.serveLightClientData.get:
toValidationResult(
node.processor[].lightClientFinalityUpdateValidator(
MsgSource.gossip, msg))
else:
ValidationResult.Ignore)
node.network.addValidator(
getLightClientOptimisticUpdateTopic(digest),
proc(msg: altair.LightClientOptimisticUpdate): ValidationResult =
if node.config.serveLightClientData.get:
toValidationResult(
node.processor[].lightClientOptimisticUpdateValidator(
MsgSource.gossip, msg))
else:
ValidationResult.Ignore)
installLightClientDataValidators(forkDigests.altair)
installLightClientDataValidators(forkDigests.bellatrix)
node.installLightClientMessageValidators()
proc stop(node: BeaconNode) =
bnStatus = BeaconNodeStatus.Stopping
@ -1462,6 +1435,7 @@ proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} =
wallTime = node.beaconClock.now()
wallSlot = wallTime.slotOrZero()
node.startLightClient()
node.requestManager.start()
node.syncManager.start()
@ -1723,21 +1697,18 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref BrHmacDrbgContext) {.r
# works
for node in metadata.bootstrapNodes:
config.bootstrapNodes.add node
if config.serveLightClientData.isNone:
if metadata.configDefaults.serveLightClientData:
info "Applying network config default",
serveLightClientData = metadata.configDefaults.serveLightClientData,
eth2Network = config.eth2Network
config.serveLightClientData =
some metadata.configDefaults.serveLightClientData
if config.importLightClientData.isNone:
if metadata.configDefaults.importLightClientData !=
ImportLightClientData.None:
info "Applying network config default",
importLightClientData = metadata.configDefaults.importLightClientData,
eth2Network = config.eth2Network
config.importLightClientData =
some metadata.configDefaults.importLightClientData
template applyConfigDefault(field: untyped): untyped =
if config.`field`.isNone:
if not metadata.configDefaults.`field`.isZeroMemory:
info "Applying network config default",
eth2Network = config.eth2Network,
`field` = metadata.configDefaults.`field`
config.`field` = some metadata.configDefaults.`field`
applyConfigDefault(lightClientEnable)
applyConfigDefault(serveLightClientData)
applyConfigDefault(importLightClientData)
let node = BeaconNode.init(
metadata.cfg,

View File

@ -159,6 +159,7 @@ proc startSingleNodeNetwork {.raises: [CatchableError, Defect].} =
"--keymanager-address=127.0.0.1",
"--keymanager-port=" & $keymanagerPort,
"--keymanager-token-file=" & tokenFilePath,
"--light-client-enable=off",
"--serve-light-client-data=off",
"--import-light-client-data=none",
"--doppelganger-detection=off"], it))