nimbus-eth2/beacon_chain/el/el_manager.nim
Agnish Ghosh b32205de7c
upstream peerdas alpha3 related spec changes + fix upstream related issues (#6468)
* reworked some of the das core specs, pr'd to check whether whether the conflicting type issue is centric to my machine or not

* bumped nim-blscurve to 9c6e80c6109133c0af3025654f5a8820282cff05, same as unstable

* bumped nim-eth2-scenarios, nim-nat-traversal at par with unstable, added more pathches, made peerdas devnet branch backward compatible, peerdas passing new ssz tests as per alpha3, disabled electra fixture tests, as branch hasn't been rebased for a while

* refactor test fixture files

* rm: serializeDataColumn

* refactor: took data columns extracted from blobs during block proposal to the heap

* disable blob broadcast in pd devnet

* fix addBlock in message router

* fix: data column iterator

* added debug checkpoints to check CI

* refactor if else conditions

* add: updated das core specs to alpha 3, and unit tests pass
2024-08-05 19:27:39 +05:30

2231 lines
85 KiB
Nim

# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/[strformat, typetraits, json, sequtils],
# Nimble packages:
chronos, metrics, chronicles/timings,
json_rpc/[client, errors],
web3, web3/[engine_api, primitives, conversions],
eth/common/eth_types,
results,
stew/[assign2, byteutils, objects],
# Local modules:
../spec/[eth2_merkleization, forks],
../networking/network_metadata,
".."/beacon_node_status,
"."/[eth1_chain, el_conf]
from std/times import getTime, inSeconds, initTime, `-`
from ../spec/engine_authentication import getSignedIatToken
from ../spec/helpers import bytes_to_uint64
from ../spec/state_transition_block import kzg_commitment_to_versioned_hash
export
eth1_chain, el_conf, engine_api, base
logScope:
topics = "elman"
type
PubKeyBytes = DynamicBytes[48, 48]
WithdrawalCredentialsBytes = DynamicBytes[32, 32]
SignatureBytes = DynamicBytes[96, 96]
Int64LeBytes = DynamicBytes[8, 8]
WithoutTimeout* = distinct int
contract(DepositContract):
proc deposit(pubkey: PubKeyBytes,
withdrawalCredentials: WithdrawalCredentialsBytes,
signature: SignatureBytes,
deposit_data_root: FixedBytes[32])
proc get_deposit_root(): FixedBytes[32]
proc get_deposit_count(): Int64LeBytes
proc DepositEvent(pubkey: PubKeyBytes,
withdrawalCredentials: WithdrawalCredentialsBytes,
amount: Int64LeBytes,
signature: SignatureBytes,
index: Int64LeBytes) {.event.}
const
noTimeout = WithoutTimeout(0)
hasDepositRootChecks = defined(has_deposit_root_checks)
targetBlocksPerLogsRequest = 1000'u64
# TODO
#
# This is currently set to 1000, because this was the default maximum
# value in Besu circa our 22.3.0 release. Previously, we've used 5000,
# but this was effectively forcing the fallback logic in `syncBlockRange`
# to always execute multiple requests before getting a successful response.
#
# Besu have raised this default to 5000 in https://github.com/hyperledger/besu/pull/5209
# which is expected to ship in their next release.
#
# Full deposits sync time with various values for this parameter:
#
# Blocks per request | Geth running on the same host | Geth running on a more distant host
# ----------------------------------------------------------------------------------------
# 1000 | 11m 20s | 22m
# 5000 | 5m 20s | 15m 40s
# 100000 | 4m 10s | not tested
#
# The number of requests scales linearly with the parameter value as you would expect.
#
# These results suggest that it would be reasonable for us to get back to 5000 once the
# Besu release is well-spread within their userbase.
# Engine API timeouts
engineApiConnectionTimeout = 5.seconds # How much we wait before giving up connecting to the Engine API
web3RequestsTimeout* = 8.seconds # How much we wait for eth_* requests (e.g. eth_getBlockByHash)
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#request-2
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#request-2
GETPAYLOAD_TIMEOUT = 1.seconds
connectionStateChangeHysteresisThreshold = 15
## How many unsuccesful/successful requests we must see
## before declaring the connection as degraded/restored
type
NextExpectedPayloadParams* = object
headBlockHash*: Eth2Digest
safeBlockHash*: Eth2Digest
finalizedBlockHash*: Eth2Digest
payloadAttributes*: PayloadAttributesV3
ELManagerState* {.pure.} = enum
Running, Closing, Closed
ELManager* = ref object
eth1Network: Option[Eth1Network]
## If this value is supplied the EL manager will check whether
## all configured EL nodes are connected to the same network.
depositContractAddress*: Eth1Address
depositContractBlockNumber: uint64
depositContractBlockHash: BlockHash
blocksPerLogsRequest: uint64
## This value is used to dynamically adjust the number of
## blocks we are trying to download at once during deposit
## syncing. By default, the value is set to the constant
## `targetBlocksPerLogsRequest`, but if the EL is failing
## to serve this number of blocks per single `eth_getLogs`
## request, we temporarily lower the value until the request
## succeeds. The failures are generally expected only in
## periods in the history for very high deposit density.
elConnections: seq[ELConnection]
## All active EL connections
eth1Chain: Eth1Chain
## At larger distances, this chain consists of all blocks
## with deposits. Within the relevant voting period, it
## also includes blocks without deposits because we must
## vote for a block only if it's part of our known history.
syncTargetBlock: Option[Eth1BlockNumber]
chainSyncingLoopFut: Future[void]
exchangeTransitionConfigurationLoopFut: Future[void]
managerState: ELManagerState
nextExpectedPayloadParams*: Option[NextExpectedPayloadParams]
EtcStatus {.pure.} = enum
notExchangedYet
mismatch
match
DepositContractSyncStatus {.pure.} = enum
unknown
notSynced
synced
ELConnectionState {.pure.} = enum
NeverTested
Working
Degraded
ELConnection* = ref object
engineUrl: EngineApiUrl
web3: Opt[Web3]
## This will be `none` before connecting and while we are
## reconnecting after a lost connetion. You can wait on
## the future below for the moment the connection is active.
connectingFut: Future[Result[Web3, string]].Raising([CancelledError])
## This future will be replaced when the connection is lost.
etcStatus: EtcStatus
## The latest status of the `exchangeTransitionConfiguration`
## exchange.
state: ELConnectionState
hysteresisCounter: int
depositContractSyncStatus: DepositContractSyncStatus
## Are we sure that this EL has synced the deposit contract?
lastPayloadId: Option[PayloadID]
FullBlockId* = object
number: Eth1BlockNumber
hash: BlockHash
DataProviderFailure* = object of CatchableError
CorruptDataProvider* = object of DataProviderFailure
DataProviderTimeout* = object of DataProviderFailure
DataProviderConnectionFailure* = object of DataProviderFailure
DisconnectHandler* = proc () {.gcsafe, raises: [].}
DepositEventHandler* = proc (
pubkey: PubKeyBytes,
withdrawalCredentials: WithdrawalCredentialsBytes,
amount: Int64LeBytes,
signature: SignatureBytes,
merkleTreeIndex: Int64LeBytes,
j: JsonNode) {.gcsafe, raises: [].}
BellatrixExecutionPayloadWithValue* = object
executionPayload*: ExecutionPayloadV1
blockValue*: UInt256
SomeEnginePayloadWithValue =
BellatrixExecutionPayloadWithValue |
GetPayloadV2Response |
GetPayloadV3Response |
GetPayloadV4Response
declareCounter failed_web3_requests,
"Failed web3 requests"
declareGauge eth1_latest_head,
"The highest Eth1 block number observed on the network"
declareGauge eth1_synced_head,
"Block number of the highest synchronized block according to follow distance"
declareCounter engine_api_responses,
"Number of successful requests to the newPayload Engine API end-point",
labels = ["url", "request", "status"]
declareHistogram engine_api_request_duration_seconds,
"Time(s) used to generate signature usign remote signer",
buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
labels = ["url", "request"]
declareCounter engine_api_timeouts,
"Number of timed-out requests to Engine API end-point",
labels = ["url", "request"]
declareCounter engine_api_last_minute_forkchoice_updates_sent,
"Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals",
labels = ["url"]
proc close(connection: ELConnection): Future[void] {.async: (raises: []).} =
if connection.web3.isSome:
try:
let web3 = connection.web3.get
await noCancel web3.close().wait(30.seconds)
except AsyncTimeoutError:
debug "Failed to close execution layer data provider in time",
timeout = 30.seconds
except CatchableError as exc:
# TODO (cheatfate): This handler should be removed when `nim-web3` will
# adopt `asyncraises`.
debug "Failed to close execution layer", error = $exc.name,
reason = $exc.msg
proc increaseCounterTowardsStateChange(connection: ELConnection): bool =
result = connection.hysteresisCounter >= connectionStateChangeHysteresisThreshold
if result:
connection.hysteresisCounter = 0
else:
inc connection.hysteresisCounter
proc decreaseCounterTowardsStateChange(connection: ELConnection) =
if connection.hysteresisCounter > 0:
# While we increase the counter by 1, we decreate it by 20% in order
# to require a steady and affirmative change instead of allowing
# the counter to drift very slowly in one direction when the ratio
# between success and failure is roughly 50:50%
connection.hysteresisCounter = connection.hysteresisCounter div 5
proc setDegradedState(
connection: ELConnection,
requestName: string,
statusCode: int,
errMsg: string
): Future[void] {.async: (raises: []).} =
debug "Failed EL Request", requestName, statusCode, err = errMsg
case connection.state
of ELConnectionState.NeverTested, ELConnectionState.Working:
if connection.increaseCounterTowardsStateChange():
warn "Connection to EL node degraded",
url = url(connection.engineUrl),
failedRequest = requestName,
statusCode, err = errMsg
connection.state = Degraded
await connection.close()
connection.web3 = Opt.none(Web3)
of ELConnectionState.Degraded:
connection.decreaseCounterTowardsStateChange()
proc setWorkingState(connection: ELConnection) =
case connection.state
of ELConnectionState.NeverTested:
connection.hysteresisCounter = 0
connection.state = Working
of ELConnectionState.Degraded:
if connection.increaseCounterTowardsStateChange():
info "Connection to EL node restored",
url = url(connection.engineUrl)
connection.state = Working
of ELConnectionState.Working:
connection.decreaseCounterTowardsStateChange()
proc engineApiRequest[T](
connection: ELConnection,
request: Future[T],
requestName: string,
startTime: Moment,
deadline: Future[void] | Duration | WithoutTimeout,
failureAllowed = false
): Future[T] {.async: (raises: [CatchableError]).} =
## This procedure raises `CancelledError` and `DataProviderTimeout`
## exceptions, and everything which `request` could raise.
try:
let res =
when deadline is WithoutTimeout:
await request
else:
await request.wait(deadline)
engine_api_request_duration_seconds.observe(
float(milliseconds(Moment.now - startTime)) / 1000.0,
[connection.engineUrl.url, requestName])
engine_api_responses.inc(
1, [connection.engineUrl.url, requestName, "200"])
connection.setWorkingState()
res
except AsyncTimeoutError:
engine_api_timeouts.inc(1, [connection.engineUrl.url, requestName])
if not(failureAllowed):
await connection.setDegradedState(requestName, 0, "Request timed out")
raise newException(DataProviderTimeout, "Request timed out")
except CancelledError as exc:
when deadline is WithoutTimeout:
# When `deadline` is set to `noTimeout`, we usually get cancelled on
# timeout which was handled by caller.
engine_api_timeouts.inc(1, [connection.engineUrl.url, requestName])
if not(failureAllowed):
await connection.setDegradedState(requestName, 0, "Request timed out")
else:
if not(failureAllowed):
await connection.setDegradedState(requestName, 0, "Request interrupted")
raise exc
except CatchableError as exc:
let statusCode =
if request.error of ErrorResponse:
((ref ErrorResponse) request.error).status
else:
0
engine_api_responses.inc(
1, [connection.engineUrl.url, requestName, $statusCode])
if not(failureAllowed):
await connection.setDegradedState(
requestName, statusCode, request.error.msg)
raise exc
func raiseIfNil(web3block: BlockObject): BlockObject {.raises: [ValueError].} =
if web3block == nil:
raise newException(ValueError, "EL returned 'null' result for block")
web3block
template cfg(m: ELManager): auto =
m.eth1Chain.cfg
func hasJwtSecret*(m: ELManager): bool =
for c in m.elConnections:
if c.engineUrl.jwtSecret.isSome:
return true
func isSynced*(m: ELManager): bool =
m.syncTargetBlock.isSome and
m.eth1Chain.blocks.len > 0 and
m.syncTargetBlock.get <= m.eth1Chain.blocks[^1].number
template eth1ChainBlocks*(m: ELManager): Deque[Eth1Block] =
m.eth1Chain.blocks
# TODO: Add cfg validation
# MIN_GENESIS_ACTIVE_VALIDATOR_COUNT should be larger than SLOTS_PER_EPOCH
# doAssert SECONDS_PER_ETH1_BLOCK * cfg.ETH1_FOLLOW_DISTANCE < GENESIS_DELAY,
# "Invalid configuration: GENESIS_DELAY is set too low"
func asConsensusWithdrawal(w: WithdrawalV1): capella.Withdrawal =
capella.Withdrawal(
index: w.index.uint64,
validator_index: w.validatorIndex.uint64,
address: ExecutionAddress(data: w.address.distinctBase),
amount: Gwei w.amount)
func asEngineWithdrawal(w: capella.Withdrawal): WithdrawalV1 =
WithdrawalV1(
index: Quantity(w.index),
validatorIndex: Quantity(w.validator_index),
address: Address(w.address.data),
amount: Quantity(w.amount))
func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV1):
bellatrix.ExecutionPayload =
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
bellatrix.Transaction.init(tt.distinctBase)
bellatrix.ExecutionPayload(
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
feeRecipient:
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
block_number: rpcExecutionPayload.blockNumber.uint64,
gas_limit: rpcExecutionPayload.gasLimit.uint64,
gas_used: rpcExecutionPayload.gasUsed.uint64,
timestamp: rpcExecutionPayload.timestamp.uint64,
extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes),
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
mapIt(rpcExecutionPayload.transactions, it.getTransaction)))
func asConsensusType*(payloadWithValue: BellatrixExecutionPayloadWithValue):
bellatrix.ExecutionPayloadForSigning =
bellatrix.ExecutionPayloadForSigning(
executionPayload: payloadWithValue.executionPayload.asConsensusType,
blockValue: payloadWithValue.blockValue)
template maybeDeref[T](o: Option[T]): T = o.get
template maybeDeref[V](v: V): V = v
func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV1OrV2|ExecutionPayloadV2):
capella.ExecutionPayload =
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
bellatrix.Transaction.init(tt.distinctBase)
capella.ExecutionPayload(
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
feeRecipient:
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
block_number: rpcExecutionPayload.blockNumber.uint64,
gas_limit: rpcExecutionPayload.gasLimit.uint64,
gas_used: rpcExecutionPayload.gasUsed.uint64,
timestamp: rpcExecutionPayload.timestamp.uint64,
extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes),
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
mapIt(rpcExecutionPayload.transactions, it.getTransaction)),
withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init(
mapIt(maybeDeref rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal)))
func asConsensusType*(payloadWithValue: engine_api.GetPayloadV2Response):
capella.ExecutionPayloadForSigning =
capella.ExecutionPayloadForSigning(
executionPayload: payloadWithValue.executionPayload.asConsensusType,
blockValue: payloadWithValue.blockValue)
func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV3):
deneb.ExecutionPayload =
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
bellatrix.Transaction.init(tt.distinctBase)
deneb.ExecutionPayload(
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
feeRecipient:
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
block_number: rpcExecutionPayload.blockNumber.uint64,
gas_limit: rpcExecutionPayload.gasLimit.uint64,
gas_used: rpcExecutionPayload.gasUsed.uint64,
timestamp: rpcExecutionPayload.timestamp.uint64,
extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes),
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
mapIt(rpcExecutionPayload.transactions, it.getTransaction)),
withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init(
mapIt(rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal)),
blob_gas_used: rpcExecutionPayload.blobGasUsed.uint64,
excess_blob_gas: rpcExecutionPayload.excessBlobGas.uint64)
func asConsensusType*(payload: engine_api.GetPayloadV3Response):
deneb.ExecutionPayloadForSigning =
deneb.ExecutionPayloadForSigning(
executionPayload: payload.executionPayload.asConsensusType,
blockValue: payload.blockValue,
# TODO
# The `mapIt` calls below are necessary only because we use different distinct
# types for KZG commitments and Blobs in the `web3` and the `deneb` spec types.
# Both are defined as `array[N, byte]` under the hood.
blobsBundle: deneb.BlobsBundle(
commitments: KzgCommitments.init(
payload.blobsBundle.commitments.mapIt(
kzg_abi.KzgCommitment(bytes: it.bytes))),
proofs: KzgProofs.init(
payload.blobsBundle.proofs.mapIt(
kzg_abi.KzgProof(bytes: it.bytes))),
blobs: Blobs.init(
payload.blobsBundle.blobs.mapIt(it.bytes))))
func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV4):
electra.ExecutionPayload =
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
bellatrix.Transaction.init(tt.distinctBase)
template getDepositReceipt(dr: DepositReceiptV1): DepositReceipt =
DepositReceipt(
pubkey: ValidatorPubKey(blob: dr.pubkey.distinctBase),
withdrawal_credentials: dr.withdrawalCredentials.asEth2Digest,
amount: dr.amount.Gwei,
signature: ValidatorSig(blob: dr.signature.distinctBase),
index: dr.index.uint64)
template getExecutionLayerWithdrawalRequest(elwr: WithdrawalRequestV1):
ExecutionLayerWithdrawalRequest =
ExecutionLayerWithdrawalRequest(
source_address: ExecutionAddress(data: elwr.sourceAddress.distinctBase),
validator_pubkey: ValidatorPubKey(
blob: elwr.validatorPublicKey.distinctBase),
amount: elwr.amount.Gwei)
electra.ExecutionPayload(
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
feeRecipient:
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
block_number: rpcExecutionPayload.blockNumber.uint64,
gas_limit: rpcExecutionPayload.gasLimit.uint64,
gas_used: rpcExecutionPayload.gasUsed.uint64,
timestamp: rpcExecutionPayload.timestamp.uint64,
extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(
rpcExecutionPayload.extraData.bytes),
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
mapIt(rpcExecutionPayload.transactions, it.getTransaction)),
withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init(
mapIt(rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal)),
blob_gas_used: rpcExecutionPayload.blobGasUsed.uint64,
excess_blob_gas: rpcExecutionPayload.excessBlobGas.uint64,
deposit_receipts:
List[electra.DepositReceipt, MAX_DEPOSIT_RECEIPTS_PER_PAYLOAD].init(
mapIt(rpcExecutionPayload.depositRequests, it.getDepositReceipt)),
withdrawal_requests:
List[electra.ExecutionLayerWithdrawalRequest,
MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD].init(
mapIt(rpcExecutionPayload.withdrawalRequests,
it.getExecutionLayerWithdrawalRequest)))
func asConsensusType*(payload: engine_api.GetPayloadV4Response):
electra.ExecutionPayloadForSigning =
electra.ExecutionPayloadForSigning(
executionPayload: payload.executionPayload.asConsensusType,
blockValue: payload.blockValue,
# TODO
# The `mapIt` calls below are necessary only because we use different distinct
# types for KZG commitments and Blobs in the `web3` and the `deneb` spec types.
# Both are defined as `array[N, byte]` under the hood.
blobsBundle: deneb.BlobsBundle(
commitments: KzgCommitments.init(
payload.blobsBundle.commitments.mapIt(
kzg_abi.KzgCommitment(bytes: it.bytes))),
proofs: KzgProofs.init(
payload.blobsBundle.proofs.mapIt(
kzg_abi.KzgProof(bytes: it.bytes))),
blobs: Blobs.init(
payload.blobsBundle.blobs.mapIt(it.bytes))))
func asEngineExecutionPayload*(executionPayload: bellatrix.ExecutionPayload):
ExecutionPayloadV1 =
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
TypedTransaction(tt.distinctBase)
engine_api.ExecutionPayloadV1(
parentHash: executionPayload.parent_hash.asBlockHash,
feeRecipient: Address(executionPayload.fee_recipient.data),
stateRoot: executionPayload.state_root.asBlockHash,
receiptsRoot: executionPayload.receipts_root.asBlockHash,
logsBloom:
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
prevRandao: executionPayload.prev_randao.asBlockHash,
blockNumber: Quantity(executionPayload.block_number),
gasLimit: Quantity(executionPayload.gas_limit),
gasUsed: Quantity(executionPayload.gas_used),
timestamp: Quantity(executionPayload.timestamp),
extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
baseFeePerGas: executionPayload.base_fee_per_gas,
blockHash: executionPayload.block_hash.asBlockHash,
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction))
template toEngineWithdrawal(w: capella.Withdrawal): WithdrawalV1 =
WithdrawalV1(
index: Quantity(w.index),
validatorIndex: Quantity(w.validator_index),
address: Address(w.address.data),
amount: Quantity(w.amount))
func asEngineExecutionPayload*(executionPayload: capella.ExecutionPayload):
ExecutionPayloadV2 =
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
TypedTransaction(tt.distinctBase)
engine_api.ExecutionPayloadV2(
parentHash: executionPayload.parent_hash.asBlockHash,
feeRecipient: Address(executionPayload.fee_recipient.data),
stateRoot: executionPayload.state_root.asBlockHash,
receiptsRoot: executionPayload.receipts_root.asBlockHash,
logsBloom:
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
prevRandao: executionPayload.prev_randao.asBlockHash,
blockNumber: Quantity(executionPayload.block_number),
gasLimit: Quantity(executionPayload.gas_limit),
gasUsed: Quantity(executionPayload.gas_used),
timestamp: Quantity(executionPayload.timestamp),
extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
baseFeePerGas: executionPayload.base_fee_per_gas,
blockHash: executionPayload.block_hash.asBlockHash,
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction),
withdrawals: mapIt(executionPayload.withdrawals, it.toEngineWithdrawal))
func asEngineExecutionPayload*(executionPayload: deneb.ExecutionPayload):
ExecutionPayloadV3 =
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
TypedTransaction(tt.distinctBase)
engine_api.ExecutionPayloadV3(
parentHash: executionPayload.parent_hash.asBlockHash,
feeRecipient: Address(executionPayload.fee_recipient.data),
stateRoot: executionPayload.state_root.asBlockHash,
receiptsRoot: executionPayload.receipts_root.asBlockHash,
logsBloom:
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
prevRandao: executionPayload.prev_randao.asBlockHash,
blockNumber: Quantity(executionPayload.block_number),
gasLimit: Quantity(executionPayload.gas_limit),
gasUsed: Quantity(executionPayload.gas_used),
timestamp: Quantity(executionPayload.timestamp),
extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
baseFeePerGas: executionPayload.base_fee_per_gas,
blockHash: executionPayload.block_hash.asBlockHash,
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction),
withdrawals: mapIt(executionPayload.withdrawals, it.asEngineWithdrawal),
blobGasUsed: Quantity(executionPayload.blob_gas_used),
excessBlobGas: Quantity(executionPayload.excess_blob_gas))
func asEngineExecutionPayload*(executionPayload: electra.ExecutionPayload):
ExecutionPayloadV4 =
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
TypedTransaction(tt.distinctBase)
template getDepositReceipt(dr: DepositReceipt): DepositReceiptV1 =
DepositReceiptV1(
pubkey: FixedBytes[RawPubKeySize](dr.pubkey.blob),
withdrawalCredentials: FixedBytes[32](dr.withdrawal_credentials.data),
amount: dr.amount.Quantity,
signature: FixedBytes[RawSigSize](dr.signature.blob),
index: dr.index.Quantity)
template getExecutionLayerWithdrawalRequest(
elwr: ExecutionLayerWithdrawalRequest): WithdrawalRequestV1 =
WithdrawalRequestV1(
sourceAddress: Address(elwr.source_address.data),
validatorPublicKey: FixedBytes[RawPubKeySize](elwr.validator_pubkey.blob),
amount: elwr.amount.Quantity)
engine_api.ExecutionPayloadV4(
parentHash: executionPayload.parent_hash.asBlockHash,
feeRecipient: Address(executionPayload.fee_recipient.data),
stateRoot: executionPayload.state_root.asBlockHash,
receiptsRoot: executionPayload.receipts_root.asBlockHash,
logsBloom:
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
prevRandao: executionPayload.prev_randao.asBlockHash,
blockNumber: Quantity(executionPayload.block_number),
gasLimit: Quantity(executionPayload.gas_limit),
gasUsed: Quantity(executionPayload.gas_used),
timestamp: Quantity(executionPayload.timestamp),
extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
baseFeePerGas: executionPayload.base_fee_per_gas,
blockHash: executionPayload.block_hash.asBlockHash,
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction),
withdrawals: mapIt(executionPayload.withdrawals, it.asEngineWithdrawal),
blobGasUsed: Quantity(executionPayload.blob_gas_used),
excessBlobGas: Quantity(executionPayload.excess_blob_gas),
depositRequests: mapIt(
executionPayload.deposit_receipts, it.getDepositReceipt),
withdrawalRequests:
mapIt(executionPayload.withdrawal_requests,
it.getExecutionLayerWithdrawalRequest))
func isConnected(connection: ELConnection): bool =
connection.web3.isSome
func getJsonRpcRequestHeaders(jwtSecret: Opt[seq[byte]]):
auto =
if jwtSecret.isSome:
let secret = jwtSecret.get
(proc(): seq[(string, string)] =
# https://www.rfc-editor.org/rfc/rfc6750#section-6.1.1
@[("Authorization", "Bearer " & getSignedIatToken(
secret, (getTime() - initTime(0, 0)).inSeconds))])
else:
(proc(): seq[(string, string)] = @[])
proc newWeb3*(engineUrl: EngineApiUrl): Future[Web3] =
newWeb3(engineUrl.url,
getJsonRpcRequestHeaders(engineUrl.jwtSecret), httpFlags = {})
proc establishEngineApiConnection(url: EngineApiUrl):
Future[Result[Web3, string]] {.
async: (raises: [CancelledError]).} =
try:
ok(await newWeb3(url).wait(engineApiConnectionTimeout))
except AsyncTimeoutError:
err "Engine API connection timed out"
except CancelledError as exc:
raise exc
except CatchableError as exc:
err exc.msg
proc tryConnecting(connection: ELConnection): Future[bool] {.
async: (raises: [CancelledError]).} =
if connection.isConnected:
return true
if connection.connectingFut == nil or
connection.connectingFut.finished: # The previous attempt was not successful
connection.connectingFut =
establishEngineApiConnection(connection.engineUrl)
let web3Res = await connection.connectingFut
if web3Res.isErr:
warn "Engine API connection failed", err = web3Res.error
false
else:
connection.web3 = Opt.some(web3Res.get)
true
proc connectedRpcClient(connection: ELConnection): Future[RpcClient] {.
async: (raises: [CancelledError]).} =
while not connection.isConnected:
if not(await connection.tryConnecting()):
await sleepAsync(chronos.seconds(10))
connection.web3.get.provider
proc getBlockByHash(
rpcClient: RpcClient,
hash: BlockHash
): Future[BlockObject] {.async: (raises: [CatchableError]).} =
await rpcClient.eth_getBlockByHash(hash, false)
proc getBlockByNumber*(
rpcClient: RpcClient,
number: Eth1BlockNumber
): Future[BlockObject] {.async: (raises: [CatchableError]).} =
let hexNumber = try:
let num = distinctBase(number)
&"0x{num:X}" # No leading 0's!
except ValueError as exc:
# Since the format above is valid, failing here should not be possible
raiseAssert exc.msg
await rpcClient.eth_getBlockByNumber(hexNumber, false)
func areSameAs(expectedParams: Option[NextExpectedPayloadParams],
latestHead, latestSafe, latestFinalized: Eth2Digest,
timestamp: uint64,
randomData: Eth2Digest,
feeRecipient: Eth1Address,
withdrawals: seq[WithdrawalV1]): bool =
expectedParams.isSome and
expectedParams.get.headBlockHash == latestHead and
expectedParams.get.safeBlockHash == latestSafe and
expectedParams.get.finalizedBlockHash == latestFinalized and
expectedParams.get.payloadAttributes.timestamp.uint64 == timestamp and
expectedParams.get.payloadAttributes.prevRandao.bytes == randomData.data and
expectedParams.get.payloadAttributes.suggestedFeeRecipient == feeRecipient and
expectedParams.get.payloadAttributes.withdrawals == withdrawals
proc forkchoiceUpdated(rpcClient: RpcClient,
state: ForkchoiceStateV1,
payloadAttributes: Option[PayloadAttributesV1] |
Option[PayloadAttributesV2] |
Option[PayloadAttributesV3]):
Future[ForkchoiceUpdatedResponse] =
when payloadAttributes is Option[PayloadAttributesV1]:
rpcClient.engine_forkchoiceUpdatedV1(state, payloadAttributes)
elif payloadAttributes is Option[PayloadAttributesV2]:
rpcClient.engine_forkchoiceUpdatedV2(state, payloadAttributes)
elif payloadAttributes is Option[PayloadAttributesV3]:
rpcClient.engine_forkchoiceUpdatedV3(state, payloadAttributes)
else:
static: doAssert false
proc getPayloadFromSingleEL(
connection: ELConnection,
GetPayloadResponseType: type,
isForkChoiceUpToDate: bool,
consensusHead: Eth2Digest,
headBlock, safeBlock, finalizedBlock: Eth2Digest,
timestamp: uint64,
randomData: Eth2Digest,
suggestedFeeRecipient: Eth1Address,
withdrawals: seq[WithdrawalV1]
): Future[GetPayloadResponseType] {.async: (raises: [CatchableError]).} =
let
rpcClient = await connection.connectedRpcClient()
payloadId = if isForkChoiceUpToDate and connection.lastPayloadId.isSome:
connection.lastPayloadId.get
elif not headBlock.isZero:
engine_api_last_minute_forkchoice_updates_sent.inc(1, [connection.engineUrl.url])
when GetPayloadResponseType is BellatrixExecutionPayloadWithValue:
let response = await rpcClient.forkchoiceUpdated(
ForkchoiceStateV1(
headBlockHash: headBlock.asBlockHash,
safeBlockHash: safeBlock.asBlockHash,
finalizedBlockHash: finalizedBlock.asBlockHash),
some PayloadAttributesV1(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData.data,
suggestedFeeRecipient: suggestedFeeRecipient))
elif GetPayloadResponseType is engine_api.GetPayloadV2Response:
let response = await rpcClient.forkchoiceUpdated(
ForkchoiceStateV1(
headBlockHash: headBlock.asBlockHash,
safeBlockHash: safeBlock.asBlockHash,
finalizedBlockHash: finalizedBlock.asBlockHash),
some PayloadAttributesV2(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData.data,
suggestedFeeRecipient: suggestedFeeRecipient,
withdrawals: withdrawals))
elif GetPayloadResponseType is engine_api.GetPayloadV3Response or
GetPayloadResponseType is engine_api.GetPayloadV4Response:
# https://github.com/ethereum/execution-apis/blob/90a46e9137c89d58e818e62fa33a0347bba50085/src/engine/prague.md
# does not define any new forkchoiceUpdated, so reuse V3 from Dencun
let response = await rpcClient.forkchoiceUpdated(
ForkchoiceStateV1(
headBlockHash: headBlock.asBlockHash,
safeBlockHash: safeBlock.asBlockHash,
finalizedBlockHash: finalizedBlock.asBlockHash),
some PayloadAttributesV3(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData.data,
suggestedFeeRecipient: suggestedFeeRecipient,
withdrawals: withdrawals,
parentBeaconBlockRoot: consensusHead.asBlockHash))
else:
static: doAssert false
if response.payloadStatus.status != PayloadExecutionStatus.valid or
response.payloadId.isNone:
raise newException(CatchableError, "Head block is not a valid payload")
# Give the EL some time to assemble the block
await sleepAsync(chronos.milliseconds 500)
response.payloadId.get
else:
raise newException(CatchableError, "No confirmed execution head yet")
when GetPayloadResponseType is BellatrixExecutionPayloadWithValue:
let payload =
await engine_api.getPayload(rpcClient, ExecutionPayloadV1, payloadId)
return BellatrixExecutionPayloadWithValue(
executionPayload: payload, blockValue: Wei.zero)
else:
return await engine_api.getPayload(
rpcClient, GetPayloadResponseType, payloadId)
func cmpGetPayloadResponses(lhs, rhs: SomeEnginePayloadWithValue): int =
cmp(distinctBase lhs.blockValue, distinctBase rhs.blockValue)
template EngineApiResponseType*(T: type bellatrix.ExecutionPayloadForSigning): type =
BellatrixExecutionPayloadWithValue
template EngineApiResponseType*(T: type capella.ExecutionPayloadForSigning): type =
engine_api.GetPayloadV2Response
template EngineApiResponseType*(T: type deneb.ExecutionPayloadForSigning): type =
engine_api.GetPayloadV3Response
template EngineApiResponseType*(T: type electra.ExecutionPayloadForSigning): type =
engine_api.GetPayloadV4Response
template toEngineWithdrawals*(withdrawals: seq[capella.Withdrawal]): seq[WithdrawalV1] =
mapIt(withdrawals, toEngineWithdrawal(it))
template kind(T: type ExecutionPayloadV1): ConsensusFork =
ConsensusFork.Bellatrix
template kind(T: typedesc[ExecutionPayloadV1OrV2|ExecutionPayloadV2]): ConsensusFork =
ConsensusFork.Capella
template kind(T: type ExecutionPayloadV3): ConsensusFork =
ConsensusFork.Deneb
proc getPayload*(
m: ELManager,
PayloadType: type ForkyExecutionPayloadForSigning,
consensusHead: Eth2Digest,
headBlock, safeBlock, finalizedBlock: Eth2Digest,
timestamp: uint64,
randomData: Eth2Digest,
suggestedFeeRecipient: Eth1Address,
withdrawals: seq[capella.Withdrawal]
): Future[Opt[PayloadType]] {.async: (raises: [CancelledError]).} =
if m.elConnections.len == 0:
return err()
let
engineApiWithdrawals = toEngineWithdrawals withdrawals
isFcUpToDate = m.nextExpectedPayloadParams.areSameAs(
headBlock, safeBlock, finalizedBlock, timestamp,
randomData, suggestedFeeRecipient, engineApiWithdrawals)
# `getPayloadFromSingleEL` may introduce additional latency
const extraProcessingOverhead = 500.milliseconds
let
timeout = GETPAYLOAD_TIMEOUT + extraProcessingOverhead
deadline = sleepAsync(timeout)
var bestPayloadIdx = Opt.none(int)
while true:
let requests =
m.elConnections.mapIt(
it.getPayloadFromSingleEL(EngineApiResponseType(PayloadType),
isFcUpToDate, consensusHead, headBlock, safeBlock, finalizedBlock,
timestamp, randomData, suggestedFeeRecipient, engineApiWithdrawals))
let timeoutExceeded =
try:
await allFutures(requests).wait(deadline)
false
except AsyncTimeoutError:
true
except CancelledError as exc:
let pending =
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc
for idx, req in requests:
if not(req.finished()):
warn "Timeout while getting execution payload",
url = m.elConnections[idx].engineUrl.url
elif req.failed():
warn "Failed to get execution payload from EL",
url = m.elConnections[idx].engineUrl.url,
reason = req.error.msg
else:
const payloadFork = PayloadType.kind
when payloadFork >= ConsensusFork.Capella:
when payloadFork == ConsensusFork.Capella:
# TODO: The engine_api module may offer an alternative API where
# it is guaranteed to return the correct response type (i.e. the
# rule below will be enforced during deserialization).
if req.value().executionPayload.withdrawals.isNone:
warn "Execution client returned a block without a " &
"'withdrawals' field for a post-Shanghai block",
url = m.elConnections[idx].engineUrl.url
continue
if engineApiWithdrawals !=
req.value().executionPayload.withdrawals.maybeDeref:
# otherwise it formats as "@[(index: ..., validatorIndex: ...,
# address: ..., amount: ...), (index: ..., validatorIndex: ...,
# address: ..., amount: ...)]"
# TODO (cheatfate): should we have `continue` statement at the
# end of this branch. If no such payload could be choosen as
# best one.
warn "Execution client did not return correct withdrawals",
withdrawals_from_cl_len = engineApiWithdrawals.len,
withdrawals_from_el_len =
req.value().executionPayload.withdrawals.maybeDeref.len,
withdrawals_from_cl =
mapIt(engineApiWithdrawals, it.asConsensusWithdrawal),
withdrawals_from_el =
mapIt(
req.value().executionPayload.withdrawals.maybeDeref,
it.asConsensusWithdrawal),
url = m.elConnections[idx].engineUrl.url
# If we have more than one EL connection we consider this as
# a failure.
if len(requests) > 1:
continue
if req.value().executionPayload.extraData.len > MAX_EXTRA_DATA_BYTES:
warn "Execution client provided a block with invalid extraData " &
"(size exceeds limit)",
url = m.elConnections[idx].engineUrl.url,
size = req.value().executionPayload.extraData.len,
limit = MAX_EXTRA_DATA_BYTES
continue
if bestPayloadIdx.isNone:
bestPayloadIdx = Opt.some(idx)
else:
if cmpGetPayloadResponses(
req.value(), requests[bestPayloadIdx.get].value()) > 0:
bestPayloadIdx = Opt.some(idx)
let pending =
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
if bestPayloadIdx.isSome():
return ok(requests[bestPayloadIdx.get()].value().asConsensusType)
if timeoutExceeded:
break
err()
proc waitELToSyncDeposits(
connection: ELConnection,
minimalRequiredBlock: BlockHash
) {.async: (raises: [CancelledError]).} =
var rpcClient: RpcClient = nil
if connection.depositContractSyncStatus == DepositContractSyncStatus.synced:
return
var attempt = 0
while true:
if isNil(rpcClient):
rpcClient = await connection.connectedRpcClient()
try:
discard raiseIfNil await connection.engineApiRequest(
rpcClient.getBlockByHash(minimalRequiredBlock),
"getBlockByHash", Moment.now(),
web3RequestsTimeout, failureAllowed = true)
connection.depositContractSyncStatus = DepositContractSyncStatus.synced
return
except CancelledError as exc:
trace "waitELToSyncDepositContract interrupted",
url = connection.engineUrl.url
raise exc
except CatchableError as exc:
connection.depositContractSyncStatus = DepositContractSyncStatus.notSynced
if attempt == 0:
warn "Failed to obtain the most recent known block from the " &
"execution layer node (the node is probably not synced)",
url = connection.engineUrl.url,
blk = minimalRequiredBlock,
reason = exc.msg
elif attempt mod 60 == 0:
# This warning will be produced every 30 minutes
warn "Still failing to obtain the most recent known block from the " &
"execution layer node (the node is probably still not synced)",
url = connection.engineUrl.url,
blk = minimalRequiredBlock,
reason = exc.msg
inc(attempt)
await sleepAsync(seconds(30))
rpcClient = nil
func networkHasDepositContract(m: ELManager): bool =
not m.cfg.DEPOSIT_CONTRACT_ADDRESS.isDefaultValue
func mostRecentKnownBlock(m: ELManager): BlockHash =
if m.eth1Chain.finalizedDepositsMerkleizer.getChunkCount() > 0:
m.eth1Chain.finalizedBlockHash.asBlockHash
else:
m.depositContractBlockHash
proc selectConnectionForChainSyncing(
m: ELManager
): Future[ELConnection] {.async: (raises: [CancelledError,
DataProviderConnectionFailure]).} =
doAssert m.elConnections.len > 0
let pendingConnections = m.elConnections.mapIt(
if m.networkHasDepositContract:
FutureBase waitELToSyncDeposits(it, m.mostRecentKnownBlock)
else:
FutureBase connectedRpcClient(it))
while true:
var pendingFutures = pendingConnections
try:
discard await race(pendingFutures)
except ValueError:
raiseAssert "pendingFutures should not be empty at this moment"
except CancelledError as exc:
let pending = pendingConnections.filterIt(not(it.finished())).
mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc
pendingFutures.reset()
for index, future in pendingConnections.pairs():
if future.completed():
let pending = pendingConnections.filterIt(not(it.finished())).
mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
return m.elConnections[index]
elif not(future.finished()):
pendingFutures.add(future)
if len(pendingFutures) == 0:
raise newException(DataProviderConnectionFailure,
"Unable to establish connection for chain syncing")
proc sendNewPayloadToSingleEL(
connection: ELConnection,
payload: engine_api.ExecutionPayloadV1
): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} =
let rpcClient = await connection.connectedRpcClient()
await rpcClient.engine_newPayloadV1(payload)
proc sendNewPayloadToSingleEL(
connection: ELConnection,
payload: engine_api.ExecutionPayloadV2
): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} =
let rpcClient = await connection.connectedRpcClient()
await rpcClient.engine_newPayloadV2(payload)
proc sendNewPayloadToSingleEL(
connection: ELConnection,
payload: engine_api.ExecutionPayloadV3,
versioned_hashes: seq[engine_api.VersionedHash],
parent_beacon_block_root: FixedBytes[32]
): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} =
let rpcClient = await connection.connectedRpcClient()
await rpcClient.engine_newPayloadV3(
payload, versioned_hashes, parent_beacon_block_root)
proc sendNewPayloadToSingleEL(
connection: ELConnection,
payload: engine_api.ExecutionPayloadV4,
versioned_hashes: seq[engine_api.VersionedHash],
parent_beacon_block_root: FixedBytes[32]
): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} =
let rpcClient = await connection.connectedRpcClient()
await rpcClient.engine_newPayloadV4(
payload, versioned_hashes, parent_beacon_block_root)
type
StatusRelation = enum
newStatusIsPreferable
oldStatusIsOk
disagreement
func compareStatuses(
newStatus, prevStatus: PayloadExecutionStatus
): StatusRelation =
case prevStatus
of PayloadExecutionStatus.syncing:
if newStatus == PayloadExecutionStatus.syncing:
oldStatusIsOk
else:
newStatusIsPreferable
of PayloadExecutionStatus.valid:
case newStatus
of PayloadExecutionStatus.syncing,
PayloadExecutionStatus.accepted,
PayloadExecutionStatus.valid:
oldStatusIsOk
of PayloadExecutionStatus.invalid_block_hash,
PayloadExecutionStatus.invalid:
disagreement
of PayloadExecutionStatus.invalid:
case newStatus
of PayloadExecutionStatus.syncing,
PayloadExecutionStatus.invalid:
oldStatusIsOk
of PayloadExecutionStatus.valid,
PayloadExecutionStatus.accepted,
PayloadExecutionStatus.invalid_block_hash:
disagreement
of PayloadExecutionStatus.accepted:
case newStatus
of PayloadExecutionStatus.accepted,
PayloadExecutionStatus.syncing:
oldStatusIsOk
of PayloadExecutionStatus.valid:
newStatusIsPreferable
of PayloadExecutionStatus.invalid_block_hash,
PayloadExecutionStatus.invalid:
disagreement
of PayloadExecutionStatus.invalid_block_hash:
if newStatus == PayloadExecutionStatus.invalid_block_hash:
oldStatusIsOk
else:
disagreement
type
ELConsensusViolationDetector = object
selectedResponse: Opt[int]
disagreementAlreadyDetected: bool
func init(T: type ELConsensusViolationDetector): T =
ELConsensusViolationDetector(
selectedResponse: Opt.none(int),
disagreementAlreadyDetected: false
)
proc processResponse(
d: var ELConsensusViolationDetector,
elResponseType: typedesc,
connections: openArray[ELConnection],
requests: auto,
idx: int) =
if not requests[idx].completed:
return
let status = requests[idx].value().status
if d.selectedResponse.isNone:
d.selectedResponse = Opt.some(idx)
elif not d.disagreementAlreadyDetected:
let prevStatus = requests[d.selectedResponse.get].value().status
case compareStatuses(status, prevStatus)
of newStatusIsPreferable:
d.selectedResponse = Opt.some(idx)
of oldStatusIsOk:
discard
of disagreement:
d.disagreementAlreadyDetected = true
error "Execution layer consensus violation detected",
responseType = name(elResponseType),
url1 = connections[d.selectedResponse.get].engineUrl.url,
status1 = prevStatus,
url2 = connections[idx].engineUrl.url,
status2 = status
proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
block:
let pending = futures.filterIt(not(it.finished()))
if len(pending) > 0:
try:
await allFutures(pending).wait(30.seconds)
except CancelledError:
discard
except AsyncTimeoutError:
discard
block:
let pending = futures.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
if len(pending) > 0:
await noCancel allFutures(pending)
proc sendNewPayload*(
m: ELManager,
blck: SomeForkyBeaconBlock
): Future[PayloadExecutionStatus] {.async: (raises: [CancelledError]).} =
let
startTime = Moment.now()
deadline = sleepAsync(NEWPAYLOAD_TIMEOUT)
payload = blck.body.execution_payload.asEngineExecutionPayload
var
responseProcessor = ELConsensusViolationDetector.init()
while true:
block mainLoop:
let
requests = m.elConnections.mapIt:
let req =
when payload is engine_api.ExecutionPayloadV3 or
payload is engine_api.ExecutionPayloadV4:
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-alpha.1/specs/deneb/beacon-chain.md#process_execution_payload
# Verify the execution payload is valid
# [Modified in Deneb] Pass `versioned_hashes` to Execution Engine
let versioned_hashes = mapIt(
blck.body.blob_kzg_commitments,
engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it)))
sendNewPayloadToSingleEL(
it, payload, versioned_hashes,
FixedBytes[32] blck.parent_root.data)
elif payload is engine_api.ExecutionPayloadV1 or
payload is engine_api.ExecutionPayloadV2:
sendNewPayloadToSingleEL(it, payload)
else:
static: doAssert false
engineApiRequest(it, req, "newPayload", startTime, noTimeout)
var pendingRequests = requests
while true:
let timeoutExceeded =
try:
discard await race(pendingRequests).wait(deadline)
false
except AsyncTimeoutError:
true
except ValueError:
raiseAssert "pendingRequests should not be empty!"
except CancelledError as exc:
let pending =
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc
var stillPending: type(pendingRequests)
for request in pendingRequests:
if not(request.finished()):
stillPending.add(request)
elif request.completed():
let index = requests.find(request)
doAssert(index >= 0)
responseProcessor.processResponse(type(payload),
m.elConnections, requests, index)
pendingRequests = stillPending
if responseProcessor.disagreementAlreadyDetected:
let pending =
pendingRequests.filterIt(not(it.finished())).
mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
return PayloadExecutionStatus.invalid
elif responseProcessor.selectedResponse.isSome():
# We spawn task which will wait for all other responses which are
# still pending, after 30.seconds all pending requests will be
# cancelled.
asyncSpawn lazyWait(pendingRequests.mapIt(FutureBase(it)))
return requests[responseProcessor.selectedResponse.get].value().status
if timeoutExceeded:
# Timeout exceeded, cancelling all pending requests.
let pending =
pendingRequests.filterIt(not(it.finished())).
mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
return PayloadExecutionStatus.syncing
if len(pendingRequests) == 0:
# All requests failed, we will continue our attempts until deadline
# is not finished.
break mainLoop
proc forkchoiceUpdatedForSingleEL(
connection: ELConnection,
state: ref ForkchoiceStateV1,
payloadAttributes: Option[PayloadAttributesV1] |
Option[PayloadAttributesV2] |
Option[PayloadAttributesV3]
): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} =
let
rpcClient = await connection.connectedRpcClient()
response = await rpcClient.forkchoiceUpdated(state[], payloadAttributes)
if response.payloadStatus.status notin {syncing, valid, invalid}:
debug "Invalid fork-choice updated response from the EL",
payloadStatus = response.payloadStatus
return
if response.payloadStatus.status == PayloadExecutionStatus.valid and
response.payloadId.isSome:
connection.lastPayloadId = response.payloadId
return response.payloadStatus
proc forkchoiceUpdated*(
m: ELManager,
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
payloadAttributes: Option[PayloadAttributesV1] |
Option[PayloadAttributesV2] |
Option[PayloadAttributesV3]
): Future[(PayloadExecutionStatus, Option[BlockHash])] {.
async: (raises: [CancelledError]).} =
doAssert not headBlockHash.isZero
# Allow finalizedBlockHash to be 0 to avoid sync deadlocks.
#
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events
# has "Before the first finalized block occurs in the system the finalized
# block hash provided by this event is stubbed with
# `0x0000000000000000000000000000000000000000000000000000000000000000`."
# and
# https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/bellatrix/validator.md#executionpayload
# notes "`finalized_block_hash` is the hash of the latest finalized execution
# payload (`Hash32()` if none yet finalized)"
if m.elConnections.len == 0:
return (PayloadExecutionStatus.syncing, none BlockHash)
when payloadAttributes is Option[PayloadAttributesV3]:
template payloadAttributesV3(): auto =
if payloadAttributes.isSome:
payloadAttributes.get
else:
# As timestamp and prevRandao are both 0, won't false-positive match
(static(default(PayloadAttributesV3)))
elif payloadAttributes is Option[PayloadAttributesV2]:
template payloadAttributesV3(): auto =
if payloadAttributes.isSome:
PayloadAttributesV3(
timestamp: payloadAttributes.get.timestamp,
prevRandao: payloadAttributes.get.prevRandao,
suggestedFeeRecipient: payloadAttributes.get.suggestedFeeRecipient,
withdrawals: payloadAttributes.get.withdrawals,
parentBeaconBlockRoot: static(default(FixedBytes[32])))
else:
# As timestamp and prevRandao are both 0, won't false-positive match
(static(default(PayloadAttributesV3)))
elif payloadAttributes is Option[PayloadAttributesV1]:
template payloadAttributesV3(): auto =
if payloadAttributes.isSome:
PayloadAttributesV3(
timestamp: payloadAttributes.get.timestamp,
prevRandao: payloadAttributes.get.prevRandao,
suggestedFeeRecipient: payloadAttributes.get.suggestedFeeRecipient,
withdrawals: @[],
parentBeaconBlockRoot: static(default(FixedBytes[32])))
else:
# As timestamp and prevRandao are both 0, won't false-positive match
(static(default(PayloadAttributesV3)))
else:
static: doAssert false
let
state = newClone ForkchoiceStateV1(
headBlockHash: headBlockHash.asBlockHash,
safeBlockHash: safeBlockHash.asBlockHash,
finalizedBlockHash: finalizedBlockHash.asBlockHash)
startTime = Moment.now
deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT)
var
responseProcessor = ELConsensusViolationDetector.init()
while true:
block mainLoop:
let requests =
m.elConnections.mapIt:
let req = it.forkchoiceUpdatedForSingleEL(state, payloadAttributes)
engineApiRequest(it, req, "forkchoiceUpdated", startTime, noTimeout)
var pendingRequests = requests
while true:
let timeoutExceeded =
try:
discard await race(pendingRequests).wait(deadline)
false
except ValueError:
raiseAssert "pendingRequests should not be empty!"
except AsyncTimeoutError:
true
except CancelledError as exc:
let pending =
pendingRequests.filterIt(not(it.finished())).
mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc
var stillPending: type(pendingRequests)
for request in pendingRequests:
if not(request.finished()):
stillPending.add(request)
elif request.completed():
let index = requests.find(request)
doAssert(index >= 0)
responseProcessor.processResponse(
PayloadStatusV1, m.elConnections, requests, index)
pendingRequests = stillPending
template assignNextExpectedPayloadParams() =
# Ensure that there's no race condition window where getPayload's
# check for whether it needs to trigger a new fcU payload, due to
# cache invalidation, falsely suggests that the expected payload
# matches, and similarly that if the fcU fails or times out for other
# reasons, the expected payload params remain synchronized with
# EL state.
assign(
m.nextExpectedPayloadParams,
some NextExpectedPayloadParams(
headBlockHash: headBlockHash,
safeBlockHash: safeBlockHash,
finalizedBlockHash: finalizedBlockHash,
payloadAttributes: payloadAttributesV3))
template getSelected: untyped =
let data = requests[responseProcessor.selectedResponse.get].value()
(data.status, data.latestValidHash)
if responseProcessor.disagreementAlreadyDetected:
let pending =
pendingRequests.filterIt(not(it.finished())).
mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
return (PayloadExecutionStatus.invalid, none BlockHash)
elif responseProcessor.selectedResponse.isSome:
# We spawn task which will wait for all other responses which are
# still pending, after 30.seconds all pending requests will be
# cancelled.
asyncSpawn lazyWait(pendingRequests.mapIt(FutureBase(it)))
assignNextExpectedPayloadParams()
return getSelected()
if timeoutExceeded:
# Timeout exceeded, cancelling all pending requests.
let pending =
pendingRequests.filterIt(not(it.finished())).
mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
return (PayloadExecutionStatus.syncing, none BlockHash)
if len(pendingRequests) == 0:
# All requests failed, we will continue our attempts until deadline
# is not finished.
break mainLoop
# TODO can't be defined within exchangeConfigWithSingleEL
func `==`(x, y: Quantity): bool {.borrow.}
proc exchangeConfigWithSingleEL(
m: ELManager,
connection: ELConnection
) {.async: (raises: [CancelledError]).} =
let rpcClient = await connection.connectedRpcClient()
if m.eth1Network.isSome and
connection.etcStatus == EtcStatus.notExchangedYet:
try:
let
providerChain = await connection.engineApiRequest(
rpcClient.eth_chainId(), "chainId", Moment.now(),
web3RequestsTimeout)
# https://chainid.network/
expectedChain = case m.eth1Network.get
of mainnet: 1.Quantity
of goerli: 5.Quantity
of sepolia: 11155111.Quantity
of holesky: 17000.Quantity
if expectedChain != providerChain:
warn "The specified EL client is connected to a different chain",
url = connection.engineUrl,
expectedChain = distinctBase(expectedChain),
actualChain = distinctBase(providerChain)
connection.etcStatus = EtcStatus.mismatch
return
except CancelledError as exc:
debug "Configuration exchange was interrupted"
raise exc
except CatchableError as exc:
# Typically because it's not synced through EIP-155, assuming this Web3
# endpoint has been otherwise working.
debug "Failed to obtain eth_chainId", reason = exc.msg
connection.etcStatus = EtcStatus.match
proc exchangeTransitionConfiguration*(
m: ELManager
) {.async: (raises: [CancelledError]).} =
if m.elConnections.len == 0:
return
let requests = m.elConnections.mapIt(m.exchangeConfigWithSingleEL(it))
try:
await allFutures(requests).wait(3.seconds)
except AsyncTimeoutError:
discard
except CancelledError as exc:
let pending = requests.filterIt(not(it.finished())).
mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc
let (pending, failed, finished) =
block:
var
failed = 0
done = 0
pending: seq[Future[void]]
for req in requests:
if not req.finished():
pending.add(req.cancelAndWait())
else:
if req.completed():
inc(done)
else:
inc(failed)
(pending, failed, done)
await noCancel allFutures(pending)
if (len(pending) > 0) or (failed != 0):
warn "Failed to exchange configuration with the configured EL end-points",
completed = finished, failed = failed, timed_out = len(pending)
template readJsonField(logEvent, field: untyped, ValueType: type): untyped =
if logEvent.field.isNone:
raise newException(CatchableError,
"Web3 provider didn't return needed logEvent field " & astToStr(field))
logEvent.field.get
template init[N: static int](T: type DynamicBytes[N, N]): T =
T newSeq[byte](N)
proc fetchTimestamp(
connection: ELConnection,
rpcClient: RpcClient,
blk: Eth1Block
) {.async: (raises: [CatchableError]).} =
debug "Fetching block timestamp", blockNum = blk.number
let web3block = raiseIfNil await connection.engineApiRequest(
rpcClient.getBlockByHash(blk.hash.asBlockHash),
"getBlockByHash", Moment.now(), web3RequestsTimeout)
blk.timestamp = Eth1BlockTimestamp(web3block.timestamp)
func depositEventsToBlocks(
depositsList: openArray[JsonString]
): seq[Eth1Block] {.raises: [CatchableError].} =
var lastEth1Block: Eth1Block
for logEventData in depositsList:
let
logEvent = JrpcConv.decode(logEventData.string, LogObject)
blockNumber = Eth1BlockNumber readJsonField(logEvent, blockNumber, Quantity)
blockHash = readJsonField(logEvent, blockHash, BlockHash)
if lastEth1Block == nil or lastEth1Block.number != blockNumber:
lastEth1Block = Eth1Block(
hash: blockHash.asEth2Digest,
number: blockNumber
# The `timestamp` is set in `syncBlockRange` immediately
# after calling this function, because we don't want to
# make this function `async`
)
result.add lastEth1Block
var
pubkey = init PubKeyBytes
withdrawalCredentials = init WithdrawalCredentialsBytes
amount = init Int64LeBytes
signature = init SignatureBytes
index = init Int64LeBytes
var offset = 0
offset += decode(logEvent.data, 0, offset, pubkey)
offset += decode(logEvent.data, 0, offset, withdrawalCredentials)
offset += decode(logEvent.data, 0, offset, amount)
offset += decode(logEvent.data, 0, offset, signature)
offset += decode(logEvent.data, 0, offset, index)
if pubkey.len != 48 or
withdrawalCredentials.len != 32 or
amount.len != 8 or
signature.len != 96 or
index.len != 8:
raise newException(CorruptDataProvider,
"Web3 provider supplied invalid deposit logs")
lastEth1Block.deposits.add DepositData(
pubkey: ValidatorPubKey.init(pubkey.toArray),
withdrawal_credentials: Eth2Digest(data: withdrawalCredentials.toArray),
amount: bytes_to_uint64(amount.toArray).Gwei,
signature: ValidatorSig.init(signature.toArray))
type
DepositContractDataStatus = enum
Fetched
VerifiedCorrect
DepositRootIncorrect
DepositRootUnavailable
DepositCountIncorrect
DepositCountUnavailable
when hasDepositRootChecks:
const
contractCallTimeout = 60.seconds
proc fetchDepositContractData(
connection: ELConnection,
rpcClient: RpcClient,
depositContract: Sender[DepositContract],
blk: Eth1Block
): Future[DepositContractDataStatus] {.async: (raises: [CancelledError]).} =
let
startTime = Moment.now()
deadline = sleepAsync(contractCallTimeout)
depositRootFut =
depositContract.get_deposit_root.call(blockNumber = blk.number)
rawCountFut =
depositContract.get_deposit_count.call(blockNumber = blk.number)
engineFut1 = connection.engineApiRequest(
depositRootFut, "get_deposit_root", startTime, deadline,
failureAllowed = true)
engineFut2 = connection.engineApiRequest(
rawCountFut, "get_deposit_count", startTime, deadline,
failureAllowed = true)
try:
await allFutures(engineFut1, engineFut2)
except CancelledError as exc:
var pending: seq[Future[void]]
if not(engineFut1.finished()):
pending.add(engineFut1.cancelAndWait())
if not(engineFut2.finished()):
pending.add(engineFut2.cancelAndWait())
await noCancel allFutures(pending)
raise exc
var res: DepositContractDataStatus
try:
# `engineFut1` could hold timeout exception `DataProviderTimeout`.
discard engineFut1.read()
let fetchedRoot = asEth2Digest(depositRootFut.read())
if blk.depositRoot.isZero:
blk.depositRoot = fetchedRoot
res = Fetched
elif blk.depositRoot == fetchedRoot:
res = VerifiedCorrect
else:
res = DepositRootIncorrect
except CatchableError as exc:
debug "Failed to fetch deposits root", block_number = blk.number,
reason = exc.msg
res = DepositRootUnavailable
try:
# `engineFut2` could hold timeout exception `DataProviderTimeout`.
discard engineFut2.read()
let fetchedCount = bytes_to_uint64(rawCountFut.read().toArray)
if blk.depositCount == 0:
blk.depositCount = fetchedCount
elif blk.depositCount != fetchedCount:
res = DepositCountIncorrect
except CatchableError as exc:
debug "Failed to fetch deposits count", block_number = blk.number,
reason = exc.msg
res = DepositCountUnavailable
res
template trackFinalizedState*(m: ELManager,
finalizedEth1Data: Eth1Data,
finalizedStateDepositIndex: uint64): bool =
trackFinalizedState(m.eth1Chain, finalizedEth1Data, finalizedStateDepositIndex)
template getBlockProposalData*(m: ELManager,
state: ForkedHashedBeaconState,
finalizedEth1Data: Eth1Data,
finalizedStateDepositIndex: uint64):
BlockProposalEth1Data =
getBlockProposalData(
m.eth1Chain, state, finalizedEth1Data, finalizedStateDepositIndex)
func new*(T: type ELConnection, engineUrl: EngineApiUrl): T =
ELConnection(
engineUrl: engineUrl,
depositContractSyncStatus: DepositContractSyncStatus.unknown)
proc new*(T: type ELManager,
cfg: RuntimeConfig,
depositContractBlockNumber: uint64,
depositContractBlockHash: Eth2Digest,
db: BeaconChainDB,
engineApiUrls: seq[EngineApiUrl],
eth1Network: Option[Eth1Network]): T =
let
eth1Chain = Eth1Chain.init(
cfg, db, depositContractBlockNumber, depositContractBlockHash)
debug "Initializing ELManager",
depositContractBlockNumber,
depositContractBlockHash
T(eth1Chain: eth1Chain,
depositContractAddress: cfg.DEPOSIT_CONTRACT_ADDRESS,
depositContractBlockNumber: depositContractBlockNumber,
depositContractBlockHash: depositContractBlockHash.asBlockHash,
elConnections: mapIt(engineApiUrls, ELConnection.new(it)),
eth1Network: eth1Network,
blocksPerLogsRequest: targetBlocksPerLogsRequest,
managerState: ELManagerState.Running)
proc stop(m: ELManager) {.async: (raises: []).} =
if m.managerState notin {ELManagerState.Closing, ELManagerState.Closed}:
m.managerState = ELManagerState.Closing
var pending: seq[Future[void].Raising([])]
if not(m.chainSyncingLoopFut.isNil()) and
not(m.chainSyncingLoopFut.finished()):
pending.add(m.chainSyncingLoopFut.cancelAndWait())
if not(m.exchangeTransitionConfigurationLoopFut.isNil()) and
not(m.exchangeTransitionConfigurationLoopFut.finished()):
pending.add(m.exchangeTransitionConfigurationLoopFut.cancelAndWait())
for connection in m.elConnections:
pending.add(connection.close())
await noCancel allFutures(pending)
m.managerState = ELManagerState.Closed
const
votedBlocksSafetyMargin = 50
func earliestBlockOfInterest(
m: ELManager,
latestEth1BlockNumber: Eth1BlockNumber): Eth1BlockNumber =
let blocksOfInterestRange =
SLOTS_PER_ETH1_VOTING_PERIOD +
(2 * m.cfg.ETH1_FOLLOW_DISTANCE) +
votedBlocksSafetyMargin
if latestEth1BlockNumber > blocksOfInterestRange.Eth1BlockNumber:
latestEth1BlockNumber - blocksOfInterestRange
else:
0.Eth1BlockNumber
proc syncBlockRange(
m: ELManager,
connection: ELConnection,
rpcClient: RpcClient,
depositContract: Sender[DepositContract],
fromBlock, toBlock,
fullSyncFromBlock: Eth1BlockNumber
) {.async: (raises: [CatchableError]).} =
doAssert m.eth1Chain.blocks.len > 0
var currentBlock = fromBlock
while currentBlock <= toBlock:
var
depositLogs: seq[JsonString]
maxBlockNumberRequested: Eth1BlockNumber
backoff = 100
while true:
maxBlockNumberRequested =
min(toBlock, currentBlock + m.blocksPerLogsRequest - 1)
debug "Obtaining deposit log events",
fromBlock = currentBlock,
toBlock = maxBlockNumberRequested,
backoff
debug.logTime "Deposit logs obtained":
# Reduce all request rate until we have a more general solution
# for dealing with Infura's rate limits
await sleepAsync(milliseconds(backoff))
depositLogs =
try:
await connection.engineApiRequest(
depositContract.getJsonLogs(
DepositEvent,
fromBlock = some blockId(currentBlock),
toBlock = some blockId(maxBlockNumberRequested)),
"getLogs", Moment.now(), 30.seconds)
except CancelledError as exc:
debug "Request for deposit logs was interrupted"
raise exc
except CatchableError as exc:
debug "Request for deposit logs failed", reason = exc.msg
inc failed_web3_requests
backoff = (backoff * 3) div 2
m.blocksPerLogsRequest = m.blocksPerLogsRequest div 2
if m.blocksPerLogsRequest == 0:
m.blocksPerLogsRequest = 1
raise exc
continue
m.blocksPerLogsRequest = min(
(m.blocksPerLogsRequest * 3 + 1) div 2,
targetBlocksPerLogsRequest)
currentBlock = maxBlockNumberRequested + 1
break
let blocksWithDeposits = depositEventsToBlocks(depositLogs)
for i in 0 ..< blocksWithDeposits.len:
let blk = blocksWithDeposits[i]
if blk.number > fullSyncFromBlock:
try:
await fetchTimestamp(connection, rpcClient, blk)
except CancelledError as exc:
debug "Request for block timestamp was interrupted",
block_number = blk.number
raise exc
except CatchableError as exc:
debug "Request for block timestamp failed",
block_number = blk.number, reason = exc.msg
let lastBlock = m.eth1Chain.blocks.peekLast
for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number:
debug "Obtaining block without deposits", blockNum = n
let noDepositsBlock =
try:
raiseIfNil await connection.engineApiRequest(
rpcClient.getBlockByNumber(n),
"getBlockByNumber", Moment.now(), web3RequestsTimeout)
except CancelledError as exc:
debug "The process of obtaining the block was interrupted",
block_number = n
raise exc
except CatchableError as exc:
debug "Request for block failed", block_number = n,
reason = exc.msg
raise exc
m.eth1Chain.addBlock(
lastBlock.makeSuccessorWithoutDeposits(noDepositsBlock))
eth1_synced_head.set noDepositsBlock.number.toGaugeValue
m.eth1Chain.addBlock blk
eth1_synced_head.set blk.number.toGaugeValue
if blocksWithDeposits.len > 0:
let lastIdx = blocksWithDeposits.len - 1
template lastBlock: auto = blocksWithDeposits[lastIdx]
let status =
when hasDepositRootChecks:
await fetchDepositContractData(
connection, rpcClient, depositContract, lastBlock)
else:
DepositRootUnavailable
when hasDepositRootChecks:
debug "Deposit contract state verified",
status = $status,
ourCount = lastBlock.depositCount,
ourRoot = lastBlock.depositRoot
case status
of DepositRootIncorrect, DepositCountIncorrect:
raise newException(CorruptDataProvider,
"The deposit log events disagree with the deposit contract state")
else:
discard
info "Eth1 sync progress",
blockNumber = lastBlock.number,
depositsProcessed = lastBlock.depositCount
func hasConnection*(m: ELManager): bool =
m.elConnections.len > 0
func hasAnyWorkingConnection*(m: ELManager): bool =
m.elConnections.anyIt(it.state == Working or it.state == NeverTested)
func hasProperlyConfiguredConnection*(m: ELManager): bool =
for connection in m.elConnections:
if connection.etcStatus == EtcStatus.match:
return true
false
proc startExchangeTransitionConfigurationLoop(
m: ELManager
) {.async: (raises: [CancelledError]).} =
debug "Starting exchange transition configuration loop"
while true:
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#specification-3
debug "Exchange transition configuration tick"
await m.exchangeTransitionConfiguration()
await sleepAsync(60.seconds)
proc syncEth1Chain(
m: ELManager,
connection: ELConnection
) {.async: (raises: [CatchableError]).} =
let rpcClient =
try:
await connection.connectedRpcClient().wait(1.seconds)
except AsyncTimeoutError:
raise newException(DataProviderTimeout, "Connection timed out")
let
# BEWARE
# `connectedRpcClient` guarantees that connection.web3 will not be
# `none` here, but it's not safe to initialize this later (e.g closer
# to where it's used) because `connection.web3` may be set to `none`
# at any time after a failed request. Luckily, the `contractSender`
# object is very cheap to create.
depositContract = connection.web3.get.contractSender(
DepositContract, m.depositContractAddress)
shouldProcessDeposits = not (
m.depositContractAddress.isZeroMemory or
m.eth1Chain.finalizedBlockHash.data.isZeroMemory)
trace "Starting syncEth1Chain", shouldProcessDeposits
logScope:
url = connection.engineUrl.url
# We might need to reset the chain if the new provider disagrees
# with the previous one regarding the history of the chain or if
# we have detected a conensus violation - our view disagreeing with
# the majority of the validators in the network.
#
# Consensus violations happen in practice because the web3 providers
# sometimes return incomplete or incorrect deposit log events even
# when they don't indicate any errors in the response. When this
# happens, we are usually able to download the data successfully
# on the second attempt.
#
# TODO
# Perhaps the above problem was manifesting only with the obsolete
# JSON-RPC data providers, which can no longer be used with Nimbus.
if m.eth1Chain.blocks.len > 0:
let needsReset = m.eth1Chain.hasConsensusViolation or (block:
let
lastKnownBlock = m.eth1Chain.blocks.peekLast
matchingBlockAtNewEl =
try:
raiseIfNil await connection.engineApiRequest(
rpcClient.getBlockByNumber(lastKnownBlock.number),
"getBlockByNumber", Moment.now(), web3RequestsTimeout)
except CancelledError as exc:
debug "getBlockByNumber request has been interrupted",
last_known_block_number = lastKnownBlock.number
raise exc
except CatchableError as exc:
debug "getBlockByNumber request failed",
last_known_block_number = lastKnownBlock.number,
reason = exc.msg
raise exc
lastKnownBlock.hash.asBlockHash != matchingBlockAtNewEl.hash)
if needsReset:
trace "Resetting the Eth1 chain",
hasConsensusViolation = m.eth1Chain.hasConsensusViolation
m.eth1Chain.clear()
var eth1SyncedTo: Eth1BlockNumber
if shouldProcessDeposits:
if m.eth1Chain.blocks.len == 0:
let finalizedBlockHash = m.eth1Chain.finalizedBlockHash.asBlockHash
let startBlock =
try:
raiseIfNil await connection.engineApiRequest(
rpcClient.getBlockByHash(finalizedBlockHash),
"getBlockByHash", Moment.now(), web3RequestsTimeout)
except CancelledError as exc:
debug "getBlockByHash() request has been interrupted",
finalized_block_hash = finalizedBlockHash
raise exc
except CatchableError as exc:
debug "getBlockByHash() request has failed",
finalized_block_hash = finalizedBlockHash,
reason = exc.msg
raise exc
m.eth1Chain.addBlock Eth1Block(
hash: m.eth1Chain.finalizedBlockHash,
number: Eth1BlockNumber startBlock.number,
timestamp: Eth1BlockTimestamp startBlock.timestamp)
eth1SyncedTo = m.eth1Chain.blocks[^1].number
eth1_synced_head.set eth1SyncedTo.toGaugeValue
eth1_finalized_head.set eth1SyncedTo.toGaugeValue
eth1_finalized_deposits.set(
m.eth1Chain.finalizedDepositsMerkleizer.getChunkCount.toGaugeValue)
debug "Starting Eth1 syncing", `from` = shortLog(m.eth1Chain.blocks[^1])
var latestBlockNumber: Eth1BlockNumber
while true:
debug "syncEth1Chain tick",
shouldProcessDeposits, latestBlockNumber, eth1SyncedTo
# TODO (cheatfate): This should be removed
if bnStatus == BeaconNodeStatus.Stopping:
await noCancel m.stop()
return
if m.eth1Chain.hasConsensusViolation:
raise newException(CorruptDataProvider,
"Eth1 chain contradicts Eth2 consensus")
let latestBlock =
try:
raiseIfNil await connection.engineApiRequest(
rpcClient.eth_getBlockByNumber(blockId("latest"), false),
"getBlockByNumber", Moment.now(), web3RequestsTimeout)
except CancelledError as exc:
debug "Latest block request has been interrupted"
raise exc
except CatchableError as exc:
warn "Failed to obtain the latest block from the EL", reason = exc.msg
raise exc
latestBlockNumber = latestBlock.number
m.syncTargetBlock = some(
if latestBlock.number > m.cfg.ETH1_FOLLOW_DISTANCE.Eth1BlockNumber:
latestBlock.number - m.cfg.ETH1_FOLLOW_DISTANCE
else:
0.Eth1BlockNumber)
if m.syncTargetBlock.get <= eth1SyncedTo:
# The chain reorged to a lower height.
# It's relatively safe to ignore that.
await sleepAsync(m.cfg.SECONDS_PER_ETH1_BLOCK.int.seconds)
continue
eth1_latest_head.set latestBlock.number.toGaugeValue
if shouldProcessDeposits and
latestBlock.number.uint64 > m.cfg.ETH1_FOLLOW_DISTANCE:
try:
await m.syncBlockRange(connection,
rpcClient,
depositContract,
eth1SyncedTo + 1,
m.syncTargetBlock.get,
m.earliestBlockOfInterest(latestBlock.number))
except CancelledError as exc:
debug "Syncing block range process has been interrupted"
raise exc
except CatchableError as exc:
debug "Syncing block range process has been failed", reason = exc.msg
raise exc
eth1SyncedTo = m.syncTargetBlock.get
eth1_synced_head.set eth1SyncedTo.toGaugeValue
proc startChainSyncingLoop(
m: ELManager
) {.async: (raises: []).} =
info "Starting execution layer deposit syncing",
contract = $m.depositContractAddress
var syncedConnectionFut = m.selectConnectionForChainSyncing()
info "Connection attempt started"
var runLoop = true
while runLoop:
try:
let connection = await syncedConnectionFut.wait(60.seconds)
await syncEth1Chain(m, connection)
except AsyncTimeoutError:
notice "No synced EL nodes available for deposit syncing"
try:
await sleepAsync(chronos.seconds(30))
except CancelledError:
runLoop = false
except CancelledError:
runLoop = false
except CatchableError:
try:
await sleepAsync(10.seconds)
except CancelledError:
runLoop = false
break
debug "Restarting the deposit syncing loop"
# A more detailed error is already logged by trackEngineApiRequest
# To be extra safe, we will make a fresh connection attempt
await syncedConnectionFut.cancelAndWait()
syncedConnectionFut = m.selectConnectionForChainSyncing()
debug "EL chain syncing process has been stopped"
proc start*(m: ELManager, syncChain = true) {.gcsafe.} =
if m.elConnections.len == 0:
return
## Calling `ELManager.start()` on an already started ELManager is a noop
if syncChain and m.chainSyncingLoopFut.isNil:
m.chainSyncingLoopFut =
m.startChainSyncingLoop()
if m.hasJwtSecret and m.exchangeTransitionConfigurationLoopFut.isNil:
m.exchangeTransitionConfigurationLoopFut =
m.startExchangeTransitionConfigurationLoop()
func `$`(x: Quantity): string =
$(x.uint64)
func `$`(x: BlockObject): string =
$(x.number) & " [" & $(x.hash) & "]"
proc testWeb3Provider*(
web3Url: Uri,
depositContractAddress: Eth1Address,
jwtSecret: Opt[seq[byte]]
) {.async: (raises: [CatchableError]).} =
stdout.write "Establishing web3 connection..."
let web3 =
try:
await newWeb3($web3Url,
getJsonRpcRequestHeaders(jwtSecret)).wait(5.seconds)
except CatchableError as exc:
stdout.write "\rEstablishing web3 connection: Failure(" & exc.msg & ")\n"
quit 1
stdout.write "\rEstablishing web3 connection: Connected\n"
template request(actionDesc: static string,
action: untyped): untyped =
stdout.write actionDesc & "..."
stdout.flushFile()
var res: typeof(read action)
try:
let fut = action
res = await fut.wait(web3RequestsTimeout)
when res is BlockObject:
res = raiseIfNil res
stdout.write "\r" & actionDesc & ": " & $res
except CatchableError as err:
stdout.write "\r" & actionDesc & ": Error(" & err.msg & ")"
stdout.write "\n"
res
discard request "Chain ID":
web3.provider.eth_chainId()
discard request "Sync status":
web3.provider.eth_syncing()
let
latestBlock = request "Latest block":
web3.provider.eth_getBlockByNumber(blockId("latest"), false)
ns = web3.contractSender(DepositContract, depositContractAddress)
discard request "Deposit root":
ns.get_deposit_root.call(blockNumber = latestBlock.number)