From e8542f951fc2f8b7056cd0795d2fee140e7e60b5 Mon Sep 17 00:00:00 2001 From: Advaita Saha Date: Sun, 29 Sep 2024 22:18:11 +0530 Subject: [PATCH] External syncer (#2574) * inital external sync structure * add to makefile * feat: external syncer with healing process * fix: suggestions * network mapping * minor changes * forward sync * jwt auth * nrpc structure * nrpc engine-api loader * fix: suggestions * fix: suggestions * remove sync_db * fix: edge cases and forks * fix: rebase changes * revert nimbus config changes --- Makefile | 7 +- nimbus/utils/era_helpers.nim | 2 +- nrpc/config.nim | 208 +++++++++++++++++++++++++ nrpc/nim.cfg | 10 ++ nrpc/nrpc.nim | 285 +++++++++++++++++++++++++++++++++++ 5 files changed, 509 insertions(+), 3 deletions(-) create mode 100644 nrpc/config.nim create mode 100644 nrpc/nim.cfg create mode 100644 nrpc/nrpc.nim diff --git a/Makefile b/Makefile index c3fc76c2d..c505b2cb4 100644 --- a/Makefile +++ b/Makefile @@ -61,8 +61,10 @@ EXCLUDED_NIM_PACKAGES := \ TOOLS := \ test_tools_build \ persist \ - hunter + hunter \ + nrpc TOOLS_DIRS := \ + nrpc \ tests \ premix # comma-separated values for the "clean" target @@ -108,6 +110,7 @@ VERIF_PROXY_OUT_PATH ?= build/libverifproxy/ fluffy \ nimbus_verified_proxy \ libverifproxy \ + external_sync \ test \ test-reproducibility \ clean \ @@ -207,7 +210,7 @@ update-from-ci: | sanity-checks update-test $(TOOLS): | build deps rocksdb for D in $(TOOLS_DIRS); do [ -e "$${D}/$@.nim" ] && TOOL_DIR="$${D}" && break; done && \ echo -e $(BUILD_MSG) "build/$@" && \ - $(ENV_SCRIPT) nim c $(NIM_PARAMS) -o:build/$@ "$${TOOL_DIR}/$@.nim" + $(ENV_SCRIPT) nim c $(NIM_PARAMS) -d:chronicles_log_level=TRACE -o:build/$@ "$${TOOL_DIR}/$@.nim" # a phony target, because teaching `make` how to do conditional recompilation of Nim projects is too complicated nimbus: | build deps rocksdb diff --git a/nimbus/utils/era_helpers.nim b/nimbus/utils/era_helpers.nim index 122292a92..a84a428ec 100644 --- a/nimbus/utils/era_helpers.nim +++ b/nimbus/utils/era_helpers.nim @@ -117,7 +117,7 @@ proc getWithdrawals*(x: seq[capella.Withdrawal]): seq[common.Withdrawal] = ) return withdrawals -proc getEthBlock(blck: ForkyTrustedBeaconBlock): Opt[EthBlock] = +proc getEthBlock*(blck: ForkyTrustedBeaconBlock): Opt[EthBlock] = ## Convert a beacon block to an eth1 block. const consensusFork = typeof(blck).kind when consensusFork >= ConsensusFork.Bellatrix: diff --git a/nrpc/config.nim b/nrpc/config.nim new file mode 100644 index 000000000..2fc385360 --- /dev/null +++ b/nrpc/config.nim @@ -0,0 +1,208 @@ +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import + std/[ + options, + strutils, + os, + net + ], + pkg/[ + chronicles, + confutils, + confutils/defs, + confutils/std/net + ], + eth/[common, net/utils, net/nat, p2p/enode, p2p/discoveryv5/enr], + "../nimbus"/[constants, compile_info], + ../nimbus/common/chain_config, + ../nimbus/db/opts + +export net, defs + +func defaultDataDir*(): string = + when defined(windows): + getHomeDir() / "AppData" / "Roaming" / "Nimbus" + elif defined(macosx): + getHomeDir() / "Library" / "Application Support" / "Nimbus" + else: + getHomeDir() / ".cache" / "nimbus" + +func getLogLevels(): string = + var logLevels: seq[string] + for level in LogLevel: + if level < enabledLogLevel: + continue + logLevels.add($level) + join(logLevels, ", ") + +const + logLevelDesc = getLogLevels() + +type + ChainDbMode* {.pure.} = enum + Aristo + AriPrune + + NRpcCmd* {.pure.} = enum + `external_sync` + + NRpcConf* = object of RootObj + ## Main NRpc configuration object + + beaconApi* {. + desc: "Beacon API url" + defaultValue: "" + name: "beacon-api" .}: string + + network {. + desc: "Name or id number of Ethereum network(mainnet(1), sepolia(11155111), holesky(17000), other=custom)" + longDesc: + "- mainnet: Ethereum main network\n" & + "- sepolia: Test network (pow+pos) with merge\n" & + "- holesky: The holesovice post-merge testnet" + defaultValue: "" # the default value is set in makeConfig + defaultValueDesc: "mainnet(1)" + abbr: "i" + name: "network" }: string + + customNetwork {. + desc: "Use custom genesis block for private Ethereum Network (as /path/to/genesis.json)" + defaultValueDesc: "" + abbr: "c" + name: "custom-network" }: Option[NetworkParams] + + networkId* {. + ignore # this field is not processed by confutils + defaultValue: MainNet # the defaultValue value is set by `makeConfig` + name: "network-id"}: NetworkId + + networkParams* {. + ignore # this field is not processed by confutils + defaultValue: NetworkParams() # the defaultValue value is set by `makeConfig` + name: "network-params"}: NetworkParams + + logLevel* {. + separator: "\pLOGGING AND DEBUGGING OPTIONS:" + desc: "Sets the log level for process and topics (" & logLevelDesc & ")" + defaultValue: LogLevel.INFO + defaultValueDesc: $LogLevel.INFO + name: "log-level" }: LogLevel + + logFile* {. + desc: "Specifies a path for the written Json log file" + name: "log-file" }: Option[OutFile] + + case cmd* {. + command + desc: "" }: NRpcCmd + + of `external_sync`: + + # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.4/src/engine/authentication.md#key-distribution + jwtSecret* {. + desc: "Path to a file containing a 32 byte hex-encoded shared secret" & + " needed for websocket authentication. By default, the secret key" & + " is auto-generated." + defaultValueDesc: "\"jwt.hex\" in the data directory (see --data-dir)" + name: "jwt-secret" .}: Option[InputFile] + + elEngineApi* {. + desc: "Eth1 Engine API url" + defaultValue: "" + name: "el-engine-api" .}: string + +func parseCmdArg(T: type NetworkId, p: string): T + {.gcsafe, raises: [ValueError].} = + parseInt(p).T + +func completeCmdArg(T: type NetworkId, val: string): seq[string] = + return @[] + +func parseCmdArg*(T: type enr.Record, p: string): T {.raises: [ValueError].} = + result = fromURI(enr.Record, p).valueOr: + raise newException(ValueError, "Invalid ENR") + +func completeCmdArg*(T: type enr.Record, val: string): seq[string] = + return @[] + +proc parseCmdArg(T: type NetworkParams, p: string): T + {.gcsafe, raises: [ValueError].} = + try: + if not loadNetworkParams(p, result): + raise newException(ValueError, "failed to load customNetwork") + except CatchableError: + raise newException(ValueError, "failed to load customNetwork") + +func completeCmdArg(T: type NetworkParams, val: string): seq[string] = + return @[] + + +proc getNetworkId(conf: NRpcConf): Opt[NetworkId] = + if conf.network.len == 0: + return Opt.none NetworkId + + let network = toLowerAscii(conf.network) + case network + of "mainnet": return Opt.some MainNet + of "sepolia": return Opt.some SepoliaNet + of "holesky": return Opt.some HoleskyNet + else: + try: + Opt.some parseInt(network).NetworkId + except CatchableError: + error "Failed to parse network name or id", network + quit QuitFailure + +# KLUDGE: The `load()` template does currently not work within any exception +# annotated environment. +{.pop.} + +proc makeConfig*(cmdLine = commandLineParams()): NRpcConf + {.raises: [CatchableError].} = + ## Note: this function is not gc-safe + + # The try/catch clause can go away when `load()` is clean + try: + {.push warning[ProveInit]: off.} + result = NRpcConf.load( + cmdLine + ) + {.pop.} + except CatchableError as e: + raise e + + var networkId = result.getNetworkId() + + if result.customNetwork.isSome: + result.networkParams = result.customNetwork.get() + if networkId.isNone: + # WARNING: networkId and chainId are two distinct things + # they usage should not be mixed in other places. + # We only set networkId to chainId if networkId not set in cli and + # --custom-network is set. + # If chainId is not defined in config file, it's ok because + # zero means CustomNet + networkId = Opt.some(NetworkId(result.networkParams.config.chainId)) + + if networkId.isNone: + # bootnodes is set via getBootNodes + networkId = Opt.some MainNet + + result.networkId = networkId.get() + + if result.customNetwork.isNone: + result.networkParams = networkParams(result.networkId) + + +when isMainModule: + # for testing purpose + discard makeConfig() diff --git a/nrpc/nim.cfg b/nrpc/nim.cfg new file mode 100644 index 000000000..39cfc86be --- /dev/null +++ b/nrpc/nim.cfg @@ -0,0 +1,10 @@ +-d:"chronicles_runtime_filtering=on" +-d:"chronicles_disable_thread_id" + +@if release: + -d:"chronicles_line_numbers:0" +@end + +-d:"chronicles_sinks=textlines[file]" +-d:"chronicles_runtime_filtering=on" +-d:nimDebugDlOpen diff --git a/nrpc/nrpc.nim b/nrpc/nrpc.nim new file mode 100644 index 000000000..2fb968e46 --- /dev/null +++ b/nrpc/nrpc.nim @@ -0,0 +1,285 @@ +# Nimbus +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import + std/sequtils, + chronicles, + ../nimbus/constants, + ../nimbus/core/chain, + ./config, + ../nimbus/db/core_db/persistent, + ../nimbus/utils/era_helpers, + kzg4844/kzg_ex as kzg, + web3, + web3/[engine_api, primitives, conversions], + beacon_chain/spec/digest, + beacon_chain/el/el_conf, + beacon_chain/el/el_manager, + beacon_chain/spec/[forks, state_transition_block], + beacon_chain/spec/eth2_apis/[rest_types, rest_beacon_calls], + beacon_chain/networking/network_metadata, + eth/async_utils + +var running* {.volatile.} = true + +# Load the EL block, from CL ( either head or CL root ) +template getCLBlockFromBeaconChain( + client: RestClientRef, blockIdent: BlockIdent, clConfig: RuntimeConfig +): (ForkedSignedBeaconBlock, bool) = + let clBlock = + try: + awaitWithTimeout(client.getBlockV2(blockIdent, clConfig), 30.seconds): + error "Failed to get CL head" + quit(QuitFailure) + except CatchableError as exc: + error "Error getting CL head", error = exc.msg + quit(QuitFailure) + + # Constructing the EL block from the CL block + var blck: ForkedSignedBeaconBlock + if clBlock.isSome(): + let blck = clBlock.get()[] + + (blck, true) + else: + (blck, false) + +# Load the EL block, from CL ( either head or CL root ) +# Also returns the availability of the block as a boolean +template getELBlockFromBeaconChain( + client: RestClientRef, blockIdent: BlockIdent, clConfig: RuntimeConfig +): (EthBlock, bool) = + let (clBlock, isAvailable) = getCLBlockFromBeaconChain(client, blockIdent, clConfig) + + # Constructing the EL block from the CL block + var eth1block: EthBlock + if isAvailable: + withBlck(clBlock.asTrusted()): + eth1Block = getEthBlock(forkyBlck.message).valueOr: + error "Failed to get EL block from CL head" + quit(QuitFailure) + + (eth1Block, true) + else: + (eth1Block, false) + +# Load the network configuration based on the network id +template loadNetworkConfig(conf: NRpcConf): (RuntimeConfig, uint64, uint64) = + case conf.networkId + of MainNet: + (getMetadataForNetwork("mainnet").cfg, 15537393'u64, 4700013'u64) + of SepoliaNet: + (getMetadataForNetwork("sepolia").cfg, 1450408'u64, 115193'u64) + of HoleskyNet: + (getMetadataForNetwork("holesky").cfg, 0'u64, 0'u64) + else: + error "Unsupported network", network = conf.networkId + quit(QuitFailure) + +# Slot Finding Mechanism +# First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1 +# Then it iterates over the slots to find the current slot number, along with reducing the +# search space by calculating the difference between the `blockNumber` and the `block_number` from the executionPayload +# of the slot, then adding the difference to the importedSlot. This pushes the lower bound more, +# making the search way smaller +template findSlot( + client: RestClientRef, + currentBlockNumber: uint64, + lastEra1Block: uint64, + firstSlotAfterMerge: uint64, +): uint64 = + var importedSlot = (currentBlockNumber - lastEra1Block) + firstSlotAfterMerge + notice "Finding slot number corresponding to block", importedSlot + + var clNum = 0'u64 + while running and clNum < currentBlockNumber: + let (blk, stat) = + client.getELBlockFromBeaconChain(BlockIdent.init(Slot(importedSlot)), clConfig) + if not stat: + importedSlot += 1 + continue + + clNum = blk.header.number + # decreasing the lower bound with each iteration + importedSlot += currentBlockNumber - clNum + + notice "Found the slot to start with", slot = importedSlot + importedSlot + +# The main procedure to sync the EL with the help of CL +# Takes blocks from the CL and sends them to the EL via the engineAPI +proc syncToEngineApi(conf: NRpcConf) {.async.} = + let + # Load the network configuration, jwt secret and engine api url + (clConfig, lastEra1Block, firstSlotAfterMerge) = loadNetworkConfig(conf) + jwtSecret = + if conf.jwtSecret.isSome(): + loadJwtSecret(Opt.some(conf.jwtSecret.get())) + else: + Opt.none(seq[byte]) + engineUrl = EngineApiUrl.init(conf.elEngineApi, jwtSecret) + + # Create the client for the engine api + # And exchange the capabilities for a test communication + web3 = await engineUrl.newWeb3() + rpcClient = web3.provider + data = + try: + await rpcClient.exchangeCapabilities( + @[ + "engine_exchangeTransitionConfigurationV1", "engine_forkchoiceUpdatedV1", + "engine_getPayloadBodiesByHash", "engine_getPayloadBodiesByRangeV1", + "engine_getPayloadV1", "engine_newPayloadV1", + ] + ) + except CatchableError as exc: + error "Error Connecting to the EL Engine API", error = exc.msg + @[] + + notice "Communication with the EL Success", data = data + + # Get the latest block number from the EL rest api + template elBlockNumber(): uint64 = + try: + uint64(await rpcClient.eth_blockNumber()) + except CatchableError as exc: + error "Error getting block number", error = exc.msg + 0'u64 + + # Load the EL state detials and create the beaconAPI client + var + currentBlockNumber = elBlockNumber() + 1 + curBlck: ForkedSignedBeaconBlock + client = RestClientRef.new(conf.beaconApi).valueOr: + error "Cannot connect to Beacon Api", url = conf.beaconApi + quit(QuitFailure) + + notice "Current block number", number = currentBlockNumber + + # Load the latest state from the CL + var + (finalizedBlck, _) = client.getELBlockFromBeaconChain( + BlockIdent.init(BlockIdentType.Finalized), clConfig + ) + (headBlck, _) = + client.getELBlockFromBeaconChain(BlockIdent.init(BlockIdentType.Head), clConfig) + + # Check if the EL is already in sync or ahead of the CL + if headBlck.header.number <= currentBlockNumber: + notice "CL head is behind of EL head, or in sync", head = headBlck.header.number + quit(QuitSuccess) + + var + importedSlot = + findSlot(client, currentBlockNumber, lastEra1Block, firstSlotAfterMerge) + finalizedHash = Eth2Digest.fromHex("0x00") + headHash: Eth2Digest + + while running and currentBlockNumber < headBlck.header.number: + var isAvailable = false + (curBlck, isAvailable) = + client.getCLBlockFromBeaconChain(BlockIdent.init(Slot(importedSlot)), clConfig) + + if not isAvailable: + importedSlot += 1 + continue + + debug "Block loaded from the CL" + importedSlot += 1 + withBlck(curBlck): + # Don't include blocks before bellatrix, as it doesn't have payload + when consensusFork >= ConsensusFork.Bellatrix: + # Load the execution payload for all blocks after the bellatrix upgrade + let payload = forkyBlck.message.body.execution_payload.asEngineExecutionPayload + var payloadResponse: engine_api.PayloadStatusV1 + + # Make the newPayload call based on the consensus fork + # Before Deneb calls are made without versioned hashes + # Thus calls will be same for Bellatrix and Capella forks + # And for Deneb, we will pass the versioned hashes + when consensusFork <= ConsensusFork.Capella: + debug "Making new payload call for Bellatrix/Capella" + payloadResponse = await rpcClient.newPayload(payload) + elif consensusFork >= ConsensusFork.Deneb: + # Calculate the versioned hashes from the kzg commitments + debug "Generating the versioned hashes for Deneb" + let versioned_hashes = mapIt( + forkyBlck.message.body.blob_kzg_commitments, + engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it)), + ) + debug "Making new payload call for Deneb" + payloadResponse = await rpcClient.newPayload( + payload, versioned_hashes, FixedBytes[32] forkyBlck.message.parent_root.data + ) + notice "Payload status", response = payloadResponse + + # Load the head hash from the execution payload, for forkchoice + headHash = forkyBlck.message.body.execution_payload.block_hash + + # Make the forkchoicestate based on the the last + # `new_payload` call and the state received from the EL rest api + # And generate the PayloadAttributes based on the consensus fork + let + state = ForkchoiceStateV1( + headBlockHash: headHash.asBlockHash, + safeBlockHash: finalizedHash.asBlockHash, + finalizedBlockHash: finalizedHash.asBlockHash, + ) + payloadAttributes = + when consensusFork == ConsensusFork.Bellatrix: + Opt.none(PayloadAttributesV1) + elif consensusFork == ConsensusFork.Capella: + Opt.none(PayloadAttributesV2) + else: + Opt.none(PayloadAttributesV3) # For Deneb + + # Make the forkchoiceUpdated call based, after loading attributes based on the consensus fork + let fcuResponse = await rpcClient.forkchoiceUpdated(state, payloadAttributes) + notice "Forkchoice Updated", state = state, response = fcuResponse + + # Update the finalized hash + # This is updated after the fcu call is made + # So that head - head mod 32 is maintained + # i.e finalized have to be mod slots per epoch == 0 + let blknum = forkyBlck.message.body.execution_payload.block_number + if blknum < finalizedBlck.header.number and blknum mod 32 == 0: + finalizedHash = headHash + elif blknum >= finalizedBlck.header.number: + # If the real finalized block is crossed, then upate the finalized hash to the real one + (finalizedBlck, _) = client.getELBlockFromBeaconChain( + BlockIdent.init(BlockIdentType.Finalized), clConfig + ) + finalizedHash = finalizedBlck.header.blockHash + + # Update the current block number from EL rest api + # Shows that the fcu call has succeeded + currentBlockNumber = elBlockNumber() + (headBlck, _) = + client.getELBlockFromBeaconChain(BlockIdent.init(BlockIdentType.Head), clConfig) + +when isMainModule: + ## Ctrl+C handling + proc controlCHandler() {.noconv.} = + when defined(windows): + # workaround for https://github.com/nim-lang/Nim/issues/4057 + setupForeignThreadGc() + running = false + + setControlCHook(controlCHandler) + + ## Show logs on stdout until we get the user's logging choice + discard defaultChroniclesStream.output.open(stdout) + + ## Processing command line arguments + let conf = makeConfig() + setLogLevel(conf.logLevel) + + case conf.cmd + of NRpcCmd.`external_sync`: + waitFor syncToEngineApi(conf)