From 01efa93cf6254ae42f17522aa4b997c539f9eac0 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Tue, 31 May 2022 12:45:37 +0200 Subject: [PATCH] add light client (standalone) (#3653) Introduces a new library for syncing using libp2p based light client sync protocol, and adds a new `nimbus_light_client` executable that uses this library for syncing. The new executable emits log messages when new beacon block headers are received, and is integrated into testing. --- Makefile | 43 +- beacon_chain/beacon_clock.nim | 3 + beacon_chain/beacon_node.nim | 2 - beacon_chain/conf.nim | 15 +- beacon_chain/conf_light_client.nim | 131 +++++ .../consensus_object_pools/blockchain_dag.nim | 9 +- .../gossip_processing/eth2_processor.nim | 3 + .../light_client_processor.nim | 208 ++++++-- beacon_chain/light_client.nim | 256 ++++++++++ beacon_chain/networking/eth2_discovery.nim | 11 +- beacon_chain/networking/eth2_network.nim | 156 +++--- beacon_chain/networking/peer_pool.nim | 3 +- beacon_chain/nimbus_beacon_node.nim | 6 - beacon_chain/nimbus_light_client.nim | 99 ++++ beacon_chain/nimbus_light_client.nim.cfg | 7 + beacon_chain/spec/forks.nim | 14 + beacon_chain/spec/network.nim | 3 +- beacon_chain/sync/light_client_manager.nim | 454 ++++++++++++++++++ beacon_chain/sync/sync_manager.nim | 6 +- beacon_chain/sync/sync_protocol.nim | 91 ++-- scripts/launch_local_testnet.sh | 67 ++- tests/test_light_client_processor.nim | 28 +- 22 files changed, 1440 insertions(+), 175 deletions(-) create mode 100644 beacon_chain/conf_light_client.nim create mode 100644 beacon_chain/light_client.nim create mode 100644 beacon_chain/nimbus_light_client.nim create mode 100644 beacon_chain/nimbus_light_client.nim.cfg create mode 100644 beacon_chain/sync/light_client_manager.nim diff --git a/Makefile b/Makefile index b0c4730c8..4d620f2ab 100644 --- a/Makefile +++ b/Makefile @@ -35,17 +35,17 @@ CPU_LIMIT := 0 BUILD_END_MSG := "\\x1B[92mBuild completed successfully:\\x1B[39m" ifeq ($(CPU_LIMIT), 0) - CPU_LIMIT_CMD := + CPU_LIMIT_CMD := else - CPU_LIMIT_CMD := cpulimit --limit=$(CPU_LIMIT) --foreground -- + CPU_LIMIT_CMD := cpulimit --limit=$(CPU_LIMIT) --foreground -- endif # TODO: move this to nimbus-build-system ifeq ($(shell uname), Darwin) - # Scary warnings in large volume: https://github.com/status-im/nimbus-eth2/issues/3076 - SILENCE_WARNINGS := 2>&1 | grep -v "failed to insert symbol" | grep -v "could not find object file symbol for symbol" || true + # Scary warnings in large volume: https://github.com/status-im/nimbus-eth2/issues/3076 + SILENCE_WARNINGS := 2>&1 | grep -v "failed to insert symbol" | grep -v "could not find object file symbol for symbol" || true else - SILENCE_WARNINGS := + SILENCE_WARNINGS := endif # unconditionally built by the default Make target @@ -60,6 +60,7 @@ TOOLS := \ ncli_split_keystore \ wss_sim \ stack_sizes \ + nimbus_light_client \ nimbus_validator_client \ nimbus_signing_node \ validator_db_aggregator @@ -267,12 +268,12 @@ clean_eth2_network_simulation_all: rm -rf tests/simulation/{data,validators} GOERLI_TESTNETS_PARAMS := \ - --tcp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \ - --udp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \ - --metrics \ - --metrics-port=$$(( $(BASE_METRICS_PORT) + $(NODE_ID) )) \ - --rest \ - --rest-port=$$(( $(BASE_REST_PORT) +$(NODE_ID) )) + --tcp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \ + --udp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \ + --metrics \ + --metrics-port=$$(( $(BASE_METRICS_PORT) + $(NODE_ID) )) \ + --rest \ + --rest-port=$$(( $(BASE_REST_PORT) +$(NODE_ID) )) eth2_network_simulation: | build deps clean_eth2_network_simulation_all + GIT_ROOT="$$PWD" NIMFLAGS="$(NIMFLAGS)" LOG_LEVEL="$(LOG_LEVEL)" tests/simulation/start-in-tmux.sh @@ -281,7 +282,7 @@ eth2_network_simulation: | build deps clean_eth2_network_simulation_all #- https://www.gnu.org/software/make/manual/html_node/Multi_002dLine.html #- macOS doesn't support "=" at the end of "define FOO": https://stackoverflow.com/questions/13260396/gnu-make-3-81-eval-function-not-working define CONNECT_TO_NETWORK - scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID) + scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID) scripts/make_prometheus_config.sh \ --nodes 1 \ @@ -297,7 +298,7 @@ define CONNECT_TO_NETWORK endef define CONNECT_TO_NETWORK_IN_DEV_MODE - scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID) + scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID) scripts/make_prometheus_config.sh \ --nodes 1 \ @@ -340,6 +341,16 @@ define CONNECT_TO_NETWORK_WITH_VALIDATOR_CLIENT --rest-port=$$(( $(BASE_REST_PORT) +$(NODE_ID) )) endef +define CONNECT_TO_NETWORK_WITH_LIGHT_CLIENT + scripts/makedir.sh build/data/shared_$(1)_$(NODE_ID) + + $(CPU_LIMIT_CMD) build/nimbus_light_client \ + --network=$(1) \ + --log-level="$(RUNTIME_LOG_LEVEL)" \ + --log-file=build/data/shared_$(1)_$(NODE_ID)/nbc_lc_$$(date +"%Y%m%d%H%M%S").log \ + --trusted-block-root="$(LC_TRUSTED_BLOCK_ROOT)" +endef + define MAKE_DEPOSIT_DATA build/nimbus_beacon_node deposits createTestnetDeposits \ --network=$(1) \ @@ -386,6 +397,9 @@ prater: | prater-build prater-vc: | prater-build nimbus_validator_client $(call CONNECT_TO_NETWORK_WITH_VALIDATOR_CLIENT,prater,nimbus_beacon_node,$(GOERLI_WEB3_URL)) +prater-lc: | nimbus_light_client + $(call CONNECT_TO_NETWORK_WITH_LIGHT_CLIENT,prater) + ifneq ($(LOG_LEVEL), TRACE) prater-dev: + "$(MAKE)" LOG_LEVEL=TRACE $@ @@ -412,6 +426,9 @@ ropsten: | ropsten-build ropsten-vc: | ropsten-build nimbus_validator_client $(call CONNECT_TO_NETWORK_WITH_VALIDATOR_CLIENT,ropsten,nimbus_beacon_node,$(ROPSTEN_WEB3_URL)) +ropsten-lc: | nimbus_light_client + $(call CONNECT_TO_NETWORK_WITH_LIGHT_CLIENT,ropsten) + ifneq ($(LOG_LEVEL), TRACE) ropsten-dev: + "$(MAKE)" LOG_LEVEL=TRACE $@ diff --git a/beacon_chain/beacon_clock.nim b/beacon_chain/beacon_clock.nim index fbdfb832b..c847c89c9 100644 --- a/beacon_chain/beacon_clock.nim +++ b/beacon_chain/beacon_clock.nim @@ -56,6 +56,9 @@ proc now*(c: BeaconClock): BeaconTime = ## Current time, in slots - this may end up being less than GENESIS_SLOT(!) toBeaconTime(c, getTime()) +func getBeaconTimeFn*(c: BeaconClock): GetBeaconTimeFn = + return proc(): BeaconTime = c.now() + proc fromNow*(c: BeaconClock, t: BeaconTime): tuple[inFuture: bool, offset: Duration] = let now = c.now() if t > now: diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index fb65b7f83..7a2607f40 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -36,8 +36,6 @@ export type RpcServer* = RpcHttpServer - GossipState* = set[BeaconStateFork] - BeaconNode* = ref object nickname*: string graffitiBytes*: GraffitiBytes diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 632bc4092..a2f07d4c3 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -1108,11 +1108,13 @@ proc readValue*(r: var TomlReader, a: var Address) except CatchableError: r.raiseUnexpectedValue("string expected") -proc loadEth2Network*(config: BeaconNodeConf): Eth2NetworkMetadata {.raises: [Defect, IOError].} = - network_name.set(2, labelValues = [config.eth2Network.get(otherwise = "mainnet")]) +proc loadEth2Network*( + eth2Network: Option[string] +): Eth2NetworkMetadata {.raises: [Defect, IOError].} = + network_name.set(2, labelValues = [eth2Network.get(otherwise = "mainnet")]) when not defined(gnosisChainBinary): - if config.eth2Network.isSome: - getMetadataForNetwork(config.eth2Network.get) + if eth2Network.isSome: + getMetadataForNetwork(eth2Network.get) else: when const_preset == "mainnet": mainnetMetadata @@ -1122,5 +1124,8 @@ proc loadEth2Network*(config: BeaconNodeConf): Eth2NetworkMetadata {.raises: [De echo "Must specify network on non-mainnet node" quit 1 else: - checkNetworkParameterUse config.eth2Network + checkNetworkParameterUse eth2Network gnosisMetadata + +template loadEth2Network*(config: BeaconNodeConf): Eth2NetworkMetadata = + loadEth2Network(config.eth2Network) diff --git a/beacon_chain/conf_light_client.nim b/beacon_chain/conf_light_client.nim new file mode 100644 index 000000000..7dfdac910 --- /dev/null +++ b/beacon_chain/conf_light_client.nim @@ -0,0 +1,131 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +# This implements the pre-release proposal of the libp2p based light client sync +# protocol. See https://github.com/ethereum/consensus-specs/pull/2802 + +import + json_serialization/std/net, + ./conf + +export net, conf + +type LightClientConf* = object + # Config + configFile* {. + desc: "Loads the configuration from a TOML file" + name: "config-file" }: Option[InputFile] + + # Logging + logLevel* {. + desc: "Sets the log level" + defaultValue: "INFO" + name: "log-level" }: string + + logStdout* {. + hidden + desc: "Specifies what kind of logs should be written to stdout (auto, colors, nocolors, json)" + defaultValueDesc: "auto" + defaultValue: StdoutLogKind.Auto + name: "log-format" }: StdoutLogKind + + logFile* {. + desc: "Specifies a path for the written Json log file (deprecated)" + name: "log-file" }: Option[OutFile] + + # Network + eth2Network* {. + desc: "The Eth2 network to join" + defaultValueDesc: "mainnet" + name: "network" }: Option[string] + + # Libp2p + bootstrapNodes* {. + desc: "Specifies one or more bootstrap nodes to use when connecting to the network" + abbr: "b" + name: "bootstrap-node" }: seq[string] + + bootstrapNodesFile* {. + desc: "Specifies a line-delimited file of bootstrap Ethereum network addresses" + defaultValue: "" + name: "bootstrap-file" }: InputFile + + listenAddress* {. + desc: "Listening address for the Ethereum LibP2P and Discovery v5 traffic" + defaultValue: defaultListenAddress + defaultValueDesc: "0.0.0.0" + name: "listen-address" }: ValidIpAddress + + tcpPort* {. + desc: "Listening TCP port for Ethereum LibP2P traffic" + defaultValue: defaultEth2TcpPort + defaultValueDesc: "9000" + name: "tcp-port" }: Port + + udpPort* {. + desc: "Listening UDP port for node discovery" + defaultValue: defaultEth2TcpPort + defaultValueDesc: "9000" + name: "udp-port" }: Port + + maxPeers* {. + desc: "The target number of peers to connect to" + defaultValue: 160 # 5 (fanout) * 64 (subnets) / 2 (subs) for a heathy mesh + name: "max-peers" }: int + + hardMaxPeers* {. + desc: "The maximum number of peers to connect to. Defaults to maxPeers * 1.5" + name: "hard-max-peers" }: Option[int] + + nat* {. + desc: "Specify method to use for determining public address. " & + "Must be one of: any, none, upnp, pmp, extip:" + defaultValue: NatConfig(hasExtIp: false, nat: NatAny) + defaultValueDesc: "any" + name: "nat" .}: NatConfig + + enrAutoUpdate* {. + desc: "Discovery can automatically update its ENR with the IP address " & + "and UDP port as seen by other nodes it communicates with. " & + "This option allows to enable/disable this functionality" + defaultValue: false + name: "enr-auto-update" .}: bool + + agentString* {. + defaultValue: "nimbus", + desc: "Node agent string which is used as identifier in network" + name: "agent-string" }: string + + discv5Enabled* {. + desc: "Enable Discovery v5" + defaultValue: true + name: "discv5" }: bool + + directPeers* {. + desc: "The list of priviledged, secure and known peers to connect and maintain the connection to, this requires a not random netkey-file. In the complete multiaddress format like: /ip4/
/tcp//p2p/. Peering agreements are established out of band and must be reciprocal." + name: "direct-peer" .}: seq[string] + + # Light client + trustedBlockRoot* {. + desc: "Recent trusted finalized block root to initialize light client from" + name: "trusted-block-root" }: Eth2Digest + + # Testing + stopAtEpoch* {. + hidden + desc: "The wall-time epoch at which to exit the program. (for testing purposes)" + defaultValue: 0 + name: "stop-at-epoch" }: uint64 + +func parseCmdArg*(T: type Eth2Digest, input: string): T + {.raises: [ValueError, Defect].} = + Eth2Digest.fromHex(input) + +func completeCmdArg*(T: type Eth2Digest, input: string): seq[string] = + return @[] diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 057d7c35f..93cb47c4b 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -944,6 +944,9 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, template genesis_validators_root*(dag: ChainDAGRef): Eth2Digest = getStateField(dag.headState, genesis_validators_root) +proc genesisBlockRoot*(dag: ChainDAGRef): Eth2Digest = + dag.db.getGenesisBlock().expect("DB must be initialized with genesis block") + func getEpochRef*( dag: ChainDAGRef, state: ForkedHashedBeaconState, cache: var StateCache): EpochRef = ## Get a cached `EpochRef` or construct one based on the given state - always @@ -1048,12 +1051,6 @@ func stateCheckpoint*(dag: ChainDAGRef, bsi: BlockSlotId): BlockSlotId = template forkAtEpoch*(dag: ChainDAGRef, epoch: Epoch): Fork = forkAtEpoch(dag.cfg, epoch) -func forkDigestAtEpoch*(dag: ChainDAGRef, epoch: Epoch): ForkDigest = - case dag.cfg.stateForkAtEpoch(epoch) - of BeaconStateFork.Bellatrix: dag.forkDigests.bellatrix - of BeaconStateFork.Altair: dag.forkDigests.altair - of BeaconStateFork.Phase0: dag.forkDigests.phase0 - proc getBlockRange*( dag: ChainDAGRef, startSlot: Slot, skipStep: uint64, output: var openArray[BlockId]): Natural = diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 76daf27a1..1f74a7da6 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -145,6 +145,9 @@ type ValidationRes* = Result[void, ValidationError] +func toValidationResult*(res: ValidationRes): ValidationResult = + if res.isOk(): ValidationResult.Accept else: res.error()[0] + # Initialization # ------------------------------------------------------------------------------ diff --git a/beacon_chain/gossip_processing/light_client_processor.nim b/beacon_chain/gossip_processing/light_client_processor.nim index f2537913b..e2119c4bf 100644 --- a/beacon_chain/gossip_processing/light_client_processor.nim +++ b/beacon_chain/gossip_processing/light_client_processor.nim @@ -13,10 +13,10 @@ import ../spec/datatypes/altair, ../spec/light_client_sync, ../consensus_object_pools/block_pools_types, - ".."/[beacon_clock], - ../sszdump + ".."/[beacon_clock, sszdump], + "."/[eth2_processor, gossip_validation] -export sszdump +export sszdump, eth2_processor, gossip_validation # Light Client Processor # ------------------------------------------------------------------------------ @@ -26,7 +26,9 @@ declareHistogram light_client_store_object_duration_seconds, "storeObject() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] type - DidInitializeStoreCallback* = + GetTrustedBlockRootCallback* = + proc(): Option[Eth2Digest] {.gcsafe, raises: [Defect].} + VoidCallback* = proc() {.gcsafe, raises: [Defect].} LightClientProcessor* = object @@ -61,12 +63,12 @@ type # Consumer # ---------------------------------------------------------------- store: ref Option[LightClientStore] - getBeaconTime*: GetBeaconTimeFn - didInitializeStoreCallback: DidInitializeStoreCallback + getBeaconTime: GetBeaconTimeFn + getTrustedBlockRoot: GetTrustedBlockRootCallback + onStoreInitialized, onFinalizedHeader, onOptimisticHeader: VoidCallback cfg: RuntimeConfig genesis_validators_root: Eth2Digest - trustedBlockRoot: Eth2Digest lastProgressTick: BeaconTime # Moment when last update made progress lastDuplicateTick: BeaconTime # Moment when last duplicate update received @@ -87,10 +89,13 @@ proc new*( dumpEnabled: bool, dumpDirInvalid, dumpDirIncoming: string, cfg: RuntimeConfig, - genesis_validators_root, trustedBlockRoot: Eth2Digest, + genesis_validators_root: Eth2Digest, store: ref Option[LightClientStore], getBeaconTime: GetBeaconTimeFn, - didInitializeStoreCallback: DidInitializeStoreCallback = nil + getTrustedBlockRoot: GetTrustedBlockRootCallback, + onStoreInitialized: VoidCallback = nil, + onFinalizedHeader: VoidCallback = nil, + onOptimisticHeader: VoidCallback = nil ): ref LightClientProcessor = (ref LightClientProcessor)( dumpEnabled: dumpEnabled, @@ -98,10 +103,12 @@ proc new*( dumpDirIncoming: dumpDirIncoming, store: store, getBeaconTime: getBeaconTime, - didInitializeStoreCallback: didInitializeStoreCallback, + getTrustedBlockRoot: getTrustedBlockRoot, + onStoreInitialized: onStoreInitialized, + onFinalizedHeader: onFinalizedHeader, + onOptimisticHeader: onOptimisticHeader, cfg: cfg, - genesis_validators_root: genesis_validators_root, - trustedBlockRoot: trustedBlockRoot + genesis_validators_root: genesis_validators_root ) # Storage @@ -147,30 +154,29 @@ proc tryForceUpdate( finalizedSlot = store[].get.finalized_header.slot, optimisticSlot = store[].get.optimistic_header.slot -proc storeObject*( +proc processObject( self: var LightClientProcessor, - src: MsgSource, wallTime: BeaconTime, - obj: SomeLightClientObject): Result[void, BlockError] = - ## storeObject is the main entry point for unvalidated light client objects - - ## all untrusted objects pass through here. When storing an object, we will - ## update the `LightClientStore` accordingly + obj: SomeLightClientObject, + wallTime: BeaconTime): Result[void, BlockError] = let - startTick = Moment.now() wallSlot = wallTime.slotOrZero() store = self.store - res = when obj is altair.LightClientBootstrap: if store[].isSome: err(BlockError.Duplicate) else: - let initRes = initialize_light_client_store( - self.trustedBlockRoot, obj) - if initRes.isErr: - err(initRes.error) + let trustedBlockRoot = self.getTrustedBlockRoot() + if trustedBlockRoot.isNone: + err(BlockError.MissingParent) else: - store[] = some(initRes.get) - ok() + let initRes = + initialize_light_client_store(trustedBlockRoot.get, obj) + if initRes.isErr: + err(initRes.error) + else: + store[] = some(initRes.get) + ok() elif obj is SomeLightClientUpdate: if store[].isNone: err(BlockError.MissingParent) @@ -209,6 +215,31 @@ proc storeObject*( self.lastDuplicateTick = wallTime + duplicateCountDelay self.numDuplicatesSinceProgress = 0 + res + +proc storeObject*( + self: var LightClientProcessor, + src: MsgSource, wallTime: BeaconTime, + obj: SomeLightClientObject): Result[bool, BlockError] = + ## storeObject is the main entry point for unvalidated light client objects - + ## all untrusted objects pass through here. When storing an object, we will + ## update the `LightClientStore` accordingly + let + startTick = Moment.now() + store = self.store + previousFinalized = + if store[].isSome: + store[].get.finalized_header + else: + BeaconBlockHeader() + previousOptimistic = + if store[].isSome: + store[].get.optimistic_header + else: + BeaconBlockHeader() + + ? self.processObject(obj, wallTime) + let storeObjectTick = Moment.now() storeObjectDur = storeObjectTick - startTick @@ -223,7 +254,7 @@ proc storeObject*( obj.finalized_header.slot else: obj.attested_header.slot - debug "Light client object processed", + debug "LC object processed", finalizedSlot = store[].get.finalized_header.slot, optimisticSlot = store[].get.optimistic_header.slot, kind = typeof(obj).name, @@ -231,11 +262,24 @@ proc storeObject*( storeObjectDur when obj is altair.LightClientBootstrap: - if self.didInitializeStoreCallback != nil: - self.didInitializeStoreCallback() - self.didInitializeStoreCallback = nil + if self.onStoreInitialized != nil: + self.onStoreInitialized() + self.onStoreInitialized = nil - res + var didProgress = false + + if store[].get.optimistic_header != previousOptimistic: + when obj isnot SomeLightClientUpdateWithFinality: + didProgress = true + if self.onOptimisticHeader != nil: + self.onOptimisticHeader() + + if store[].get.finalized_header != previousFinalized: + didProgress = true + if self.onFinalizedHeader != nil: + self.onFinalizedHeader() + + ok didProgress # Enqueue # ------------------------------------------------------------------------------ @@ -264,10 +308,106 @@ proc addObject*( (afterGenesis, wallSlot) = wallTime.toSlot() if not afterGenesis: - error "Processing light client object before genesis, clock turned back?" + error "Processing LC object before genesis, clock turned back?" quit 1 let res = self.storeObject(src, wallTime, obj) - if resFut != nil: - resFut.complete(res) + if resfut != nil: + if res.isOk: + resfut.complete(Result[void, BlockError].ok()) + else: + resfut.complete(Result[void, BlockError].err(res.error)) + +# Message validators +# ------------------------------------------------------------------------------ + +declareCounter lc_light_client_finality_updates_received, + "Number of valid LC finality updates processed by this LC" +declareCounter lc_light_client_finality_updates_dropped, + "Number of invalid LC finality updates dropped by this LC", labels = ["reason"] +declareCounter lc_light_client_optimistic_updates_received, + "Number of valid LC optimistic updates processed by this LC" +declareCounter lc_light_client_optimistic_updates_dropped, + "Number of invalid LC optimistic updates dropped by this LC", labels = ["reason"] + +func toValidationError( + v: Result[bool, BlockError], + wallTime: BeaconTime): Result[void, ValidationError] = + if v.isOk: + let didProgress = v.get + if didProgress: + when v is SomeLightClientUpdate: + let + signature_slot = v.signature_slot + currentTime = wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY + forwardTime = signature_slot.light_client_optimistic_update_time + if currentTime < forwardTime: + # [IGNORE] The `finality_update` / `optimistic_update` is received + # after the block at `signature_slot` was given enough time to + # propagate through the network. + return errIgnore(typeof(v).name & ": received too early") + ok() + else: + # [IGNORE] The `finality_update` / `optimistic_update` + # advances the `finalized_header` / `optimistic_header` + # of the local `LightClientStore`. + errIgnore(typeof(v).name & ": no significant progress") + else: + case v.error + of BlockError.Invalid: + # [REJECT] The `finality_update` / `optimistic_update` is valid. + errReject($v.error) + of BlockError.MissingParent, BlockError.UnviableFork, BlockError.Duplicate: + # [IGNORE] No other `finality_update` with a lower or equal + # `finalized_header.slot` / `attested_header.slot` was already + # forwarded on the network. + errIgnore($v.error) + +# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_finality_update +proc lightClientFinalityUpdateValidator*( + self: var LightClientProcessor, src: MsgSource, + finality_update: altair.LightClientFinalityUpdate +): Result[void, ValidationError] = + logScope: + finality_update + + debug "LC finality update received" + + let + wallTime = self.getBeaconTime() + r = self.storeObject(src, wallTime, finality_update) + v = r.toValidationError(wallTime) + if v.isOk(): + trace "LC finality update validated" + + lc_light_client_finality_updates_received.inc() + else: + debug "Dropping LC finality update", error = v.error + lc_light_client_finality_updates_dropped.inc(1, [$v.error[0]]) + + v + +# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_optimistic_update +proc lightClientOptimisticUpdateValidator*( + self: var LightClientProcessor, src: MsgSource, + optimistic_update: altair.LightClientOptimisticUpdate +): Result[void, ValidationError] = + logScope: + optimistic_update + + debug "LC optimistic update received" + + let + wallTime = self.getBeaconTime() + r = self.storeObject(src, wallTime, optimistic_update) + v = r.toValidationError(wallTime) + if v.isOk(): + trace "LC optimistic update validated" + + lc_light_client_optimistic_updates_received.inc() + else: + debug "Dropping LC optimistic update", error = v.error + lc_light_client_optimistic_updates_dropped.inc(1, [$v.error[0]]) + + v diff --git a/beacon_chain/light_client.nim b/beacon_chain/light_client.nim new file mode 100644 index 000000000..1d38fe328 --- /dev/null +++ b/beacon_chain/light_client.nim @@ -0,0 +1,256 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +# This implements the pre-release proposal of the libp2p based light client sync +# protocol. See https://github.com/ethereum/consensus-specs/pull/2802 + +import + chronicles, + eth/keys, + ./gossip_processing/light_client_processor, + ./networking/eth2_network, + ./spec/datatypes/altair, + ./spec/helpers, + ./sync/light_client_manager, + "."/[beacon_clock, conf_light_client] + +export eth2_network, conf_light_client + +logScope: topics = "lightcl" + +type + LightClientCallback* = + proc(lightClient: LightClient) {.gcsafe, raises: [Defect].} + + LightClient* = ref object + network: Eth2Node + cfg: RuntimeConfig + forkDigests: ref ForkDigests + getBeaconTime: GetBeaconTimeFn + store: ref Option[LightClientStore] + processor: ref LightClientProcessor + manager: LightClientManager + gossipState: GossipState + onFinalizedHeader*, onOptimisticHeader*: LightClientCallback + trustedBlockRoot*: Option[Eth2Digest] + +func finalizedHeader*(lightClient: LightClient): Opt[BeaconBlockHeader] = + if lightClient.store[].isSome: + ok lightClient.store[].get.finalized_header + else: + err() + +func optimisticHeader*(lightClient: LightClient): Opt[BeaconBlockHeader] = + if lightClient.store[].isSome: + ok lightClient.store[].get.optimistic_header + else: + err() + +proc createLightClient( + network: Eth2Node, + rng: ref BrHmacDrbgContext, + dumpEnabled: bool, + dumpDirInvalid, dumpDirIncoming: string, + cfg: RuntimeConfig, + forkDigests: ref ForkDigests, + getBeaconTime: GetBeaconTimeFn, + genesis_validators_root: Eth2Digest +): LightClient = + let lightClient = LightClient( + network: network, + cfg: cfg, + forkDigests: forkDigests, + getBeaconTime: getBeaconTime, + store: (ref Option[LightClientStore])()) + + func getTrustedBlockRoot(): Option[Eth2Digest] = + lightClient.trustedBlockRoot + + proc onStoreInitialized() = + discard + + proc onFinalizedHeader() = + if lightClient.onFinalizedHeader != nil: + lightClient.onFinalizedHeader(lightClient) + + proc onOptimisticHeader() = + if lightClient.onOptimisticHeader != nil: + lightClient.onOptimisticHeader(lightClient) + + lightClient.processor = LightClientProcessor.new( + dumpEnabled, dumpDirInvalid, dumpDirIncoming, + cfg, genesis_validators_root, + lightClient.store, getBeaconTime, getTrustedBlockRoot, + onStoreInitialized, onFinalizedHeader, onOptimisticHeader) + + proc lightClientVerifier(obj: SomeLightClientObject): + Future[Result[void, BlockError]] = + let resfut = newFuture[Result[void, BlockError]]("lightClientVerifier") + lightClient.processor[].addObject(MsgSource.gossip, obj, resfut) + resfut + proc bootstrapVerifier(obj: altair.LightClientBootstrap): auto = + lightClientVerifier(obj) + proc updateVerifier(obj: altair.LightClientUpdate): auto = + lightClientVerifier(obj) + proc finalityVerifier(obj: altair.LightClientFinalityUpdate): auto = + lightClientVerifier(obj) + proc optimisticVerifier(obj: altair.LightClientOptimisticUpdate): auto = + lightClientVerifier(obj) + + func isLightClientStoreInitialized(): bool = + lightClient.store[].isSome + + func isNextSyncCommitteeKnown(): bool = + if lightClient.store[].isSome: + lightClient.store[].get.is_next_sync_committee_known + else: + false + + func getFinalizedPeriod(): SyncCommitteePeriod = + if lightClient.store[].isSome: + lightClient.store[].get.finalized_header.slot.sync_committee_period + else: + GENESIS_SLOT.sync_committee_period + + func getOptimisticPeriod(): SyncCommitteePeriod = + if lightClient.store[].isSome: + lightClient.store[].get.optimistic_header.slot.sync_committee_period + else: + GENESIS_SLOT.sync_committee_period + + lightClient.manager = LightClientManager.init( + lightClient.network, rng, getTrustedBlockRoot, + bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier, + isLightClientStoreInitialized, isNextSyncCommitteeKnown, + getFinalizedPeriod, getOptimisticPeriod, getBeaconTime) + + lightClient.gossipState = {} + + lightClient + +proc createLightClient*( + network: Eth2Node, + rng: ref BrHmacDrbgContext, + config: BeaconNodeConf, + cfg: RuntimeConfig, + forkDigests: ref ForkDigests, + getBeaconTime: GetBeaconTimeFn, + genesis_validators_root: Eth2Digest +): LightClient = + createLightClient( + network, rng, + config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, + cfg, forkDigests, getBeaconTime, genesis_validators_root) + +proc createLightClient*( + network: Eth2Node, + rng: ref BrHmacDrbgContext, + config: LightClientConf, + cfg: RuntimeConfig, + forkDigests: ref ForkDigests, + getBeaconTime: GetBeaconTimeFn, + genesis_validators_root: Eth2Digest +): LightClient = + createLightClient( + network, rng, + dumpEnabled = false, dumpDirInvalid = ".", dumpDirIncoming = ".", + cfg, forkDigests, getBeaconTime, genesis_validators_root) + +proc start*(lightClient: LightClient) = + notice "Starting light client", + trusted_block_root = lightClient.trustedBlockRoot + lightClient.manager.start() + +from + libp2p/protocols/pubsub/gossipsub +import + TopicParams, validateParameters, init + +proc installMessageValidators*(lightClient: LightClient) = + template getLocalWallPeriod(): auto = + lightClient.getBeaconTime().slotOrZero().sync_committee_period + + let forkDigests = lightClient.forkDigests + for digest in [forkDigests.altair, forkDigests.bellatrix]: + lightClient.network.addValidator( + getLightClientFinalityUpdateTopic(digest), + proc(msg: altair.LightClientFinalityUpdate): ValidationResult = + if lightClient.manager.isGossipSupported(getLocalWallPeriod()): + toValidationResult( + lightClient.processor[].lightClientFinalityUpdateValidator( + MsgSource.gossip, msg)) + else: + ValidationResult.Ignore) + + lightClient.network.addValidator( + getLightClientOptimisticUpdateTopic(digest), + proc(msg: altair.LightClientOptimisticUpdate): ValidationResult = + if lightClient.manager.isGossipSupported(getLocalWallPeriod()): + toValidationResult( + lightClient.processor[].lightClientOptimisticUpdateValidator( + MsgSource.gossip, msg)) + else: + ValidationResult.Ignore) + +const lightClientTopicParams = TopicParams.init() +static: lightClientTopicParams.validateParameters().tryGet() + +proc updateGossipStatus*( + lightClient: LightClient, slot: Slot, dagIsBehind = default(Option[bool])) = + let + isBehind = + if lightClient.manager.isGossipSupported(slot.sync_committee_period): + false + elif dagIsBehind.isSome: + # While separate message validators can be installed for both + # full node and light client (both are called unless one rejects msg), + # only a single subscription can be installed per topic for now. + # The full node subscription is also handled here, even though it + # does not directly relate to the client side of the LC sync protocol + dagIsBehind.get + else: + true # Force `targetGossipState` to `{}` + targetGossipState = + getTargetGossipState( + slot.epoch, + lightClient.cfg.ALTAIR_FORK_EPOCH, + lightClient.cfg.BELLATRIX_FORK_EPOCH, + isBehind) + template currentGossipState(): auto = lightClient.gossipState + if currentGossipState == targetGossipState: + return + + if currentGossipState.card == 0 and targetGossipState.card > 0: + debug "Enabling LC topic subscriptions", + wallSlot = slot, targetGossipState + elif currentGossipState.card > 0 and targetGossipState.card == 0: + debug "Disabling LC topic subscriptions", + wallSlot = slot + else: + # Individual forks added / removed + discard + + let + newGossipForks = targetGossipState - currentGossipState + oldGossipForks = currentGossipState - targetGossipState + + for gossipFork in oldGossipForks: + if gossipFork >= BeaconStateFork.Altair: + let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork) + lightClient.network.unsubscribe( + getLightClientFinalityUpdateTopic(forkDigest)) + + for gossipFork in newGossipForks: + if gossipFork >= BeaconStateFork.Altair: + let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork) + lightClient.network.subscribe( + getLightClientOptimisticUpdateTopic(forkDigest), + lightClientTopicParams) + + lightClient.gossipState = targetGossipState diff --git a/beacon_chain/networking/eth2_discovery.nim b/beacon_chain/networking/eth2_discovery.nim index 4f54302e8..e1357eaa4 100644 --- a/beacon_chain/networking/eth2_discovery.nim +++ b/beacon_chain/networking/eth2_discovery.nim @@ -11,7 +11,7 @@ import std/[os, strutils], chronicles, stew/shims/net, stew/results, bearssl, eth/keys, eth/p2p/discoveryv5/[enr, protocol, node], - ../conf + ".."/[conf, conf_light_client] export protocol, keys @@ -77,7 +77,7 @@ proc loadBootstrapFile*(bootstrapFile: string, quit 1 proc new*(T: type Eth2DiscoveryProtocol, - config: BeaconNodeConf, + config: BeaconNodeConf | LightClientConf, enrIp: Option[ValidIpAddress], enrTcpPort, enrUdpPort: Option[Port], pk: PrivateKey, enrFields: openArray[(string, seq[byte])], rng: ref BrHmacDrbgContext): @@ -91,9 +91,10 @@ proc new*(T: type Eth2DiscoveryProtocol, addBootstrapNode(node, bootstrapEnrs) loadBootstrapFile(string config.bootstrapNodesFile, bootstrapEnrs) - let persistentBootstrapFile = config.dataDir / "bootstrap_nodes.txt" - if fileExists(persistentBootstrapFile): - loadBootstrapFile(persistentBootstrapFile, bootstrapEnrs) + when config is BeaconNodeConf: + let persistentBootstrapFile = config.dataDir / "bootstrap_nodes.txt" + if fileExists(persistentBootstrapFile): + loadBootstrapFile(persistentBootstrapFile, bootstrapEnrs) newProtocol(pk, enrIp, enrTcpPort, enrUdpPort, enrFields, bootstrapEnrs, bindPort = config.udpPort, bindIp = config.listenAddress, diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 4eb2ebaa3..c0eb9452b 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -25,7 +25,7 @@ import libp2p/stream/connection, eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], - ".."/[version, conf, beacon_clock], + ".."/[version, conf, beacon_clock, conf_light_client], ../spec/datatypes/[phase0, altair, bellatrix], ../spec/[eth2_ssz_serialization, network, helpers, forks], ../validators/keystore_management, @@ -35,7 +35,7 @@ when chronicles.enabledLogLevel == LogLevel.TRACE: import std/sequtils export - tables, version, multiaddress, peerinfo, p2pProtocol, connection, + tables, chronos, version, multiaddress, peerinfo, p2pProtocol, connection, libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery, peer_pool, peer_scores @@ -140,7 +140,7 @@ type # Private fields: libp2pCodecName: string protocolMounter*: MounterProc - isLightClientRequest: bool + isRequired, isLightClientRequest: bool ProtocolInfoObj* = object name*: string @@ -311,6 +311,7 @@ const NetworkInsecureKeyPassword = "INSECUREPASSWORD" template libp2pProtocol*(name: string, version: int, + isRequired = false, isLightClientRequest = false) {.pragma.} func shortLog*(peer: Peer): string = shortLog(peer.peerId) @@ -498,6 +499,38 @@ proc getRequestProtoName(fn: NimNode): NimNode = return newLit("") +proc isRequiredProto(fn: NimNode): NimNode = + # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes + # (TODO: file as an issue) + + let pragmas = fn.pragma + if pragmas.kind == nnkPragma and pragmas.len > 0: + for pragma in pragmas: + try: + if pragma.len > 0 and $pragma[0] == "libp2pProtocol": + if pragma.len <= 3: + return newLit(false) + for i in 3 ..< pragma.len: + let param = pragma[i] + case param.kind + of nnkExprEqExpr: + if $param[0] == "isRequired": + if $param[1] == "true": + return newLit(true) + if $param[1] == "false": + return newLit(false) + raiseAssert "Unexpected value: " & $param + if $param[0] != "isLightClientRequest": + raiseAssert "Unexpected param: " & $param + of nnkIdent: + if i == 3: + return newLit(param.boolVal) + else: raiseAssert "Unexpected kind: " & param.kind.repr + return newLit(false) + except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454 + + return newLit(false) + proc isLightClientRequestProto(fn: NimNode): NimNode = # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes # (TODO: file as an issue) @@ -509,19 +542,23 @@ proc isLightClientRequestProto(fn: NimNode): NimNode = if pragma.len > 0 and $pragma[0] == "libp2pProtocol": if pragma.len <= 3: return newLit(false) - let param = pragma[3] - case param.kind - of nnkExprEqExpr: - if $param[0] != "isLightClientRequest": - raiseAssert "Unexpected param: " & $param - if $param[1] == "true": - return newLit(true) - if $param[1] == "false": - return newLit(false) - raiseAssert "Unexpected value: " & $param - of nnkIdent: - return newLit(param.boolVal) - else: raiseAssert "Unexpected kind: " & param.kind.repr + for i in 3 ..< pragma.len: + let param = pragma[i] + case param.kind + of nnkExprEqExpr: + if $param[0] == "isLightClientRequest": + if $param[1] == "true": + return newLit(true) + if $param[1] == "false": + return newLit(false) + raiseAssert "Unexpected value: " & $param + if $param[0] != "isRequired": + raiseAssert "Unexpected param: " & $param + of nnkIdent: + if i == 4: + return newLit(param.boolVal) + else: raiseAssert "Unexpected kind: " & param.kind.repr + return newLit(false) except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454 return newLit(false) @@ -1614,20 +1651,22 @@ proc onConnEvent(node: Eth2Node, peerId: PeerId, event: ConnEvent) {.async.} = peer = peerId, peer_state = peer.connectionState peer.connectionState = Disconnected -proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig, - enrForkId: ENRForkID, discoveryForkId: ENRForkID, forkDigests: ref ForkDigests, - getBeaconTime: GetBeaconTimeFn, switch: Switch, - pubsub: GossipSub, ip: Option[ValidIpAddress], tcpPort, - udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool, +proc new*(T: type Eth2Node, + config: BeaconNodeConf | LightClientConf, runtimeCfg: RuntimeConfig, + enrForkId: ENRForkID, discoveryForkId: ENRForkID, + forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn, + switch: Switch, pubsub: GossipSub, + ip: Option[ValidIpAddress], tcpPort, udpPort: Option[Port], + privKey: keys.PrivateKey, discovery: bool, rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} = when not defined(local_testnet): let - connectTimeout = 1.minutes - seenThreshold = 5.minutes + connectTimeout = chronos.minutes(1) + seenThreshold = chronos.minutes(5) else: let - connectTimeout = 10.seconds - seenThreshold = 10.seconds + connectTimeout = chronos.seconds(10) + seenThreshold = chronos.seconds(10) type MetaData = altair.MetaData # Weird bug without this.. # Versions up to v22.3.0 would write an empty `MetaData` to @@ -1669,12 +1708,15 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig, node.protocolStates[proto.index] = proto.networkStateInitializer(node) for msg in proto.messages: - if msg.isLightClientRequest and not config.serveLightClientData.get: - continue + when config is BeaconNodeConf: + if msg.isLightClientRequest and not config.serveLightClientData.get: + continue + elif config is LightClientConf: + if not msg.isRequired: + continue if msg.protocolMounter != nil: msg.protocolMounter node - proc peerHook(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(node, peerId, event) @@ -1790,10 +1832,11 @@ proc registerMsg(protocol: ProtocolInfo, name: string, mounter: MounterProc, libp2pCodecName: string, - isLightClientRequest: bool) = + isRequired, isLightClientRequest: bool) = protocol.messages.add MessageInfo(name: name, protocolMounter: mounter, libp2pCodecName: libp2pCodecName, + isRequired: isRequired, isLightClientRequest: isLightClientRequest) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = @@ -1833,6 +1876,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = MsgRecName = msg.recName MsgStrongRecName = msg.strongRecName codecNameLit = getRequestProtoName(msg.procDef) + isRequiredLit = isRequiredProto(msg.procDef) isLightClientRequestLit = isLightClientRequestProto(msg.procDef) protocolMounterName = ident(msgName & "Mounter") @@ -1910,6 +1954,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = msgNameLit, protocolMounterName, codecNameLit, + isRequiredLit, isLightClientRequestLit)) result.implementProtocolInit = proc (p: P2PProtocol): NimNode = @@ -1917,6 +1962,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = #Must import here because of cyclicity import ../sync/sync_protocol +export sync_protocol proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} = trace "updating peer metadata", peerId @@ -2004,10 +2050,21 @@ proc initAddress*(T: type MultiAddress, str: string): T = template tcpEndPoint(address, port): auto = MultiAddress.init(address, tcpProtocol, port) +proc optimisticgetRandomNetKeys*(rng: var BrHmacDrbgContext): NetKeyPair = + let res = PrivateKey.random(Secp256k1, rng) + if res.isErr(): + fatal "Could not generate random network key file" + quit QuitFailure + + let + privKey = res.get() + pubKey = privKey.getPublicKey().expect("working public key from random") + NetKeyPair(seckey: privKey, pubkey: pubKey) + proc getPersistentNetKeys*(rng: var BrHmacDrbgContext, config: BeaconNodeConf): NetKeyPair = case config.cmd - of noCommand, record: + of BNStartUpCmd.noCommand, BNStartUpCmd.record: if config.netKeyFile == "random": let res = PrivateKey.random(Secp256k1, rng) if res.isErr(): @@ -2078,7 +2135,7 @@ proc getPersistentNetKeys*(rng: var BrHmacDrbgContext, network_public_key = pubKey NetKeyPair(seckey: privKey, pubkey: pubKey) - of createTestnet: + of BNStartUpCmd.createTestnet: if config.netKeyFile == "random": fatal "Could not create testnet using `random` network key" quit QuitFailure @@ -2115,15 +2172,7 @@ proc getPersistentNetKeys*(rng: var BrHmacDrbgContext, NetKeyPair(seckey: privKey, pubkey: pubKey) else: - let res = PrivateKey.random(Secp256k1, rng) - if res.isErr(): - fatal "Could not generate random network key file" - quit QuitFailure - - let - privKey = res.get() - pubKey = privKey.getPublicKey().expect("working public key from random") - NetKeyPair(seckey: privKey, pubkey: pubKey) + optimisticgetRandomNetKeys(rng) func gossipId( data: openArray[byte], altairPrefix, topic: string): seq[byte] = @@ -2143,8 +2192,8 @@ func gossipId( messageDigest.data[0..19] -proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey, - address: MultiAddress, +proc newBeaconSwitch*(config: BeaconNodeConf | LightClientConf, + seckey: PrivateKey, address: MultiAddress, rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} = SwitchBuilder .new() @@ -2152,7 +2201,7 @@ proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey, .withAddress(address) .withRng(rng) .withNoise() - .withMplex(5.minutes, 5.minutes) + .withMplex(chronos.minutes(5), chronos.minutes(5)) .withMaxConnections(config.maxPeers) .withAgentVersion(config.agentString) .withTcpTransport({ServerFlags.ReuseAddr}) @@ -2181,7 +2230,7 @@ template gossipMaxSize(T: untyped): uint32 = maxSize.uint32 proc createEth2Node*(rng: ref BrHmacDrbgContext, - config: BeaconNodeConf, + config: BeaconNodeConf | LightClientConf, netKeys: NetKeyPair, cfg: RuntimeConfig, forkDigests: ref ForkDigests, @@ -2230,8 +2279,8 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, let params = GossipSubParams( explicit: true, - pruneBackoff: 1.minutes, - unsubscribeBackoff: 10.seconds, + pruneBackoff: chronos.minutes(1), + unsubscribeBackoff: chronos.seconds(10), floodPublish: true, gossipFactor: 0.05, d: 8, @@ -2240,18 +2289,18 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, dScore: 6, dOut: 6 div 2, # less than dlow and no more than dlow/2 dLazy: 6, - heartbeatInterval: 700.milliseconds, + heartbeatInterval: chronos.milliseconds(700), historyLength: 6, historyGossip: 3, - fanoutTTL: 60.seconds, - seenTTL: 385.seconds, + fanoutTTL: chronos.seconds(60), + seenTTL: chronos.seconds(385), gossipThreshold: -4000, publishThreshold: -8000, graylistThreshold: -16000, # also disconnect threshold opportunisticGraftThreshold: 0, - decayInterval: 12.seconds, + decayInterval: chronos.seconds(12), decayToZero: 0.01, - retainScore: 385.seconds, + retainScore: chronos.seconds(385), appSpecificWeight: 0.0, ipColocationFactorWeight: -53.75, ipColocationFactorThreshold: 3.0, @@ -2488,10 +2537,7 @@ proc updateForkId*(node: Eth2Node, epoch: Epoch, genesis_validators_root: Eth2Di node.discoveryForkId = getDiscoveryForkID(node.cfg, epoch, genesis_validators_root) func forkDigestAtEpoch(node: Eth2Node, epoch: Epoch): ForkDigest = - case node.cfg.stateForkAtEpoch(epoch) - of BeaconStateFork.Bellatrix: node.forkDigests.bellatrix - of BeaconStateFork.Altair: node.forkDigests.altair - of BeaconStateFork.Phase0: node.forkDigests.phase0 + node.forkDigests[].atEpoch(epoch, node.cfg) proc getWallEpoch(node: Eth2Node): Epoch = node.getBeaconTime().slotOrZero.epoch diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index bb6097cd6..68983f2f6 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -543,7 +543,8 @@ proc acquire*[A, B](pool: PeerPool[A, B], proc acquireNoWait*[A, B](pool: PeerPool[A, B], filter = {PeerType.Incoming, - PeerType.Outgoing}): A = + PeerType.Outgoing} + ): A {.raises: [PeerPoolError, Defect].} = doAssert(filter != {}, "Filter must not be empty") if pool.lenAvailable(filter) < 1: raise newException(PeerPoolError, "Not enough peers in pool") diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 3d90bc492..7e8ea9ffd 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -334,9 +334,6 @@ proc initFullNode( const SlashingDbName = "slashing_protection" # changing this requires physical file rename as well or history is lost. -func getBeaconTimeFn(clock: BeaconClock): GetBeaconTimeFn = - return proc(): BeaconTime = clock.now() - proc init*(T: type BeaconNode, cfg: RuntimeConfig, rng: ref BrHmacDrbgContext, @@ -1308,9 +1305,6 @@ proc installMessageValidators(node: BeaconNode) = # subnets are subscribed to during any given epoch. let forkDigests = node.dag.forkDigests - func toValidationResult(res: ValidationRes): ValidationResult = - if res.isOk(): ValidationResult.Accept else: res.error()[0] - node.network.addValidator( getBeaconBlocksTopic(forkDigests.phase0), proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult = diff --git a/beacon_chain/nimbus_light_client.nim b/beacon_chain/nimbus_light_client.nim new file mode 100644 index 000000000..564061d9d --- /dev/null +++ b/beacon_chain/nimbus_light_client.nim @@ -0,0 +1,99 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# This implements the pre-release proposal of the libp2p based light client sync +# protocol. See https://github.com/ethereum/consensus-specs/pull/2802 + +import + std/os, + chronicles, chronos, + eth/keys, + ./spec/beaconstate, + "."/[light_client, nimbus_binary_common, version] + +proc onFinalizedHeader(lightClient: LightClient) = + notice "New LC finalized header", + finalized_header = shortLog(lightClient.finalizedHeader.get) + +proc onOptimisticHeader(lightClient: LightClient) = + notice "New LC optimistic header", + optimistic_header = shortLog(lightClient.optimisticHeader.get) + +proc onSecond( + lightClient: LightClient, + config: LightClientConf, + getBeaconTime: GetBeaconTimeFn) = + ## This procedure will be called once per second. + let wallSlot = getBeaconTime().slotOrZero() + checkIfShouldStopAtEpoch(wallSlot, config.stopAtEpoch) + + lightClient.updateGossipStatus(wallSlot + 1) + +proc runOnSecondLoop( + lightClient: LightClient, + config: LightClientConf, + getBeaconTime: GetBeaconTimeFn) {.async.} = + while true: + onSecond(lightClient, config, getBeaconTime) + await chronos.sleepAsync(chronos.seconds(1)) + +programMain: + var config = makeBannerAndConfig( + "Nimbus light client " & fullVersionStr, LightClientConf) + setupLogging(config.logLevel, config.logStdout, config.logFile) + + notice "Launching light client", + version = fullVersionStr, cmdParams = commandLineParams(), config + + let metadata = loadEth2Network(config.eth2Network) + for node in metadata.bootstrapNodes: + config.bootstrapNodes.add node + template cfg(): auto = metadata.cfg + + let + genesisState = + try: + template genesisData(): auto = metadata.genesisData + newClone(readSszForkedHashedBeaconState( + cfg, genesisData.toOpenArrayByte(genesisData.low, genesisData.high))) + except CatchableError as err: + raiseAssert "Invalid baked-in state: " & err.msg + + beaconClock = BeaconClock.init( + getStateField(genesisState[], genesis_time)) + getBeaconTime = beaconClock.getBeaconTimeFn() + + genesis_validators_root = + getStateField(genesisState[], genesis_validators_root) + forkDigests = newClone ForkDigests.init(cfg, genesis_validators_root) + + genesisBlockRoot = get_initial_beacon_block(genesisState[]).root + + rng = keys.newRng() + netKeys = optimisticgetRandomNetKeys(rng[]) + network = createEth2Node( + rng, config, netKeys, cfg, + forkDigests, getBeaconTime, genesis_validators_root) + + lightClient = createLightClient( + network, rng, config, cfg, + forkDigests, getBeaconTime, genesis_validators_root) + + info "Listening to incoming network requests" + network.initBeaconSync(cfg, forkDigests, genesisBlockRoot, getBeaconTime) + lightClient.installMessageValidators() + waitFor network.startListening() + waitFor network.start() + + lightClient.onFinalizedHeader = onFinalizedHeader + lightClient.onOptimisticHeader = onOptimisticHeader + lightClient.trustedBlockRoot = some config.trustedBlockRoot + lightClient.start() + + asyncSpawn runOnSecondLoop(lightClient, config, getBeaconTime) + while true: + poll() diff --git a/beacon_chain/nimbus_light_client.nim.cfg b/beacon_chain/nimbus_light_client.nim.cfg new file mode 100644 index 000000000..4c0d44247 --- /dev/null +++ b/beacon_chain/nimbus_light_client.nim.cfg @@ -0,0 +1,7 @@ +-d:"chronicles_sinks=textlines[dynamic],json[dynamic]" +-d:"chronicles_runtime_filtering=on" +-d:"chronicles_disable_thread_id" + +@if release: + -d:"chronicles_line_numbers:0" +@end diff --git a/beacon_chain/spec/forks.nim b/beacon_chain/spec/forks.nim index 3840bea8d..e9df5b4d3 100644 --- a/beacon_chain/spec/forks.nim +++ b/beacon_chain/spec/forks.nim @@ -351,6 +351,20 @@ func stateForkForDigest*( else: err() +func atStateFork*( + forkDigests: ForkDigests, stateFork: BeaconStateFork): ForkDigest = + case stateFork + of BeaconStateFork.Bellatrix: + forkDigests.bellatrix + of BeaconStateFork.Altair: + forkDigests.altair + of BeaconStateFork.Phase0: + forkDigests.phase0 + +template atEpoch*( + forkDigests: ForkDigests, epoch: Epoch, cfg: RuntimeConfig): ForkDigest = + forkDigests.atStateFork(cfg.stateForkAtEpoch(epoch)) + template asSigned*(x: ForkedTrustedSignedBeaconBlock): ForkedSignedBeaconBlock = isomorphicCast[ForkedSignedBeaconBlock](x) diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 3c566888e..e3d070bf5 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -138,9 +138,10 @@ func getDiscoveryForkID*(cfg: RuntimeConfig, next_fork_epoch: FAR_FUTURE_EPOCH) # https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/p2p-interface.md#transitioning-the-gossip +type GossipState* = set[BeaconStateFork] func getTargetGossipState*( epoch, ALTAIR_FORK_EPOCH, BELLATRIX_FORK_EPOCH: Epoch, isBehind: bool): - set[BeaconStateFork] = + GossipState = if isBehind: {} diff --git a/beacon_chain/sync/light_client_manager.nim b/beacon_chain/sync/light_client_manager.nim new file mode 100644 index 000000000..011d380ce --- /dev/null +++ b/beacon_chain/sync/light_client_manager.nim @@ -0,0 +1,454 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +# This implements the pre-release proposal of the libp2p based light client sync +# protocol. See https://github.com/ethereum/consensus-specs/pull/2802 + +import chronos, chronicles, stew/base10 +import + eth/p2p/discoveryv5/random2, + ../spec/datatypes/[altair], + ../networking/eth2_network, + ../beacon_clock, + "."/sync_protocol, "."/sync_manager +export sync_manager + +logScope: + topics = "lcman" + +type + Nothing = object + ResponseError = object of CatchableError + Endpoint[K, V] = + (K, V) # https://github.com/nim-lang/Nim/issues/19531 + Bootstrap = + Endpoint[Eth2Digest, altair.LightClientBootstrap] + UpdatesByRange = + Endpoint[Slice[SyncCommitteePeriod], altair.LightClientUpdate] + FinalityUpdate = + Endpoint[Nothing, altair.LightClientFinalityUpdate] + OptimisticUpdate = + Endpoint[Nothing, altair.LightClientOptimisticUpdate] + + ValueVerifier[V] = + proc(v: V): Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].} + BootstrapVerifier* = + ValueVerifier[altair.LightClientBootstrap] + UpdateVerifier* = + ValueVerifier[altair.LightClientUpdate] + FinalityUpdateVerifier* = + ValueVerifier[altair.LightClientFinalityUpdate] + OptimisticUpdateVerifier* = + ValueVerifier[altair.LightClientOptimisticUpdate] + + GetTrustedBlockRootCallback* = + proc(): Option[Eth2Digest] {.gcsafe, raises: [Defect].} + GetBoolCallback* = + proc(): bool {.gcsafe, raises: [Defect].} + GetSyncCommitteePeriodCallback* = + proc(): SyncCommitteePeriod {.gcsafe, raises: [Defect].} + + LightClientManager* = object + network: Eth2Node + rng: ref BrHmacDrbgContext + getTrustedBlockRoot: GetTrustedBlockRootCallback + bootstrapVerifier: BootstrapVerifier + updateVerifier: UpdateVerifier + finalityUpdateVerifier: FinalityUpdateVerifier + optimisticUpdateVerifier: OptimisticUpdateVerifier + isLightClientStoreInitialized: GetBoolCallback + isNextSyncCommitteeKnown: GetBoolCallback + getFinalizedPeriod: GetSyncCommitteePeriodCallback + getOptimisticPeriod: GetSyncCommitteePeriodCallback + getBeaconTime: GetBeaconTimeFn + loopFuture: Future[void] + +func init*( + T: type LightClientManager, + network: Eth2Node, + rng: ref BrHmacDrbgContext, + getTrustedBlockRoot: GetTrustedBlockRootCallback, + bootstrapVerifier: BootstrapVerifier, + updateVerifier: UpdateVerifier, + finalityUpdateVerifier: FinalityUpdateVerifier, + optimisticUpdateVerifier: OptimisticUpdateVerifier, + isLightClientStoreInitialized: GetBoolCallback, + isNextSyncCommitteeKnown: GetBoolCallback, + getFinalizedPeriod: GetSyncCommitteePeriodCallback, + getOptimisticPeriod: GetSyncCommitteePeriodCallback, + getBeaconTime: GetBeaconTimeFn +): LightClientManager = + ## Initialize light client manager. + LightClientManager( + network: network, + rng: rng, + getTrustedBlockRoot: getTrustedBlockRoot, + bootstrapVerifier: bootstrapVerifier, + updateVerifier: updateVerifier, + finalityUpdateVerifier: finalityUpdateVerifier, + optimisticUpdateVerifier: optimisticUpdateVerifier, + isLightClientStoreInitialized: isLightClientStoreInitialized, + isNextSyncCommitteeKnown: isNextSyncCommitteeKnown, + getFinalizedPeriod: getFinalizedPeriod, + getOptimisticPeriod: getOptimisticPeriod, + getBeaconTime: getBeaconTime + ) + +proc isGossipSupported*( + self: LightClientManager, + period: SyncCommitteePeriod +): bool = + ## Indicate whether the light client is sufficiently synced to accept gossip. + if not self.isLightClientStoreInitialized(): + return false + + let + finalizedPeriod = self.getFinalizedPeriod() + isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown() + if isNextSyncCommitteeKnown: + period <= finalizedPeriod + 1 + else: + period <= finalizedPeriod + +# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#getlightclientbootstrap +proc doRequest( + e: typedesc[Bootstrap], + peer: Peer, + blockRoot: Eth2Digest +): Future[NetRes[altair.LightClientBootstrap]] {. + raises: [Defect, IOError].} = + peer.lightClientBootstrap(blockRoot) + +# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#lightclientupdatesbyrange +type LightClientUpdatesByRangeResponse = NetRes[seq[altair.LightClientUpdate]] +proc doRequest( + e: typedesc[UpdatesByRange], + peer: Peer, + periods: Slice[SyncCommitteePeriod] +): Future[LightClientUpdatesByRangeResponse] {. + async, raises: [Defect, IOError].} = + let + startPeriod = periods.a + lastPeriod = periods.b + reqCount = min(periods.len, MAX_REQUEST_LIGHT_CLIENT_UPDATES).uint64 + let response = await peer.lightClientUpdatesByRange(startPeriod, reqCount) + if response.isOk: + if response.get.lenu64 > reqCount: + raise newException(ResponseError, "Too many values in response" & + " (" & Base10.toString(response.get.lenu64) & + " > " & Base10.toString(reqCount.uint) & ")") + var expectedPeriod = startPeriod + for update in response.get: + let + attestedPeriod = update.attested_header.slot.sync_committee_period + signaturePeriod = update.signature_slot.sync_committee_period + if attestedPeriod != update.signature_slot.sync_committee_period: + raise newException(ResponseError, "Conflicting sync committee periods" & + " (signature: " & Base10.toString(distinctBase(signaturePeriod)) & + " != " & Base10.toString(distinctBase(attestedPeriod)) & ")") + if attestedPeriod < expectedPeriod: + raise newException(ResponseError, "Unexpected sync committee period" & + " (" & Base10.toString(distinctBase(attestedPeriod)) & + " < " & Base10.toString(distinctBase(expectedPeriod)) & ")") + if attestedPeriod > expectedPeriod: + if attestedPeriod > lastPeriod: + raise newException(ResponseError, "Sync committee period too high" & + " (" & Base10.toString(distinctBase(attestedPeriod)) & + " > " & Base10.toString(distinctBase(lastPeriod)) & ")") + expectedPeriod = attestedPeriod + inc expectedPeriod + return response + +# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#getlightclientfinalityupdate +proc doRequest( + e: typedesc[FinalityUpdate], + peer: Peer +): Future[NetRes[altair.LightClientFinalityUpdate]] {. + raises: [Defect, IOError].} = + peer.lightClientFinalityUpdate() + +# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#getlightclientoptimisticupdate +proc doRequest( + e: typedesc[OptimisticUpdate], + peer: Peer +): Future[NetRes[altair.LightClientOptimisticUpdate]] {. + raises: [Defect, IOError].} = + peer.lightClientOptimisticUpdate() + +template valueVerifier[E]( + self: LightClientManager, + e: typedesc[E] +): ValueVerifier[E.V] = + when E.V is altair.LightClientBootstrap: + self.bootstrapVerifier + elif E.V is altair.LightClientUpdate: + self.updateVerifier + elif E.V is altair.LightClientFinalityUpdate: + self.finalityUpdateVerifier + elif E.V is altair.LightClientOptimisticUpdate: + self.optimisticUpdateVerifier + else: static: doAssert false + +iterator values(v: auto): auto = + ## Local helper for `workerTask` to share the same implementation for both + ## scalar and aggregate values, by treating scalars as 1-length aggregates. + when v is seq: + for i in v: + yield i + else: + yield v + +proc workerTask[E]( + self: LightClientManager, + e: typedesc[E], + key: E.K +): Future[bool] {.async.} = + var + peer: Peer + didProgress = false + try: + peer = self.network.peerPool.acquireNoWait() + let value = + when E.K is Nothing: + await E.doRequest(peer) + else: + await E.doRequest(peer, key) + if value.isOk: + var applyReward = false + for val in value.get.values: + let res = await self.valueVerifier(E)(val) + if res.isErr: + case res.error + of BlockError.MissingParent: + # Stop, requires different request to progress + return didProgress + of BlockError.Duplicate: + # Ignore, a concurrent request may have already fulfilled this + when E.V is altair.LightClientBootstrap: + didProgress = true + else: + discard + of BlockError.UnviableFork: + # Descore, peer is on an incompatible fork version + notice "Received value from an unviable fork", value = val.shortLog, + endpoint = E.name, peer, peer_score = peer.getScore() + peer.updateScore(PeerScoreUnviableFork) + return didProgress + of BlockError.Invalid: + # Descore, received data is malformed + warn "Received invalid value", value = val.shortLog, + endpoint = E.name, peer, peer_score = peer.getScore() + peer.updateScore(PeerScoreBadBlocks) + return didProgress + else: + # Reward, peer returned something useful + applyReward = true + didProgress = true + if applyReward: + peer.updateScore(PeerScoreGoodBlocks) + else: + peer.updateScore(PeerScoreNoBlocks) + debug "Failed to receive value on request", value, + endpoint = E.name, peer, peer_score = peer.getScore() + except ResponseError as exc: + warn "Received invalid response", error = exc.msg, + endpoint = E.name, peer, peer_score = peer.getScore() + peer.updateScore(PeerScoreBadBlocks) + except CancelledError as exc: + raise exc + except PeerPoolError as exc: + debug "Failed to acquire peer", exc = exc.msg + except CatchableError as exc: + if peer != nil: + peer.updateScore(PeerScoreNoBlocks) + debug "Unexpected exception while receiving value", exc = exc.msg, + endpoint = E.name, peer, peer_score = peer.getScore() + raise exc + finally: + if peer != nil: + self.network.peerPool.release(peer) + return didProgress + +proc query[E]( + self: LightClientManager, + e: typedesc[E], + key: E.K +): Future[bool] {.async.} = + const PARALLEL_REQUESTS = 2 + var workers: array[PARALLEL_REQUESTS, Future[bool]] + + let + progressFut = newFuture[void]("lcmanProgress") + doneFut = newFuture[void]("lcmanDone") + var + numCompleted = 0 + maxCompleted = workers.len + + proc handleFinishedWorker(future: pointer) = + try: + let didProgress = cast[Future[bool]](future).read() + if didProgress and not progressFut.finished: + progressFut.complete() + except CancelledError as exc: + if not progressFut.finished: + progressFut.cancel() + except CatchableError as exc: + discard + finally: + inc numCompleted + if numCompleted == maxCompleted: + doneFut.complete() + + try: + # Start concurrent workers + for i in 0 ..< workers.len: + try: + workers[i] = self.workerTask(e, key) + workers[i].addCallback(handleFinishedWorker) + except CancelledError as exc: + raise exc + except CatchableError as exc: + workers[i] = newFuture[bool]() + workers[i].complete(false) + + # Wait for any worker to report progress, or for all workers to finish + discard await race(progressFut, doneFut) + finally: + for i in 0 ..< maxCompleted: + if workers[i] == nil: + maxCompleted = i + if numCompleted == maxCompleted: + doneFut.complete() + break + if not workers[i].finished: + workers[i].cancel() + while true: + try: + await allFutures(workers[0 ..< maxCompleted]) + break + except CancelledError as exc: + continue + while true: + try: + await doneFut + break + except CancelledError as exc: + continue + + if not progressFut.finished: + progressFut.cancel() + return progressFut.completed + +template query( + self: LightClientManager, + e: typedesc[UpdatesByRange], + key: SyncCommitteePeriod +): Future[bool] = + self.query(e, key .. key) + +template query[E]( + self: LightClientManager, + e: typedesc[E] +): Future[bool] = + self.query(e, Nothing()) + +type SchedulingMode = enum + Soon, + CurrentPeriod, + NextPeriod + +func fetchTime( + self: LightClientManager, + wallTime: BeaconTime, + schedulingMode: SchedulingMode +): BeaconTime = + let + remainingTime = + case schedulingMode: + of Soon: + chronos.seconds(0) + of CurrentPeriod: + let + wallPeriod = wallTime.slotOrZero().sync_committee_period + deadlineSlot = (wallPeriod + 1).start_slot - 1 + deadline = deadlineSlot.start_beacon_time() + chronos.nanoseconds((deadline - wallTime).nanoseconds) + of NextPeriod: + chronos.seconds( + (SLOTS_PER_SYNC_COMMITTEE_PERIOD * SECONDS_PER_SLOT).int64) + minDelay = max(remainingTime div 8, chronos.seconds(30)) + jitterSeconds = (minDelay * 2).seconds + jitterDelay = chronos.seconds(self.rng[].rand(jitterSeconds).int64) + return wallTime + minDelay + jitterDelay + +# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light-client-sync-process +proc loop(self: LightClientManager) {.async.} = + var nextFetchTime = self.getBeaconTime() + while true: + # Periodically wake and check for changes + let wallTime = self.getBeaconTime() + if wallTime < nextFetchTime or + self.network.peerPool.lenAvailable < 1: + await sleepAsync(chronos.seconds(2)) + continue + + # Obtain bootstrap data once a trusted block root is supplied + if not self.isLightClientStoreInitialized(): + let trustedBlockRoot = self.getTrustedBlockRoot() + if trustedBlockRoot.isNone: + await sleepAsync(chronos.seconds(2)) + continue + + let didProgress = await self.query(Bootstrap, trustedBlockRoot.get) + if not didProgress: + nextFetchTime = self.fetchTime(wallTime, Soon) + continue + + # Fetch updates + var allowWaitNextPeriod = false + let + finalized = self.getFinalizedPeriod() + optimistic = self.getOptimisticPeriod() + current = wallTime.slotOrZero().sync_committee_period + isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown() + + didProgress = + if finalized == optimistic and not isNextSyncCommitteeKnown: + if finalized >= current: + await self.query(UpdatesByRange, finalized) + else: + await self.query(UpdatesByRange, finalized ..< current) + elif finalized + 1 < current: + await self.query(UpdatesByRange, finalized + 1 ..< current) + elif finalized != optimistic: + await self.query(FinalityUpdate) + else: + allowWaitNextPeriod = true + await self.query(OptimisticUpdate) + + schedulingMode = + if not didProgress or not self.isGossipSupported(current): + Soon + elif not allowWaitNextPeriod: + CurrentPeriod + else: + NextPeriod + + nextFetchTime = self.fetchTime(wallTime, schedulingMode) + +proc start*(self: var LightClientManager) = + ## Start light client manager's loop. + doAssert self.loopFuture == nil + self.loopFuture = self.loop() + +proc stop*(self: var LightClientManager) {.async.} = + ## Stop light client manager's loop. + if self.loopFuture != nil: + await self.loopFuture.cancelAndWait() + self.loopFuture = nil diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index d51022e00..26d45823e 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -95,7 +95,8 @@ proc initQueue[A, B](man: SyncManager[A, B]) = of SyncQueueKind.Forward: man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(), man.getLastSlot(), man.chunkSize, - man.getSafeSlot, man.blockVerifier, 1) + man.getSafeSlot, man.blockVerifier, 1, + man.ident) of SyncQueueKind.Backward: let firstSlot = man.getFirstSlot() @@ -108,7 +109,8 @@ proc initQueue[A, B](man: SyncManager[A, B]) = Slot(firstSlot - 1'u64) man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot, man.chunkSize, man.getSafeSlot, - man.blockVerifier, 1) + man.blockVerifier, 1, + man.ident) proc newSyncManager*[A, B](pool: PeerPool[A, B], direction: SyncQueueKind, diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index bd5cd94e1..f3157c532 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -31,7 +31,7 @@ const blockByRangeLookupCost = allowedOpsPerSecondCost(20) # https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#configuration - MAX_REQUEST_LIGHT_CLIENT_UPDATES = 128 + MAX_REQUEST_LIGHT_CLIENT_UPDATES* = 128 lightClientEmptyResponseCost = allowedOpsPerSecondCost(50) lightClientBootstrapLookupCost = allowedOpsPerSecondCost(5) lightClientBootstrapResponseCost = allowedOpsPerSecondCost(100) @@ -59,9 +59,12 @@ type else: index: uint32 - BeaconSyncNetworkState* = ref object - dag*: ChainDAGRef - getBeaconTime*: GetBeaconTimeFn + BeaconSyncNetworkState = ref object + dag: ChainDAGRef + cfg: RuntimeConfig + forkDigests: ref ForkDigests + genesisBlockRoot: Eth2Digest + getBeaconTime: GetBeaconTimeFn BeaconSyncPeerState* = ref object statusLastTime*: chronos.Moment @@ -151,17 +154,29 @@ func disconnectReasonName(reason: uint64): string = elif reason == uint64(FaultOrError): "Fault or error" else: "Disconnected (" & $reason & ")" +func forkDigestAtEpoch(state: BeaconSyncNetworkState, + epoch: Epoch): ForkDigest = + state.forkDigests[].atEpoch(epoch, state.cfg) + proc getCurrentStatus(state: BeaconSyncNetworkState): StatusMsg = let dag = state.dag wallSlot = state.getBeaconTime().slotOrZero - StatusMsg( - forkDigest: dag.forkDigestAtEpoch(wallSlot.epoch), - finalizedRoot: dag.finalizedHead.blck.root, - finalizedEpoch: dag.finalizedHead.slot.epoch, - headRoot: dag.head.root, - headSlot: dag.head.slot) + if dag != nil: + StatusMsg( + forkDigest: state.forkDigestAtEpoch(wallSlot.epoch), + finalizedRoot: dag.finalizedHead.blck.root, + finalizedEpoch: dag.finalizedHead.slot.epoch, + headRoot: dag.head.root, + headSlot: dag.head.slot) + else: + StatusMsg( + forkDigest: state.forkDigestAtEpoch(wallSlot.epoch), + finalizedRoot: state.genesisBlockRoot, + finalizedEpoch: GENESIS_EPOCH, + headRoot: state.genesisBlockRoot, + headSlot: GENESIS_SLOT) proc checkStatusMsg(state: BeaconSyncNetworkState, status: StatusMsg): Result[void, cstring] = @@ -176,15 +191,20 @@ proc checkStatusMsg(state: BeaconSyncNetworkState, status: StatusMsg): if status.headSlot > wallSlot: return err("head more recent than wall clock") - if dag.forkDigestAtEpoch(wallSlot.epoch) != status.forkDigest: + if state.forkDigestAtEpoch(wallSlot.epoch) != status.forkDigest: return err("fork digests differ") - if status.finalizedEpoch <= dag.finalizedHead.slot.epoch: - let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot()) - if blockId.isSome and - (not status.finalizedRoot.isZero) and - status.finalizedRoot != blockId.get().bid.root: - return err("peer following different finality") + if dag != nil: + if status.finalizedEpoch <= dag.finalizedHead.slot.epoch: + let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot()) + if blockId.isSome and + (not status.finalizedRoot.isZero) and + status.finalizedRoot != blockId.get().bid.root: + return err("peer following different finality") + else: + if status.finalizedEpoch == GENESIS_EPOCH: + if status.finalizedRoot != state.genesisBlockRoot: + return err("peer following different finality") ok() @@ -229,22 +249,22 @@ p2pProtocol BeaconSync(version = 1, proc status(peer: Peer, theirStatus: StatusMsg, response: SingleChunkResponse[StatusMsg]) - {.async, libp2pProtocol("status", 1).} = + {.async, libp2pProtocol("status", 1, isRequired = true).} = let ourStatus = peer.networkState.getCurrentStatus() trace "Sending status message", peer = peer, status = ourStatus await response.send(ourStatus) discard await peer.handleStatus(peer.networkState, theirStatus) proc ping(peer: Peer, value: uint64): uint64 - {.libp2pProtocol("ping", 1).} = + {.libp2pProtocol("ping", 1, isRequired = true).} = return peer.network.metadata.seq_number proc getMetaData(peer: Peer): phase0.MetaData - {.libp2pProtocol("metadata", 1).} = + {.libp2pProtocol("metadata", 1, isRequired = true).} = return peer.network.phase0metadata proc getMetadata_v2(peer: Peer): altair.MetaData - {.libp2pProtocol("metadata", 2).} = + {.libp2pProtocol("metadata", 2, isRequired = true).} = return peer.network.metadata proc beaconBlocksByRange( @@ -439,7 +459,7 @@ p2pProtocol BeaconSync(version = 1, await response.writeBytesSZ( uncompressedLen, bytes, - dag.forkDigestAtEpoch(blocks[i].slot.epoch).data) + peer.networkState.forkDigestAtEpoch(blocks[i].slot.epoch).data) inc found @@ -495,7 +515,7 @@ p2pProtocol BeaconSync(version = 1, await response.writeBytesSZ( uncompressedLen, bytes, - dag.forkDigestAtEpoch(blockRef.slot.epoch).data) + peer.networkState.forkDigestAtEpoch(blockRef.slot.epoch).data) inc found @@ -520,7 +540,7 @@ p2pProtocol BeaconSync(version = 1, if bootstrap.isOk: let contextEpoch = bootstrap.get.header.slot.epoch - contextBytes = dag.forkDigestAtEpoch(contextEpoch).data + contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data await response.send(bootstrap.get, contextBytes) else: peer.updateRequestQuota(lightClientEmptyResponseCost) @@ -565,7 +585,7 @@ p2pProtocol BeaconSync(version = 1, if update.isSome: let contextEpoch = update.get.attested_header.slot.epoch - contextBytes = dag.forkDigestAtEpoch(contextEpoch).data + contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data await response.write(update.get, contextBytes) inc found @@ -589,7 +609,7 @@ p2pProtocol BeaconSync(version = 1, if finality_update.isSome: let contextEpoch = finality_update.get.attested_header.slot.epoch - contextBytes = dag.forkDigestAtEpoch(contextEpoch).data + contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data await response.send(finality_update.get, contextBytes) else: peer.updateRequestQuota(lightClientEmptyResponseCost) @@ -616,7 +636,7 @@ p2pProtocol BeaconSync(version = 1, if optimistic_update.isSome: let contextEpoch = optimistic_update.get.attested_header.slot.epoch - contextBytes = dag.forkDigestAtEpoch(contextEpoch).data + contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data await response.send(optimistic_update.get, contextBytes) else: peer.updateRequestQuota(lightClientEmptyResponseCost) @@ -629,7 +649,7 @@ p2pProtocol BeaconSync(version = 1, proc goodbye(peer: Peer, reason: uint64) - {.async, libp2pProtocol("goodbye", 1).} = + {.async, libp2pProtocol("goodbye", 1, isRequired = true).} = debug "Received Goodbye message", reason = disconnectReasonName(reason), peer proc useSyncV2*(state: BeaconSyncNetworkState): bool = @@ -689,4 +709,19 @@ proc initBeaconSync*(network: Eth2Node, dag: ChainDAGRef, getBeaconTime: GetBeaconTimeFn) = var networkState = network.protocolState(BeaconSync) networkState.dag = dag + networkState.cfg = dag.cfg + networkState.forkDigests = dag.forkDigests + networkState.genesisBlockRoot = dag.genesisBlockRoot + networkState.getBeaconTime = getBeaconTime + +proc initBeaconSync*(network: Eth2Node, + cfg: RuntimeConfig, + forkDigests: ref ForkDigests, + genesisBlockRoot: Eth2Digest, + getBeaconTime: GetBeaconTimeFn) = + var networkState = network.protocolState(BeaconSync) + networkState.dag = nil + networkState.cfg = cfg + networkState.forkDigests = forkDigests + networkState.genesisBlockRoot = genesisBlockRoot networkState.getBeaconTime = getBeaconTime diff --git a/scripts/launch_local_testnet.sh b/scripts/launch_local_testnet.sh index 41e5c54d9..5fcda5589 100755 --- a/scripts/launch_local_testnet.sh +++ b/scripts/launch_local_testnet.sh @@ -42,6 +42,9 @@ if [[ ${PIPESTATUS[0]} != 4 ]]; then exit 1 fi +CURL_BINARY="$(command -v curl)" || { echo "Curl not installed. Aborting."; exit 1; } +JQ_BINARY="$(command -v jq)" || { echo "Jq not installed. Aborting."; exit 1; } + OPTS="ht:n:d:g" LONGOPTS="help,preset:,nodes:,data-dir:,remote-validators-count:,threshold:,remote-signers:,with-ganache,stop-at-epoch:,disable-htop,disable-vc,enable-logtrace,log-level:,base-port:,base-rest-port:,base-metrics-port:,reuse-existing-data-dir,reuse-binaries,timeout:,kill-old-processes,eth2-docker-image:,lighthouse-vc-nodes:" @@ -68,6 +71,7 @@ ETH2_DOCKER_IMAGE="" REMOTE_SIGNER_NODES=0 REMOTE_SIGNER_THRESHOLD=1 REMOTE_VALIDATORS_COUNT=0 +LC_NODES=1 print_help() { cat </dev/null - curl -sSLO "${LH_URL}" + "${CURL_BINARY}" -sSLO "${LH_URL}" tar -xzf "${LH_TARBALL}" # contains just one file named "lighthouse" rm lighthouse-* # deletes both the tarball and old binary versions mv lighthouse "${LH_BINARY}" @@ -320,6 +329,10 @@ if [[ "${USE_VC}" == "1" ]]; then BINARIES="${BINARIES} nimbus_validator_client" fi +if [ "$LC_NODES" -ge "1" ]; then + BINARIES="${BINARIES} nimbus_light_client" +fi + if [[ "$ENABLE_LOGTRACE" == "1" ]]; then BINARIES="${BINARIES} logtrace" fi @@ -355,11 +368,13 @@ cleanup() { pkill -f -P $$ nimbus_beacon_node &>/dev/null || true pkill -f -P $$ nimbus_validator_client &>/dev/null || true pkill -f -P $$ nimbus_signing_node &>/dev/null || true + pkill -f -P $$ nimbus_light_client &>/dev/null || true pkill -f -P $$ ${LH_BINARY} &>/dev/null || true sleep 2 pkill -f -9 -P $$ nimbus_beacon_node &>/dev/null || true pkill -f -9 -P $$ nimbus_validator_client &>/dev/null || true pkill -f -9 -P $$ nimbus_signing_node &>/dev/null || true + pkill -f -9 -P $$ nimbus_light_client &>/dev/null || true pkill -f -9 -P $$ ${LH_BINARY} &>/dev/null || true # Delete all binaries we just built, because these are unusable outside this @@ -391,7 +406,7 @@ fi REMOTE_URLS="" for NUM_REMOTE in $(seq 0 $(( REMOTE_SIGNER_NODES - 1 ))); do - REMOTE_PORT=$(( BASE_REMOTE_SIGNER_PORT + NUM_REMOTE )) + REMOTE_PORT=$(( BASE_REMOTE_SIGNER_PORT + NUM_REMOTE )) REMOTE_URLS="${REMOTE_URLS} --remote-signer=http://127.0.0.1:${REMOTE_PORT}" done @@ -538,6 +553,10 @@ if [ "$REMOTE_SIGNER_NODES" -ge "0" ]; then NUM_JOBS=$((NUM_JOBS + REMOTE_SIGNER_NODES )) fi +if [ "$LC_NODES" -ge "1" ]; then + NUM_JOBS=$((NUM_JOBS + LC_NODES )) +fi + VALIDATORS_PER_VALIDATOR=$(( (SYSTEM_VALIDATORS / NODES_WITH_VALIDATORS) / 2 )) VALIDATOR_OFFSET=$((SYSTEM_VALIDATORS / 2)) @@ -650,7 +669,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do --config-file="${CLI_CONF_FILE}" \ --tcp-port=$(( BASE_PORT + NUM_NODE )) \ --udp-port=$(( BASE_PORT + NUM_NODE )) \ - --max-peers=$(( NUM_NODES - 1 )) \ + --max-peers=$(( NUM_NODES + LC_NODES - 1 )) \ --data-dir="${CONTAINER_NODE_DATA_DIR}" \ ${BOOTSTRAP_ARG} \ ${WEB3_ARG} \ @@ -696,6 +715,46 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do fi done +# light clients +if [ "$LC_NODES" -ge "1" ]; then + echo "Waiting for Altair finalization" + ALTAIR_FORK_EPOCH="$( + "${CURL_BINARY}" -s "http://localhost:${BASE_REST_PORT}/eth/v1/config/spec" | \ + "${JQ_BINARY}" -r '.data.ALTAIR_FORK_EPOCH')" + while :; do + CURRENT_FORK_EPOCH="$( + "${CURL_BINARY}" -s "http://localhost:${BASE_REST_PORT}/eth/v1/beacon/states/finalized/fork" | \ + "${JQ_BINARY}" -r '.data.epoch')" + if [ "${CURRENT_FORK_EPOCH}" -ge "${ALTAIR_FORK_EPOCH}" ]; then + break + fi + sleep 1 + done + + echo "Altair finalized, launching $LC_NODES light client(s)" + LC_BOOTSTRAP_NODE="$( + "${CURL_BINARY}" -s "http://localhost:${BASE_REST_PORT}/eth/v1/node/identity" | \ + "${JQ_BINARY}" -r '.data.enr')" + LC_TRUSTED_BLOCK_ROOT="$( + "${CURL_BINARY}" -s "http://localhost:${BASE_REST_PORT}/eth/v1/beacon/headers/finalized" | \ + "${JQ_BINARY}" -r '.data.root')" + for NUM_LC in $(seq 0 $(( LC_NODES - 1 ))); do + ./build/nimbus_light_client \ + --log-level="${LOG_LEVEL}" \ + --log-format="json" \ + --network="${CONTAINER_DATA_DIR}" \ + --bootstrap-node="${LC_BOOTSTRAP_NODE}" \ + --tcp-port=$(( BASE_PORT + NUM_NODES + NUM_LC )) \ + --udp-port=$(( BASE_PORT + NUM_NODES + NUM_LC )) \ + --max-peers=$(( NUM_NODES + LC_NODES - 1 )) \ + --nat="extip:127.0.0.1" \ + --trusted-block-root="${LC_TRUSTED_BLOCK_ROOT}" \ + ${STOP_AT_EPOCH_FLAG} \ + &> "${DATA_DIR}/log_lc${NUM_LC}.txt" & + PIDS="${PIDS},$!" + done +fi + # give the regular nodes time to crash sleep 5 BG_JOBS="$(jobs | wc -l | tr -d ' ')" @@ -703,7 +762,7 @@ if [[ "${TIMEOUT_DURATION}" != "0" ]]; then BG_JOBS=$(( BG_JOBS - 1 )) # minus the timeout bg job fi if [[ "$BG_JOBS" != "$NUM_JOBS" ]]; then - echo "$(( NUM_JOBS - BG_JOBS )) nimbus_beacon_node/nimbus_validator_client instance(s) exited early. Aborting." + echo "$(( NUM_JOBS - BG_JOBS )) nimbus_beacon_node/nimbus_validator_client/nimbus_light_client instance(s) exited early. Aborting." dump_logs dump_logtrace exit 1 diff --git a/tests/test_light_client_processor.nim b/tests/test_light_client_processor.nim index 52c0e42d5..236351357 100644 --- a/tests/test_light_client_processor.nim +++ b/tests/test_light_client_processor.nim @@ -58,6 +58,8 @@ suite "Light client processor" & preset(): let genesis_validators_root = dag.genesis_validators_root trustedBlockRoot = dag.head.root + proc getTrustedBlockRoot(): Option[Eth2Digest] = + some trustedBlockRoot const lowPeriod = 0.SyncCommitteePeriod @@ -83,15 +85,15 @@ suite "Light client processor" & preset(): func setTimeToSlot(slot: Slot) = time = chronos.seconds((slot * SECONDS_PER_SLOT).int64) - var numDidInitializeStoreCalls = 0 - proc didInitializeStore() = inc numDidInitializeStoreCalls + var numOnStoreInitializedCalls = 0 + proc onStoreInitialized() = inc numOnStoreInitializedCalls let store = (ref Option[LightClientStore])() var processor = LightClientProcessor.new( - false, "", "", cfg, genesis_validators_root, trustedBlockRoot, - store, getBeaconTime, didInitializeStore) - res: Result[void, BlockError] + false, "", "", cfg, genesis_validators_root, + store, getBeaconTime, getTrustedBlockRoot, onStoreInitialized) + res: Result[bool, BlockError] test "Sync" & preset(): let bootstrap = dag.getLightClientBootstrap(trustedBlockRoot) @@ -101,7 +103,7 @@ suite "Light client processor" & preset(): MsgSource.gossip, getBeaconTime(), bootstrap.get) check: res.isOk - numDidInitializeStoreCalls == 1 + numOnStoreInitializedCalls == 1 for period in lowPeriod .. lastPeriodWithSupermajority: let update = dag.getLightClientUpdateForPeriod(period) @@ -177,7 +179,7 @@ suite "Light client processor" & preset(): store[].get.optimistic_header == finalityUpdate.get.attested_header else: check res.error == BlockError.Duplicate - check numDidInitializeStoreCalls == 1 + check numOnStoreInitializedCalls == 1 test "Invalid bootstrap" & preset(): var bootstrap = dag.getLightClientBootstrap(trustedBlockRoot) @@ -189,7 +191,7 @@ suite "Light client processor" & preset(): check: res.isErr res.error == BlockError.Invalid - numDidInitializeStoreCalls == 0 + numOnStoreInitializedCalls == 0 test "Duplicate bootstrap" & preset(): let bootstrap = dag.getLightClientBootstrap(trustedBlockRoot) @@ -199,13 +201,13 @@ suite "Light client processor" & preset(): MsgSource.gossip, getBeaconTime(), bootstrap.get) check: res.isOk - numDidInitializeStoreCalls == 1 + numOnStoreInitializedCalls == 1 res = processor[].storeObject( MsgSource.gossip, getBeaconTime(), bootstrap.get) check: res.isErr res.error == BlockError.Duplicate - numDidInitializeStoreCalls == 1 + numOnStoreInitializedCalls == 1 test "Missing bootstrap (update)" & preset(): let update = dag.getLightClientUpdateForPeriod(lowPeriod) @@ -216,7 +218,7 @@ suite "Light client processor" & preset(): check: res.isErr res.error == BlockError.MissingParent - numDidInitializeStoreCalls == 0 + numOnStoreInitializedCalls == 0 test "Missing bootstrap (finality update)" & preset(): let finalityUpdate = dag.getLightClientFinalityUpdate() @@ -227,7 +229,7 @@ suite "Light client processor" & preset(): check: res.isErr res.error == BlockError.MissingParent - numDidInitializeStoreCalls == 0 + numOnStoreInitializedCalls == 0 test "Missing bootstrap (optimistic update)" & preset(): let optimisticUpdate = dag.getLightClientOptimisticUpdate() @@ -238,4 +240,4 @@ suite "Light client processor" & preset(): check: res.isErr res.error == BlockError.MissingParent - numDidInitializeStoreCalls == 0 + numOnStoreInitializedCalls == 0