add light client (standalone) (#3653)

Introduces a new library for syncing using libp2p based light client
sync protocol, and adds a new `nimbus_light_client` executable that uses
this library for syncing. The new executable emits log messages when
new beacon block headers are received, and is integrated into testing.
This commit is contained in:
Etan Kissling 2022-05-31 12:45:37 +02:00 committed by GitHub
parent 85d0234524
commit 01efa93cf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1440 additions and 175 deletions

View File

@ -35,17 +35,17 @@ CPU_LIMIT := 0
BUILD_END_MSG := "\\x1B[92mBuild completed successfully:\\x1B[39m"
ifeq ($(CPU_LIMIT), 0)
CPU_LIMIT_CMD :=
CPU_LIMIT_CMD :=
else
CPU_LIMIT_CMD := cpulimit --limit=$(CPU_LIMIT) --foreground --
CPU_LIMIT_CMD := cpulimit --limit=$(CPU_LIMIT) --foreground --
endif
# TODO: move this to nimbus-build-system
ifeq ($(shell uname), Darwin)
# Scary warnings in large volume: https://github.com/status-im/nimbus-eth2/issues/3076
SILENCE_WARNINGS := 2>&1 | grep -v "failed to insert symbol" | grep -v "could not find object file symbol for symbol" || true
# Scary warnings in large volume: https://github.com/status-im/nimbus-eth2/issues/3076
SILENCE_WARNINGS := 2>&1 | grep -v "failed to insert symbol" | grep -v "could not find object file symbol for symbol" || true
else
SILENCE_WARNINGS :=
SILENCE_WARNINGS :=
endif
# unconditionally built by the default Make target
@ -60,6 +60,7 @@ TOOLS := \
ncli_split_keystore \
wss_sim \
stack_sizes \
nimbus_light_client \
nimbus_validator_client \
nimbus_signing_node \
validator_db_aggregator
@ -267,12 +268,12 @@ clean_eth2_network_simulation_all:
rm -rf tests/simulation/{data,validators}
GOERLI_TESTNETS_PARAMS := \
--tcp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \
--udp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \
--metrics \
--metrics-port=$$(( $(BASE_METRICS_PORT) + $(NODE_ID) )) \
--rest \
--rest-port=$$(( $(BASE_REST_PORT) +$(NODE_ID) ))
--tcp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \
--udp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \
--metrics \
--metrics-port=$$(( $(BASE_METRICS_PORT) + $(NODE_ID) )) \
--rest \
--rest-port=$$(( $(BASE_REST_PORT) +$(NODE_ID) ))
eth2_network_simulation: | build deps clean_eth2_network_simulation_all
+ GIT_ROOT="$$PWD" NIMFLAGS="$(NIMFLAGS)" LOG_LEVEL="$(LOG_LEVEL)" tests/simulation/start-in-tmux.sh
@ -281,7 +282,7 @@ eth2_network_simulation: | build deps clean_eth2_network_simulation_all
#- https://www.gnu.org/software/make/manual/html_node/Multi_002dLine.html
#- macOS doesn't support "=" at the end of "define FOO": https://stackoverflow.com/questions/13260396/gnu-make-3-81-eval-function-not-working
define CONNECT_TO_NETWORK
scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID)
scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID)
scripts/make_prometheus_config.sh \
--nodes 1 \
@ -297,7 +298,7 @@ define CONNECT_TO_NETWORK
endef
define CONNECT_TO_NETWORK_IN_DEV_MODE
scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID)
scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID)
scripts/make_prometheus_config.sh \
--nodes 1 \
@ -340,6 +341,16 @@ define CONNECT_TO_NETWORK_WITH_VALIDATOR_CLIENT
--rest-port=$$(( $(BASE_REST_PORT) +$(NODE_ID) ))
endef
define CONNECT_TO_NETWORK_WITH_LIGHT_CLIENT
scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID)
$(CPU_LIMIT_CMD) build/nimbus_light_client \
--network=$(1) \
--log-level="$(RUNTIME_LOG_LEVEL)" \
--log-file=build/data/shared_$(1)_$(NODE_ID)/nbc_lc_$$(date +"%Y%m%d%H%M%S").log \
--trusted-block-root="$(LC_TRUSTED_BLOCK_ROOT)"
endef
define MAKE_DEPOSIT_DATA
build/nimbus_beacon_node deposits createTestnetDeposits \
--network=$(1) \
@ -386,6 +397,9 @@ prater: | prater-build
prater-vc: | prater-build nimbus_validator_client
$(call CONNECT_TO_NETWORK_WITH_VALIDATOR_CLIENT,prater,nimbus_beacon_node,$(GOERLI_WEB3_URL))
prater-lc: | nimbus_light_client
$(call CONNECT_TO_NETWORK_WITH_LIGHT_CLIENT,prater)
ifneq ($(LOG_LEVEL), TRACE)
prater-dev:
+ "$(MAKE)" LOG_LEVEL=TRACE $@
@ -412,6 +426,9 @@ ropsten: | ropsten-build
ropsten-vc: | ropsten-build nimbus_validator_client
$(call CONNECT_TO_NETWORK_WITH_VALIDATOR_CLIENT,ropsten,nimbus_beacon_node,$(ROPSTEN_WEB3_URL))
ropsten-lc: | nimbus_light_client
$(call CONNECT_TO_NETWORK_WITH_LIGHT_CLIENT,ropsten)
ifneq ($(LOG_LEVEL), TRACE)
ropsten-dev:
+ "$(MAKE)" LOG_LEVEL=TRACE $@

View File

@ -56,6 +56,9 @@ proc now*(c: BeaconClock): BeaconTime =
## Current time, in slots - this may end up being less than GENESIS_SLOT(!)
toBeaconTime(c, getTime())
func getBeaconTimeFn*(c: BeaconClock): GetBeaconTimeFn =
return proc(): BeaconTime = c.now()
proc fromNow*(c: BeaconClock, t: BeaconTime): tuple[inFuture: bool, offset: Duration] =
let now = c.now()
if t > now:

View File

@ -36,8 +36,6 @@ export
type
RpcServer* = RpcHttpServer
GossipState* = set[BeaconStateFork]
BeaconNode* = ref object
nickname*: string
graffitiBytes*: GraffitiBytes

View File

