# beacon_chain # Copyright (c) 2018-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import std/[strformat, strutils, sequtils, typetraits, uri, json], # Nimble packages: chronos, metrics, chronicles/timings, json_rpc/[client, errors], web3, web3/[engine_api, primitives, conversions], eth/common/[eth_types, transaction], eth/async_utils, results, stew/[assign2, byteutils, objects], # Local modules: ../spec/[eth2_merkleization, forks, helpers], ../networking/network_metadata, ".."/[beacon_node_status, future_combinators], "."/[eth1_chain, el_conf] from std/times import getTime, inSeconds, initTime, `-` from ../spec/engine_authentication import getSignedIatToken from ../spec/state_transition_block import kzg_commitment_to_versioned_hash export eth1_chain, el_conf, engine_api, base logScope: topics = "elmon" type PubKeyBytes = DynamicBytes[48, 48] WithdrawalCredentialsBytes = DynamicBytes[32, 32] SignatureBytes = DynamicBytes[96, 96] Int64LeBytes = DynamicBytes[8, 8] contract(DepositContract): proc deposit(pubkey: PubKeyBytes, withdrawalCredentials: WithdrawalCredentialsBytes, signature: SignatureBytes, deposit_data_root: FixedBytes[32]) proc get_deposit_root(): FixedBytes[32] proc get_deposit_count(): Int64LeBytes proc DepositEvent(pubkey: PubKeyBytes, withdrawalCredentials: WithdrawalCredentialsBytes, amount: Int64LeBytes, signature: SignatureBytes, index: Int64LeBytes) {.event.} const hasDepositRootChecks = defined(has_deposit_root_checks) targetBlocksPerLogsRequest = 1000'u64 # TODO # # This is currently set to 1000, because this was the default maximum # value in Besu circa our 22.3.0 release. Previously, we've used 5000, # but this was effectively forcing the fallback logic in `syncBlockRange` # to always execute multiple requests before getting a successful response. # # Besu have raised this default to 5000 in https://github.com/hyperledger/besu/pull/5209 # which is expected to ship in their next release. # # Full deposits sync time with various values for this parameter: # # Blocks per request | Geth running on the same host | Geth running on a more distant host # ---------------------------------------------------------------------------------------- # 1000 | 11m 20s | 22m # 5000 | 5m 20s | 15m 40s # 100000 | 4m 10s | not tested # # The number of requests scales linearly with the parameter value as you would expect. # # These results suggest that it would be reasonable for us to get back to 5000 once the # Besu release is well-spread within their userbase. # Engine API timeouts engineApiConnectionTimeout = 5.seconds # How much we wait before giving up connecting to the Engine API web3RequestsTimeout* = 8.seconds # How much we wait for eth_* requests (e.g. eth_getBlockByHash) # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#request-2 # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#request-2 GETPAYLOAD_TIMEOUT = 1.seconds connectionStateChangeHysteresisThreshold = 15 ## How many unsuccesful/successful requests we must see ## before declaring the connection as degraded/restored type NextExpectedPayloadParams* = object headBlockHash*: Eth2Digest safeBlockHash*: Eth2Digest finalizedBlockHash*: Eth2Digest payloadAttributes*: PayloadAttributesV3 ELManager* = ref object eth1Network: Option[Eth1Network] ## If this value is supplied the EL manager will check whether ## all configured EL nodes are connected to the same network. depositContractAddress*: Eth1Address depositContractBlockNumber: uint64 depositContractBlockHash: BlockHash blocksPerLogsRequest: uint64 ## This value is used to dynamically adjust the number of ## blocks we are trying to download at once during deposit ## syncing. By default, the value is set to the constant ## `targetBlocksPerLogsRequest`, but if the EL is failing ## to serve this number of blocks per single `eth_getLogs` ## request, we temporarily lower the value until the request ## succeeds. The failures are generally expected only in ## periods in the history for very high deposit density. elConnections: seq[ELConnection] ## All active EL connections eth1Chain: Eth1Chain ## At larger distances, this chain consists of all blocks ## with deposits. Within the relevant voting period, it ## also includes blocks without deposits because we must ## vote for a block only if it's part of our known history. syncTargetBlock: Option[Eth1BlockNumber] chainSyncingLoopFut: Future[void] exchangeTransitionConfigurationLoopFut: Future[void] stopFut: Future[void] nextExpectedPayloadParams*: Option[NextExpectedPayloadParams] EtcStatus {.pure.} = enum notExchangedYet mismatch match DepositContractSyncStatus {.pure.} = enum unknown notSynced synced ConnectionState = enum NeverTested Working Degraded ELConnection* = ref object engineUrl: EngineApiUrl web3: Option[Web3] ## This will be `none` before connecting and while we are ## reconnecting after a lost connetion. You can wait on ## the future below for the moment the connection is active. connectingFut: Future[Result[Web3, string]] ## This future will be replaced when the connection is lost. etcStatus: EtcStatus ## The latest status of the `exchangeTransitionConfiguration` ## exchange. state: ConnectionState hysteresisCounter: int depositContractSyncStatus: DepositContractSyncStatus ## Are we sure that this EL has synced the deposit contract? lastPayloadId: Option[engine_api.PayloadID] FullBlockId* = object number: Eth1BlockNumber hash: BlockHash DataProviderFailure* = object of CatchableError CorruptDataProvider* = object of DataProviderFailure DataProviderTimeout* = object of DataProviderFailure DisconnectHandler* = proc () {.gcsafe, raises: [].} DepositEventHandler* = proc ( pubkey: PubKeyBytes, withdrawalCredentials: WithdrawalCredentialsBytes, amount: Int64LeBytes, signature: SignatureBytes, merkleTreeIndex: Int64LeBytes, j: JsonNode) {.gcsafe, raises: [].} BellatrixExecutionPayloadWithValue* = object executionPayload*: ExecutionPayloadV1 blockValue*: UInt256 SomeEnginePayloadWithValue = BellatrixExecutionPayloadWithValue | GetPayloadV2Response | GetPayloadV3Response declareCounter failed_web3_requests, "Failed web3 requests" declareGauge eth1_latest_head, "The highest Eth1 block number observed on the network" declareGauge eth1_synced_head, "Block number of the highest synchronized block according to follow distance" declareCounter engine_api_responses, "Number of successful requests to the newPayload Engine API end-point", labels = ["url", "request", "status"] declareHistogram engine_api_request_duration_seconds, "Time(s) used to generate signature usign remote signer", buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0], labels = ["url", "request"] declareCounter engine_api_timeouts, "Number of timed-out requests to Engine API end-point", labels = ["url", "request"] declareCounter engine_api_last_minute_forkchoice_updates_sent, "Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals", labels = ["url"] proc close(connection: ELConnection): Future[void] {.async.} = if connection.web3.isSome: awaitWithTimeout(connection.web3.get.close(), 30.seconds): debug "Failed to close data provider in time" proc increaseCounterTowardsStateChange(connection: ELConnection): bool = result = connection.hysteresisCounter >= connectionStateChangeHysteresisThreshold if result: connection.hysteresisCounter = 0 else: inc connection.hysteresisCounter proc decreaseCounterTowardsStateChange(connection: ELConnection) = if connection.hysteresisCounter > 0: # While we increase the counter by 1, we decreate it by 20% in order # to require a steady and affirmative change instead of allowing # the counter to drift very slowly in one direction when the ratio # between success and failure is roughly 50:50% connection.hysteresisCounter = connection.hysteresisCounter div 5 proc setDegradedState(connection: ELConnection, requestName: string, statusCode: int, errMsg: string) = debug "Failed EL Request", requestName, statusCode, err = errMsg case connection.state of NeverTested, Working: if connection.increaseCounterTowardsStateChange(): warn "Connection to EL node degraded", url = url(connection.engineUrl), failedRequest = requestName, statusCode, err = errMsg connection.state = Degraded asyncSpawn connection.close() connection.web3 = none[Web3]() of Degraded: connection.decreaseCounterTowardsStateChange() proc setWorkingState(connection: ELConnection) = case connection.state of NeverTested: connection.hysteresisCounter = 0 connection.state = Working of Degraded: if connection.increaseCounterTowardsStateChange(): info "Connection to EL node restored", url = url(connection.engineUrl) connection.state = Working of Working: connection.decreaseCounterTowardsStateChange() proc trackEngineApiRequest(connection: ELConnection, request: FutureBase, requestName: string, startTime: Moment, deadline: Future[void], failureAllowed = false) = request.addCallback do (udata: pointer) {.gcsafe, raises: [].}: # TODO `udata` is nil here. How come? # This forces us to create a GC cycle between the Future and the closure if request.completed: engine_api_request_duration_seconds.observe( float(milliseconds(Moment.now - startTime)) / 1000.0, [connection.engineUrl.url, requestName]) connection.setWorkingState() deadline.addCallback do (udata: pointer) {.gcsafe, raises: [].}: if not request.finished: request.cancelSoon() engine_api_timeouts.inc(1, [connection.engineUrl.url, requestName]) if not failureAllowed: connection.setDegradedState(requestName, 0, "Request timed out") else: let statusCode = if not request.failed: 200 elif request.error of ErrorResponse: ((ref ErrorResponse) request.error).status else: 0 if request.failed and not failureAllowed: connection.setDegradedState(requestName, statusCode, request.error.msg) engine_api_responses.inc(1, [connection.engineUrl.url, requestName, $statusCode]) template awaitOrRaiseOnTimeout[T](fut: Future[T], timeout: Duration): T = awaitWithTimeout(fut, timeout): raise newException(DataProviderTimeout, "Timeout") template trackedRequestWithTimeout[T](connection: ELConnection, requestName: static string, lazyRequestExpression: Future[T], timeout: Duration, failureAllowed = false): T = let connectionParam = connection startTime = Moment.now deadline = sleepAsync(timeout) request = lazyRequestExpression connectionParam.trackEngineApiRequest( request, requestName, startTime, deadline, failureAllowed) awaitWithTimeout(request, deadline): raise newException(DataProviderTimeout, "Timeout") func raiseIfNil(web3block: BlockObject): BlockObject {.raises: [ValueError].} = if web3block == nil: raise newException(ValueError, "EL returned 'null' result for block") web3block template cfg(m: ELManager): auto = m.eth1Chain.cfg func hasJwtSecret*(m: ELManager): bool = for c in m.elConnections: if c.engineUrl.jwtSecret.isSome: return true func isSynced*(m: ELManager): bool = m.syncTargetBlock.isSome and m.eth1Chain.blocks.len > 0 and m.syncTargetBlock.get <= m.eth1Chain.blocks[^1].number template eth1ChainBlocks*(m: ELManager): Deque[Eth1Block] = m.eth1Chain.blocks template toGaugeValue(x: Quantity): int64 = toGaugeValue(distinctBase x) # TODO: Add cfg validation # MIN_GENESIS_ACTIVE_VALIDATOR_COUNT should be larger than SLOTS_PER_EPOCH # doAssert SECONDS_PER_ETH1_BLOCK * cfg.ETH1_FOLLOW_DISTANCE < GENESIS_DELAY, # "Invalid configuration: GENESIS_DELAY is set too low" func asConsensusWithdrawal(w: WithdrawalV1): capella.Withdrawal = capella.Withdrawal( index: w.index.uint64, validator_index: w.validatorIndex.uint64, address: ExecutionAddress(data: w.address.distinctBase), amount: Gwei w.amount) func asEngineWithdrawal(w: capella.Withdrawal): WithdrawalV1 = WithdrawalV1( index: Quantity(w.index), validatorIndex: Quantity(w.validator_index), address: Address(w.address.data), amount: Quantity(w.amount)) func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV1): bellatrix.ExecutionPayload = template getTransaction(tt: TypedTransaction): bellatrix.Transaction = bellatrix.Transaction.init(tt.distinctBase) bellatrix.ExecutionPayload( parent_hash: rpcExecutionPayload.parentHash.asEth2Digest, feeRecipient: ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase), state_root: rpcExecutionPayload.stateRoot.asEth2Digest, receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest, logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase), prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest, block_number: rpcExecutionPayload.blockNumber.uint64, gas_limit: rpcExecutionPayload.gasLimit.uint64, gas_used: rpcExecutionPayload.gasUsed.uint64, timestamp: rpcExecutionPayload.timestamp.uint64, extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes), base_fee_per_gas: rpcExecutionPayload.baseFeePerGas, block_hash: rpcExecutionPayload.blockHash.asEth2Digest, transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init( mapIt(rpcExecutionPayload.transactions, it.getTransaction))) func asConsensusType*(payloadWithValue: BellatrixExecutionPayloadWithValue): bellatrix.ExecutionPayloadForSigning = bellatrix.ExecutionPayloadForSigning( executionPayload: payloadWithValue.executionPayload.asConsensusType, blockValue: payloadWithValue.blockValue) template maybeDeref[T](o: Option[T]): T = o.get template maybeDeref[V](v: V): V = v func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV1OrV2|ExecutionPayloadV2): capella.ExecutionPayload = template getTransaction(tt: TypedTransaction): bellatrix.Transaction = bellatrix.Transaction.init(tt.distinctBase) capella.ExecutionPayload( parent_hash: rpcExecutionPayload.parentHash.asEth2Digest, feeRecipient: ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase), state_root: rpcExecutionPayload.stateRoot.asEth2Digest, receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest, logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase), prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest, block_number: rpcExecutionPayload.blockNumber.uint64, gas_limit: rpcExecutionPayload.gasLimit.uint64, gas_used: rpcExecutionPayload.gasUsed.uint64, timestamp: rpcExecutionPayload.timestamp.uint64, extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes), base_fee_per_gas: rpcExecutionPayload.baseFeePerGas, block_hash: rpcExecutionPayload.blockHash.asEth2Digest, transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init( mapIt(rpcExecutionPayload.transactions, it.getTransaction)), withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init( mapIt(maybeDeref rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal))) func asConsensusType*(payloadWithValue: engine_api.GetPayloadV2Response): capella.ExecutionPayloadForSigning = capella.ExecutionPayloadForSigning( executionPayload: payloadWithValue.executionPayload.asConsensusType, blockValue: payloadWithValue.blockValue) func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV3): deneb.ExecutionPayload = template getTransaction(tt: TypedTransaction): bellatrix.Transaction = bellatrix.Transaction.init(tt.distinctBase) deneb.ExecutionPayload( parent_hash: rpcExecutionPayload.parentHash.asEth2Digest, feeRecipient: ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase), state_root: rpcExecutionPayload.stateRoot.asEth2Digest, receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest, logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase), prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest, block_number: rpcExecutionPayload.blockNumber.uint64, gas_limit: rpcExecutionPayload.gasLimit.uint64, gas_used: rpcExecutionPayload.gasUsed.uint64, timestamp: rpcExecutionPayload.timestamp.uint64, extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes), base_fee_per_gas: rpcExecutionPayload.baseFeePerGas, block_hash: rpcExecutionPayload.blockHash.asEth2Digest, transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init( mapIt(rpcExecutionPayload.transactions, it.getTransaction)), withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init( mapIt(rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal)), blob_gas_used: rpcExecutionPayload.blobGasUsed.uint64, excess_blob_gas: rpcExecutionPayload.excessBlobGas.uint64) func asConsensusType*(payload: engine_api.GetPayloadV3Response): deneb.ExecutionPayloadForSigning = deneb.ExecutionPayloadForSigning( executionPayload: payload.executionPayload.asConsensusType, blockValue: payload.blockValue, # TODO # The `mapIt` calls below are necessary only because we use different distinct # types for KZG commitments and Blobs in the `web3` and the `deneb` spec types. # Both are defined as `array[N, byte]` under the hood. blobsBundle: BlobsBundle( commitments: KzgCommitments.init( payload.blobsBundle.commitments.mapIt(it.bytes)), proofs: KzgProofs.init( payload.blobsBundle.proofs.mapIt(it.bytes)), blobs: Blobs.init( payload.blobsBundle.blobs.mapIt(it.bytes)))) func asEngineExecutionPayload*(executionPayload: bellatrix.ExecutionPayload): ExecutionPayloadV1 = template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction = TypedTransaction(tt.distinctBase) engine_api.ExecutionPayloadV1( parentHash: executionPayload.parent_hash.asBlockHash, feeRecipient: Address(executionPayload.fee_recipient.data), stateRoot: executionPayload.state_root.asBlockHash, receiptsRoot: executionPayload.receipts_root.asBlockHash, logsBloom: FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data), prevRandao: executionPayload.prev_randao.asBlockHash, blockNumber: Quantity(executionPayload.block_number), gasLimit: Quantity(executionPayload.gas_limit), gasUsed: Quantity(executionPayload.gas_used), timestamp: Quantity(executionPayload.timestamp), extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data), baseFeePerGas: executionPayload.base_fee_per_gas, blockHash: executionPayload.block_hash.asBlockHash, transactions: mapIt(executionPayload.transactions, it.getTypedTransaction)) template toEngineWithdrawal(w: capella.Withdrawal): WithdrawalV1 = WithdrawalV1( index: Quantity(w.index), validatorIndex: Quantity(w.validator_index), address: Address(w.address.data), amount: Quantity(w.amount)) func asEngineExecutionPayload*(executionPayload: capella.ExecutionPayload): ExecutionPayloadV2 = template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction = TypedTransaction(tt.distinctBase) engine_api.ExecutionPayloadV2( parentHash: executionPayload.parent_hash.asBlockHash, feeRecipient: Address(executionPayload.fee_recipient.data), stateRoot: executionPayload.state_root.asBlockHash, receiptsRoot: executionPayload.receipts_root.asBlockHash, logsBloom: FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data), prevRandao: executionPayload.prev_randao.asBlockHash, blockNumber: Quantity(executionPayload.block_number), gasLimit: Quantity(executionPayload.gas_limit), gasUsed: Quantity(executionPayload.gas_used), timestamp: Quantity(executionPayload.timestamp), extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data), baseFeePerGas: executionPayload.base_fee_per_gas, blockHash: executionPayload.block_hash.asBlockHash, transactions: mapIt(executionPayload.transactions, it.getTypedTransaction), withdrawals: mapIt(executionPayload.withdrawals, it.toEngineWithdrawal)) func asEngineExecutionPayload*(executionPayload: deneb.ExecutionPayload): ExecutionPayloadV3 = template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction = TypedTransaction(tt.distinctBase) engine_api.ExecutionPayloadV3( parentHash: executionPayload.parent_hash.asBlockHash, feeRecipient: Address(executionPayload.fee_recipient.data), stateRoot: executionPayload.state_root.asBlockHash, receiptsRoot: executionPayload.receipts_root.asBlockHash, logsBloom: FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data), prevRandao: executionPayload.prev_randao.asBlockHash, blockNumber: Quantity(executionPayload.block_number), gasLimit: Quantity(executionPayload.gas_limit), gasUsed: Quantity(executionPayload.gas_used), timestamp: Quantity(executionPayload.timestamp), extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data), baseFeePerGas: executionPayload.base_fee_per_gas, blockHash: executionPayload.block_hash.asBlockHash, transactions: mapIt(executionPayload.transactions, it.getTypedTransaction), withdrawals: mapIt(executionPayload.withdrawals, it.asEngineWithdrawal), blobGasUsed: Quantity(executionPayload.blob_gas_used), excessBlobGas: Quantity(executionPayload.excess_blob_gas)) func isConnected(connection: ELConnection): bool = connection.web3.isSome func getJsonRpcRequestHeaders(jwtSecret: Opt[seq[byte]]): auto = if jwtSecret.isSome: let secret = jwtSecret.get (proc(): seq[(string, string)] = # https://www.rfc-editor.org/rfc/rfc6750#section-6.1.1 @[("Authorization", "Bearer " & getSignedIatToken( secret, (getTime() - initTime(0, 0)).inSeconds))]) else: (proc(): seq[(string, string)] = @[]) proc newWeb3*(engineUrl: EngineApiUrl): Future[Web3] = newWeb3(engineUrl.url, getJsonRpcRequestHeaders(engineUrl.jwtSecret), httpFlags = {HttpClientFlag.NewConnectionAlways}) proc establishEngineApiConnection*(url: EngineApiUrl): Future[Result[Web3, string]] {.async.} = try: ok(await newWeb3(url).wait(engineApiConnectionTimeout)) except AsyncTimeoutError: err "Engine API connection timed out" except CancelledError as exc: raise exc except CatchableError as exc: err "Engine API connection failed: " & exc.msg proc tryConnecting(connection: ELConnection): Future[bool] {.async.} = if connection.isConnected: return true if connection.connectingFut == nil or connection.connectingFut.finished: # The previous attempt was not successful connection.connectingFut = establishEngineApiConnection(connection.engineUrl) let web3Res = await connection.connectingFut if web3Res.isErr: return false else: connection.web3 = some web3Res.get return true proc connectedRpcClient(connection: ELConnection): Future[RpcClient] {.async.} = while not connection.isConnected: if not await connection.tryConnecting(): await sleepAsync(chronos.seconds(10)) return connection.web3.get.provider proc getBlockByHash(rpcClient: RpcClient, hash: BlockHash): Future[BlockObject] = rpcClient.eth_getBlockByHash(hash, false) proc getBlockByNumber*(rpcClient: RpcClient, number: Eth1BlockNumber): Future[BlockObject] = let hexNumber = try: &"0x{number:X}" # No leading 0's! except ValueError as exc: # Since the format above is valid, failing here should not be possible raiseAssert exc.msg rpcClient.eth_getBlockByNumber(hexNumber, false) func areSameAs(expectedParams: Option[NextExpectedPayloadParams], latestHead, latestSafe, latestFinalized: Eth2Digest, timestamp: uint64, randomData: Eth2Digest, feeRecipient: Eth1Address, withdrawals: seq[WithdrawalV1]): bool = expectedParams.isSome and expectedParams.get.headBlockHash == latestHead and expectedParams.get.safeBlockHash == latestSafe and expectedParams.get.finalizedBlockHash == latestFinalized and expectedParams.get.payloadAttributes.timestamp.uint64 == timestamp and expectedParams.get.payloadAttributes.prevRandao.bytes == randomData.data and expectedParams.get.payloadAttributes.suggestedFeeRecipient == feeRecipient and expectedParams.get.payloadAttributes.withdrawals == withdrawals proc forkchoiceUpdated(rpcClient: RpcClient, state: ForkchoiceStateV1, payloadAttributes: Option[PayloadAttributesV1] | Option[PayloadAttributesV2] | Option[PayloadAttributesV3]): Future[ForkchoiceUpdatedResponse] = when payloadAttributes is Option[PayloadAttributesV1]: rpcClient.engine_forkchoiceUpdatedV1(state, payloadAttributes) elif payloadAttributes is Option[PayloadAttributesV2]: rpcClient.engine_forkchoiceUpdatedV2(state, payloadAttributes) elif payloadAttributes is Option[PayloadAttributesV3]: rpcClient.engine_forkchoiceUpdatedV3(state, payloadAttributes) else: static: doAssert false func computeBlockValue(blk: ExecutionPayloadV1): UInt256 {.raises: [RlpError].} = for transactionBytes in blk.transactions: var rlp = rlpFromBytes distinctBase(transactionBytes) let transaction = rlp.read(eth_types.Transaction) result += distinctBase(effectiveGasTip(transaction, blk.baseFeePerGas)).u256 proc getPayloadFromSingleEL( connection: ELConnection, GetPayloadResponseType: type, isForkChoiceUpToDate: bool, consensusHead: Eth2Digest, headBlock, safeBlock, finalizedBlock: Eth2Digest, timestamp: uint64, randomData: Eth2Digest, suggestedFeeRecipient: Eth1Address, withdrawals: seq[WithdrawalV1]): Future[GetPayloadResponseType] {.async.} = let rpcClient = await connection.connectedRpcClient() payloadId = if isForkChoiceUpToDate and connection.lastPayloadId.isSome: connection.lastPayloadId.get elif not headBlock.isZero: engine_api_last_minute_forkchoice_updates_sent.inc(1, [connection.engineUrl.url]) when GetPayloadResponseType is BellatrixExecutionPayloadWithValue: let response = await rpcClient.forkchoiceUpdated( ForkchoiceStateV1( headBlockHash: headBlock.asBlockHash, safeBlockHash: safeBlock.asBlockHash, finalizedBlockHash: finalizedBlock.asBlockHash), some PayloadAttributesV1( timestamp: Quantity timestamp, prevRandao: FixedBytes[32] randomData.data, suggestedFeeRecipient: suggestedFeeRecipient)) elif GetPayloadResponseType is engine_api.GetPayloadV2Response: let response = await rpcClient.forkchoiceUpdated( ForkchoiceStateV1( headBlockHash: headBlock.asBlockHash, safeBlockHash: safeBlock.asBlockHash, finalizedBlockHash: finalizedBlock.asBlockHash), some PayloadAttributesV2( timestamp: Quantity timestamp, prevRandao: FixedBytes[32] randomData.data, suggestedFeeRecipient: suggestedFeeRecipient, withdrawals: withdrawals)) elif GetPayloadResponseType is engine_api.GetPayloadV3Response: let response = await rpcClient.forkchoiceUpdated( ForkchoiceStateV1( headBlockHash: headBlock.asBlockHash, safeBlockHash: safeBlock.asBlockHash, finalizedBlockHash: finalizedBlock.asBlockHash), some PayloadAttributesV3( timestamp: Quantity timestamp, prevRandao: FixedBytes[32] randomData.data, suggestedFeeRecipient: suggestedFeeRecipient, withdrawals: withdrawals, parentBeaconBlockRoot: consensusHead.asBlockHash)) else: static: doAssert false if response.payloadStatus.status != PayloadExecutionStatus.valid or response.payloadId.isNone: raise newException(CatchableError, "Head block is not a valid payload") # Give the EL some time to assemble the block await sleepAsync(chronos.milliseconds 500) response.payloadId.get else: raise newException(CatchableError, "No confirmed execution head yet") when GetPayloadResponseType is BellatrixExecutionPayloadWithValue: let payload = await engine_api.getPayload(rpcClient, ExecutionPayloadV1, payloadId) return BellatrixExecutionPayloadWithValue( executionPayload: payload, blockValue: computeBlockValue payload) else: return await engine_api.getPayload(rpcClient, GetPayloadResponseType, payloadId) func cmpGetPayloadResponses(lhs, rhs: SomeEnginePayloadWithValue): int = cmp(distinctBase lhs.blockValue, distinctBase rhs.blockValue) template EngineApiResponseType*(T: type bellatrix.ExecutionPayloadForSigning): type = BellatrixExecutionPayloadWithValue template EngineApiResponseType*(T: type capella.ExecutionPayloadForSigning): type = engine_api.GetPayloadV2Response template EngineApiResponseType*(T: type deneb.ExecutionPayloadForSigning): type = engine_api.GetPayloadV3Response template toEngineWithdrawals*(withdrawals: seq[capella.Withdrawal]): seq[WithdrawalV1] = mapIt(withdrawals, toEngineWithdrawal(it)) template kind(T: type ExecutionPayloadV1): ConsensusFork = ConsensusFork.Bellatrix template kind(T: typedesc[ExecutionPayloadV1OrV2|ExecutionPayloadV2]): ConsensusFork = ConsensusFork.Capella template kind(T: type ExecutionPayloadV3): ConsensusFork = ConsensusFork.Deneb proc getPayload*(m: ELManager, PayloadType: type ForkyExecutionPayloadForSigning, consensusHead: Eth2Digest, headBlock, safeBlock, finalizedBlock: Eth2Digest, timestamp: uint64, randomData: Eth2Digest, suggestedFeeRecipient: Eth1Address, withdrawals: seq[capella.Withdrawal]): Future[Opt[PayloadType]] {.async.} = if m.elConnections.len == 0: return err() let engineApiWithdrawals = toEngineWithdrawals withdrawals isFcUpToDate = m.nextExpectedPayloadParams.areSameAs( headBlock, safeBlock, finalizedBlock, timestamp, randomData, suggestedFeeRecipient, engineApiWithdrawals) # `getPayloadFromSingleEL` may introduce additional latency const extraProcessingOverhead = 500.milliseconds let timeout = GETPAYLOAD_TIMEOUT + extraProcessingOverhead deadline = sleepAsync(timeout) requests = m.elConnections.mapIt(it.getPayloadFromSingleEL( EngineApiResponseType(PayloadType), isFcUpToDate, consensusHead, headBlock, safeBlock, finalizedBlock, timestamp, randomData, suggestedFeeRecipient, engineApiWithdrawals )) requestsCompleted = allFutures(requests) await requestsCompleted or deadline var bestPayloadIdx = none int for idx, req in requests: if not req.finished: req.cancelSoon() elif req.failed: error "Failed to get execution payload from EL", url = m.elConnections[idx].engineUrl.url, err = req.error.msg else: const payloadFork = PayloadType.kind when payloadFork >= ConsensusFork.Capella: when payloadFork == ConsensusFork.Capella: # TODO: The engine_api module may offer an alternative API where it is guaranteed # to return the correct response type (i.e. the rule below will be enforced # during deserialization). if req.read.executionPayload.withdrawals.isNone: warn "Execution client returned a block without a 'withdrawals' field for a post-Shanghai block", url = m.elConnections[idx].engineUrl.url continue if engineApiWithdrawals != req.read.executionPayload.withdrawals.maybeDeref: # otherwise it formats as "@[(index: ..., validatorIndex: ..., # address: ..., amount: ...), (index: ..., validatorIndex: ..., # address: ..., amount: ...)]" warn "Execution client did not return correct withdrawals", withdrawals_from_cl_len = engineApiWithdrawals.len, withdrawals_from_el_len = req.read.executionPayload.withdrawals.maybeDeref.len, withdrawals_from_cl = mapIt(engineApiWithdrawals, it.asConsensusWithdrawal), withdrawals_from_el = mapIt( req.read.executionPayload.withdrawals.maybeDeref, it.asConsensusWithdrawal) if req.read.executionPayload.extraData.len > MAX_EXTRA_DATA_BYTES: warn "Execution client provided a block with invalid extraData (size exceeds limit)", size = req.read.executionPayload.extraData.len, limit = MAX_EXTRA_DATA_BYTES continue if bestPayloadIdx.isNone: bestPayloadIdx = some idx else: if cmpGetPayloadResponses(req.read, requests[bestPayloadIdx.get].read) > 0: bestPayloadIdx = some idx if bestPayloadIdx.isSome: return ok requests[bestPayloadIdx.get].read.asConsensusType else: return err() proc waitELToSyncDeposits(connection: ELConnection, minimalRequiredBlock: BlockHash) {.async.} = var rpcClient = await connection.connectedRpcClient() if connection.depositContractSyncStatus == DepositContractSyncStatus.synced: return var attempt = 0 while true: try: discard raiseIfNil connection.trackedRequestWithTimeout( "getBlockByHash", rpcClient.getBlockByHash(minimalRequiredBlock), web3RequestsTimeout, failureAllowed = true) connection.depositContractSyncStatus = DepositContractSyncStatus.synced return except CancelledError as err: trace "waitELToSyncDepositContract cancelled", url = connection.engineUrl.url raise err except CatchableError as err: connection.depositContractSyncStatus = DepositContractSyncStatus.notSynced if attempt == 0: warn "Failed to obtain the most recent known block from the execution " & "layer node (the node is probably not synced)", url = connection.engineUrl.url, blk = minimalRequiredBlock, err = err.msg elif attempt mod 60 == 0: # This warning will be produced every 30 minutes warn "Still failing to obtain the most recent known block from the " & "execution layer node (the node is probably still not synced)", url = connection.engineUrl.url, blk = minimalRequiredBlock, err = err.msg inc attempt await sleepAsync(seconds(30)) rpcClient = await connection.connectedRpcClient() func networkHasDepositContract(m: ELManager): bool = not m.cfg.DEPOSIT_CONTRACT_ADDRESS.isDefaultValue func mostRecentKnownBlock(m: ELManager): BlockHash = if m.eth1Chain.finalizedDepositsMerkleizer.getChunkCount() > 0: m.eth1Chain.finalizedBlockHash.asBlockHash else: m.depositContractBlockHash proc selectConnectionForChainSyncing(m: ELManager): Future[ELConnection] {.async.} = doAssert m.elConnections.len > 0 let connectionsFuts = mapIt( m.elConnections, if m.networkHasDepositContract: FutureBase waitELToSyncDeposits(it, m.mostRecentKnownBlock) else: FutureBase connectedRpcClient(it)) # TODO: Ideally, the cancellation will be handled automatically # by a helper like `firstCompletedFuture` let firstConnected = try: await firstCompletedFuture(connectionsFuts) except CancelledError as err: for future in connectionsFuts: future.cancelSoon() raise err for future in connectionsFuts: if future != firstConnected: future.cancelSoon() return m.elConnections[find(connectionsFuts, firstConnected)] proc sendNewPayloadToSingleEL(connection: ELConnection, payload: engine_api.ExecutionPayloadV1): Future[PayloadStatusV1] {.async.} = let rpcClient = await connection.connectedRpcClient() return await rpcClient.engine_newPayloadV1(payload) proc sendNewPayloadToSingleEL(connection: ELConnection, payload: engine_api.ExecutionPayloadV2): Future[PayloadStatusV1] {.async.} = let rpcClient = await connection.connectedRpcClient() return await rpcClient.engine_newPayloadV2(payload) proc sendNewPayloadToSingleEL(connection: ELConnection, payload: engine_api.ExecutionPayloadV3, versioned_hashes: seq[engine_api.VersionedHash], parent_beacon_block_root: FixedBytes[32]): Future[PayloadStatusV1] {.async.} = let rpcClient = await connection.connectedRpcClient() return await rpcClient.engine_newPayloadV3( payload, versioned_hashes, parent_beacon_block_root) type StatusRelation = enum newStatusIsPreferable oldStatusIsOk disagreement func compareStatuses(newStatus, prevStatus: PayloadExecutionStatus): StatusRelation = case prevStatus of PayloadExecutionStatus.syncing: if newStatus == PayloadExecutionStatus.syncing: oldStatusIsOk else: newStatusIsPreferable of PayloadExecutionStatus.valid: case newStatus of PayloadExecutionStatus.syncing, PayloadExecutionStatus.accepted, PayloadExecutionStatus.valid: oldStatusIsOk of PayloadExecutionStatus.invalid_block_hash, PayloadExecutionStatus.invalid: disagreement of PayloadExecutionStatus.invalid: case newStatus of PayloadExecutionStatus.syncing, PayloadExecutionStatus.invalid: oldStatusIsOk of PayloadExecutionStatus.valid, PayloadExecutionStatus.accepted, PayloadExecutionStatus.invalid_block_hash: disagreement of PayloadExecutionStatus.accepted: case newStatus of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing: oldStatusIsOk of PayloadExecutionStatus.valid: newStatusIsPreferable of PayloadExecutionStatus.invalid_block_hash, PayloadExecutionStatus.invalid: disagreement of PayloadExecutionStatus.invalid_block_hash: if newStatus == PayloadExecutionStatus.invalid_block_hash: oldStatusIsOk else: disagreement type ELConsensusViolationDetector = object selectedResponse: Option[int] disagreementAlreadyDetected: bool func init(T: type ELConsensusViolationDetector): T = ELConsensusViolationDetector(selectedResponse: none int, disagreementAlreadyDetected: false) proc processResponse[ELResponseType]( d: var ELConsensusViolationDetector, connections: openArray[ELConnection], requests: openArray[Future[ELResponseType]], idx: int) = if not requests[idx].completed: return let status = try: requests[idx].read.status except CatchableError: raiseAssert "checked above" if d.selectedResponse.isNone: d.selectedResponse = some idx elif not d.disagreementAlreadyDetected: let prevStatus = try: requests[d.selectedResponse.get].read.status except CatchableError: raiseAssert "previously checked" case compareStatuses(status, prevStatus) of newStatusIsPreferable: d.selectedResponse = some idx of oldStatusIsOk: discard of disagreement: d.disagreementAlreadyDetected = true error "Execution layer consensus violation detected", responseType = name(ELResponseType), url1 = connections[d.selectedResponse.get].engineUrl.url, status1 = prevStatus, url2 = connections[idx].engineUrl.url, status2 = status proc sendNewPayload*(m: ELManager, blck: SomeForkyBeaconBlock): Future[PayloadExecutionStatus] {.async.} = let earlyDeadline = sleepAsync(chronos.seconds 1) startTime = Moment.now deadline = sleepAsync(NEWPAYLOAD_TIMEOUT) payload = blck.body.execution_payload.asEngineExecutionPayload requests = m.elConnections.mapIt: let req = when payload is engine_api.ExecutionPayloadV3: # https://github.com/ethereum/consensus-specs/blob/v1.4.0-alpha.1/specs/deneb/beacon-chain.md#process_execution_payload # Verify the execution payload is valid # [Modified in Deneb] Pass `versioned_hashes` to Execution Engine let versioned_hashes = mapIt( blck.body.blob_kzg_commitments, engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it))) sendNewPayloadToSingleEL( it, payload, versioned_hashes, FixedBytes[32] blck.parent_root.data) elif payload is engine_api.ExecutionPayloadV1 or payload is engine_api.ExecutionPayloadV2: sendNewPayloadToSingleEL(it, payload) else: static: doAssert false trackEngineApiRequest(it, req, "newPayload", startTime, deadline) req requestsCompleted = allFutures(requests) await requestsCompleted or earlyDeadline var stillPending = newSeq[Future[PayloadStatusV1]]() responseProcessor = init ELConsensusViolationDetector for idx, req in requests: if not req.finished: stillPending.add req elif req.completed: responseProcessor.processResponse(m.elConnections, requests, idx) if responseProcessor.disagreementAlreadyDetected: return PayloadExecutionStatus.invalid elif responseProcessor.selectedResponse.isSome: return requests[responseProcessor.selectedResponse.get].read.status await requestsCompleted or deadline for idx, req in requests: if req.completed and req in stillPending: responseProcessor.processResponse(m.elConnections, requests, idx) return if responseProcessor.disagreementAlreadyDetected: PayloadExecutionStatus.invalid elif responseProcessor.selectedResponse.isSome: requests[responseProcessor.selectedResponse.get].read.status else: PayloadExecutionStatus.syncing proc forkchoiceUpdatedForSingleEL( connection: ELConnection, state: ref ForkchoiceStateV1, payloadAttributes: Option[PayloadAttributesV1] | Option[PayloadAttributesV2] | Option[PayloadAttributesV3]): Future[PayloadStatusV1] {.async.} = let rpcClient = await connection.connectedRpcClient() response = await rpcClient.forkchoiceUpdated(state[], payloadAttributes) if response.payloadStatus.status notin {syncing, valid, invalid}: debug "Invalid fork-choice updated response from the EL", payloadStatus = response.payloadStatus return if response.payloadStatus.status == PayloadExecutionStatus.valid and response.payloadId.isSome: connection.lastPayloadId = response.payloadId return response.payloadStatus proc forkchoiceUpdated*(m: ELManager, headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest, payloadAttributes: Option[PayloadAttributesV1] | Option[PayloadAttributesV2] | Option[PayloadAttributesV3]): Future[(PayloadExecutionStatus, Option[BlockHash])] {.async: (raises: [CancelledError]).} = doAssert not headBlockHash.isZero # Allow finalizedBlockHash to be 0 to avoid sync deadlocks. # # https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events # has "Before the first finalized block occurs in the system the finalized # block hash provided by this event is stubbed with # `0x0000000000000000000000000000000000000000000000000000000000000000`." # and # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.6/specs/bellatrix/validator.md#executionpayload # notes "`finalized_block_hash` is the hash of the latest finalized execution # payload (`Hash32()` if none yet finalized)" if m.elConnections.len == 0: return (PayloadExecutionStatus.syncing, none BlockHash) when payloadAttributes is Option[PayloadAttributesV3]: template payloadAttributesV3(): auto = if payloadAttributes.isSome: payloadAttributes.get else: # As timestamp and prevRandao are both 0, won't false-positive match (static(default(PayloadAttributesV3))) elif payloadAttributes is Option[PayloadAttributesV2]: template payloadAttributesV3(): auto = if payloadAttributes.isSome: PayloadAttributesV3( timestamp: payloadAttributes.get.timestamp, prevRandao: payloadAttributes.get.prevRandao, suggestedFeeRecipient: payloadAttributes.get.suggestedFeeRecipient, withdrawals: payloadAttributes.get.withdrawals, parentBeaconBlockRoot: static(default(FixedBytes[32]))) else: # As timestamp and prevRandao are both 0, won't false-positive match (static(default(PayloadAttributesV3))) elif payloadAttributes is Option[PayloadAttributesV1]: template payloadAttributesV3(): auto = if payloadAttributes.isSome: PayloadAttributesV3( timestamp: payloadAttributes.get.timestamp, prevRandao: payloadAttributes.get.prevRandao, suggestedFeeRecipient: payloadAttributes.get.suggestedFeeRecipient, withdrawals: @[], parentBeaconBlockRoot: static(default(FixedBytes[32]))) else: # As timestamp and prevRandao are both 0, won't false-positive match (static(default(PayloadAttributesV3))) else: static: doAssert false let state = newClone ForkchoiceStateV1( headBlockHash: headBlockHash.asBlockHash, safeBlockHash: safeBlockHash.asBlockHash, finalizedBlockHash: finalizedBlockHash.asBlockHash) earlyDeadline = sleepAsync(chronos.seconds 1) startTime = Moment.now deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT) requests = m.elConnections.mapIt: let req = it.forkchoiceUpdatedForSingleEL(state, payloadAttributes) trackEngineApiRequest(it, req, "forkchoiceUpdated", startTime, deadline) req requestsCompleted = allFutures(requests) await requestsCompleted or earlyDeadline var stillPending = newSeq[Future[PayloadStatusV1]]() responseProcessor = init ELConsensusViolationDetector for idx, req in requests: if not req.finished: stillPending.add req elif req.completed: responseProcessor.processResponse(m.elConnections, requests, idx) template assignNextExpectedPayloadParams() = # Ensure that there's no race condition window where getPayload's check for # whether it needs to trigger a new fcU payload, due to cache invalidation, # falsely suggests that the expected payload matches, and similarly that if # the fcU fails or times out for other reasons, the expected payload params # remain synchronized with EL state. assign( m.nextExpectedPayloadParams, some NextExpectedPayloadParams( headBlockHash: headBlockHash, safeBlockHash: safeBlockHash, finalizedBlockHash: finalizedBlockHash, payloadAttributes: payloadAttributesV3)) template getSelected: untyped = let data = try: requests[responseProcessor.selectedResponse.get].read except CatchableError: raiseAssert "Only completed requests get selected" (data.status, data.latestValidHash) if responseProcessor.disagreementAlreadyDetected: return (PayloadExecutionStatus.invalid, none BlockHash) elif responseProcessor.selectedResponse.isSome: assignNextExpectedPayloadParams() return getSelected() await requestsCompleted or deadline for idx, req in requests: if req.completed and req in stillPending: responseProcessor.processResponse(m.elConnections, requests, idx) return if responseProcessor.disagreementAlreadyDetected: (PayloadExecutionStatus.invalid, none BlockHash) elif responseProcessor.selectedResponse.isSome: assignNextExpectedPayloadParams() getSelected() else: (PayloadExecutionStatus.syncing, none BlockHash) # TODO can't be defined within exchangeConfigWithSingleEL func `==`(x, y: Quantity): bool {.borrow.} proc exchangeConfigWithSingleEL(m: ELManager, connection: ELConnection) {.async.} = let rpcClient = await connection.connectedRpcClient() if m.eth1Network.isSome and connection.etcStatus == EtcStatus.notExchangedYet: try: let providerChain = connection.trackedRequestWithTimeout( "chainId", rpcClient.eth_chainId(), web3RequestsTimeout) # https://chainid.network/ expectedChain = case m.eth1Network.get of mainnet: 1.Quantity of goerli: 5.Quantity of sepolia: 11155111.Quantity of holesky: 17000.Quantity if expectedChain != providerChain: warn "The specified EL client is connected to a different chain", url = connection.engineUrl, expectedChain = distinctBase(expectedChain), actualChain = distinctBase(providerChain) connection.etcStatus = EtcStatus.mismatch return except CatchableError as exc: # Typically because it's not synced through EIP-155, assuming this Web3 # endpoint has been otherwise working. debug "Failed to obtain eth_chainId", error = exc.msg connection.etcStatus = EtcStatus.match # https://github.com/ethereum/execution-apis/blob/c4089414bbbe975bbc4bf1ccf0a3d31f76feb3e1/src/engine/cancun.md#deprecate-engine_exchangetransitionconfigurationv1 # Consensus layer clients MUST NOT call this method. if m.eth1Chain.cfg.DENEB_FORK_EPOCH != FAR_FUTURE_EPOCH: return # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#engine_exchangetransitionconfigurationv1 let ourConf = TransitionConfigurationV1( terminalTotalDifficulty: m.eth1Chain.cfg.TERMINAL_TOTAL_DIFFICULTY, terminalBlockHash: m.eth1Chain.cfg.TERMINAL_BLOCK_HASH, terminalBlockNumber: Quantity 0) try: discard connection.trackedRequestWithTimeout( "exchangeTransitionConfiguration", rpcClient.engine_exchangeTransitionConfigurationV1(ourConf), timeout = 1.seconds) except CatchableError as err: warn "Failed to exchange transition configuration", url = connection.engineUrl, err = err.msg proc exchangeTransitionConfiguration*(m: ELManager) {.async.} = if m.elConnections.len == 0: return let deadline = sleepAsync(3.seconds) requests = m.elConnections.mapIt(m.exchangeConfigWithSingleEL(it)) requestsCompleted = allFutures(requests) await requestsCompleted or deadline var cancelled = 0 for idx, req in requests: if not req.finished: req.cancelSoon() inc cancelled if cancelled == requests.len: warn "Failed to exchange configuration with the configured EL end-points" template readJsonField(logEvent, field: untyped, ValueType: type): untyped = if logEvent.field.isNone: raise newException(CatchableError, "Web3 provider didn't return needed logEvent field " & astToStr(field)) logEvent.field.get template init[N: static int](T: type DynamicBytes[N, N]): T = T newSeq[byte](N) proc fetchTimestamp(connection: ELConnection, rpcClient: RpcClient, blk: Eth1Block) {.async.} = debug "Fetching block timestamp", blockNum = blk.number let web3block = raiseIfNil connection.trackedRequestWithTimeout( "getBlockByHash", rpcClient.getBlockByHash(blk.hash.asBlockHash), web3RequestsTimeout) blk.timestamp = Eth1BlockTimestamp web3block.timestamp func depositEventsToBlocks(depositsList: openArray[JsonString]): seq[Eth1Block] {. raises: [CatchableError].} = var lastEth1Block: Eth1Block for logEventData in depositsList: let logEvent = JrpcConv.decode(logEventData.string, LogObject) blockNumber = Eth1BlockNumber readJsonField(logEvent, blockNumber, Quantity) blockHash = readJsonField(logEvent, blockHash, BlockHash) if lastEth1Block == nil or lastEth1Block.number != blockNumber: lastEth1Block = Eth1Block( hash: blockHash.asEth2Digest, number: blockNumber # The `timestamp` is set in `syncBlockRange` immediately # after calling this function, because we don't want to # make this function `async` ) result.add lastEth1Block var pubkey = init PubKeyBytes withdrawalCredentials = init WithdrawalCredentialsBytes amount = init Int64LeBytes signature = init SignatureBytes index = init Int64LeBytes var offset = 0 offset += decode(logEvent.data, 0, offset, pubkey) offset += decode(logEvent.data, 0, offset, withdrawalCredentials) offset += decode(logEvent.data, 0, offset, amount) offset += decode(logEvent.data, 0, offset, signature) offset += decode(logEvent.data, 0, offset, index) if pubkey.len != 48 or withdrawalCredentials.len != 32 or amount.len != 8 or signature.len != 96 or index.len != 8: raise newException(CorruptDataProvider, "Web3 provider supplied invalid deposit logs") lastEth1Block.deposits.add DepositData( pubkey: ValidatorPubKey.init(pubkey.toArray), withdrawal_credentials: Eth2Digest(data: withdrawalCredentials.toArray), amount: bytes_to_uint64(amount.toArray), signature: ValidatorSig.init(signature.toArray)) type DepositContractDataStatus = enum Fetched VerifiedCorrect DepositRootIncorrect DepositRootUnavailable DepositCountIncorrect DepositCountUnavailable when hasDepositRootChecks: const contractCallTimeout = 60.seconds proc fetchDepositContractData(connection: ELConnection, rpcClient: RpcClient, depositContract: Sender[DepositContract], blk: Eth1Block): Future[DepositContractDataStatus] {.async.} = let startTime = Moment.now deadline = sleepAsync(contractCallTimeout) depositRoot = depositContract.get_deposit_root.call(blockNumber = blk.number) rawCount = depositContract.get_deposit_count.call(blockNumber = blk.number) # We allow failures on these requests becaues the clients # are expected to prune the state data for historical blocks connection.trackEngineApiRequest( depositRoot, "get_deposit_root", startTime, deadline, failureAllowed = true) connection.trackEngineApiRequest( rawCount, "get_deposit_count", startTime, deadline, failureAllowed = true) try: let fetchedRoot = asEth2Digest(block: awaitWithTimeout(depositRoot, deadline): raise newException(DataProviderTimeout, "Request time out while obtaining deposits root")) if blk.depositRoot.isZero: blk.depositRoot = fetchedRoot result = Fetched elif blk.depositRoot == fetchedRoot: result = VerifiedCorrect else: result = DepositRootIncorrect except CatchableError as err: debug "Failed to fetch deposits root", blockNumber = blk.number, err = err.msg result = DepositRootUnavailable try: let fetchedCount = bytes_to_uint64((block: awaitWithTimeout(rawCount, deadline): raise newException(DataProviderTimeout, "Request time out while obtaining deposits count")).toArray) if blk.depositCount == 0: blk.depositCount = fetchedCount elif blk.depositCount != fetchedCount: result = DepositCountIncorrect except CatchableError as err: debug "Failed to fetch deposits count", blockNumber = blk.number, err = err.msg result = DepositCountUnavailable template trackFinalizedState*(m: ELManager, finalizedEth1Data: Eth1Data, finalizedStateDepositIndex: uint64): bool = trackFinalizedState(m.eth1Chain, finalizedEth1Data, finalizedStateDepositIndex) template getBlockProposalData*(m: ELManager, state: ForkedHashedBeaconState, finalizedEth1Data: Eth1Data, finalizedStateDepositIndex: uint64): BlockProposalEth1Data = getBlockProposalData( m.eth1Chain, state, finalizedEth1Data, finalizedStateDepositIndex) func new*(T: type ELConnection, engineUrl: EngineApiUrl): T = ELConnection( engineUrl: engineUrl, depositContractSyncStatus: DepositContractSyncStatus.unknown) proc new*(T: type ELManager, cfg: RuntimeConfig, depositContractBlockNumber: uint64, depositContractBlockHash: Eth2Digest, db: BeaconChainDB, engineApiUrls: seq[EngineApiUrl], eth1Network: Option[Eth1Network]): T = let eth1Chain = Eth1Chain.init( cfg, db, depositContractBlockNumber, depositContractBlockHash) debug "Initializing ELManager", depositContractBlockNumber, depositContractBlockHash T(eth1Chain: eth1Chain, depositContractAddress: cfg.DEPOSIT_CONTRACT_ADDRESS, depositContractBlockNumber: depositContractBlockNumber, depositContractBlockHash: depositContractBlockHash.asBlockHash, elConnections: mapIt(engineApiUrls, ELConnection.new(it)), eth1Network: eth1Network, blocksPerLogsRequest: targetBlocksPerLogsRequest) proc safeCancel(fut: var Future[void]) = if not fut.isNil and not fut.finished: fut.cancelSoon() fut = nil proc doStop(m: ELManager) {.async.} = safeCancel m.chainSyncingLoopFut safeCancel m.exchangeTransitionConfigurationLoopFut if m.elConnections.len > 0: let closeConnectionFutures = mapIt(m.elConnections, close(it)) await allFutures(closeConnectionFutures) proc stop(m: ELManager) {.async.} = if not m.stopFut.isNil: await m.stopFut else: m.stopFut = m.doStop() await m.stopFut m.stopFut = nil const votedBlocksSafetyMargin = 50 func earliestBlockOfInterest( m: ELManager, latestEth1BlockNumber: Eth1BlockNumber): Eth1BlockNumber = let blocksOfInterestRange = SLOTS_PER_ETH1_VOTING_PERIOD + (2 * m.cfg.ETH1_FOLLOW_DISTANCE) + votedBlocksSafetyMargin if latestEth1BlockNumber > blocksOfInterestRange: latestEth1BlockNumber - blocksOfInterestRange else: 0 proc syncBlockRange(m: ELManager, connection: ELConnection, rpcClient: RpcClient, depositContract: Sender[DepositContract], fromBlock, toBlock, fullSyncFromBlock: Eth1BlockNumber) {.gcsafe, async.} = doAssert m.eth1Chain.blocks.len > 0 var currentBlock = fromBlock while currentBlock <= toBlock: var depositLogs: seq[JsonString] maxBlockNumberRequested: Eth1BlockNumber backoff = 100 while true: maxBlockNumberRequested = min(toBlock, currentBlock + m.blocksPerLogsRequest - 1) debug "Obtaining deposit log events", fromBlock = currentBlock, toBlock = maxBlockNumberRequested, backoff debug.logTime "Deposit logs obtained": # Reduce all request rate until we have a more general solution # for dealing with Infura's rate limits await sleepAsync(milliseconds(backoff)) let startTime = Moment.now deadline = sleepAsync 30.seconds jsonLogsFut = depositContract.getJsonLogs( DepositEvent, fromBlock = some blockId(currentBlock), toBlock = some blockId(maxBlockNumberRequested)) connection.trackEngineApiRequest( jsonLogsFut, "getLogs", startTime, deadline) depositLogs = try: # Downloading large amounts of deposits may take several minutes awaitWithTimeout(jsonLogsFut, deadline): raise newException(DataProviderTimeout, "Request time out while obtaining json logs") except CatchableError as err: debug "Request for deposit logs failed", err = err.msg inc failed_web3_requests backoff = (backoff * 3) div 2 m.blocksPerLogsRequest = m.blocksPerLogsRequest div 2 if m.blocksPerLogsRequest == 0: m.blocksPerLogsRequest = 1 raise err continue m.blocksPerLogsRequest = min( (m.blocksPerLogsRequest * 3 + 1) div 2, targetBlocksPerLogsRequest) currentBlock = maxBlockNumberRequested + 1 break let blocksWithDeposits = depositEventsToBlocks(depositLogs) for i in 0 ..< blocksWithDeposits.len: let blk = blocksWithDeposits[i] if blk.number > fullSyncFromBlock: await fetchTimestamp(connection, rpcClient, blk) let lastBlock = m.eth1Chain.blocks.peekLast for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number: debug "Obtaining block without deposits", blockNum = n let noDepositsBlock = raiseIfNil connection.trackedRequestWithTimeout( "getBlockByNumber", rpcClient.getBlockByNumber(n), web3RequestsTimeout) m.eth1Chain.addBlock( lastBlock.makeSuccessorWithoutDeposits(noDepositsBlock)) eth1_synced_head.set noDepositsBlock.number.toGaugeValue m.eth1Chain.addBlock blk eth1_synced_head.set blk.number.toGaugeValue if blocksWithDeposits.len > 0: let lastIdx = blocksWithDeposits.len - 1 template lastBlock: auto = blocksWithDeposits[lastIdx] let status = when hasDepositRootChecks: await fetchDepositContractData( connection, rpcClient, depositContract, lastBlock) else: DepositRootUnavailable when hasDepositRootChecks: debug "Deposit contract state verified", status = $status, ourCount = lastBlock.depositCount, ourRoot = lastBlock.depositRoot case status of DepositRootIncorrect, DepositCountIncorrect: raise newException(CorruptDataProvider, "The deposit log events disagree with the deposit contract state") else: discard info "Eth1 sync progress", blockNumber = lastBlock.number, depositsProcessed = lastBlock.depositCount func hasConnection*(m: ELManager): bool = m.elConnections.len > 0 func hasAnyWorkingConnection*(m: ELManager): bool = m.elConnections.anyIt(it.state == Working or it.state == NeverTested) func hasProperlyConfiguredConnection*(m: ELManager): bool = for connection in m.elConnections: if connection.etcStatus == EtcStatus.match: return true false proc startExchangeTransitionConfigurationLoop(m: ELManager) {.async.} = debug "Starting exchange transition configuration loop" while true: # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#specification-3 debug "Exchange transition configuration tick" traceAsyncErrors m.exchangeTransitionConfiguration() await sleepAsync(60.seconds) proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = let rpcClient = awaitOrRaiseOnTimeout(connection.connectedRpcClient(), 1.seconds) let # BEWARE # `connectedRpcClient` guarantees that connection.web3 will not be # `none` here, but it's not safe to initialize this later (e.g closer # to where it's used) because `connection.web3` may be set to `none` # at any time after a failed request. Luckily, the `contractSender` # object is very cheap to create. depositContract = connection.web3.get.contractSender( DepositContract, m.depositContractAddress) shouldProcessDeposits = not ( m.depositContractAddress.isZeroMemory or m.eth1Chain.finalizedBlockHash.data.isZeroMemory) trace "Starting syncEth1Chain", shouldProcessDeposits logScope: url = connection.engineUrl.url # We might need to reset the chain if the new provider disagrees # with the previous one regarding the history of the chain or if # we have detected a conensus violation - our view disagreeing with # the majority of the validators in the network. # # Consensus violations happen in practice because the web3 providers # sometimes return incomplete or incorrect deposit log events even # when they don't indicate any errors in the response. When this # happens, we are usually able to download the data successfully # on the second attempt. # # TODO # Perhaps the above problem was manifesting only with the obsolete # JSON-RPC data providers, which can no longer be used with Nimbus. if m.eth1Chain.blocks.len > 0: let needsReset = m.eth1Chain.hasConsensusViolation or (block: let lastKnownBlock = m.eth1Chain.blocks.peekLast matchingBlockAtNewEl = raiseIfNil connection.trackedRequestWithTimeout( "getBlockByNumber", rpcClient.getBlockByNumber(lastKnownBlock.number), web3RequestsTimeout) lastKnownBlock.hash.asBlockHash != matchingBlockAtNewEl.hash) if needsReset: trace "Resetting the Eth1 chain", hasConsensusViolation = m.eth1Chain.hasConsensusViolation m.eth1Chain.clear() var eth1SyncedTo: Eth1BlockNumber if shouldProcessDeposits: if m.eth1Chain.blocks.len == 0: let finalizedBlockHash = m.eth1Chain.finalizedBlockHash.asBlockHash let startBlock = raiseIfNil connection.trackedRequestWithTimeout( "getBlockByHash", rpcClient.getBlockByHash(finalizedBlockHash), web3RequestsTimeout) m.eth1Chain.addBlock Eth1Block( hash: m.eth1Chain.finalizedBlockHash, number: Eth1BlockNumber startBlock.number, timestamp: Eth1BlockTimestamp startBlock.timestamp) eth1SyncedTo = m.eth1Chain.blocks[^1].number eth1_synced_head.set eth1SyncedTo.toGaugeValue eth1_finalized_head.set eth1SyncedTo.toGaugeValue eth1_finalized_deposits.set( m.eth1Chain.finalizedDepositsMerkleizer.getChunkCount.toGaugeValue) debug "Starting Eth1 syncing", `from` = shortLog(m.eth1Chain.blocks[^1]) while true: debug "syncEth1Chain tick" if bnStatus == BeaconNodeStatus.Stopping: await m.stop() return if m.eth1Chain.hasConsensusViolation: raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus") let latestBlock = try: raiseIfNil connection.trackedRequestWithTimeout( "getBlockByNumber", rpcClient.eth_getBlockByNumber(blockId("latest"), false), web3RequestsTimeout) except CatchableError as err: warn "Failed to obtain the latest block from the EL", err = err.msg raise err m.syncTargetBlock = some( if Eth1BlockNumber(latestBlock.number) > m.cfg.ETH1_FOLLOW_DISTANCE: Eth1BlockNumber(latestBlock.number) - m.cfg.ETH1_FOLLOW_DISTANCE else: Eth1BlockNumber(0)) if m.syncTargetBlock.get <= eth1SyncedTo: # The chain reorged to a lower height. # It's relatively safe to ignore that. await sleepAsync(m.cfg.SECONDS_PER_ETH1_BLOCK.int.seconds) continue eth1_latest_head.set latestBlock.number.toGaugeValue if shouldProcessDeposits and latestBlock.number.uint64 > m.cfg.ETH1_FOLLOW_DISTANCE: await m.syncBlockRange(connection, rpcClient, depositContract, eth1SyncedTo + 1, m.syncTargetBlock.get, m.earliestBlockOfInterest(Eth1BlockNumber latestBlock.number)) eth1SyncedTo = m.syncTargetBlock.get eth1_synced_head.set eth1SyncedTo.toGaugeValue proc startChainSyncingLoop(m: ELManager) {.async.} = info "Starting execution layer deposit syncing", contract = $m.depositContractAddress var syncedConnectionFut = m.selectConnectionForChainSyncing() info "Connection attempt started" while true: try: await syncedConnectionFut or sleepAsync(60.seconds) if not syncedConnectionFut.finished: notice "No synced execution layer available for deposit syncing" await sleepAsync(chronos.seconds(30)) continue await syncEth1Chain(m, syncedConnectionFut.read) except CatchableError: await sleepAsync(10.seconds) # A more detailed error is already logged by trackEngineApiRequest debug "Restarting the deposit syncing loop" # To be extra safe, we will make a fresh connection attempt await syncedConnectionFut.cancelAndWait() syncedConnectionFut = m.selectConnectionForChainSyncing() proc start*(m: ELManager, syncChain = true) {.gcsafe.} = if m.elConnections.len == 0: return ## Calling `ELManager.start()` on an already started ELManager is a noop if syncChain and m.chainSyncingLoopFut.isNil: m.chainSyncingLoopFut = m.startChainSyncingLoop() if m.hasJwtSecret and m.exchangeTransitionConfigurationLoopFut.isNil: m.exchangeTransitionConfigurationLoopFut = m.startExchangeTransitionConfigurationLoop() func `$`(x: Quantity): string = $(x.uint64) func `$`(x: BlockObject): string = $(x.number) & " [" & $(x.hash) & "]" proc testWeb3Provider*(web3Url: Uri, depositContractAddress: Eth1Address, jwtSecret: Opt[seq[byte]]) {.async.} = stdout.write "Establishing web3 connection..." var web3: Web3 try: web3 = awaitOrRaiseOnTimeout( newWeb3($web3Url, getJsonRpcRequestHeaders(jwtSecret)), 5.seconds) stdout.write "\rEstablishing web3 connection: Connected\n" except CatchableError as err: stdout.write "\rEstablishing web3 connection: Failure(" & err.msg & ")\n" quit 1 template request(actionDesc: static string, action: untyped): untyped = stdout.write actionDesc & "..." stdout.flushFile() var res: typeof(read action) try: res = awaitOrRaiseOnTimeout(action, web3RequestsTimeout) when res is BlockObject: res = raiseIfNil res stdout.write "\r" & actionDesc & ": " & $res except CatchableError as err: stdout.write "\r" & actionDesc & ": Error(" & err.msg & ")" stdout.write "\n" res discard request "Chain ID": web3.provider.eth_chainId() discard request "Sync status": web3.provider.eth_syncing() let latestBlock = request "Latest block": web3.provider.eth_getBlockByNumber(blockId("latest"), false) ns = web3.contractSender(DepositContract, depositContractAddress) discard request "Deposit root": ns.get_deposit_root.call(blockNumber = latestBlock.number.uint64)