@ -1108,11 +1108,13 @@ proc readValue*(r: var TomlReader, a: var Address)
except CatchableError:
r.raiseUnexpectedValue("string expected")
proc loadEth2Network*(config: BeaconNodeConf): Eth2NetworkMetadata {.raises: [Defect, IOError].} =
network_name.set(2, labelValues = [config.eth2Network.get(otherwise = "mainnet")])
proc loadEth2Network*(
eth2Network: Option[string]
): Eth2NetworkMetadata {.raises: [Defect, IOError].} =
network_name.set(2, labelValues = [eth2Network.get(otherwise = "mainnet")])
when not defined(gnosisChainBinary):
if config.eth2Network.isSome:
getMetadataForNetwork(config.eth2Network.get)
if eth2Network.isSome:
getMetadataForNetwork(eth2Network.get)
else:
when const_preset == "mainnet":
mainnetMetadata
@ -1122,5 +1124,8 @@ proc loadEth2Network*(config: BeaconNodeConf): Eth2NetworkMetadata {.raises: [De
echo "Must specify network on non-mainnet node"
quit 1
else:
checkNetworkParameterUse config.eth2Network
checkNetworkParameterUse eth2Network
gnosisMetadata
template loadEth2Network*(config: BeaconNodeConf): Eth2NetworkMetadata =
loadEth2Network(config.eth2Network)

View File

@ -0,0 +1,131 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
# This implements the pre-release proposal of the libp2p based light client sync
# protocol. See https://github.com/ethereum/consensus-specs/pull/2802
import
json_serialization/std/net,
./conf
export net, conf
type LightClientConf* = object
# Config
configFile* {.
desc: "Loads the configuration from a TOML file"
name: "config-file" }: Option[InputFile]
# Logging
logLevel* {.
desc: "Sets the log level"
defaultValue: "INFO"
name: "log-level" }: string
logStdout* {.
hidden
desc: "Specifies what kind of logs should be written to stdout (auto, colors, nocolors, json)"
defaultValueDesc: "auto"
defaultValue: StdoutLogKind.Auto
name: "log-format" }: StdoutLogKind
logFile* {.
desc: "Specifies a path for the written Json log file (deprecated)"
name: "log-file" }: Option[OutFile]
# Network
eth2Network* {.
desc: "The Eth2 network to join"
defaultValueDesc: "mainnet"
name: "network" }: Option[string]
# Libp2p
bootstrapNodes* {.
desc: "Specifies one or more bootstrap nodes to use when connecting to the network"
abbr: "b"
name: "bootstrap-node" }: seq[string]
bootstrapNodesFile* {.
desc: "Specifies a line-delimited file of bootstrap Ethereum network addresses"
defaultValue: ""
name: "bootstrap-file" }: InputFile
listenAddress* {.
desc: "Listening address for the Ethereum LibP2P and Discovery v5 traffic"
defaultValue: defaultListenAddress
defaultValueDesc: "0.0.0.0"
name: "listen-address" }: ValidIpAddress
tcpPort* {.
desc: "Listening TCP port for Ethereum LibP2P traffic"
defaultValue: defaultEth2TcpPort
defaultValueDesc: "9000"
name: "tcp-port" }: Port
udpPort* {.
desc: "Listening UDP port for node discovery"
defaultValue: defaultEth2TcpPort
defaultValueDesc: "9000"
name: "udp-port" }: Port
maxPeers* {.
desc: "The target number of peers to connect to"
defaultValue: 160 # 5 (fanout) * 64 (subnets) / 2 (subs) for a heathy mesh
name: "max-peers" }: int
hardMaxPeers* {.
desc: "The maximum number of peers to connect to. Defaults to maxPeers * 1.5"
name: "hard-max-peers" }: Option[int]
nat* {.
desc: "Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>"
defaultValue: NatConfig(hasExtIp: false, nat: NatAny)
defaultValueDesc: "any"
name: "nat" .}: NatConfig
enrAutoUpdate* {.
desc: "Discovery can automatically update its ENR with the IP address " &
"and UDP port as seen by other nodes it communicates with. " &
"This option allows to enable/disable this functionality"
defaultValue: false
name: "enr-auto-update" .}: bool
agentString* {.
defaultValue: "nimbus",
desc: "Node agent string which is used as identifier in network"
name: "agent-string" }: string
discv5Enabled* {.
desc: "Enable Discovery v5"
defaultValue: true
name: "discv5" }: bool
directPeers* {.
desc: "The list of priviledged, secure and known peers to connect and maintain the connection to, this requires a not random netkey-file. In the complete multiaddress format like: /ip4/<address>/tcp/<port>/p2p/<peerId-public-key>. Peering agreements are established out of band and must be reciprocal."
name: "direct-peer" .}: seq[string]
# Light client
trustedBlockRoot* {.
desc: "Recent trusted finalized block root to initialize light client from"
name: "trusted-block-root" }: Eth2Digest
# Testing
stopAtEpoch* {.
hidden
desc: "The wall-time epoch at which to exit the program. (for testing purposes)"
defaultValue: 0
name: "stop-at-epoch" }: uint64
func parseCmdArg*(T: type Eth2Digest, input: string): T
{.raises: [ValueError, Defect].} =
Eth2Digest.fromHex(input)
func completeCmdArg*(T: type Eth2Digest, input: string): seq[string] =
return @[]

View File

@ -944,6 +944,9 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
template genesis_validators_root*(dag: ChainDAGRef): Eth2Digest =
getStateField(dag.headState, genesis_validators_root)
proc genesisBlockRoot*(dag: ChainDAGRef): Eth2Digest =
dag.db.getGenesisBlock().expect("DB must be initialized with genesis block")
func getEpochRef*(
dag: ChainDAGRef, state: ForkedHashedBeaconState, cache: var StateCache): EpochRef =
## Get a cached `EpochRef` or construct one based on the given state - always
@ -1048,12 +1051,6 @@ func stateCheckpoint*(dag: ChainDAGRef, bsi: BlockSlotId): BlockSlotId =
template forkAtEpoch*(dag: ChainDAGRef, epoch: Epoch): Fork =
forkAtEpoch(dag.cfg, epoch)
func forkDigestAtEpoch*(dag: ChainDAGRef, epoch: Epoch): ForkDigest =
case dag.cfg.stateForkAtEpoch(epoch)
of BeaconStateFork.Bellatrix: dag.forkDigests.bellatrix
of BeaconStateFork.Altair: dag.forkDigests.altair
of BeaconStateFork.Phase0: dag.forkDigests.phase0
proc getBlockRange*(
dag: ChainDAGRef, startSlot: Slot, skipStep: uint64,
output: var openArray[BlockId]): Natural =

View File

@ -145,6 +145,9 @@ type
ValidationRes* = Result[void, ValidationError]
func toValidationResult*(res: ValidationRes): ValidationResult =
if res.isOk(): ValidationResult.Accept else: res.error()[0]
# Initialization
# ------------------------------------------------------------------------------

View File

@ -13,10 +13,10 @@ import
../spec/datatypes/altair,
../spec/light_client_sync,
../consensus_object_pools/block_pools_types,
".."/[beacon_clock],
../sszdump
".."/[beacon_clock, sszdump],
"."/[eth2_processor, gossip_validation]
export sszdump
export sszdump, eth2_processor, gossip_validation
# Light Client Processor
# ------------------------------------------------------------------------------
@ -26,7 +26,9 @@ declareHistogram light_client_store_object_duration_seconds,
"storeObject() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
type
DidInitializeStoreCallback* =
GetTrustedBlockRootCallback* =
proc(): Option[Eth2Digest] {.gcsafe, raises: [Defect].}
VoidCallback* =
proc() {.gcsafe, raises: [Defect].}
LightClientProcessor* = object
@ -61,12 +63,12 @@ type
# Consumer
# ----------------------------------------------------------------
store: ref Option[LightClientStore]
getBeaconTime*: GetBeaconTimeFn
didInitializeStoreCallback: DidInitializeStoreCallback
getBeaconTime: GetBeaconTimeFn
getTrustedBlockRoot: GetTrustedBlockRootCallback
onStoreInitialized, onFinalizedHeader, onOptimisticHeader: VoidCallback
cfg: RuntimeConfig
genesis_validators_root: Eth2Digest
trustedBlockRoot: Eth2Digest
lastProgressTick: BeaconTime # Moment when last update made progress
lastDuplicateTick: BeaconTime # Moment when last duplicate update received
@ -87,10 +89,13 @@ proc new*(
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
cfg: RuntimeConfig,
genesis_validators_root, trustedBlockRoot: Eth2Digest,
genesis_validators_root: Eth2Digest,
store: ref Option[LightClientStore],
getBeaconTime: GetBeaconTimeFn,
didInitializeStoreCallback: DidInitializeStoreCallback = nil
getTrustedBlockRoot: GetTrustedBlockRootCallback,
onStoreInitialized: VoidCallback = nil,
onFinalizedHeader: VoidCallback = nil,
onOptimisticHeader: VoidCallback = nil
): ref LightClientProcessor =
(ref LightClientProcessor)(
dumpEnabled: dumpEnabled,
@ -98,10 +103,12 @@ proc new*(
dumpDirIncoming: dumpDirIncoming,
store: store,
getBeaconTime: getBeaconTime,
didInitializeStoreCallback: didInitializeStoreCallback,
getTrustedBlockRoot: getTrustedBlockRoot,
onStoreInitialized: onStoreInitialized,
onFinalizedHeader: onFinalizedHeader,
onOptimisticHeader: onOptimisticHeader,
cfg: cfg,
genesis_validators_root: genesis_validators_root,
trustedBlockRoot: trustedBlockRoot
genesis_validators_root: genesis_validators_root
)
# Storage
@ -147,30 +154,29 @@ proc tryForceUpdate(
finalizedSlot = store[].get.finalized_header.slot,
optimisticSlot = store[].get.optimistic_header.slot
proc storeObject*(
proc processObject(
self: var LightClientProcessor,
src: MsgSource, wallTime: BeaconTime,
obj: SomeLightClientObject): Result[void, BlockError] =
## storeObject is the main entry point for unvalidated light client objects -
## all untrusted objects pass through here. When storing an object, we will
## update the `LightClientStore` accordingly
obj: SomeLightClientObject,
wallTime: BeaconTime): Result[void, BlockError] =
let
startTick = Moment.now()
wallSlot = wallTime.slotOrZero()
store = self.store
res =
when obj is altair.LightClientBootstrap:
if store[].isSome:
err(BlockError.Duplicate)
else:
let initRes = initialize_light_client_store(
self.trustedBlockRoot, obj)
if initRes.isErr:
err(initRes.error)
let trustedBlockRoot = self.getTrustedBlockRoot()
if trustedBlockRoot.isNone:
err(BlockError.MissingParent)
else:
store[] = some(initRes.get)
ok()
let initRes =
initialize_light_client_store(trustedBlockRoot.get, obj)
if initRes.isErr:
err(initRes.error)
else:
store[] = some(initRes.get)
ok()
elif obj is SomeLightClientUpdate:
if store[].isNone:
err(BlockError.MissingParent)
@ -209,6 +215,31 @@ proc storeObject*(
self.lastDuplicateTick = wallTime + duplicateCountDelay
self.numDuplicatesSinceProgress = 0
res
proc storeObject*(
self: var LightClientProcessor,
src: MsgSource, wallTime: BeaconTime,
obj: SomeLightClientObject): Result[bool, BlockError] =
## storeObject is the main entry point for unvalidated light client objects -
## all untrusted objects pass through here. When storing an object, we will
## update the `LightClientStore` accordingly
let
startTick = Moment.now()
store = self.store
previousFinalized =
if store[].isSome:
store[].get.finalized_header
else:
BeaconBlockHeader()
previousOptimistic =
if store[].isSome:
store[].get.optimistic_header
else:
BeaconBlockHeader()
? self.processObject(obj, wallTime)
let
storeObjectTick = Moment.now()
storeObjectDur = storeObjectTick - startTick
@ -223,7 +254,7 @@ proc storeObject*(
obj.finalized_header.slot
else:
obj.attested_header.slot
debug "Light client object processed",
debug "LC object processed",
finalizedSlot = store[].get.finalized_header.slot,
optimisticSlot = store[].get.optimistic_header.slot,
kind = typeof(obj).name,
@ -231,11 +262,24 @@ proc storeObject*(
storeObjectDur
when obj is altair.LightClientBootstrap:
if self.didInitializeStoreCallback != nil:
self.didInitializeStoreCallback()
self.didInitializeStoreCallback = nil
if self.onStoreInitialized != nil:
self.onStoreInitialized()
self.onStoreInitialized = nil
res
var didProgress = false
if store[].get.optimistic_header != previousOptimistic:
when obj isnot SomeLightClientUpdateWithFinality:
didProgress = true
if self.onOptimisticHeader != nil:
self.onOptimisticHeader()
if store[].get.finalized_header != previousFinalized:
didProgress = true
if self.onFinalizedHeader != nil:
self.onFinalizedHeader()
ok didProgress
# Enqueue
# ------------------------------------------------------------------------------
@ -264,10 +308,106 @@ proc addObject*(
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing light client object before genesis, clock turned back?"
error "Processing LC object before genesis, clock turned back?"
quit 1
let res = self.storeObject(src, wallTime, obj)
if resFut != nil:
resFut.complete(res)
if resfut != nil:
if res.isOk:
resfut.complete(Result[void, BlockError].ok())
else:
resfut.complete(Result[void, BlockError].err(res.error))
# Message validators
# ------------------------------------------------------------------------------
declareCounter lc_light_client_finality_updates_received,
"Number of valid LC finality updates processed by this LC"
declareCounter lc_light_client_finality_updates_dropped,
"Number of invalid LC finality updates dropped by this LC", labels = ["reason"]
declareCounter lc_light_client_optimistic_updates_received,
"Number of valid LC optimistic updates processed by this LC"
declareCounter lc_light_client_optimistic_updates_dropped,
"Number of invalid LC optimistic updates dropped by this LC", labels = ["reason"]
func toValidationError(
v: Result[bool, BlockError],
wallTime: BeaconTime): Result[void, ValidationError] =
if v.isOk:
let didProgress = v.get
if didProgress:
when v is SomeLightClientUpdate:
let
signature_slot = v.signature_slot
currentTime = wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY
forwardTime = signature_slot.light_client_optimistic_update_time
if currentTime < forwardTime:
# [IGNORE] The `finality_update` / `optimistic_update` is received
# after the block at `signature_slot` was given enough time to
# propagate through the network.
return errIgnore(typeof(v).name & ": received too early")
ok()
else:
# [IGNORE] The `finality_update` / `optimistic_update`
# advances the `finalized_header` / `optimistic_header`
# of the local `LightClientStore`.
errIgnore(typeof(v).name & ": no significant progress")
else:
case v.error
of BlockError.Invalid:
# [REJECT] The `finality_update` / `optimistic_update` is valid.
errReject($v.error)
of BlockError.MissingParent, BlockError.UnviableFork, BlockError.Duplicate:
# [IGNORE] No other `finality_update` with a lower or equal
# `finalized_header.slot` / `attested_header.slot` was already
# forwarded on the network.
errIgnore($v.error)
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_finality_update
proc lightClientFinalityUpdateValidator*(
self: var LightClientProcessor, src: MsgSource,
finality_update: altair.LightClientFinalityUpdate
): Result[void, ValidationError] =
logScope:
finality_update
debug "LC finality update received"
let
wallTime = self.getBeaconTime()
r = self.storeObject(src, wallTime, finality_update)
v = r.toValidationError(wallTime)
if v.isOk():
trace "LC finality update validated"
lc_light_client_finality_updates_received.inc()
else:
debug "Dropping LC finality update", error = v.error
lc_light_client_finality_updates_dropped.inc(1, [$v.error[0]])
v
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_optimistic_update
proc lightClientOptimisticUpdateValidator*(
self: var LightClientProcessor, src: MsgSource,
optimistic_update: altair.LightClientOptimisticUpdate
): Result[void, ValidationError] =
logScope:
optimistic_update
debug "LC optimistic update received"
let
wallTime = self.getBeaconTime()
r = self.storeObject(src, wallTime, optimistic_update)
v = r.toValidationError(wallTime)
if v.isOk():
trace "LC optimistic update validated"
lc_light_client_optimistic_updates_received.inc()
else:
debug "Dropping LC optimistic update", error = v.error
lc_light_client_optimistic_updates_dropped.inc(1, [$v.error[0]])
v

View File

@ -0,0 +1,256 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
# This implements the pre-release proposal of the libp2p based light client sync
# protocol. See https://github.com/ethereum/consensus-specs/pull/2802
import
chronicles,
eth/keys,
./gossip_processing/light_client_processor,
./networking/eth2_network,
./spec/datatypes/altair,
./spec/helpers,
./sync/light_client_manager,
"."/[beacon_clock, conf_light_client]
export eth2_network, conf_light_client
logScope: topics = "lightcl"
type
LightClientCallback* =
proc(lightClient: LightClient) {.gcsafe, raises: [Defect].}
LightClient* = ref object
network: Eth2Node
cfg: RuntimeConfig
forkDigests: ref ForkDigests
getBeaconTime: GetBeaconTimeFn
store: ref Option[LightClientStore]
processor: ref LightClientProcessor
manager: LightClientManager
gossipState: GossipState
onFinalizedHeader*, onOptimisticHeader*: LightClientCallback
trustedBlockRoot*: Option[Eth2Digest]
func finalizedHeader*(lightClient: LightClient): Opt[BeaconBlockHeader] =
if lightClient.store[].isSome:
ok lightClient.store[].get.finalized_header
else:
err()
func optimisticHeader*(lightClient: LightClient): Opt[BeaconBlockHeader] =
if lightClient.store[].isSome:
ok lightClient.store[].get.optimistic_header
else:
err()
proc createLightClient(
network: Eth2Node,
rng: ref BrHmacDrbgContext,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
getBeaconTime: GetBeaconTimeFn,
genesis_validators_root: Eth2Digest
): LightClient =
let lightClient = LightClient(
network: network,
cfg: cfg,
forkDigests: forkDigests,
getBeaconTime: getBeaconTime,
store: (ref Option[LightClientStore])())
func getTrustedBlockRoot(): Option[Eth2Digest] =
lightClient.trustedBlockRoot
proc onStoreInitialized() =
discard
proc onFinalizedHeader() =
if lightClient.onFinalizedHeader != nil:
lightClient.onFinalizedHeader(lightClient)
proc onOptimisticHeader() =
if lightClient.onOptimisticHeader != nil:
lightClient.onOptimisticHeader(lightClient)
lightClient.processor = LightClientProcessor.new(
dumpEnabled, dumpDirInvalid, dumpDirIncoming,
cfg, genesis_validators_root,
lightClient.store, getBeaconTime, getTrustedBlockRoot,
onStoreInitialized, onFinalizedHeader, onOptimisticHeader)
proc lightClientVerifier(obj: SomeLightClientObject):
Future[Result[void, BlockError]] =
let resfut = newFuture[Result[void, BlockError]]("lightClientVerifier")
lightClient.processor[].addObject(MsgSource.gossip, obj, resfut)
resfut
proc bootstrapVerifier(obj: altair.LightClientBootstrap): auto =
lightClientVerifier(obj)
proc updateVerifier(obj: altair.LightClientUpdate): auto =
lightClientVerifier(obj)
proc finalityVerifier(obj: altair.LightClientFinalityUpdate): auto =
lightClientVerifier(obj)
proc optimisticVerifier(obj: altair.LightClientOptimisticUpdate): auto =
lightClientVerifier(obj)
func isLightClientStoreInitialized(): bool =
lightClient.store[].isSome
func isNextSyncCommitteeKnown(): bool =
if lightClient.store[].isSome:
lightClient.store[].get.is_next_sync_committee_known
else:
false
func getFinalizedPeriod(): SyncCommitteePeriod =
if lightClient.store[].isSome:
lightClient.store[].get.finalized_header.slot.sync_committee_period
else:
GENESIS_SLOT.sync_committee_period
func getOptimisticPeriod(): SyncCommitteePeriod =
if lightClient.store[].isSome:
lightClient.store[].get.optimistic_header.slot.sync_committee_period
else:
GENESIS_SLOT.sync_committee_period
lightClient.manager = LightClientManager.init(
lightClient.network, rng, getTrustedBlockRoot,
bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier,
isLightClientStoreInitialized, isNextSyncCommitteeKnown,
getFinalizedPeriod, getOptimisticPeriod, getBeaconTime)
lightClient.gossipState = {}
lightClient
proc createLightClient*(
network: Eth2Node,
rng: ref BrHmacDrbgContext,
config: BeaconNodeConf,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
getBeaconTime: GetBeaconTimeFn,
genesis_validators_root: Eth2Digest
): LightClient =
createLightClient(
network, rng,
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
cfg, forkDigests, getBeaconTime, genesis_validators_root)
proc createLightClient*(
network: Eth2Node,
rng: ref BrHmacDrbgContext,
config: LightClientConf,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
getBeaconTime: GetBeaconTimeFn,
genesis_validators_root: Eth2Digest
): LightClient =
createLightClient(
network, rng,
dumpEnabled = false, dumpDirInvalid = ".", dumpDirIncoming = ".",
cfg, forkDigests, getBeaconTime, genesis_validators_root)
proc start*(lightClient: LightClient) =
notice "Starting light client",
trusted_block_root = lightClient.trustedBlockRoot
lightClient.manager.start()
from
libp2p/protocols/pubsub/gossipsub
import
TopicParams, validateParameters, init
proc installMessageValidators*(lightClient: LightClient) =
template getLocalWallPeriod(): auto =
lightClient.getBeaconTime().slotOrZero().sync_committee_period
let forkDigests = lightClient.forkDigests
for digest in [forkDigests.altair, forkDigests.bellatrix]:
lightClient.network.addValidator(
getLightClientFinalityUpdateTopic(digest),
proc(msg: altair.LightClientFinalityUpdate): ValidationResult =
if lightClient.manager.isGossipSupported(getLocalWallPeriod()):
toValidationResult(
lightClient.processor[].lightClientFinalityUpdateValidator(
MsgSource.gossip, msg))
else:
ValidationResult.Ignore)
lightClient.network.addValidator(
getLightClientOptimisticUpdateTopic(digest),
proc(msg: altair.LightClientOptimisticUpdate): ValidationResult =
if lightClient.manager.isGossipSupported(getLocalWallPeriod()):
toValidationResult(
lightClient.processor[].lightClientOptimisticUpdateValidator(
MsgSource.gossip, msg))
else:
ValidationResult.Ignore)
const lightClientTopicParams = TopicParams.init()
static: lightClientTopicParams.validateParameters().tryGet()
proc updateGossipStatus*(
lightClient: LightClient, slot: Slot, dagIsBehind = default(Option[bool])) =
let
isBehind =
if lightClient.manager.isGossipSupported(slot.sync_committee_period):
false
elif dagIsBehind.isSome:
# While separate message validators can be installed for both
# full node and light client (both are called unless one rejects msg),
# only a single subscription can be installed per topic for now.
# The full node subscription is also handled here, even though it
# does not directly relate to the client side of the LC sync protocol
dagIsBehind.get
else:
true # Force `targetGossipState` to `{}`
targetGossipState =
getTargetGossipState(
slot.epoch,
lightClient.cfg.ALTAIR_FORK_EPOCH,
lightClient.cfg.BELLATRIX_FORK_EPOCH,
isBehind)
template currentGossipState(): auto = lightClient.gossipState
if currentGossipState == targetGossipState:
return
if currentGossipState.card == 0 and targetGossipState.card > 0:
debug "Enabling LC topic subscriptions",
wallSlot = slot, targetGossipState
elif currentGossipState.card > 0 and targetGossipState.card == 0:
debug "Disabling LC topic subscriptions",
wallSlot = slot
else:
# Individual forks added / removed
discard
let
newGossipForks = targetGossipState - currentGossipState
oldGossipForks = currentGossipState - targetGossipState
for gossipFork in oldGossipForks:
if gossipFork >= BeaconStateFork.Altair:
let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork)
lightClient.network.unsubscribe(
getLightClientFinalityUpdateTopic(forkDigest))
for gossipFork in newGossipForks:
if gossipFork >= BeaconStateFork.Altair:
let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork)
lightClient.network.subscribe(
getLightClientOptimisticUpdateTopic(forkDigest),
lightClientTopicParams)
lightClient.gossipState = targetGossipState

View File

@ -11,7 +11,7 @@ import
std/[os, strutils],
chronicles, stew/shims/net, stew/results, bearssl,
eth/keys, eth/p2p/discoveryv5/[enr, protocol, node],
../conf
".."/[conf, conf_light_client]
export protocol, keys
@ -77,7 +77,7 @@ proc loadBootstrapFile*(bootstrapFile: string,
quit 1
proc new*(T: type Eth2DiscoveryProtocol,
config: BeaconNodeConf,
config: BeaconNodeConf | LightClientConf,
enrIp: Option[ValidIpAddress], enrTcpPort, enrUdpPort: Option[Port],
pk: PrivateKey,
enrFields: openArray[(string, seq[byte])], rng: ref BrHmacDrbgContext):
@ -91,9 +91,10 @@ proc new*(T: type Eth2DiscoveryProtocol,
addBootstrapNode(node, bootstrapEnrs)
loadBootstrapFile(string config.bootstrapNodesFile, bootstrapEnrs)
let persistentBootstrapFile = config.dataDir / "bootstrap_nodes.txt"
if fileExists(persistentBootstrapFile):
loadBootstrapFile(persistentBootstrapFile, bootstrapEnrs)
when config is BeaconNodeConf:
let persistentBootstrapFile = config.dataDir / "bootstrap_nodes.txt"
if fileExists(persistentBootstrapFile):
loadBootstrapFile(persistentBootstrapFile, bootstrapEnrs)
newProtocol(pk, enrIp, enrTcpPort, enrUdpPort, enrFields, bootstrapEnrs,
bindPort = config.udpPort, bindIp = config.listenAddress,

View File

@ -25,7 +25,7 @@ import
libp2p/stream/connection,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2],
".."/[version, conf, beacon_clock],
".."/[version, conf, beacon_clock, conf_light_client],
../spec/datatypes/[phase0, altair, bellatrix],
../spec/[eth2_ssz_serialization, network, helpers, forks],
../validators/keystore_management,
@ -35,7 +35,7 @@ when chronicles.enabledLogLevel == LogLevel.TRACE:
import std/sequtils
export
tables, version, multiaddress, peerinfo, p2pProtocol, connection,
tables, chronos, version, multiaddress, peerinfo, p2pProtocol, connection,
libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery,
peer_pool, peer_scores
@ -140,7 +140,7 @@ type
# Private fields:
libp2pCodecName: string
protocolMounter*: MounterProc
isLightClientRequest: bool
isRequired, isLightClientRequest: bool
ProtocolInfoObj* = object
name*: string
@ -311,6 +311,7 @@ const
NetworkInsecureKeyPassword = "INSECUREPASSWORD"
template libp2pProtocol*(name: string, version: int,
isRequired = false,
isLightClientRequest = false) {.pragma.}
func shortLog*(peer: Peer): string = shortLog(peer.peerId)
@ -498,6 +499,38 @@ proc getRequestProtoName(fn: NimNode): NimNode =
return newLit("")
proc isRequiredProto(fn: NimNode): NimNode =
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
try:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
if pragma.len <= 3:
return newLit(false)
for i in 3 ..< pragma.len:
let param = pragma[i]
case param.kind
of nnkExprEqExpr:
if $param[0] == "isRequired":
if $param[1] == "true":
return newLit(true)
if $param[1] == "false":
return newLit(false)
raiseAssert "Unexpected value: " & $param
if $param[0] != "isLightClientRequest":
raiseAssert "Unexpected param: " & $param
of nnkIdent:
if i == 3:
return newLit(param.boolVal)
else: raiseAssert "Unexpected kind: " & param.kind.repr
return newLit(false)
except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454
return newLit(false)
proc isLightClientRequestProto(fn: NimNode): NimNode =
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
@ -509,19 +542,23 @@ proc isLightClientRequestProto(fn: NimNode): NimNode =
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
if pragma.len <= 3:
return newLit(false)
let param = pragma[3]
case param.kind
of nnkExprEqExpr:
if $param[0] != "isLightClientRequest":
raiseAssert "Unexpected param: " & $param
if $param[1] == "true":
return newLit(true)
if $param[1] == "false":
return newLit(false)
raiseAssert "Unexpected value: " & $param
of nnkIdent:
return newLit(param.boolVal)
else: raiseAssert "Unexpected kind: " & param.kind.repr
for i in 3 ..< pragma.len:
let param = pragma[i]
case param.kind
of nnkExprEqExpr:
if $param[0] == "isLightClientRequest":
if $param[1] == "true":
return newLit(true)
if $param[1] == "false":
return newLit(false)
raiseAssert "Unexpected value: " & $param
if $param[0] != "isRequired":
raiseAssert "Unexpected param: " & $param
of nnkIdent:
if i == 4:
return newLit(param.boolVal)
else: raiseAssert "Unexpected kind: " & param.kind.repr
return newLit(false)
except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454
return newLit(false)
@ -1614,20 +1651,22 @@ proc onConnEvent(node: Eth2Node, peerId: PeerId, event: ConnEvent) {.async.} =
peer = peerId, peer_state = peer.connectionState
peer.connectionState = Disconnected
proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
enrForkId: ENRForkID, discoveryForkId: ENRForkID, forkDigests: ref ForkDigests,
getBeaconTime: GetBeaconTimeFn, switch: Switch,
pubsub: GossipSub, ip: Option[ValidIpAddress], tcpPort,
udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool,
proc new*(T: type Eth2Node,
config: BeaconNodeConf | LightClientConf, runtimeCfg: RuntimeConfig,
enrForkId: ENRForkID, discoveryForkId: ENRForkID,
forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn,
switch: Switch, pubsub: GossipSub,
ip: Option[ValidIpAddress], tcpPort, udpPort: Option[Port],
privKey: keys.PrivateKey, discovery: bool,
rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} =
when not defined(local_testnet):
let
connectTimeout = 1.minutes
seenThreshold = 5.minutes
connectTimeout = chronos.minutes(1)
seenThreshold = chronos.minutes(5)
else:
let
connectTimeout = 10.seconds
seenThreshold = 10.seconds
connectTimeout = chronos.seconds(10)
seenThreshold = chronos.seconds(10)
type MetaData = altair.MetaData # Weird bug without this..
# Versions up to v22.3.0 would write an empty `MetaData` to
@ -1669,12 +1708,15 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
node.protocolStates[proto.index] = proto.networkStateInitializer(node)
for msg in proto.messages:
if msg.isLightClientRequest and not config.serveLightClientData.get:
continue
when config is BeaconNodeConf:
if msg.isLightClientRequest and not config.serveLightClientData.get:
continue
elif config is LightClientConf:
if not msg.isRequired:
continue
if msg.protocolMounter != nil:
msg.protocolMounter node
proc peerHook(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(node, peerId, event)
@ -1790,10 +1832,11 @@ proc registerMsg(protocol: ProtocolInfo,
name: string,
mounter: MounterProc,
libp2pCodecName: string,
isLightClientRequest: bool) =
isRequired, isLightClientRequest: bool) =
protocol.messages.add MessageInfo(name: name,
protocolMounter: mounter,
libp2pCodecName: libp2pCodecName,
isRequired: isRequired,
isLightClientRequest: isLightClientRequest)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
@ -1833,6 +1876,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
MsgRecName = msg.recName
MsgStrongRecName = msg.strongRecName
codecNameLit = getRequestProtoName(msg.procDef)
isRequiredLit = isRequiredProto(msg.procDef)
isLightClientRequestLit = isLightClientRequestProto(msg.procDef)
protocolMounterName = ident(msgName & "Mounter")
@ -1910,6 +1954,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
msgNameLit,
protocolMounterName,
codecNameLit,
isRequiredLit,
isLightClientRequestLit))
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
@ -1917,6 +1962,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
#Must import here because of cyclicity
import ../sync/sync_protocol
export sync_protocol
proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} =
trace "updating peer metadata", peerId
@ -2004,10 +2050,21 @@ proc initAddress*(T: type MultiAddress, str: string): T =
template tcpEndPoint(address, port): auto =
MultiAddress.init(address, tcpProtocol, port)
proc optimisticgetRandomNetKeys*(rng: var BrHmacDrbgContext): NetKeyPair =
let res = PrivateKey.random(Secp256k1, rng)
if res.isErr():
fatal "Could not generate random network key file"
quit QuitFailure
let
privKey = res.get()
pubKey = privKey.getPublicKey().expect("working public key from random")
NetKeyPair(seckey: privKey, pubkey: pubKey)
proc getPersistentNetKeys*(rng: var BrHmacDrbgContext,
config: BeaconNodeConf): NetKeyPair =
case config.cmd
of noCommand, record:
of BNStartUpCmd.noCommand, BNStartUpCmd.record:
if config.netKeyFile == "random":
let res = PrivateKey.random(Secp256k1, rng)
if res.isErr():
@ -2078,7 +2135,7 @@ proc getPersistentNetKeys*(rng: var BrHmacDrbgContext,
network_public_key = pubKey
NetKeyPair(seckey: privKey, pubkey: pubKey)
of createTestnet:
of BNStartUpCmd.createTestnet:
if config.netKeyFile == "random":
fatal "Could not create testnet using `random` network key"
quit QuitFailure
@ -2115,15 +2172,7 @@ proc getPersistentNetKeys*(rng: var BrHmacDrbgContext,
NetKeyPair(seckey: privKey, pubkey: pubKey)
else:
let res = PrivateKey.random(Secp256k1, rng)
if res.isErr():
fatal "Could not generate random network key file"
quit QuitFailure
let
privKey = res.get()
pubKey = privKey.getPublicKey().expect("working public key from random")
NetKeyPair(seckey: privKey, pubkey: pubKey)
optimisticgetRandomNetKeys(rng)
func gossipId(
data: openArray[byte], altairPrefix, topic: string): seq[byte] =
@ -2143,8 +2192,8 @@ func gossipId(
messageDigest.data[0..19]
proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey,
address: MultiAddress,
proc newBeaconSwitch*(config: BeaconNodeConf | LightClientConf,
seckey: PrivateKey, address: MultiAddress,
rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} =
SwitchBuilder
.new()
@ -2152,7 +2201,7 @@ proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey,
.withAddress(address)
.withRng(rng)
.withNoise()
.withMplex(5.minutes, 5.minutes)
.withMplex(chronos.minutes(5), chronos.minutes(5))
.withMaxConnections(config.maxPeers)
.withAgentVersion(config.agentString)
.withTcpTransport({ServerFlags.ReuseAddr})
@ -2181,7 +2230,7 @@ template gossipMaxSize(T: untyped): uint32 =
maxSize.uint32
proc createEth2Node*(rng: ref BrHmacDrbgContext,
config: BeaconNodeConf,
config: BeaconNodeConf | LightClientConf,
netKeys: NetKeyPair,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
@ -2230,8 +2279,8 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
let
params = GossipSubParams(
explicit: true,
pruneBackoff: 1.minutes,
unsubscribeBackoff: 10.seconds,
pruneBackoff: chronos.minutes(1),
unsubscribeBackoff: chronos.seconds(10),
floodPublish: true,
gossipFactor: 0.05,
d: 8,
@ -2240,18 +2289,18 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
dScore: 6,
dOut: 6 div 2, # less than dlow and no more than dlow/2
dLazy: 6,
heartbeatInterval: 700.milliseconds,
heartbeatInterval: chronos.milliseconds(700),
historyLength: 6,
historyGossip: 3,
fanoutTTL: 60.seconds,
seenTTL: 385.seconds,
fanoutTTL: chronos.seconds(60),
seenTTL: chronos.seconds(385),
gossipThreshold: -4000,
publishThreshold: -8000,
graylistThreshold: -16000, # also disconnect threshold
opportunisticGraftThreshold: 0,
decayInterval: 12.seconds,
decayInterval: chronos.seconds(12),
decayToZero: 0.01,
retainScore: 385.seconds,
retainScore: chronos.seconds(385),
appSpecificWeight: 0.0,
ipColocationFactorWeight: -53.75,
ipColocationFactorThreshold: 3.0,
@ -2488,10 +2537,7 @@ proc updateForkId*(node: Eth2Node, epoch: Epoch, genesis_validators_root: Eth2Di
node.discoveryForkId = getDiscoveryForkID(node.cfg, epoch, genesis_validators_root)
func forkDigestAtEpoch(node: Eth2Node, epoch: Epoch): ForkDigest =
case node.cfg.stateForkAtEpoch(epoch)
of BeaconStateFork.Bellatrix: node.forkDigests.bellatrix
of BeaconStateFork.Altair: node.forkDigests.altair
of BeaconStateFork.Phase0: node.forkDigests.phase0
node.forkDigests[].atEpoch(epoch, node.cfg)
proc getWallEpoch(node: Eth2Node): Epoch =
node.getBeaconTime().slotOrZero.epoch

View File

@ -543,7 +543,8 @@ proc acquire*[A, B](pool: PeerPool[A, B],
proc acquireNoWait*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): A =
PeerType.Outgoing}
): A {.raises: [PeerPoolError, Defect].} =
doAssert(filter != {}, "Filter must not be empty")
if pool.lenAvailable(filter) < 1:
raise newException(PeerPoolError, "Not enough peers in pool")

View File

@ -334,9 +334,6 @@ proc initFullNode(
const SlashingDbName = "slashing_protection"
# changing this requires physical file rename as well or history is lost.
func getBeaconTimeFn(clock: BeaconClock): GetBeaconTimeFn =
return proc(): BeaconTime = clock.now()
proc init*(T: type BeaconNode,
cfg: RuntimeConfig,
rng: ref BrHmacDrbgContext,
@ -1308,9 +1305,6 @@ proc installMessageValidators(node: BeaconNode) =
# subnets are subscribed to during any given epoch.
let forkDigests = node.dag.forkDigests
func toValidationResult(res: ValidationRes): ValidationResult =
if res.isOk(): ValidationResult.Accept else: res.error()[0]
node.network.addValidator(
getBeaconBlocksTopic(forkDigests.phase0),
proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult =

View File

@ -0,0 +1,99 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# This implements the pre-release proposal of the libp2p based light client sync
# protocol. See https://github.com/ethereum/consensus-specs/pull/2802
import
std/os,
chronicles, chronos,
eth/keys,
./spec/beaconstate,
"."/[light_client, nimbus_binary_common, version]
proc onFinalizedHeader(lightClient: LightClient) =
notice "New LC finalized header",
finalized_header = shortLog(lightClient.finalizedHeader.get)
proc onOptimisticHeader(lightClient: LightClient) =
notice "New LC optimistic header",
optimistic_header = shortLog(lightClient.optimisticHeader.get)
proc onSecond(
lightClient: LightClient,
config: LightClientConf,
getBeaconTime: GetBeaconTimeFn) =
## This procedure will be called once per second.
let wallSlot = getBeaconTime().slotOrZero()
checkIfShouldStopAtEpoch(wallSlot, config.stopAtEpoch)
lightClient.updateGossipStatus(wallSlot + 1)
proc runOnSecondLoop(
lightClient: LightClient,
config: LightClientConf,
getBeaconTime: GetBeaconTimeFn) {.async.} =
while true:
onSecond(lightClient, config, getBeaconTime)
await chronos.sleepAsync(chronos.seconds(1))
programMain:
var config = makeBannerAndConfig(
"Nimbus light client " & fullVersionStr, LightClientConf)
setupLogging(config.logLevel, config.logStdout, config.logFile)
notice "Launching light client",
version = fullVersionStr, cmdParams = commandLineParams(), config
let metadata = loadEth2Network(config.eth2Network)
for node in metadata.bootstrapNodes:
config.bootstrapNodes.add node
template cfg(): auto = metadata.cfg
let
genesisState =
try:
template genesisData(): auto = metadata.genesisData
newClone(readSszForkedHashedBeaconState(
cfg, genesisData.toOpenArrayByte(genesisData.low, genesisData.high)))
except CatchableError as err:
raiseAssert "Invalid baked-in state: " & err.msg
beaconClock = BeaconClock.init(
getStateField(genesisState[], genesis_time))
getBeaconTime = beaconClock.getBeaconTimeFn()
genesis_validators_root =
getStateField(genesisState[], genesis_validators_root)
forkDigests = newClone ForkDigests.init(cfg, genesis_validators_root)
genesisBlockRoot = get_initial_beacon_block(genesisState[]).root
rng = keys.newRng()
netKeys = optimisticgetRandomNetKeys(rng[])
network = createEth2Node(
rng, config, netKeys, cfg,
forkDigests, getBeaconTime, genesis_validators_root)
lightClient = createLightClient(
network, rng, config, cfg,
forkDigests, getBeaconTime, genesis_validators_root)
info "Listening to incoming network requests"
network.initBeaconSync(cfg, forkDigests, genesisBlockRoot, getBeaconTime)
lightClient.installMessageValidators()
waitFor network.startListening()
waitFor network.start()
lightClient.onFinalizedHeader = onFinalizedHeader
lightClient.onOptimisticHeader = onOptimisticHeader
lightClient.trustedBlockRoot = some config.trustedBlockRoot
lightClient.start()
asyncSpawn runOnSecondLoop(lightClient, config, getBeaconTime)
while true:
poll()

View File

@ -0,0 +1,7 @@
-d:"chronicles_sinks=textlines[dynamic],json[dynamic]"
-d:"chronicles_runtime_filtering=on"
-d:"chronicles_disable_thread_id"
@if release:
-d:"chronicles_line_numbers:0"
@end

View File

@ -351,6 +351,20 @@ func stateForkForDigest*(
else:
err()
func atStateFork*(
forkDigests: ForkDigests, stateFork: BeaconStateFork): ForkDigest =
case stateFork
of BeaconStateFork.Bellatrix:
forkDigests.bellatrix
of BeaconStateFork.Altair:
forkDigests.altair
of BeaconStateFork.Phase0:
forkDigests.phase0
template atEpoch*(
forkDigests: ForkDigests, epoch: Epoch, cfg: RuntimeConfig): ForkDigest =
forkDigests.atStateFork(cfg.stateForkAtEpoch(epoch))
template asSigned*(x: ForkedTrustedSignedBeaconBlock): ForkedSignedBeaconBlock =
isomorphicCast[ForkedSignedBeaconBlock](x)

View File

@ -138,9 +138,10 @@ func getDiscoveryForkID*(cfg: RuntimeConfig,
next_fork_epoch: FAR_FUTURE_EPOCH)
# https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/p2p-interface.md#transitioning-the-gossip
type GossipState* = set[BeaconStateFork]
func getTargetGossipState*(
epoch, ALTAIR_FORK_EPOCH, BELLATRIX_FORK_EPOCH: Epoch, isBehind: bool):
set[BeaconStateFork] =
GossipState =
if isBehind:
{}

View File

@ -0,0 +1,454 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
# This implements the pre-release proposal of the libp2p based light client sync
# protocol. See https://github.com/ethereum/consensus-specs/pull/2802
import chronos, chronicles, stew/base10
import
eth/p2p/discoveryv5/random2,
../spec/datatypes/[altair],
../networking/eth2_network,
../beacon_clock,
"."/sync_protocol, "."/sync_manager
export sync_manager
logScope:
topics = "lcman"
type
Nothing = object
ResponseError = object of CatchableError
Endpoint[K, V] =
(K, V) # https://github.com/nim-lang/Nim/issues/19531
Bootstrap =
Endpoint[Eth2Digest, altair.LightClientBootstrap]
UpdatesByRange =
Endpoint[Slice[SyncCommitteePeriod], altair.LightClientUpdate]
FinalityUpdate =
Endpoint[Nothing, altair.LightClientFinalityUpdate]
OptimisticUpdate =
Endpoint[Nothing, altair.LightClientOptimisticUpdate]
ValueVerifier[V] =
proc(v: V): Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].}
BootstrapVerifier* =
ValueVerifier[altair.LightClientBootstrap]
UpdateVerifier* =
ValueVerifier[altair.LightClientUpdate]
FinalityUpdateVerifier* =
ValueVerifier[altair.LightClientFinalityUpdate]
OptimisticUpdateVerifier* =
ValueVerifier[altair.LightClientOptimisticUpdate]
GetTrustedBlockRootCallback* =
proc(): Option[Eth2Digest] {.gcsafe, raises: [Defect].}
GetBoolCallback* =
proc(): bool {.gcsafe, raises: [Defect].}
GetSyncCommitteePeriodCallback* =
proc(): SyncCommitteePeriod {.gcsafe, raises: [Defect].}
LightClientManager* = object
network: Eth2Node
rng: ref BrHmacDrbgContext
getTrustedBlockRoot: GetTrustedBlockRootCallback
bootstrapVerifier: BootstrapVerifier
updateVerifier: UpdateVerifier
finalityUpdateVerifier: FinalityUpdateVerifier
optimisticUpdateVerifier: OptimisticUpdateVerifier
isLightClientStoreInitialized: GetBoolCallback
isNextSyncCommitteeKnown: GetBoolCallback
getFinalizedPeriod: GetSyncCommitteePeriodCallback
getOptimisticPeriod: GetSyncCommitteePeriodCallback
getBeaconTime: GetBeaconTimeFn
loopFuture: Future[void]
func init*(
T: type LightClientManager,
network: Eth2Node,
rng: ref BrHmacDrbgContext,
getTrustedBlockRoot: GetTrustedBlockRootCallback,
bootstrapVerifier: BootstrapVerifier,
updateVerifier: UpdateVerifier,
finalityUpdateVerifier: FinalityUpdateVerifier,
optimisticUpdateVerifier: OptimisticUpdateVerifier,
isLightClientStoreInitialized: GetBoolCallback,
isNextSyncCommitteeKnown: GetBoolCallback,
getFinalizedPeriod: GetSyncCommitteePeriodCallback,
getOptimisticPeriod: GetSyncCommitteePeriodCallback,
getBeaconTime: GetBeaconTimeFn
): LightClientManager =
## Initialize light client manager.
LightClientManager(
network: network,
rng: rng,
getTrustedBlockRoot: getTrustedBlockRoot,
bootstrapVerifier: bootstrapVerifier,
updateVerifier: updateVerifier,
finalityUpdateVerifier: finalityUpdateVerifier,
optimisticUpdateVerifier: optimisticUpdateVerifier,
isLightClientStoreInitialized: isLightClientStoreInitialized,
isNextSyncCommitteeKnown: isNextSyncCommitteeKnown,
getFinalizedPeriod: getFinalizedPeriod,
getOptimisticPeriod: getOptimisticPeriod,
getBeaconTime: getBeaconTime
)
proc isGossipSupported*(
self: LightClientManager,
period: SyncCommitteePeriod
): bool =
## Indicate whether the light client is sufficiently synced to accept gossip.
if not self.isLightClientStoreInitialized():
return false
let
finalizedPeriod = self.getFinalizedPeriod()
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
if isNextSyncCommitteeKnown:
period <= finalizedPeriod + 1
else:
period <= finalizedPeriod
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#getlightclientbootstrap
proc doRequest(
e: typedesc[Bootstrap],
peer: Peer,
blockRoot: Eth2Digest
): Future[NetRes[altair.LightClientBootstrap]] {.
raises: [Defect, IOError].} =
peer.lightClientBootstrap(blockRoot)
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#lightclientupdatesbyrange
type LightClientUpdatesByRangeResponse = NetRes[seq[altair.LightClientUpdate]]
proc doRequest(
e: typedesc[UpdatesByRange],
peer: Peer,
periods: Slice[SyncCommitteePeriod]
): Future[LightClientUpdatesByRangeResponse] {.
async, raises: [Defect, IOError].} =
let
startPeriod = periods.a
lastPeriod = periods.b
reqCount = min(periods.len, MAX_REQUEST_LIGHT_CLIENT_UPDATES).uint64
let response = await peer.lightClientUpdatesByRange(startPeriod, reqCount)
if response.isOk:
if response.get.lenu64 > reqCount:
raise newException(ResponseError, "Too many values in response" &
" (" & Base10.toString(response.get.lenu64) &
" > " & Base10.toString(reqCount.uint) & ")")
var expectedPeriod = startPeriod
for update in response.get:
let
attestedPeriod = update.attested_header.slot.sync_committee_period
signaturePeriod = update.signature_slot.sync_committee_period
if attestedPeriod != update.signature_slot.sync_committee_period:
raise newException(ResponseError, "Conflicting sync committee periods" &
" (signature: " & Base10.toString(distinctBase(signaturePeriod)) &
" != " & Base10.toString(distinctBase(attestedPeriod)) & ")")
if attestedPeriod < expectedPeriod:
raise newException(ResponseError, "Unexpected sync committee period" &
" (" & Base10.toString(distinctBase(attestedPeriod)) &
" < " & Base10.toString(distinctBase(expectedPeriod)) & ")")
if attestedPeriod > expectedPeriod:
if attestedPeriod > lastPeriod:
raise newException(ResponseError, "Sync committee period too high" &
" (" & Base10.toString(distinctBase(attestedPeriod)) &
" > " & Base10.toString(distinctBase(lastPeriod)) & ")")
expectedPeriod = attestedPeriod
inc expectedPeriod
return response
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#getlightclientfinalityupdate
proc doRequest(
e: typedesc[FinalityUpdate],
peer: Peer
): Future[NetRes[altair.LightClientFinalityUpdate]] {.
raises: [Defect, IOError].} =
peer.lightClientFinalityUpdate()
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#getlightclientoptimisticupdate
proc doRequest(
e: typedesc[OptimisticUpdate],
peer: Peer
): Future[NetRes[altair.LightClientOptimisticUpdate]] {.
raises: [Defect, IOError].} =
peer.lightClientOptimisticUpdate()
template valueVerifier[E](
self: LightClientManager,
e: typedesc[E]
): ValueVerifier[E.V] =
when E.V is altair.LightClientBootstrap:
self.bootstrapVerifier
elif E.V is altair.LightClientUpdate:
self.updateVerifier
elif E.V is altair.LightClientFinalityUpdate:
self.finalityUpdateVerifier
elif E.V is altair.LightClientOptimisticUpdate:
self.optimisticUpdateVerifier
else: static: doAssert false
iterator values(v: auto): auto =
## Local helper for `workerTask` to share the same implementation for both
## scalar and aggregate values, by treating scalars as 1-length aggregates.
when v is seq:
for i in v:
yield i
else:
yield v
proc workerTask[E](
self: LightClientManager,
e: typedesc[E],
key: E.K
): Future[bool] {.async.} =
var
peer: Peer
didProgress = false
try:
peer = self.network.peerPool.acquireNoWait()
let value =
when E.K is Nothing:
await E.doRequest(peer)
else:
await E.doRequest(peer, key)
if value.isOk:
var applyReward = false
for val in value.get.values:
let res = await self.valueVerifier(E)(val)
if res.isErr:
case res.error
of BlockError.MissingParent:
# Stop, requires different request to progress
return didProgress
of BlockError.Duplicate:
# Ignore, a concurrent request may have already fulfilled this
when E.V is altair.LightClientBootstrap:
didProgress = true
else:
discard
of BlockError.UnviableFork:
# Descore, peer is on an incompatible fork version
notice "Received value from an unviable fork", value = val.shortLog,
endpoint = E.name, peer, peer_score = peer.getScore()
peer.updateScore(PeerScoreUnviableFork)
return didProgress
of BlockError.Invalid:
# Descore, received data is malformed
warn "Received invalid value", value = val.shortLog,
endpoint = E.name, peer, peer_score = peer.getScore()
peer.updateScore(PeerScoreBadBlocks)
return didProgress
else:
# Reward, peer returned something useful
applyReward = true
didProgress = true
if applyReward:
peer.updateScore(PeerScoreGoodBlocks)
else:
peer.updateScore(PeerScoreNoBlocks)
debug "Failed to receive value on request", value,
endpoint = E.name, peer, peer_score = peer.getScore()
except ResponseError as exc:
warn "Received invalid response", error = exc.msg,
endpoint = E.name, peer, peer_score = peer.getScore()
peer.updateScore(PeerScoreBadBlocks)
except CancelledError as exc:
raise exc
except PeerPoolError as exc:
debug "Failed to acquire peer", exc = exc.msg
except CatchableError as exc:
if peer != nil:
peer.updateScore(PeerScoreNoBlocks)
debug "Unexpected exception while receiving value", exc = exc.msg,
endpoint = E.name, peer, peer_score = peer.getScore()
raise exc
finally:
if peer != nil:
self.network.peerPool.release(peer)
return didProgress
proc query[E](
self: LightClientManager,
e: typedesc[E],
key: E.K
): Future[bool] {.async.} =
const PARALLEL_REQUESTS = 2
var workers: array[PARALLEL_REQUESTS, Future[bool]]
let
progressFut = newFuture[void]("lcmanProgress")
doneFut = newFuture[void]("lcmanDone")
var
numCompleted = 0
maxCompleted = workers.len
proc handleFinishedWorker(future: pointer) =
try:
let didProgress = cast[Future[bool]](future).read()
if didProgress and not progressFut.finished:
progressFut.complete()
except CancelledError as exc:
if not progressFut.finished:
progressFut.cancel()
except CatchableError as exc:
discard
finally:
inc numCompleted
if numCompleted == maxCompleted:
doneFut.complete()
try:
# Start concurrent workers
for i in 0 ..< workers.len:
try:
workers[i] = self.workerTask(e, key)
workers[i].addCallback(handleFinishedWorker)
except CancelledError as exc:
raise exc
except CatchableError as exc:
workers[i] = newFuture[bool]()
workers[i].complete(false)
# Wait for any worker to report progress, or for all workers to finish
discard await race(progressFut, doneFut)
finally:
for i in 0 ..< maxCompleted:
if workers[i] == nil:
maxCompleted = i
if numCompleted == maxCompleted:
doneFut.complete()
break
if not workers[i].finished:
workers[i].cancel()
while true:
try:
await allFutures(workers[0 ..< maxCompleted])
break
except CancelledError as exc:
continue
while true:
try:
await doneFut
break
except CancelledError as exc:
continue
if not progressFut.finished:
progressFut.cancel()
return progressFut.completed
template query(
self: LightClientManager,
e: typedesc[UpdatesByRange],
key: SyncCommitteePeriod
): Future[bool] =
self.query(e, key .. key)
template query[E](
self: LightClientManager,
e: typedesc[E]
): Future[bool] =
self.query(e, Nothing())
type SchedulingMode = enum
Soon,
CurrentPeriod,
NextPeriod
func fetchTime(
self: LightClientManager,
wallTime: BeaconTime,
schedulingMode: SchedulingMode
): BeaconTime =
let
remainingTime =
case schedulingMode:
of Soon:
chronos.seconds(0)
of CurrentPeriod:
let
wallPeriod = wallTime.slotOrZero().sync_committee_period
deadlineSlot = (wallPeriod + 1).start_slot - 1
deadline = deadlineSlot.start_beacon_time()
chronos.nanoseconds((deadline - wallTime).nanoseconds)
of NextPeriod:
chronos.seconds(
(SLOTS_PER_SYNC_COMMITTEE_PERIOD * SECONDS_PER_SLOT).int64)
minDelay = max(remainingTime div 8, chronos.seconds(30))
jitterSeconds = (minDelay * 2).seconds
jitterDelay = chronos.seconds(self.rng[].rand(jitterSeconds).int64)
return wallTime + minDelay + jitterDelay
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light-client-sync-process
proc loop(self: LightClientManager) {.async.} =
var nextFetchTime = self.getBeaconTime()
while true:
# Periodically wake and check for changes
let wallTime = self.getBeaconTime()
if wallTime < nextFetchTime or
self.network.peerPool.lenAvailable < 1:
await sleepAsync(chronos.seconds(2))
continue
# Obtain bootstrap data once a trusted block root is supplied
if not self.isLightClientStoreInitialized():
let trustedBlockRoot = self.getTrustedBlockRoot()
if trustedBlockRoot.isNone:
await sleepAsync(chronos.seconds(2))
continue
let didProgress = await self.query(Bootstrap, trustedBlockRoot.get)
if not didProgress:
nextFetchTime = self.fetchTime(wallTime, Soon)
continue
# Fetch updates
var allowWaitNextPeriod = false
let
finalized = self.getFinalizedPeriod()
optimistic = self.getOptimisticPeriod()
current = wallTime.slotOrZero().sync_committee_period
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
didProgress =
if finalized == optimistic and not isNextSyncCommitteeKnown:
if finalized >= current:
await self.query(UpdatesByRange, finalized)
else:
await self.query(UpdatesByRange, finalized ..< current)
elif finalized + 1 < current:
await self.query(UpdatesByRange, finalized + 1 ..< current)
elif finalized != optimistic:
await self.query(FinalityUpdate)
else:
allowWaitNextPeriod = true
await self.query(OptimisticUpdate)
schedulingMode =
if not didProgress or not self.isGossipSupported(current):
Soon
elif not allowWaitNextPeriod:
CurrentPeriod
else:
NextPeriod
nextFetchTime = self.fetchTime(wallTime, schedulingMode)
proc start*(self: var LightClientManager) =
## Start light client manager's loop.
doAssert self.loopFuture == nil
self.loopFuture = self.loop()
proc stop*(self: var LightClientManager) {.async.} =
## Stop light client manager's loop.
if self.loopFuture != nil:
await self.loopFuture.cancelAndWait()
self.loopFuture = nil

View File

@ -95,7 +95,8 @@ proc initQueue[A, B](man: SyncManager[A, B]) =
of SyncQueueKind.Forward:
man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(),
man.getLastSlot(), man.chunkSize,
man.getSafeSlot, man.blockVerifier, 1)
man.getSafeSlot, man.blockVerifier, 1,
man.ident)
of SyncQueueKind.Backward:
let
firstSlot = man.getFirstSlot()
@ -108,7 +109,8 @@ proc initQueue[A, B](man: SyncManager[A, B]) =
Slot(firstSlot - 1'u64)
man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot,
man.chunkSize, man.getSafeSlot,
man.blockVerifier, 1)
man.blockVerifier, 1,
man.ident)
proc newSyncManager*[A, B](pool: PeerPool[A, B],
direction: SyncQueueKind,

View File

@ -31,7 +31,7 @@ const
blockByRangeLookupCost = allowedOpsPerSecondCost(20)
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#configuration
MAX_REQUEST_LIGHT_CLIENT_UPDATES = 128
MAX_REQUEST_LIGHT_CLIENT_UPDATES* = 128
lightClientEmptyResponseCost = allowedOpsPerSecondCost(50)
lightClientBootstrapLookupCost = allowedOpsPerSecondCost(5)
lightClientBootstrapResponseCost = allowedOpsPerSecondCost(100)
@ -59,9 +59,12 @@ type
else:
index: uint32
BeaconSyncNetworkState* = ref object
dag*: ChainDAGRef
getBeaconTime*: GetBeaconTimeFn
BeaconSyncNetworkState = ref object
dag: ChainDAGRef
cfg: RuntimeConfig
forkDigests: ref ForkDigests
genesisBlockRoot: Eth2Digest
getBeaconTime: GetBeaconTimeFn
BeaconSyncPeerState* = ref object
statusLastTime*: chronos.Moment
@ -151,17 +154,29 @@ func disconnectReasonName(reason: uint64): string =
elif reason == uint64(FaultOrError): "Fault or error"
else: "Disconnected (" & $reason & ")"
func forkDigestAtEpoch(state: BeaconSyncNetworkState,
epoch: Epoch): ForkDigest =
state.forkDigests[].atEpoch(epoch, state.cfg)
proc getCurrentStatus(state: BeaconSyncNetworkState): StatusMsg =
let
dag = state.dag
wallSlot = state.getBeaconTime().slotOrZero
StatusMsg(
forkDigest: dag.forkDigestAtEpoch(wallSlot.epoch),
finalizedRoot: dag.finalizedHead.blck.root,
finalizedEpoch: dag.finalizedHead.slot.epoch,
headRoot: dag.head.root,
headSlot: dag.head.slot)
if dag != nil:
StatusMsg(
forkDigest: state.forkDigestAtEpoch(wallSlot.epoch),
finalizedRoot: dag.finalizedHead.blck.root,
finalizedEpoch: dag.finalizedHead.slot.epoch,
headRoot: dag.head.root,
headSlot: dag.head.slot)
else:
StatusMsg(
forkDigest: state.forkDigestAtEpoch(wallSlot.epoch),
finalizedRoot: state.genesisBlockRoot,
finalizedEpoch: GENESIS_EPOCH,
headRoot: state.genesisBlockRoot,
headSlot: GENESIS_SLOT)
proc checkStatusMsg(state: BeaconSyncNetworkState, status: StatusMsg):
Result[void, cstring] =
@ -176,15 +191,20 @@ proc checkStatusMsg(state: BeaconSyncNetworkState, status: StatusMsg):
if status.headSlot > wallSlot:
return err("head more recent than wall clock")
if dag.forkDigestAtEpoch(wallSlot.epoch) != status.forkDigest:
if state.forkDigestAtEpoch(wallSlot.epoch) != status.forkDigest:
return err("fork digests differ")
if status.finalizedEpoch <= dag.finalizedHead.slot.epoch:
let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot())
if blockId.isSome and
(not status.finalizedRoot.isZero) and
status.finalizedRoot != blockId.get().bid.root:
return err("peer following different finality")
if dag != nil:
if status.finalizedEpoch <= dag.finalizedHead.slot.epoch:
let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot())
if blockId.isSome and
(not status.finalizedRoot.isZero) and
status.finalizedRoot != blockId.get().bid.root:
return err("peer following different finality")
else:
if status.finalizedEpoch == GENESIS_EPOCH:
if status.finalizedRoot != state.genesisBlockRoot:
return err("peer following different finality")
ok()
@ -229,22 +249,22 @@ p2pProtocol BeaconSync(version = 1,
proc status(peer: Peer,
theirStatus: StatusMsg,
response: SingleChunkResponse[StatusMsg])
{.async, libp2pProtocol("status", 1).} =
{.async, libp2pProtocol("status", 1, isRequired = true).} =
let ourStatus = peer.networkState.getCurrentStatus()
trace "Sending status message", peer = peer, status = ourStatus
await response.send(ourStatus)
discard await peer.handleStatus(peer.networkState, theirStatus)
proc ping(peer: Peer, value: uint64): uint64
{.libp2pProtocol("ping", 1).} =
{.libp2pProtocol("ping", 1, isRequired = true).} =
return peer.network.metadata.seq_number
proc getMetaData(peer: Peer): phase0.MetaData
{.libp2pProtocol("metadata", 1).} =
{.libp2pProtocol("metadata", 1, isRequired = true).} =
return peer.network.phase0metadata
proc getMetadata_v2(peer: Peer): altair.MetaData
{.libp2pProtocol("metadata", 2).} =
{.libp2pProtocol("metadata", 2, isRequired = true).} =
return peer.network.metadata
proc beaconBlocksByRange(
@ -439,7 +459,7 @@ p2pProtocol BeaconSync(version = 1,
await response.writeBytesSZ(
uncompressedLen, bytes,
dag.forkDigestAtEpoch(blocks[i].slot.epoch).data)
peer.networkState.forkDigestAtEpoch(blocks[i].slot.epoch).data)
inc found
@ -495,7 +515,7 @@ p2pProtocol BeaconSync(version = 1,
await response.writeBytesSZ(
uncompressedLen, bytes,
dag.forkDigestAtEpoch(blockRef.slot.epoch).data)
peer.networkState.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found
@ -520,7 +540,7 @@ p2pProtocol BeaconSync(version = 1,
if bootstrap.isOk:
let
contextEpoch = bootstrap.get.header.slot.epoch
contextBytes = dag.forkDigestAtEpoch(contextEpoch).data
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
await response.send(bootstrap.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
@ -565,7 +585,7 @@ p2pProtocol BeaconSync(version = 1,
if update.isSome:
let
contextEpoch = update.get.attested_header.slot.epoch
contextBytes = dag.forkDigestAtEpoch(contextEpoch).data
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
await response.write(update.get, contextBytes)
inc found
@ -589,7 +609,7 @@ p2pProtocol BeaconSync(version = 1,
if finality_update.isSome:
let
contextEpoch = finality_update.get.attested_header.slot.epoch
contextBytes = dag.forkDigestAtEpoch(contextEpoch).data
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
await response.send(finality_update.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
@ -616,7 +636,7 @@ p2pProtocol BeaconSync(version = 1,
if optimistic_update.isSome:
let
contextEpoch = optimistic_update.get.attested_header.slot.epoch
contextBytes = dag.forkDigestAtEpoch(contextEpoch).data
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
await response.send(optimistic_update.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
@ -629,7 +649,7 @@ p2pProtocol BeaconSync(version = 1,
proc goodbye(peer: Peer,
reason: uint64)
{.async, libp2pProtocol("goodbye", 1).} =
{.async, libp2pProtocol("goodbye", 1, isRequired = true).} =
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer
proc useSyncV2*(state: BeaconSyncNetworkState): bool =
@ -689,4 +709,19 @@ proc initBeaconSync*(network: Eth2Node, dag: ChainDAGRef,
getBeaconTime: GetBeaconTimeFn) =
var networkState = network.protocolState(BeaconSync)
networkState.dag = dag
networkState.cfg = dag.cfg
networkState.forkDigests = dag.forkDigests
networkState.genesisBlockRoot = dag.genesisBlockRoot
networkState.getBeaconTime = getBeaconTime
proc initBeaconSync*(network: Eth2Node,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
genesisBlockRoot: Eth2Digest,
getBeaconTime: GetBeaconTimeFn) =
var networkState = network.protocolState(BeaconSync)
networkState.dag = nil
networkState.cfg = cfg
networkState.forkDigests = forkDigests
networkState.genesisBlockRoot = genesisBlockRoot
networkState.getBeaconTime = getBeaconTime

View File

@ -42,6 +42,9 @@ if [[ ${PIPESTATUS[0]} != 4 ]]; then
exit 1
fi
CURL_BINARY="$(command -v curl)" || { echo "Curl not installed. Aborting."; exit 1; }
JQ_BINARY="$(command -v jq)" || { echo "Jq not installed. Aborting."; exit 1; }
OPTS="ht:n:d:g"
LONGOPTS="help,preset:,nodes:,data-dir:,remote-validators-count:,threshold:,remote-signers:,with-ganache,stop-at-epoch:,disable-htop,disable-vc,enable-logtrace,log-level:,base-port:,base-rest-port:,base-metrics-port:,reuse-existing-data-dir,reuse-binaries,timeout:,kill-old-processes,eth2-docker-image:,lighthouse-vc-nodes:"
@ -68,6 +71,7 @@ ETH2_DOCKER_IMAGE=""
REMOTE_SIGNER_NODES=0
REMOTE_SIGNER_THRESHOLD=1
REMOTE_VALIDATORS_COUNT=0
LC_NODES=1
print_help() {
cat <<EOF
@ -102,6 +106,7 @@ CI run: $(basename "$0") --disable-htop -- --verify-finalization
--threshold used by a threshold secret sharing mechanism and determine how many shares are need to
restore signature of the original secret key
--remote-signers number of remote signing nodes
--light-clients number of light clients
EOF
}
@ -207,6 +212,10 @@ while true; do
LIGHTHOUSE_VC_NODES="$2"
shift 2
;;
--light-clients)
LC_NODES="$2"
shift 2
;;
--)
shift
break
@ -302,7 +311,7 @@ LH_BINARY="lighthouse-${LH_VERSION}"
if [[ "${USE_VC}" == "1" && "${LIGHTHOUSE_VC_NODES}" != "0" && ! -e "build/${LH_BINARY}" ]]; then
pushd "build" >/dev/null
curl -sSLO "${LH_URL}"
"${CURL_BINARY}" -sSLO "${LH_URL}"
tar -xzf "${LH_TARBALL}" # contains just one file named "lighthouse"
rm lighthouse-* # deletes both the tarball and old binary versions
mv lighthouse "${LH_BINARY}"
@ -320,6 +329,10 @@ if [[ "${USE_VC}" == "1" ]]; then
BINARIES="${BINARIES} nimbus_validator_client"
fi
if [ "$LC_NODES" -ge "1" ]; then
BINARIES="${BINARIES} nimbus_light_client"
fi
if [[ "$ENABLE_LOGTRACE" == "1" ]]; then
BINARIES="${BINARIES} logtrace"
fi
@ -355,11 +368,13 @@ cleanup() {
pkill -f -P $$ nimbus_beacon_node &>/dev/null || true
pkill -f -P $$ nimbus_validator_client &>/dev/null || true
pkill -f -P $$ nimbus_signing_node &>/dev/null || true
pkill -f -P $$ nimbus_light_client &>/dev/null || true
pkill -f -P $$ ${LH_BINARY} &>/dev/null || true
sleep 2
pkill -f -9 -P $$ nimbus_beacon_node &>/dev/null || true
pkill -f -9 -P $$ nimbus_validator_client &>/dev/null || true
pkill -f -9 -P $$ nimbus_signing_node &>/dev/null || true
pkill -f -9 -P $$ nimbus_light_client &>/dev/null || true
pkill -f -9 -P $$ ${LH_BINARY} &>/dev/null || true
# Delete all binaries we just built, because these are unusable outside this
@ -391,7 +406,7 @@ fi
REMOTE_URLS=""
for NUM_REMOTE in $(seq 0 $(( REMOTE_SIGNER_NODES - 1 ))); do
REMOTE_PORT=$(( BASE_REMOTE_SIGNER_PORT + NUM_REMOTE ))
REMOTE_PORT=$(( BASE_REMOTE_SIGNER_PORT + NUM_REMOTE ))
REMOTE_URLS="${REMOTE_URLS} --remote-signer=http://127.0.0.1:${REMOTE_PORT}"
done
@ -538,6 +553,10 @@ if [ "$REMOTE_SIGNER_NODES" -ge "0" ]; then
NUM_JOBS=$((NUM_JOBS + REMOTE_SIGNER_NODES ))
fi
if [ "$LC_NODES" -ge "1" ]; then
NUM_JOBS=$((NUM_JOBS + LC_NODES ))
fi
VALIDATORS_PER_VALIDATOR=$(( (SYSTEM_VALIDATORS / NODES_WITH_VALIDATORS) / 2 ))
VALIDATOR_OFFSET=$((SYSTEM_VALIDATORS / 2))
@ -650,7 +669,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
--config-file="${CLI_CONF_FILE}" \
--tcp-port=$(( BASE_PORT + NUM_NODE )) \
--udp-port=$(( BASE_PORT + NUM_NODE )) \
--max-peers=$(( NUM_NODES - 1 )) \
--max-peers=$(( NUM_NODES + LC_NODES - 1 )) \
--data-dir="${CONTAINER_NODE_DATA_DIR}" \
${BOOTSTRAP_ARG} \
${WEB3_ARG} \
@ -696,6 +715,46 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
fi
done
# light clients
if [ "$LC_NODES" -ge "1" ]; then
echo "Waiting for Altair finalization"
ALTAIR_FORK_EPOCH="$(
"${CURL_BINARY}" -s "http://localhost:${BASE_REST_PORT}/eth/v1/config/spec" | \
"${JQ_BINARY}" -r '.data.ALTAIR_FORK_EPOCH')"
while :; do
CURRENT_FORK_EPOCH="$(
"${CURL_BINARY}" -s "http://localhost:${BASE_REST_PORT}/eth/v1/beacon/states/finalized/fork" | \
"${JQ_BINARY}" -r '.data.epoch')"
if [ "${CURRENT_FORK_EPOCH}" -ge "${ALTAIR_FORK_EPOCH}" ]; then
break
fi
sleep 1
done
echo "Altair finalized, launching $LC_NODES light client(s)"
LC_BOOTSTRAP_NODE="$(
"${CURL_BINARY}" -s "http://localhost:${BASE_REST_PORT}/eth/v1/node/identity" | \
"${JQ_BINARY}" -r '.data.enr')"
LC_TRUSTED_BLOCK_ROOT="$(
"${CURL_BINARY}" -s "http://localhost:${BASE_REST_PORT}/eth/v1/beacon/headers/finalized" | \
"${JQ_BINARY}" -r '.data.root')"
for NUM_LC in $(seq 0 $(( LC_NODES - 1 ))); do
./build/nimbus_light_client \
--log-level="${LOG_LEVEL}" \
--log-format="json" \
--network="${CONTAINER_DATA_DIR}" \
--bootstrap-node="${LC_BOOTSTRAP_NODE}" \
--tcp-port=$(( BASE_PORT + NUM_NODES + NUM_LC )) \
--udp-port=$(( BASE_PORT + NUM_NODES + NUM_LC )) \
--max-peers=$(( NUM_NODES + LC_NODES - 1 )) \
--nat="extip:127.0.0.1" \
--trusted-block-root="${LC_TRUSTED_BLOCK_ROOT}" \
${STOP_AT_EPOCH_FLAG} \
&> "${DATA_DIR}/log_lc${NUM_LC}.txt" &
PIDS="${PIDS},$!"
done
fi
# give the regular nodes time to crash
sleep 5
BG_JOBS="$(jobs | wc -l | tr -d ' ')"
@ -703,7 +762,7 @@ if [[ "${TIMEOUT_DURATION}" != "0" ]]; then
BG_JOBS=$(( BG_JOBS - 1 )) # minus the timeout bg job
fi
if [[ "$BG_JOBS" != "$NUM_JOBS" ]]; then
echo "$(( NUM_JOBS - BG_JOBS )) nimbus_beacon_node/nimbus_validator_client instance(s) exited early. Aborting."
echo "$(( NUM_JOBS - BG_JOBS )) nimbus_beacon_node/nimbus_validator_client/nimbus_light_client instance(s) exited early. Aborting."
dump_logs
dump_logtrace
exit 1

View File

@ -58,6 +58,8 @@ suite "Light client processor" & preset():
let
genesis_validators_root = dag.genesis_validators_root
trustedBlockRoot = dag.head.root
proc getTrustedBlockRoot(): Option[Eth2Digest] =
some trustedBlockRoot
const
lowPeriod = 0.SyncCommitteePeriod
@ -83,15 +85,15 @@ suite "Light client processor" & preset():
func setTimeToSlot(slot: Slot) =
time = chronos.seconds((slot * SECONDS_PER_SLOT).int64)
var numDidInitializeStoreCalls = 0
proc didInitializeStore() = inc numDidInitializeStoreCalls
var numOnStoreInitializedCalls = 0
proc onStoreInitialized() = inc numOnStoreInitializedCalls
let store = (ref Option[LightClientStore])()
var
processor = LightClientProcessor.new(
false, "", "", cfg, genesis_validators_root, trustedBlockRoot,
store, getBeaconTime, didInitializeStore)
res: Result[void, BlockError]
false, "", "", cfg, genesis_validators_root,
store, getBeaconTime, getTrustedBlockRoot, onStoreInitialized)
res: Result[bool, BlockError]
test "Sync" & preset():
let bootstrap = dag.getLightClientBootstrap(trustedBlockRoot)
@ -101,7 +103,7 @@ suite "Light client processor" & preset():
MsgSource.gossip, getBeaconTime(), bootstrap.get)
check:
res.isOk
numDidInitializeStoreCalls == 1
numOnStoreInitializedCalls == 1
for period in lowPeriod .. lastPeriodWithSupermajority:
let update = dag.getLightClientUpdateForPeriod(period)
@ -177,7 +179,7 @@ suite "Light client processor" & preset():
store[].get.optimistic_header == finalityUpdate.get.attested_header
else:
check res.error == BlockError.Duplicate
check numDidInitializeStoreCalls == 1
check numOnStoreInitializedCalls == 1
test "Invalid bootstrap" & preset():
var bootstrap = dag.getLightClientBootstrap(trustedBlockRoot)
@ -189,7 +191,7 @@ suite "Light client processor" & preset():
check:
res.isErr
res.error == BlockError.Invalid
numDidInitializeStoreCalls == 0
numOnStoreInitializedCalls == 0
test "Duplicate bootstrap" & preset():
let bootstrap = dag.getLightClientBootstrap(trustedBlockRoot)
@ -199,13 +201,13 @@ suite "Light client processor" & preset():
MsgSource.gossip, getBeaconTime(), bootstrap.get)
check:
res.isOk
numDidInitializeStoreCalls == 1
numOnStoreInitializedCalls == 1
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), bootstrap.get)
check:
res.isErr
res.error == BlockError.Duplicate
numDidInitializeStoreCalls == 1
numOnStoreInitializedCalls == 1
test "Missing bootstrap (update)" & preset():
let update = dag.getLightClientUpdateForPeriod(lowPeriod)
@ -216,7 +218,7 @@ suite "Light client processor" & preset():
check:
res.isErr
res.error == BlockError.MissingParent
numDidInitializeStoreCalls == 0
numOnStoreInitializedCalls == 0
test "Missing bootstrap (finality update)" & preset():
let finalityUpdate = dag.getLightClientFinalityUpdate()
@ -227,7 +229,7 @@ suite "Light client processor" & preset():
check:
res.isErr
res.error == BlockError.MissingParent
numDidInitializeStoreCalls == 0
numOnStoreInitializedCalls == 0
test "Missing bootstrap (optimistic update)" & preset():
let optimisticUpdate = dag.getLightClientOptimisticUpdate()
@ -238,4 +240,4 @@ suite "Light client processor" & preset():
check:
res.isErr
res.error == BlockError.MissingParent
numDidInitializeStoreCalls == 0
numOnStoreInitializedCalls == 0