1895 lines
73 KiB
Nim
1895 lines
73 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[strformat, strutils, sequtils, typetraits, uri, json],
|
|
# Nimble packages:
|
|
chronos, metrics, chronicles/timings,
|
|
json_rpc/[client, errors],
|
|
web3, web3/[engine_api, primitives, conversions],
|
|
eth/common/[eth_types, transaction],
|
|
eth/async_utils, results,
|
|
stew/[assign2, byteutils, objects],
|
|
# Local modules:
|
|
../spec/[eth2_merkleization, forks, helpers],
|
|
../networking/network_metadata,
|
|
".."/[beacon_node_status, future_combinators],
|
|
"."/[eth1_chain, el_conf]
|
|
|
|
from std/times import getTime, inSeconds, initTime, `-`
|
|
from ../spec/engine_authentication import getSignedIatToken
|
|
from ../spec/state_transition_block import kzg_commitment_to_versioned_hash
|
|
|
|
export
|
|
eth1_chain, el_conf, engine_api, base
|
|
|
|
logScope:
|
|
topics = "elman"
|
|
|
|
type
|
|
PubKeyBytes = DynamicBytes[48, 48]
|
|
WithdrawalCredentialsBytes = DynamicBytes[32, 32]
|
|
SignatureBytes = DynamicBytes[96, 96]
|
|
Int64LeBytes = DynamicBytes[8, 8]
|
|
|
|
contract(DepositContract):
|
|
proc deposit(pubkey: PubKeyBytes,
|
|
withdrawalCredentials: WithdrawalCredentialsBytes,
|
|
signature: SignatureBytes,
|
|
deposit_data_root: FixedBytes[32])
|
|
|
|
proc get_deposit_root(): FixedBytes[32]
|
|
proc get_deposit_count(): Int64LeBytes
|
|
|
|
proc DepositEvent(pubkey: PubKeyBytes,
|
|
withdrawalCredentials: WithdrawalCredentialsBytes,
|
|
amount: Int64LeBytes,
|
|
signature: SignatureBytes,
|
|
index: Int64LeBytes) {.event.}
|
|
|
|
const
|
|
hasDepositRootChecks = defined(has_deposit_root_checks)
|
|
|
|
targetBlocksPerLogsRequest = 1000'u64
|
|
# TODO
|
|
#
|
|
# This is currently set to 1000, because this was the default maximum
|
|
# value in Besu circa our 22.3.0 release. Previously, we've used 5000,
|
|
# but this was effectively forcing the fallback logic in `syncBlockRange`
|
|
# to always execute multiple requests before getting a successful response.
|
|
#
|
|
# Besu have raised this default to 5000 in https://github.com/hyperledger/besu/pull/5209
|
|
# which is expected to ship in their next release.
|
|
#
|
|
# Full deposits sync time with various values for this parameter:
|
|
#
|
|
# Blocks per request | Geth running on the same host | Geth running on a more distant host
|
|
# ----------------------------------------------------------------------------------------
|
|
# 1000 | 11m 20s | 22m
|
|
# 5000 | 5m 20s | 15m 40s
|
|
# 100000 | 4m 10s | not tested
|
|
#
|
|
# The number of requests scales linearly with the parameter value as you would expect.
|
|
#
|
|
# These results suggest that it would be reasonable for us to get back to 5000 once the
|
|
# Besu release is well-spread within their userbase.
|
|
|
|
# Engine API timeouts
|
|
engineApiConnectionTimeout = 5.seconds # How much we wait before giving up connecting to the Engine API
|
|
web3RequestsTimeout* = 8.seconds # How much we wait for eth_* requests (e.g. eth_getBlockByHash)
|
|
|
|
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#request-2
|
|
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#request-2
|
|
GETPAYLOAD_TIMEOUT = 1.seconds
|
|
|
|
connectionStateChangeHysteresisThreshold = 15
|
|
## How many unsuccesful/successful requests we must see
|
|
## before declaring the connection as degraded/restored
|
|
|
|
type
|
|
NextExpectedPayloadParams* = object
|
|
headBlockHash*: Eth2Digest
|
|
safeBlockHash*: Eth2Digest
|
|
finalizedBlockHash*: Eth2Digest
|
|
payloadAttributes*: PayloadAttributesV3
|
|
|
|
ELManager* = ref object
|
|
eth1Network: Option[Eth1Network]
|
|
## If this value is supplied the EL manager will check whether
|
|
## all configured EL nodes are connected to the same network.
|
|
|
|
depositContractAddress*: Eth1Address
|
|
depositContractBlockNumber: uint64
|
|
depositContractBlockHash: BlockHash
|
|
|
|
blocksPerLogsRequest: uint64
|
|
## This value is used to dynamically adjust the number of
|
|
## blocks we are trying to download at once during deposit
|
|
## syncing. By default, the value is set to the constant
|
|
## `targetBlocksPerLogsRequest`, but if the EL is failing
|
|
## to serve this number of blocks per single `eth_getLogs`
|
|
## request, we temporarily lower the value until the request
|
|
## succeeds. The failures are generally expected only in
|
|
## periods in the history for very high deposit density.
|
|
|
|
elConnections: seq[ELConnection]
|
|
## All active EL connections
|
|
|
|
eth1Chain: Eth1Chain
|
|
## At larger distances, this chain consists of all blocks
|
|
## with deposits. Within the relevant voting period, it
|
|
## also includes blocks without deposits because we must
|
|
## vote for a block only if it's part of our known history.
|
|
|
|
syncTargetBlock: Option[Eth1BlockNumber]
|
|
|
|
chainSyncingLoopFut: Future[void]
|
|
exchangeTransitionConfigurationLoopFut: Future[void]
|
|
stopFut: Future[void]
|
|
|
|
nextExpectedPayloadParams*: Option[NextExpectedPayloadParams]
|
|
|
|
EtcStatus {.pure.} = enum
|
|
notExchangedYet
|
|
mismatch
|
|
match
|
|
|
|
DepositContractSyncStatus {.pure.} = enum
|
|
unknown
|
|
notSynced
|
|
synced
|
|
|
|
ConnectionState = enum
|
|
NeverTested
|
|
Working
|
|
Degraded
|
|
|
|
ELConnection* = ref object
|
|
engineUrl: EngineApiUrl
|
|
|
|
web3: Option[Web3]
|
|
## This will be `none` before connecting and while we are
|
|
## reconnecting after a lost connetion. You can wait on
|
|
## the future below for the moment the connection is active.
|
|
|
|
connectingFut: Future[Result[Web3, string]].Raising([CancelledError])
|
|
## This future will be replaced when the connection is lost.
|
|
|
|
etcStatus: EtcStatus
|
|
## The latest status of the `exchangeTransitionConfiguration`
|
|
## exchange.
|
|
|
|
state: ConnectionState
|
|
hysteresisCounter: int
|
|
|
|
depositContractSyncStatus: DepositContractSyncStatus
|
|
## Are we sure that this EL has synced the deposit contract?
|
|
|
|
lastPayloadId: Option[engine_api.PayloadID]
|
|
|
|
FullBlockId* = object
|
|
number: Eth1BlockNumber
|
|
hash: BlockHash
|
|
|
|
DataProviderFailure* = object of CatchableError
|
|
CorruptDataProvider* = object of DataProviderFailure
|
|
DataProviderTimeout* = object of DataProviderFailure
|
|
|
|
DisconnectHandler* = proc () {.gcsafe, raises: [].}
|
|
|
|
DepositEventHandler* = proc (
|
|
pubkey: PubKeyBytes,
|
|
withdrawalCredentials: WithdrawalCredentialsBytes,
|
|
amount: Int64LeBytes,
|
|
signature: SignatureBytes,
|
|
merkleTreeIndex: Int64LeBytes,
|
|
j: JsonNode) {.gcsafe, raises: [].}
|
|
|
|
BellatrixExecutionPayloadWithValue* = object
|
|
executionPayload*: ExecutionPayloadV1
|
|
blockValue*: UInt256
|
|
|
|
SomeEnginePayloadWithValue =
|
|
BellatrixExecutionPayloadWithValue |
|
|
GetPayloadV2Response |
|
|
GetPayloadV3Response
|
|
|
|
declareCounter failed_web3_requests,
|
|
"Failed web3 requests"
|
|
|
|
declareGauge eth1_latest_head,
|
|
"The highest Eth1 block number observed on the network"
|
|
|
|
declareGauge eth1_synced_head,
|
|
"Block number of the highest synchronized block according to follow distance"
|
|
|
|
declareCounter engine_api_responses,
|
|
"Number of successful requests to the newPayload Engine API end-point",
|
|
labels = ["url", "request", "status"]
|
|
|
|
declareHistogram engine_api_request_duration_seconds,
|
|
"Time(s) used to generate signature usign remote signer",
|
|
buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
|
|
labels = ["url", "request"]
|
|
|
|
declareCounter engine_api_timeouts,
|
|
"Number of timed-out requests to Engine API end-point",
|
|
labels = ["url", "request"]
|
|
|
|
declareCounter engine_api_last_minute_forkchoice_updates_sent,
|
|
"Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals",
|
|
labels = ["url"]
|
|
|
|
proc close(connection: ELConnection): Future[void] {.async.} =
|
|
if connection.web3.isSome:
|
|
awaitWithTimeout(connection.web3.get.close(), 30.seconds):
|
|
debug "Failed to close data provider in time"
|
|
|
|
proc increaseCounterTowardsStateChange(connection: ELConnection): bool =
|
|
result = connection.hysteresisCounter >= connectionStateChangeHysteresisThreshold
|
|
if result:
|
|
connection.hysteresisCounter = 0
|
|
else:
|
|
inc connection.hysteresisCounter
|
|
|
|
proc decreaseCounterTowardsStateChange(connection: ELConnection) =
|
|
if connection.hysteresisCounter > 0:
|
|
# While we increase the counter by 1, we decreate it by 20% in order
|
|
# to require a steady and affirmative change instead of allowing
|
|
# the counter to drift very slowly in one direction when the ratio
|
|
# between success and failure is roughly 50:50%
|
|
connection.hysteresisCounter = connection.hysteresisCounter div 5
|
|
|
|
proc setDegradedState(connection: ELConnection,
|
|
requestName: string,
|
|
statusCode: int, errMsg: string) =
|
|
debug "Failed EL Request", requestName, statusCode, err = errMsg
|
|
|
|
case connection.state
|
|
of NeverTested, Working:
|
|
if connection.increaseCounterTowardsStateChange():
|
|
warn "Connection to EL node degraded",
|
|
url = url(connection.engineUrl),
|
|
failedRequest = requestName,
|
|
statusCode, err = errMsg
|
|
|
|
connection.state = Degraded
|
|
|
|
asyncSpawn connection.close()
|
|
connection.web3 = none[Web3]()
|
|
of Degraded:
|
|
connection.decreaseCounterTowardsStateChange()
|
|
|
|
proc setWorkingState(connection: ELConnection) =
|
|
case connection.state
|
|
of NeverTested:
|
|
connection.hysteresisCounter = 0
|
|
connection.state = Working
|
|
of Degraded:
|
|
if connection.increaseCounterTowardsStateChange():
|
|
info "Connection to EL node restored",
|
|
url = url(connection.engineUrl)
|
|
|
|
connection.state = Working
|
|
of Working:
|
|
connection.decreaseCounterTowardsStateChange()
|
|
|
|
proc trackEngineApiRequest(connection: ELConnection,
|
|
request: FutureBase, requestName: string,
|
|
startTime: Moment, deadline: Future[void],
|
|
failureAllowed = false) =
|
|
request.addCallback do (udata: pointer) {.gcsafe, raises: [].}:
|
|
# TODO `udata` is nil here. How come?
|
|
# This forces us to create a GC cycle between the Future and the closure
|
|
if request.completed:
|
|
engine_api_request_duration_seconds.observe(
|
|
float(milliseconds(Moment.now - startTime)) / 1000.0,
|
|
[connection.engineUrl.url, requestName])
|
|
|
|
connection.setWorkingState()
|
|
|
|
deadline.addCallback do (udata: pointer) {.gcsafe, raises: [].}:
|
|
if not request.finished:
|
|
request.cancelSoon()
|
|
engine_api_timeouts.inc(1, [connection.engineUrl.url, requestName])
|
|
if not failureAllowed:
|
|
connection.setDegradedState(requestName, 0, "Request timed out")
|
|
else:
|
|
let statusCode = if not request.failed:
|
|
200
|
|
elif request.error of ErrorResponse:
|
|
((ref ErrorResponse) request.error).status
|
|
else:
|
|
0
|
|
|
|
if request.failed and not failureAllowed:
|
|
connection.setDegradedState(requestName, statusCode, request.error.msg)
|
|
|
|
engine_api_responses.inc(1, [connection.engineUrl.url, requestName, $statusCode])
|
|
|
|
template awaitOrRaiseOnTimeout[T](fut: Future[T],
|
|
timeout: Duration): T =
|
|
awaitWithTimeout(fut, timeout):
|
|
raise newException(DataProviderTimeout, "Timeout")
|
|
|
|
template trackedRequestWithTimeout[T](connection: ELConnection,
|
|
requestName: static string,
|
|
lazyRequestExpression: Future[T],
|
|
timeout: Duration,
|
|
failureAllowed = false): T =
|
|
let
|
|
connectionParam = connection
|
|
startTime = Moment.now
|
|
deadline = sleepAsync(timeout)
|
|
request = lazyRequestExpression
|
|
|
|
connectionParam.trackEngineApiRequest(
|
|
request, requestName, startTime, deadline, failureAllowed)
|
|
|
|
awaitWithTimeout(request, deadline):
|
|
raise newException(DataProviderTimeout, "Timeout")
|
|
|
|
func raiseIfNil(web3block: BlockObject): BlockObject {.raises: [ValueError].} =
|
|
if web3block == nil:
|
|
raise newException(ValueError, "EL returned 'null' result for block")
|
|
web3block
|
|
|
|
template cfg(m: ELManager): auto =
|
|
m.eth1Chain.cfg
|
|
|
|
func hasJwtSecret*(m: ELManager): bool =
|
|
for c in m.elConnections:
|
|
if c.engineUrl.jwtSecret.isSome:
|
|
return true
|
|
|
|
func isSynced*(m: ELManager): bool =
|
|
m.syncTargetBlock.isSome and
|
|
m.eth1Chain.blocks.len > 0 and
|
|
m.syncTargetBlock.get <= m.eth1Chain.blocks[^1].number
|
|
|
|
template eth1ChainBlocks*(m: ELManager): Deque[Eth1Block] =
|
|
m.eth1Chain.blocks
|
|
|
|
template toGaugeValue(x: Quantity): int64 =
|
|
toGaugeValue(distinctBase x)
|
|
|
|
# TODO: Add cfg validation
|
|
# MIN_GENESIS_ACTIVE_VALIDATOR_COUNT should be larger than SLOTS_PER_EPOCH
|
|
# doAssert SECONDS_PER_ETH1_BLOCK * cfg.ETH1_FOLLOW_DISTANCE < GENESIS_DELAY,
|
|
# "Invalid configuration: GENESIS_DELAY is set too low"
|
|
|
|
func asConsensusWithdrawal(w: WithdrawalV1): capella.Withdrawal =
|
|
capella.Withdrawal(
|
|
index: w.index.uint64,
|
|
validator_index: w.validatorIndex.uint64,
|
|
address: ExecutionAddress(data: w.address.distinctBase),
|
|
amount: Gwei w.amount)
|
|
|
|
func asEngineWithdrawal(w: capella.Withdrawal): WithdrawalV1 =
|
|
WithdrawalV1(
|
|
index: Quantity(w.index),
|
|
validatorIndex: Quantity(w.validator_index),
|
|
address: Address(w.address.data),
|
|
amount: Quantity(w.amount))
|
|
|
|
func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV1):
|
|
bellatrix.ExecutionPayload =
|
|
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
|
|
bellatrix.Transaction.init(tt.distinctBase)
|
|
|
|
bellatrix.ExecutionPayload(
|
|
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
|
|
feeRecipient:
|
|
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
|
|
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
|
|
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
|
|
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
|
|
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
|
|
block_number: rpcExecutionPayload.blockNumber.uint64,
|
|
gas_limit: rpcExecutionPayload.gasLimit.uint64,
|
|
gas_used: rpcExecutionPayload.gasUsed.uint64,
|
|
timestamp: rpcExecutionPayload.timestamp.uint64,
|
|
extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes),
|
|
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
|
|
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
|
|
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.transactions, it.getTransaction)))
|
|
|
|
func asConsensusType*(payloadWithValue: BellatrixExecutionPayloadWithValue):
|
|
bellatrix.ExecutionPayloadForSigning =
|
|
bellatrix.ExecutionPayloadForSigning(
|
|
executionPayload: payloadWithValue.executionPayload.asConsensusType,
|
|
blockValue: payloadWithValue.blockValue)
|
|
|
|
template maybeDeref[T](o: Option[T]): T = o.get
|
|
template maybeDeref[V](v: V): V = v
|
|
|
|
func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV1OrV2|ExecutionPayloadV2):
|
|
capella.ExecutionPayload =
|
|
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
|
|
bellatrix.Transaction.init(tt.distinctBase)
|
|
|
|
capella.ExecutionPayload(
|
|
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
|
|
feeRecipient:
|
|
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
|
|
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
|
|
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
|
|
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
|
|
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
|
|
block_number: rpcExecutionPayload.blockNumber.uint64,
|
|
gas_limit: rpcExecutionPayload.gasLimit.uint64,
|
|
gas_used: rpcExecutionPayload.gasUsed.uint64,
|
|
timestamp: rpcExecutionPayload.timestamp.uint64,
|
|
extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes),
|
|
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
|
|
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
|
|
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.transactions, it.getTransaction)),
|
|
withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init(
|
|
mapIt(maybeDeref rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal)))
|
|
|
|
func asConsensusType*(payloadWithValue: engine_api.GetPayloadV2Response):
|
|
capella.ExecutionPayloadForSigning =
|
|
capella.ExecutionPayloadForSigning(
|
|
executionPayload: payloadWithValue.executionPayload.asConsensusType,
|
|
blockValue: payloadWithValue.blockValue)
|
|
|
|
func asConsensusType*(rpcExecutionPayload: ExecutionPayloadV3):
|
|
deneb.ExecutionPayload =
|
|
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
|
|
bellatrix.Transaction.init(tt.distinctBase)
|
|
|
|
deneb.ExecutionPayload(
|
|
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
|
|
feeRecipient:
|
|
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
|
|
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
|
|
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
|
|
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
|
|
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
|
|
block_number: rpcExecutionPayload.blockNumber.uint64,
|
|
gas_limit: rpcExecutionPayload.gasLimit.uint64,
|
|
gas_used: rpcExecutionPayload.gasUsed.uint64,
|
|
timestamp: rpcExecutionPayload.timestamp.uint64,
|
|
extra_data: List[byte, MAX_EXTRA_DATA_BYTES].init(rpcExecutionPayload.extraData.bytes),
|
|
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
|
|
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
|
|
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.transactions, it.getTransaction)),
|
|
withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal)),
|
|
blob_gas_used: rpcExecutionPayload.blobGasUsed.uint64,
|
|
excess_blob_gas: rpcExecutionPayload.excessBlobGas.uint64)
|
|
|
|
func asConsensusType*(payload: engine_api.GetPayloadV3Response):
|
|
deneb.ExecutionPayloadForSigning =
|
|
deneb.ExecutionPayloadForSigning(
|
|
executionPayload: payload.executionPayload.asConsensusType,
|
|
blockValue: payload.blockValue,
|
|
# TODO
|
|
# The `mapIt` calls below are necessary only because we use different distinct
|
|
# types for KZG commitments and Blobs in the `web3` and the `deneb` spec types.
|
|
# Both are defined as `array[N, byte]` under the hood.
|
|
blobsBundle: BlobsBundle(
|
|
commitments: KzgCommitments.init(
|
|
payload.blobsBundle.commitments.mapIt(it.bytes)),
|
|
proofs: KzgProofs.init(
|
|
payload.blobsBundle.proofs.mapIt(it.bytes)),
|
|
blobs: Blobs.init(
|
|
payload.blobsBundle.blobs.mapIt(it.bytes))))
|
|
|
|
func asEngineExecutionPayload*(executionPayload: bellatrix.ExecutionPayload):
|
|
ExecutionPayloadV1 =
|
|
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
|
|
TypedTransaction(tt.distinctBase)
|
|
|
|
engine_api.ExecutionPayloadV1(
|
|
parentHash: executionPayload.parent_hash.asBlockHash,
|
|
feeRecipient: Address(executionPayload.fee_recipient.data),
|
|
stateRoot: executionPayload.state_root.asBlockHash,
|
|
receiptsRoot: executionPayload.receipts_root.asBlockHash,
|
|
logsBloom:
|
|
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
|
|
prevRandao: executionPayload.prev_randao.asBlockHash,
|
|
blockNumber: Quantity(executionPayload.block_number),
|
|
gasLimit: Quantity(executionPayload.gas_limit),
|
|
gasUsed: Quantity(executionPayload.gas_used),
|
|
timestamp: Quantity(executionPayload.timestamp),
|
|
extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
|
|
baseFeePerGas: executionPayload.base_fee_per_gas,
|
|
blockHash: executionPayload.block_hash.asBlockHash,
|
|
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction))
|
|
|
|
template toEngineWithdrawal(w: capella.Withdrawal): WithdrawalV1 =
|
|
WithdrawalV1(
|
|
index: Quantity(w.index),
|
|
validatorIndex: Quantity(w.validator_index),
|
|
address: Address(w.address.data),
|
|
amount: Quantity(w.amount))
|
|
|
|
func asEngineExecutionPayload*(executionPayload: capella.ExecutionPayload):
|
|
ExecutionPayloadV2 =
|
|
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
|
|
TypedTransaction(tt.distinctBase)
|
|
engine_api.ExecutionPayloadV2(
|
|
parentHash: executionPayload.parent_hash.asBlockHash,
|
|
feeRecipient: Address(executionPayload.fee_recipient.data),
|
|
stateRoot: executionPayload.state_root.asBlockHash,
|
|
receiptsRoot: executionPayload.receipts_root.asBlockHash,
|
|
logsBloom:
|
|
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
|
|
prevRandao: executionPayload.prev_randao.asBlockHash,
|
|
blockNumber: Quantity(executionPayload.block_number),
|
|
gasLimit: Quantity(executionPayload.gas_limit),
|
|
gasUsed: Quantity(executionPayload.gas_used),
|
|
timestamp: Quantity(executionPayload.timestamp),
|
|
extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
|
|
baseFeePerGas: executionPayload.base_fee_per_gas,
|
|
blockHash: executionPayload.block_hash.asBlockHash,
|
|
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction),
|
|
withdrawals: mapIt(executionPayload.withdrawals, it.toEngineWithdrawal))
|
|
|
|
func asEngineExecutionPayload*(executionPayload: deneb.ExecutionPayload):
|
|
ExecutionPayloadV3 =
|
|
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
|
|
TypedTransaction(tt.distinctBase)
|
|
|
|
engine_api.ExecutionPayloadV3(
|
|
parentHash: executionPayload.parent_hash.asBlockHash,
|
|
feeRecipient: Address(executionPayload.fee_recipient.data),
|
|
stateRoot: executionPayload.state_root.asBlockHash,
|
|
receiptsRoot: executionPayload.receipts_root.asBlockHash,
|
|
logsBloom:
|
|
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
|
|
prevRandao: executionPayload.prev_randao.asBlockHash,
|
|
blockNumber: Quantity(executionPayload.block_number),
|
|
gasLimit: Quantity(executionPayload.gas_limit),
|
|
gasUsed: Quantity(executionPayload.gas_used),
|
|
timestamp: Quantity(executionPayload.timestamp),
|
|
extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
|
|
baseFeePerGas: executionPayload.base_fee_per_gas,
|
|
blockHash: executionPayload.block_hash.asBlockHash,
|
|
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction),
|
|
withdrawals: mapIt(executionPayload.withdrawals, it.asEngineWithdrawal),
|
|
blobGasUsed: Quantity(executionPayload.blob_gas_used),
|
|
excessBlobGas: Quantity(executionPayload.excess_blob_gas))
|
|
|
|
func asEngineExecutionPayload*(executionPayload: electra.ExecutionPayload):
|
|
ExecutionPayloadV3 =
|
|
debugRaiseAssert "asEngineExecutionPayload for electra.ExecutionPayload probably won't use ExecutionPayloadV3"
|
|
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
|
|
TypedTransaction(tt.distinctBase)
|
|
|
|
engine_api.ExecutionPayloadV3(
|
|
parentHash: executionPayload.parent_hash.asBlockHash,
|
|
feeRecipient: Address(executionPayload.fee_recipient.data),
|
|
stateRoot: executionPayload.state_root.asBlockHash,
|
|
receiptsRoot: executionPayload.receipts_root.asBlockHash,
|
|
logsBloom:
|
|
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
|
|
prevRandao: executionPayload.prev_randao.asBlockHash,
|
|
blockNumber: Quantity(executionPayload.block_number),
|
|
gasLimit: Quantity(executionPayload.gas_limit),
|
|
gasUsed: Quantity(executionPayload.gas_used),
|
|
timestamp: Quantity(executionPayload.timestamp),
|
|
extraData: DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
|
|
baseFeePerGas: executionPayload.base_fee_per_gas,
|
|
blockHash: executionPayload.block_hash.asBlockHash,
|
|
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction),
|
|
withdrawals: mapIt(executionPayload.withdrawals, it.asEngineWithdrawal),
|
|
blobGasUsed: Quantity(executionPayload.blob_gas_used),
|
|
excessBlobGas: Quantity(executionPayload.excess_blob_gas))
|
|
|
|
func isConnected(connection: ELConnection): bool =
|
|
connection.web3.isSome
|
|
|
|
func getJsonRpcRequestHeaders(jwtSecret: Opt[seq[byte]]):
|
|
auto =
|
|
if jwtSecret.isSome:
|
|
let secret = jwtSecret.get
|
|
(proc(): seq[(string, string)] =
|
|
# https://www.rfc-editor.org/rfc/rfc6750#section-6.1.1
|
|
@[("Authorization", "Bearer " & getSignedIatToken(
|
|
secret, (getTime() - initTime(0, 0)).inSeconds))])
|
|
else:
|
|
(proc(): seq[(string, string)] = @[])
|
|
|
|
proc newWeb3*(engineUrl: EngineApiUrl): Future[Web3] =
|
|
newWeb3(engineUrl.url,
|
|
getJsonRpcRequestHeaders(engineUrl.jwtSecret),
|
|
httpFlags = {HttpClientFlag.NewConnectionAlways})
|
|
|
|
proc establishEngineApiConnection(url: EngineApiUrl):
|
|
Future[Result[Web3, string]] {.
|
|
async: (raises: [CancelledError]).} =
|
|
try:
|
|
ok(await newWeb3(url).wait(engineApiConnectionTimeout))
|
|
except AsyncTimeoutError:
|
|
err "Engine API connection timed out"
|
|
except CancelledError as exc:
|
|
raise exc
|
|
except CatchableError as exc:
|
|
err exc.msg
|
|
|
|
proc tryConnecting(connection: ELConnection): Future[bool] {.
|
|
async: (raises: [CancelledError]).} =
|
|
if connection.isConnected:
|
|
return true
|
|
|
|
if connection.connectingFut == nil or
|
|
connection.connectingFut.finished: # The previous attempt was not successful
|
|
connection.connectingFut = establishEngineApiConnection(connection.engineUrl)
|
|
|
|
let web3Res = await connection.connectingFut
|
|
if web3Res.isErr:
|
|
warn "Engine API connection failed", err = web3Res.error
|
|
return false
|
|
else:
|
|
connection.web3 = some web3Res.get
|
|
return true
|
|
|
|
proc connectedRpcClient(connection: ELConnection): Future[RpcClient] {.
|
|
async: (raises: [CancelledError]).} =
|
|
while not connection.isConnected:
|
|
if not await connection.tryConnecting():
|
|
await sleepAsync(chronos.seconds(10))
|
|
|
|
return connection.web3.get.provider
|
|
|
|
proc getBlockByHash(rpcClient: RpcClient, hash: BlockHash): Future[BlockObject] =
|
|
rpcClient.eth_getBlockByHash(hash, false)
|
|
|
|
proc getBlockByNumber*(rpcClient: RpcClient,
|
|
number: Eth1BlockNumber): Future[BlockObject] =
|
|
let hexNumber = try:
|
|
&"0x{number:X}" # No leading 0's!
|
|
except ValueError as exc:
|
|
# Since the format above is valid, failing here should not be possible
|
|
raiseAssert exc.msg
|
|
|
|
rpcClient.eth_getBlockByNumber(hexNumber, false)
|
|
|
|
func areSameAs(expectedParams: Option[NextExpectedPayloadParams],
|
|
latestHead, latestSafe, latestFinalized: Eth2Digest,
|
|
timestamp: uint64,
|
|
randomData: Eth2Digest,
|
|
feeRecipient: Eth1Address,
|
|
withdrawals: seq[WithdrawalV1]): bool =
|
|
expectedParams.isSome and
|
|
expectedParams.get.headBlockHash == latestHead and
|
|
expectedParams.get.safeBlockHash == latestSafe and
|
|
expectedParams.get.finalizedBlockHash == latestFinalized and
|
|
expectedParams.get.payloadAttributes.timestamp.uint64 == timestamp and
|
|
expectedParams.get.payloadAttributes.prevRandao.bytes == randomData.data and
|
|
expectedParams.get.payloadAttributes.suggestedFeeRecipient == feeRecipient and
|
|
expectedParams.get.payloadAttributes.withdrawals == withdrawals
|
|
|
|
proc forkchoiceUpdated(rpcClient: RpcClient,
|
|
state: ForkchoiceStateV1,
|
|
payloadAttributes: Option[PayloadAttributesV1] |
|
|
Option[PayloadAttributesV2] |
|
|
Option[PayloadAttributesV3]):
|
|
Future[ForkchoiceUpdatedResponse] =
|
|
when payloadAttributes is Option[PayloadAttributesV1]:
|
|
rpcClient.engine_forkchoiceUpdatedV1(state, payloadAttributes)
|
|
elif payloadAttributes is Option[PayloadAttributesV2]:
|
|
rpcClient.engine_forkchoiceUpdatedV2(state, payloadAttributes)
|
|
elif payloadAttributes is Option[PayloadAttributesV3]:
|
|
rpcClient.engine_forkchoiceUpdatedV3(state, payloadAttributes)
|
|
else:
|
|
static: doAssert false
|
|
|
|
func computeBlockValue(blk: ExecutionPayloadV1): UInt256 {.raises: [RlpError].} =
|
|
for transactionBytes in blk.transactions:
|
|
var rlp = rlpFromBytes distinctBase(transactionBytes)
|
|
let transaction = rlp.read(eth_types.Transaction)
|
|
result += distinctBase(effectiveGasTip(transaction, blk.baseFeePerGas)).u256
|
|
|
|
proc getPayloadFromSingleEL(
|
|
connection: ELConnection,
|
|
GetPayloadResponseType: type,
|
|
isForkChoiceUpToDate: bool,
|
|
consensusHead: Eth2Digest,
|
|
headBlock, safeBlock, finalizedBlock: Eth2Digest,
|
|
timestamp: uint64,
|
|
randomData: Eth2Digest,
|
|
suggestedFeeRecipient: Eth1Address,
|
|
withdrawals: seq[WithdrawalV1]): Future[GetPayloadResponseType] {.async.} =
|
|
|
|
let
|
|
rpcClient = await connection.connectedRpcClient()
|
|
payloadId = if isForkChoiceUpToDate and connection.lastPayloadId.isSome:
|
|
connection.lastPayloadId.get
|
|
elif not headBlock.isZero:
|
|
engine_api_last_minute_forkchoice_updates_sent.inc(1, [connection.engineUrl.url])
|
|
|
|
when GetPayloadResponseType is BellatrixExecutionPayloadWithValue:
|
|
let response = await rpcClient.forkchoiceUpdated(
|
|
ForkchoiceStateV1(
|
|
headBlockHash: headBlock.asBlockHash,
|
|
safeBlockHash: safeBlock.asBlockHash,
|
|
finalizedBlockHash: finalizedBlock.asBlockHash),
|
|
some PayloadAttributesV1(
|
|
timestamp: Quantity timestamp,
|
|
prevRandao: FixedBytes[32] randomData.data,
|
|
suggestedFeeRecipient: suggestedFeeRecipient))
|
|
elif GetPayloadResponseType is engine_api.GetPayloadV2Response:
|
|
let response = await rpcClient.forkchoiceUpdated(
|
|
ForkchoiceStateV1(
|
|
headBlockHash: headBlock.asBlockHash,
|
|
safeBlockHash: safeBlock.asBlockHash,
|
|
finalizedBlockHash: finalizedBlock.asBlockHash),
|
|
some PayloadAttributesV2(
|
|
timestamp: Quantity timestamp,
|
|
prevRandao: FixedBytes[32] randomData.data,
|
|
suggestedFeeRecipient: suggestedFeeRecipient,
|
|
withdrawals: withdrawals))
|
|
elif GetPayloadResponseType is engine_api.GetPayloadV3Response:
|
|
let response = await rpcClient.forkchoiceUpdated(
|
|
ForkchoiceStateV1(
|
|
headBlockHash: headBlock.asBlockHash,
|
|
safeBlockHash: safeBlock.asBlockHash,
|
|
finalizedBlockHash: finalizedBlock.asBlockHash),
|
|
some PayloadAttributesV3(
|
|
timestamp: Quantity timestamp,
|
|
prevRandao: FixedBytes[32] randomData.data,
|
|
suggestedFeeRecipient: suggestedFeeRecipient,
|
|
withdrawals: withdrawals,
|
|
parentBeaconBlockRoot: consensusHead.asBlockHash))
|
|
else:
|
|
static: doAssert false
|
|
|
|
if response.payloadStatus.status != PayloadExecutionStatus.valid or
|
|
response.payloadId.isNone:
|
|
raise newException(CatchableError, "Head block is not a valid payload")
|
|
|
|
# Give the EL some time to assemble the block
|
|
await sleepAsync(chronos.milliseconds 500)
|
|
|
|
response.payloadId.get
|
|
else:
|
|
raise newException(CatchableError, "No confirmed execution head yet")
|
|
|
|
when GetPayloadResponseType is BellatrixExecutionPayloadWithValue:
|
|
let payload =
|
|
await engine_api.getPayload(rpcClient, ExecutionPayloadV1, payloadId)
|
|
return BellatrixExecutionPayloadWithValue(
|
|
executionPayload: payload,
|
|
blockValue: computeBlockValue payload)
|
|
else:
|
|
return await engine_api.getPayload(rpcClient, GetPayloadResponseType, payloadId)
|
|
|
|
func cmpGetPayloadResponses(lhs, rhs: SomeEnginePayloadWithValue): int =
|
|
cmp(distinctBase lhs.blockValue, distinctBase rhs.blockValue)
|
|
|
|
template EngineApiResponseType*(T: type bellatrix.ExecutionPayloadForSigning): type =
|
|
BellatrixExecutionPayloadWithValue
|
|
|
|
template EngineApiResponseType*(T: type capella.ExecutionPayloadForSigning): type =
|
|
engine_api.GetPayloadV2Response
|
|
|
|
template EngineApiResponseType*(T: type deneb.ExecutionPayloadForSigning): type =
|
|
engine_api.GetPayloadV3Response
|
|
|
|
template EngineApiResponseType*(T: type electra.ExecutionPayloadForSigning): type =
|
|
debugRaiseAssert "EngineApiResponseType electra.ExecutionPayloadForSigning; presumably will be a GetPayloadV4Response"
|
|
engine_api.GetPayloadV3Response
|
|
|
|
template toEngineWithdrawals*(withdrawals: seq[capella.Withdrawal]): seq[WithdrawalV1] =
|
|
mapIt(withdrawals, toEngineWithdrawal(it))
|
|
|
|
template kind(T: type ExecutionPayloadV1): ConsensusFork =
|
|
ConsensusFork.Bellatrix
|
|
|
|
template kind(T: typedesc[ExecutionPayloadV1OrV2|ExecutionPayloadV2]): ConsensusFork =
|
|
ConsensusFork.Capella
|
|
|
|
template kind(T: type ExecutionPayloadV3): ConsensusFork =
|
|
ConsensusFork.Deneb
|
|
|
|
proc getPayload*(m: ELManager,
|
|
PayloadType: type ForkyExecutionPayloadForSigning,
|
|
consensusHead: Eth2Digest,
|
|
headBlock, safeBlock, finalizedBlock: Eth2Digest,
|
|
timestamp: uint64,
|
|
randomData: Eth2Digest,
|
|
suggestedFeeRecipient: Eth1Address,
|
|
withdrawals: seq[capella.Withdrawal]):
|
|
Future[Opt[PayloadType]] {.async: (raises: [CancelledError]).} =
|
|
if m.elConnections.len == 0:
|
|
return err()
|
|
|
|
let
|
|
engineApiWithdrawals = toEngineWithdrawals withdrawals
|
|
isFcUpToDate = m.nextExpectedPayloadParams.areSameAs(
|
|
headBlock, safeBlock, finalizedBlock, timestamp,
|
|
randomData, suggestedFeeRecipient, engineApiWithdrawals)
|
|
|
|
# `getPayloadFromSingleEL` may introduce additional latency
|
|
const extraProcessingOverhead = 500.milliseconds
|
|
let
|
|
timeout = GETPAYLOAD_TIMEOUT + extraProcessingOverhead
|
|
deadline = sleepAsync(timeout)
|
|
requests = m.elConnections.mapIt(it.getPayloadFromSingleEL(
|
|
EngineApiResponseType(PayloadType),
|
|
isFcUpToDate, consensusHead, headBlock, safeBlock, finalizedBlock,
|
|
timestamp, randomData, suggestedFeeRecipient, engineApiWithdrawals
|
|
))
|
|
requestsCompleted = allFutures(requests)
|
|
|
|
# TODO cancel requests on cancellation
|
|
await requestsCompleted or deadline
|
|
|
|
var bestPayloadIdx = none int
|
|
for idx, req in requests:
|
|
if not req.finished:
|
|
warn "Timeout while getting execution payload",
|
|
url = m.elConnections[idx].engineUrl.url
|
|
req.cancelSoon()
|
|
elif req.failed:
|
|
warn "Failed to get execution payload from EL",
|
|
url = m.elConnections[idx].engineUrl.url,
|
|
err = req.error.msg
|
|
else:
|
|
const payloadFork = PayloadType.kind
|
|
when payloadFork >= ConsensusFork.Capella:
|
|
when payloadFork == ConsensusFork.Capella:
|
|
# TODO: The engine_api module may offer an alternative API where it is guaranteed
|
|
# to return the correct response type (i.e. the rule below will be enforced
|
|
# during deserialization).
|
|
if req.value().executionPayload.withdrawals.isNone:
|
|
warn "Execution client returned a block without a 'withdrawals' field for a post-Shanghai block",
|
|
url = m.elConnections[idx].engineUrl.url
|
|
continue
|
|
|
|
if engineApiWithdrawals != req.value().executionPayload.withdrawals.maybeDeref:
|
|
# otherwise it formats as "@[(index: ..., validatorIndex: ...,
|
|
# address: ..., amount: ...), (index: ..., validatorIndex: ...,
|
|
# address: ..., amount: ...)]"
|
|
warn "Execution client did not return correct withdrawals",
|
|
withdrawals_from_cl_len = engineApiWithdrawals.len,
|
|
withdrawals_from_el_len =
|
|
req.value().executionPayload.withdrawals.maybeDeref.len,
|
|
withdrawals_from_cl =
|
|
mapIt(engineApiWithdrawals, it.asConsensusWithdrawal),
|
|
withdrawals_from_el =
|
|
mapIt(
|
|
req.value().executionPayload.withdrawals.maybeDeref,
|
|
it.asConsensusWithdrawal),
|
|
url = m.elConnections[idx].engineUrl.url
|
|
|
|
if req.value().executionPayload.extraData.len > MAX_EXTRA_DATA_BYTES:
|
|
warn "Execution client provided a block with invalid extraData (size exceeds limit)",
|
|
url = m.elConnections[idx].engineUrl.url,
|
|
size = req.value().executionPayload.extraData.len,
|
|
limit = MAX_EXTRA_DATA_BYTES
|
|
continue
|
|
|
|
if bestPayloadIdx.isNone:
|
|
bestPayloadIdx = some idx
|
|
else:
|
|
if cmpGetPayloadResponses(req.value(), requests[bestPayloadIdx.get].value()) > 0:
|
|
bestPayloadIdx = some idx
|
|
|
|
deadline.cancelSoon()
|
|
|
|
when PayloadType.kind != ConsensusFork.Electra:
|
|
if bestPayloadIdx.isSome:
|
|
return ok requests[bestPayloadIdx.get].value().asConsensusType
|
|
else:
|
|
return err()
|
|
else:
|
|
# right now, asConsensusType is confused by Deneb and Electra sharing a
|
|
# Payload type. this will probably be resolved in time naturally.
|
|
debugRaiseAssert "getPayload ForkyExecutionPayloadForSigning"
|
|
return err()
|
|
|
|
proc waitELToSyncDeposits(connection: ELConnection,
|
|
minimalRequiredBlock: BlockHash) {.async.} =
|
|
var rpcClient = await connection.connectedRpcClient()
|
|
|
|
if connection.depositContractSyncStatus == DepositContractSyncStatus.synced:
|
|
return
|
|
|
|
var attempt = 0
|
|
|
|
while true:
|
|
try:
|
|
discard raiseIfNil connection.trackedRequestWithTimeout(
|
|
"getBlockByHash",
|
|
rpcClient.getBlockByHash(minimalRequiredBlock),
|
|
web3RequestsTimeout,
|
|
failureAllowed = true)
|
|
connection.depositContractSyncStatus = DepositContractSyncStatus.synced
|
|
return
|
|
except CancelledError as err:
|
|
trace "waitELToSyncDepositContract cancelled",
|
|
url = connection.engineUrl.url
|
|
raise err
|
|
except CatchableError as err:
|
|
connection.depositContractSyncStatus = DepositContractSyncStatus.notSynced
|
|
if attempt == 0:
|
|
warn "Failed to obtain the most recent known block from the execution " &
|
|
"layer node (the node is probably not synced)",
|
|
url = connection.engineUrl.url,
|
|
blk = minimalRequiredBlock,
|
|
err = err.msg
|
|
elif attempt mod 60 == 0:
|
|
# This warning will be produced every 30 minutes
|
|
warn "Still failing to obtain the most recent known block from the " &
|
|
"execution layer node (the node is probably still not synced)",
|
|
url = connection.engineUrl.url,
|
|
blk = minimalRequiredBlock,
|
|
err = err.msg
|
|
inc attempt
|
|
await sleepAsync(seconds(30))
|
|
rpcClient = await connection.connectedRpcClient()
|
|
|
|
func networkHasDepositContract(m: ELManager): bool =
|
|
not m.cfg.DEPOSIT_CONTRACT_ADDRESS.isDefaultValue
|
|
|
|
func mostRecentKnownBlock(m: ELManager): BlockHash =
|
|
if m.eth1Chain.finalizedDepositsMerkleizer.getChunkCount() > 0:
|
|
m.eth1Chain.finalizedBlockHash.asBlockHash
|
|
else:
|
|
m.depositContractBlockHash
|
|
|
|
proc selectConnectionForChainSyncing(m: ELManager): Future[ELConnection] {.async.} =
|
|
doAssert m.elConnections.len > 0
|
|
|
|
let connectionsFuts = mapIt(
|
|
m.elConnections,
|
|
if m.networkHasDepositContract:
|
|
FutureBase waitELToSyncDeposits(it, m.mostRecentKnownBlock)
|
|
else:
|
|
FutureBase connectedRpcClient(it))
|
|
|
|
# TODO: Ideally, the cancellation will be handled automatically
|
|
# by a helper like `firstCompletedFuture`
|
|
let firstConnected = try:
|
|
await firstCompletedFuture(connectionsFuts)
|
|
except CancelledError as err:
|
|
for future in connectionsFuts:
|
|
future.cancelSoon()
|
|
raise err
|
|
|
|
for future in connectionsFuts:
|
|
if future != firstConnected:
|
|
future.cancelSoon()
|
|
|
|
return m.elConnections[find(connectionsFuts, firstConnected)]
|
|
|
|
proc sendNewPayloadToSingleEL(connection: ELConnection,
|
|
payload: engine_api.ExecutionPayloadV1):
|
|
Future[PayloadStatusV1] {.async.} =
|
|
let rpcClient = await connection.connectedRpcClient()
|
|
return await rpcClient.engine_newPayloadV1(payload)
|
|
|
|
proc sendNewPayloadToSingleEL(connection: ELConnection,
|
|
payload: engine_api.ExecutionPayloadV2):
|
|
Future[PayloadStatusV1] {.async.} =
|
|
let rpcClient = await connection.connectedRpcClient()
|
|
return await rpcClient.engine_newPayloadV2(payload)
|
|
|
|
proc sendNewPayloadToSingleEL(connection: ELConnection,
|
|
payload: engine_api.ExecutionPayloadV3,
|
|
versioned_hashes: seq[engine_api.VersionedHash],
|
|
parent_beacon_block_root: FixedBytes[32]):
|
|
Future[PayloadStatusV1] {.async.} =
|
|
let rpcClient = await connection.connectedRpcClient()
|
|
return await rpcClient.engine_newPayloadV3(
|
|
payload, versioned_hashes, parent_beacon_block_root)
|
|
|
|
type
|
|
StatusRelation = enum
|
|
newStatusIsPreferable
|
|
oldStatusIsOk
|
|
disagreement
|
|
|
|
func compareStatuses(newStatus, prevStatus: PayloadExecutionStatus): StatusRelation =
|
|
case prevStatus
|
|
of PayloadExecutionStatus.syncing:
|
|
if newStatus == PayloadExecutionStatus.syncing:
|
|
oldStatusIsOk
|
|
else:
|
|
newStatusIsPreferable
|
|
|
|
of PayloadExecutionStatus.valid:
|
|
case newStatus
|
|
of PayloadExecutionStatus.syncing,
|
|
PayloadExecutionStatus.accepted,
|
|
PayloadExecutionStatus.valid:
|
|
oldStatusIsOk
|
|
of PayloadExecutionStatus.invalid_block_hash,
|
|
PayloadExecutionStatus.invalid:
|
|
disagreement
|
|
|
|
of PayloadExecutionStatus.invalid:
|
|
case newStatus
|
|
of PayloadExecutionStatus.syncing,
|
|
PayloadExecutionStatus.invalid:
|
|
oldStatusIsOk
|
|
of PayloadExecutionStatus.valid,
|
|
PayloadExecutionStatus.accepted,
|
|
PayloadExecutionStatus.invalid_block_hash:
|
|
disagreement
|
|
|
|
of PayloadExecutionStatus.accepted:
|
|
case newStatus
|
|
of PayloadExecutionStatus.accepted,
|
|
PayloadExecutionStatus.syncing:
|
|
oldStatusIsOk
|
|
of PayloadExecutionStatus.valid:
|
|
newStatusIsPreferable
|
|
of PayloadExecutionStatus.invalid_block_hash,
|
|
PayloadExecutionStatus.invalid:
|
|
disagreement
|
|
|
|
of PayloadExecutionStatus.invalid_block_hash:
|
|
if newStatus == PayloadExecutionStatus.invalid_block_hash:
|
|
oldStatusIsOk
|
|
else:
|
|
disagreement
|
|
|
|
type
|
|
ELConsensusViolationDetector = object
|
|
selectedResponse: Option[int]
|
|
disagreementAlreadyDetected: bool
|
|
|
|
func init(T: type ELConsensusViolationDetector): T =
|
|
ELConsensusViolationDetector(selectedResponse: none int,
|
|
disagreementAlreadyDetected: false)
|
|
|
|
proc processResponse[ELResponseType](
|
|
d: var ELConsensusViolationDetector,
|
|
connections: openArray[ELConnection],
|
|
requests: openArray[Future[ELResponseType]],
|
|
idx: int) =
|
|
|
|
if not requests[idx].completed:
|
|
return
|
|
|
|
let status = try: requests[idx].read.status
|
|
except CatchableError: raiseAssert "checked above"
|
|
if d.selectedResponse.isNone:
|
|
d.selectedResponse = some idx
|
|
elif not d.disagreementAlreadyDetected:
|
|
let prevStatus = try: requests[d.selectedResponse.get].read.status
|
|
except CatchableError: raiseAssert "previously checked"
|
|
case compareStatuses(status, prevStatus)
|
|
of newStatusIsPreferable:
|
|
d.selectedResponse = some idx
|
|
of oldStatusIsOk:
|
|
discard
|
|
of disagreement:
|
|
d.disagreementAlreadyDetected = true
|
|
error "Execution layer consensus violation detected",
|
|
responseType = name(ELResponseType),
|
|
url1 = connections[d.selectedResponse.get].engineUrl.url,
|
|
status1 = prevStatus,
|
|
url2 = connections[idx].engineUrl.url,
|
|
status2 = status
|
|
|
|
proc sendNewPayload*(m: ELManager, blck: SomeForkyBeaconBlock):
|
|
Future[PayloadExecutionStatus] {.async.} =
|
|
let
|
|
earlyDeadline = sleepAsync(chronos.seconds 1)
|
|
startTime = Moment.now
|
|
deadline = sleepAsync(NEWPAYLOAD_TIMEOUT)
|
|
payload = blck.body.execution_payload.asEngineExecutionPayload
|
|
requests = m.elConnections.mapIt:
|
|
let req =
|
|
when payload is engine_api.ExecutionPayloadV3:
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-alpha.1/specs/deneb/beacon-chain.md#process_execution_payload
|
|
# Verify the execution payload is valid
|
|
# [Modified in Deneb] Pass `versioned_hashes` to Execution Engine
|
|
let versioned_hashes = mapIt(
|
|
blck.body.blob_kzg_commitments,
|
|
engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it)))
|
|
sendNewPayloadToSingleEL(
|
|
it, payload, versioned_hashes,
|
|
FixedBytes[32] blck.parent_root.data)
|
|
elif payload is engine_api.ExecutionPayloadV1 or
|
|
payload is engine_api.ExecutionPayloadV2:
|
|
sendNewPayloadToSingleEL(it, payload)
|
|
else:
|
|
static: doAssert false
|
|
trackEngineApiRequest(it, req, "newPayload", startTime, deadline)
|
|
req
|
|
|
|
requestsCompleted = allFutures(requests)
|
|
|
|
await requestsCompleted or earlyDeadline
|
|
|
|
var
|
|
stillPending = newSeq[Future[PayloadStatusV1]]()
|
|
responseProcessor = init ELConsensusViolationDetector
|
|
|
|
for idx, req in requests:
|
|
if not req.finished:
|
|
stillPending.add req
|
|
elif req.completed:
|
|
responseProcessor.processResponse(m.elConnections, requests, idx)
|
|
|
|
if responseProcessor.disagreementAlreadyDetected:
|
|
return PayloadExecutionStatus.invalid
|
|
elif responseProcessor.selectedResponse.isSome:
|
|
return requests[responseProcessor.selectedResponse.get].read.status
|
|
|
|
await requestsCompleted or deadline
|
|
|
|
for idx, req in requests:
|
|
if req.completed and req in stillPending:
|
|
responseProcessor.processResponse(m.elConnections, requests, idx)
|
|
|
|
return if responseProcessor.disagreementAlreadyDetected:
|
|
PayloadExecutionStatus.invalid
|
|
elif responseProcessor.selectedResponse.isSome:
|
|
requests[responseProcessor.selectedResponse.get].read.status
|
|
else:
|
|
PayloadExecutionStatus.syncing
|
|
|
|
proc forkchoiceUpdatedForSingleEL(
|
|
connection: ELConnection,
|
|
state: ref ForkchoiceStateV1,
|
|
payloadAttributes: Option[PayloadAttributesV1] |
|
|
Option[PayloadAttributesV2] |
|
|
Option[PayloadAttributesV3]):
|
|
Future[PayloadStatusV1] {.async.} =
|
|
let
|
|
rpcClient = await connection.connectedRpcClient()
|
|
response = await rpcClient.forkchoiceUpdated(state[], payloadAttributes)
|
|
|
|
if response.payloadStatus.status notin {syncing, valid, invalid}:
|
|
debug "Invalid fork-choice updated response from the EL",
|
|
payloadStatus = response.payloadStatus
|
|
return
|
|
|
|
if response.payloadStatus.status == PayloadExecutionStatus.valid and
|
|
response.payloadId.isSome:
|
|
connection.lastPayloadId = response.payloadId
|
|
|
|
return response.payloadStatus
|
|
|
|
proc forkchoiceUpdated*(m: ELManager,
|
|
headBlockHash, safeBlockHash,
|
|
finalizedBlockHash: Eth2Digest,
|
|
payloadAttributes: Option[PayloadAttributesV1] |
|
|
Option[PayloadAttributesV2] |
|
|
Option[PayloadAttributesV3]):
|
|
Future[(PayloadExecutionStatus, Option[BlockHash])] {.async: (raises: [CancelledError]).} =
|
|
doAssert not headBlockHash.isZero
|
|
|
|
# Allow finalizedBlockHash to be 0 to avoid sync deadlocks.
|
|
#
|
|
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events
|
|
# has "Before the first finalized block occurs in the system the finalized
|
|
# block hash provided by this event is stubbed with
|
|
# `0x0000000000000000000000000000000000000000000000000000000000000000`."
|
|
# and
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.7/specs/bellatrix/validator.md#executionpayload
|
|
# notes "`finalized_block_hash` is the hash of the latest finalized execution
|
|
# payload (`Hash32()` if none yet finalized)"
|
|
|
|
if m.elConnections.len == 0:
|
|
return (PayloadExecutionStatus.syncing, none BlockHash)
|
|
|
|
when payloadAttributes is Option[PayloadAttributesV3]:
|
|
template payloadAttributesV3(): auto =
|
|
if payloadAttributes.isSome:
|
|
payloadAttributes.get
|
|
else:
|
|
# As timestamp and prevRandao are both 0, won't false-positive match
|
|
(static(default(PayloadAttributesV3)))
|
|
elif payloadAttributes is Option[PayloadAttributesV2]:
|
|
template payloadAttributesV3(): auto =
|
|
if payloadAttributes.isSome:
|
|
PayloadAttributesV3(
|
|
timestamp: payloadAttributes.get.timestamp,
|
|
prevRandao: payloadAttributes.get.prevRandao,
|
|
suggestedFeeRecipient: payloadAttributes.get.suggestedFeeRecipient,
|
|
withdrawals: payloadAttributes.get.withdrawals,
|
|
parentBeaconBlockRoot: static(default(FixedBytes[32])))
|
|
else:
|
|
# As timestamp and prevRandao are both 0, won't false-positive match
|
|
(static(default(PayloadAttributesV3)))
|
|
elif payloadAttributes is Option[PayloadAttributesV1]:
|
|
template payloadAttributesV3(): auto =
|
|
if payloadAttributes.isSome:
|
|
PayloadAttributesV3(
|
|
timestamp: payloadAttributes.get.timestamp,
|
|
prevRandao: payloadAttributes.get.prevRandao,
|
|
suggestedFeeRecipient: payloadAttributes.get.suggestedFeeRecipient,
|
|
withdrawals: @[],
|
|
parentBeaconBlockRoot: static(default(FixedBytes[32])))
|
|
else:
|
|
# As timestamp and prevRandao are both 0, won't false-positive match
|
|
(static(default(PayloadAttributesV3)))
|
|
else:
|
|
static: doAssert false
|
|
|
|
let
|
|
state = newClone ForkchoiceStateV1(
|
|
headBlockHash: headBlockHash.asBlockHash,
|
|
safeBlockHash: safeBlockHash.asBlockHash,
|
|
finalizedBlockHash: finalizedBlockHash.asBlockHash)
|
|
earlyDeadline = sleepAsync(chronos.seconds 1)
|
|
startTime = Moment.now
|
|
deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT)
|
|
requests = m.elConnections.mapIt:
|
|
let req = it.forkchoiceUpdatedForSingleEL(state, payloadAttributes)
|
|
trackEngineApiRequest(it, req, "forkchoiceUpdated", startTime, deadline)
|
|
req
|
|
requestsCompleted = allFutures(requests)
|
|
|
|
await requestsCompleted or earlyDeadline
|
|
|
|
var
|
|
stillPending = newSeq[Future[PayloadStatusV1]]()
|
|
responseProcessor = init ELConsensusViolationDetector
|
|
|
|
for idx, req in requests:
|
|
if not req.finished:
|
|
stillPending.add req
|
|
elif req.completed:
|
|
responseProcessor.processResponse(m.elConnections, requests, idx)
|
|
|
|
template assignNextExpectedPayloadParams() =
|
|
# Ensure that there's no race condition window where getPayload's check for
|
|
# whether it needs to trigger a new fcU payload, due to cache invalidation,
|
|
# falsely suggests that the expected payload matches, and similarly that if
|
|
# the fcU fails or times out for other reasons, the expected payload params
|
|
# remain synchronized with EL state.
|
|
assign(
|
|
m.nextExpectedPayloadParams,
|
|
some NextExpectedPayloadParams(
|
|
headBlockHash: headBlockHash,
|
|
safeBlockHash: safeBlockHash,
|
|
finalizedBlockHash: finalizedBlockHash,
|
|
payloadAttributes: payloadAttributesV3))
|
|
|
|
template getSelected: untyped =
|
|
let
|
|
data =
|
|
try:
|
|
requests[responseProcessor.selectedResponse.get].read
|
|
except CatchableError:
|
|
raiseAssert "Only completed requests get selected"
|
|
(data.status, data.latestValidHash)
|
|
|
|
if responseProcessor.disagreementAlreadyDetected:
|
|
return (PayloadExecutionStatus.invalid, none BlockHash)
|
|
elif responseProcessor.selectedResponse.isSome:
|
|
assignNextExpectedPayloadParams()
|
|
return getSelected()
|
|
|
|
await requestsCompleted or deadline
|
|
|
|
for idx, req in requests:
|
|
if req.completed and req in stillPending:
|
|
responseProcessor.processResponse(m.elConnections, requests, idx)
|
|
|
|
return if responseProcessor.disagreementAlreadyDetected:
|
|
(PayloadExecutionStatus.invalid, none BlockHash)
|
|
elif responseProcessor.selectedResponse.isSome:
|
|
assignNextExpectedPayloadParams()
|
|
getSelected()
|
|
else:
|
|
(PayloadExecutionStatus.syncing, none BlockHash)
|
|
|
|
# TODO can't be defined within exchangeConfigWithSingleEL
|
|
func `==`(x, y: Quantity): bool {.borrow.}
|
|
|
|
proc exchangeConfigWithSingleEL(m: ELManager, connection: ELConnection) {.async.} =
|
|
let rpcClient = await connection.connectedRpcClient()
|
|
|
|
if m.eth1Network.isSome and
|
|
connection.etcStatus == EtcStatus.notExchangedYet:
|
|
try:
|
|
let
|
|
providerChain =
|
|
connection.trackedRequestWithTimeout(
|
|
"chainId",
|
|
rpcClient.eth_chainId(),
|
|
web3RequestsTimeout)
|
|
|
|
# https://chainid.network/
|
|
expectedChain = case m.eth1Network.get
|
|
of mainnet: 1.Quantity
|
|
of goerli: 5.Quantity
|
|
of sepolia: 11155111.Quantity
|
|
of holesky: 17000.Quantity
|
|
if expectedChain != providerChain:
|
|
warn "The specified EL client is connected to a different chain",
|
|
url = connection.engineUrl,
|
|
expectedChain = distinctBase(expectedChain),
|
|
actualChain = distinctBase(providerChain)
|
|
connection.etcStatus = EtcStatus.mismatch
|
|
return
|
|
except CatchableError as exc:
|
|
# Typically because it's not synced through EIP-155, assuming this Web3
|
|
# endpoint has been otherwise working.
|
|
debug "Failed to obtain eth_chainId",
|
|
error = exc.msg
|
|
|
|
connection.etcStatus = EtcStatus.match
|
|
|
|
proc exchangeTransitionConfiguration*(m: ELManager) {.async.} =
|
|
if m.elConnections.len == 0:
|
|
return
|
|
|
|
let
|
|
deadline = sleepAsync(3.seconds)
|
|
requests = m.elConnections.mapIt(m.exchangeConfigWithSingleEL(it))
|
|
requestsCompleted = allFutures(requests)
|
|
|
|
await requestsCompleted or deadline
|
|
|
|
var cancelled = 0
|
|
for idx, req in requests:
|
|
if not req.finished:
|
|
req.cancelSoon()
|
|
inc cancelled
|
|
|
|
if cancelled == requests.len:
|
|
warn "Failed to exchange configuration with the configured EL end-points"
|
|
|
|
template readJsonField(logEvent, field: untyped, ValueType: type): untyped =
|
|
if logEvent.field.isNone:
|
|
raise newException(CatchableError,
|
|
"Web3 provider didn't return needed logEvent field " & astToStr(field))
|
|
logEvent.field.get
|
|
|
|
template init[N: static int](T: type DynamicBytes[N, N]): T =
|
|
T newSeq[byte](N)
|
|
|
|
proc fetchTimestamp(connection: ELConnection,
|
|
rpcClient: RpcClient,
|
|
blk: Eth1Block) {.async.} =
|
|
debug "Fetching block timestamp", blockNum = blk.number
|
|
|
|
let web3block = raiseIfNil connection.trackedRequestWithTimeout(
|
|
"getBlockByHash",
|
|
rpcClient.getBlockByHash(blk.hash.asBlockHash),
|
|
web3RequestsTimeout)
|
|
|
|
blk.timestamp = Eth1BlockTimestamp web3block.timestamp
|
|
|
|
func depositEventsToBlocks(depositsList: openArray[JsonString]): seq[Eth1Block] {.
|
|
raises: [CatchableError].} =
|
|
var lastEth1Block: Eth1Block
|
|
|
|
for logEventData in depositsList:
|
|
let
|
|
logEvent = JrpcConv.decode(logEventData.string, LogObject)
|
|
blockNumber = Eth1BlockNumber readJsonField(logEvent, blockNumber, Quantity)
|
|
blockHash = readJsonField(logEvent, blockHash, BlockHash)
|
|
|
|
if lastEth1Block == nil or lastEth1Block.number != blockNumber:
|
|
lastEth1Block = Eth1Block(
|
|
hash: blockHash.asEth2Digest,
|
|
number: blockNumber
|
|
# The `timestamp` is set in `syncBlockRange` immediately
|
|
# after calling this function, because we don't want to
|
|
# make this function `async`
|
|
)
|
|
|
|
result.add lastEth1Block
|
|
|
|
var
|
|
pubkey = init PubKeyBytes
|
|
withdrawalCredentials = init WithdrawalCredentialsBytes
|
|
amount = init Int64LeBytes
|
|
signature = init SignatureBytes
|
|
index = init Int64LeBytes
|
|
|
|
var offset = 0
|
|
offset += decode(logEvent.data, 0, offset, pubkey)
|
|
offset += decode(logEvent.data, 0, offset, withdrawalCredentials)
|
|
offset += decode(logEvent.data, 0, offset, amount)
|
|
offset += decode(logEvent.data, 0, offset, signature)
|
|
offset += decode(logEvent.data, 0, offset, index)
|
|
|
|
if pubkey.len != 48 or
|
|
withdrawalCredentials.len != 32 or
|
|
amount.len != 8 or
|
|
signature.len != 96 or
|
|
index.len != 8:
|
|
raise newException(CorruptDataProvider, "Web3 provider supplied invalid deposit logs")
|
|
|
|
lastEth1Block.deposits.add DepositData(
|
|
pubkey: ValidatorPubKey.init(pubkey.toArray),
|
|
withdrawal_credentials: Eth2Digest(data: withdrawalCredentials.toArray),
|
|
amount: bytes_to_uint64(amount.toArray),
|
|
signature: ValidatorSig.init(signature.toArray))
|
|
|
|
type
|
|
DepositContractDataStatus = enum
|
|
Fetched
|
|
VerifiedCorrect
|
|
DepositRootIncorrect
|
|
DepositRootUnavailable
|
|
DepositCountIncorrect
|
|
DepositCountUnavailable
|
|
|
|
when hasDepositRootChecks:
|
|
const
|
|
contractCallTimeout = 60.seconds
|
|
|
|
proc fetchDepositContractData(connection: ELConnection,
|
|
rpcClient: RpcClient,
|
|
depositContract: Sender[DepositContract],
|
|
blk: Eth1Block): Future[DepositContractDataStatus] {.async.} =
|
|
let
|
|
startTime = Moment.now
|
|
deadline = sleepAsync(contractCallTimeout)
|
|
depositRoot = depositContract.get_deposit_root.call(blockNumber = blk.number)
|
|
rawCount = depositContract.get_deposit_count.call(blockNumber = blk.number)
|
|
|
|
# We allow failures on these requests becaues the clients
|
|
# are expected to prune the state data for historical blocks
|
|
connection.trackEngineApiRequest(
|
|
depositRoot, "get_deposit_root", startTime, deadline,
|
|
failureAllowed = true)
|
|
connection.trackEngineApiRequest(
|
|
rawCount, "get_deposit_count", startTime, deadline,
|
|
failureAllowed = true)
|
|
|
|
try:
|
|
let fetchedRoot = asEth2Digest(block:
|
|
awaitWithTimeout(depositRoot, deadline):
|
|
raise newException(DataProviderTimeout,
|
|
"Request time out while obtaining deposits root"))
|
|
if blk.depositRoot.isZero:
|
|
blk.depositRoot = fetchedRoot
|
|
result = Fetched
|
|
elif blk.depositRoot == fetchedRoot:
|
|
result = VerifiedCorrect
|
|
else:
|
|
result = DepositRootIncorrect
|
|
except CatchableError as err:
|
|
debug "Failed to fetch deposits root",
|
|
blockNumber = blk.number,
|
|
err = err.msg
|
|
result = DepositRootUnavailable
|
|
|
|
try:
|
|
let fetchedCount = bytes_to_uint64((block:
|
|
awaitWithTimeout(rawCount, deadline):
|
|
raise newException(DataProviderTimeout,
|
|
"Request time out while obtaining deposits count")).toArray)
|
|
if blk.depositCount == 0:
|
|
blk.depositCount = fetchedCount
|
|
elif blk.depositCount != fetchedCount:
|
|
result = DepositCountIncorrect
|
|
except CatchableError as err:
|
|
debug "Failed to fetch deposits count",
|
|
blockNumber = blk.number,
|
|
err = err.msg
|
|
result = DepositCountUnavailable
|
|
|
|
|
|
template trackFinalizedState*(m: ELManager,
|
|
finalizedEth1Data: Eth1Data,
|
|
finalizedStateDepositIndex: uint64): bool =
|
|
trackFinalizedState(m.eth1Chain, finalizedEth1Data, finalizedStateDepositIndex)
|
|
|
|
template getBlockProposalData*(m: ELManager,
|
|
state: ForkedHashedBeaconState,
|
|
finalizedEth1Data: Eth1Data,
|
|
finalizedStateDepositIndex: uint64):
|
|
BlockProposalEth1Data =
|
|
getBlockProposalData(
|
|
m.eth1Chain, state, finalizedEth1Data, finalizedStateDepositIndex)
|
|
|
|
func new*(T: type ELConnection,
|
|
engineUrl: EngineApiUrl): T =
|
|
ELConnection(
|
|
engineUrl: engineUrl,
|
|
depositContractSyncStatus: DepositContractSyncStatus.unknown)
|
|
|
|
proc new*(T: type ELManager,
|
|
cfg: RuntimeConfig,
|
|
depositContractBlockNumber: uint64,
|
|
depositContractBlockHash: Eth2Digest,
|
|
db: BeaconChainDB,
|
|
engineApiUrls: seq[EngineApiUrl],
|
|
eth1Network: Option[Eth1Network]): T =
|
|
let
|
|
eth1Chain = Eth1Chain.init(
|
|
cfg, db, depositContractBlockNumber, depositContractBlockHash)
|
|
|
|
debug "Initializing ELManager",
|
|
depositContractBlockNumber,
|
|
depositContractBlockHash
|
|
|
|
T(eth1Chain: eth1Chain,
|
|
depositContractAddress: cfg.DEPOSIT_CONTRACT_ADDRESS,
|
|
depositContractBlockNumber: depositContractBlockNumber,
|
|
depositContractBlockHash: depositContractBlockHash.asBlockHash,
|
|
elConnections: mapIt(engineApiUrls, ELConnection.new(it)),
|
|
eth1Network: eth1Network,
|
|
blocksPerLogsRequest: targetBlocksPerLogsRequest)
|
|
|
|
proc safeCancel(fut: var Future[void]) =
|
|
if not fut.isNil and not fut.finished:
|
|
fut.cancelSoon()
|
|
fut = nil
|
|
|
|
proc doStop(m: ELManager) {.async.} =
|
|
safeCancel m.chainSyncingLoopFut
|
|
safeCancel m.exchangeTransitionConfigurationLoopFut
|
|
|
|
if m.elConnections.len > 0:
|
|
let closeConnectionFutures = mapIt(m.elConnections, close(it))
|
|
await allFutures(closeConnectionFutures)
|
|
|
|
proc stop(m: ELManager) {.async.} =
|
|
if not m.stopFut.isNil:
|
|
await m.stopFut
|
|
else:
|
|
m.stopFut = m.doStop()
|
|
await m.stopFut
|
|
m.stopFut = nil
|
|
|
|
const
|
|
votedBlocksSafetyMargin = 50
|
|
|
|
func earliestBlockOfInterest(
|
|
m: ELManager,
|
|
latestEth1BlockNumber: Eth1BlockNumber): Eth1BlockNumber =
|
|
let blocksOfInterestRange =
|
|
SLOTS_PER_ETH1_VOTING_PERIOD +
|
|
(2 * m.cfg.ETH1_FOLLOW_DISTANCE) +
|
|
votedBlocksSafetyMargin
|
|
|
|
if latestEth1BlockNumber > blocksOfInterestRange:
|
|
latestEth1BlockNumber - blocksOfInterestRange
|
|
else:
|
|
0
|
|
|
|
proc syncBlockRange(m: ELManager,
|
|
connection: ELConnection,
|
|
rpcClient: RpcClient,
|
|
depositContract: Sender[DepositContract],
|
|
fromBlock, toBlock,
|
|
fullSyncFromBlock: Eth1BlockNumber) {.gcsafe, async.} =
|
|
doAssert m.eth1Chain.blocks.len > 0
|
|
|
|
var currentBlock = fromBlock
|
|
while currentBlock <= toBlock:
|
|
var
|
|
depositLogs: seq[JsonString]
|
|
maxBlockNumberRequested: Eth1BlockNumber
|
|
backoff = 100
|
|
|
|
while true:
|
|
maxBlockNumberRequested =
|
|
min(toBlock, currentBlock + m.blocksPerLogsRequest - 1)
|
|
|
|
debug "Obtaining deposit log events",
|
|
fromBlock = currentBlock,
|
|
toBlock = maxBlockNumberRequested,
|
|
backoff
|
|
|
|
debug.logTime "Deposit logs obtained":
|
|
# Reduce all request rate until we have a more general solution
|
|
# for dealing with Infura's rate limits
|
|
await sleepAsync(milliseconds(backoff))
|
|
let
|
|
startTime = Moment.now
|
|
deadline = sleepAsync 30.seconds
|
|
jsonLogsFut = depositContract.getJsonLogs(
|
|
DepositEvent,
|
|
fromBlock = some blockId(currentBlock),
|
|
toBlock = some blockId(maxBlockNumberRequested))
|
|
|
|
connection.trackEngineApiRequest(
|
|
jsonLogsFut, "getLogs", startTime, deadline)
|
|
|
|
depositLogs = try:
|
|
# Downloading large amounts of deposits may take several minutes
|
|
awaitWithTimeout(jsonLogsFut, deadline):
|
|
raise newException(DataProviderTimeout,
|
|
"Request time out while obtaining json logs")
|
|
except CatchableError as err:
|
|
debug "Request for deposit logs failed", err = err.msg
|
|
inc failed_web3_requests
|
|
backoff = (backoff * 3) div 2
|
|
m.blocksPerLogsRequest = m.blocksPerLogsRequest div 2
|
|
if m.blocksPerLogsRequest == 0:
|
|
m.blocksPerLogsRequest = 1
|
|
raise err
|
|
continue
|
|
m.blocksPerLogsRequest = min(
|
|
(m.blocksPerLogsRequest * 3 + 1) div 2,
|
|
targetBlocksPerLogsRequest)
|
|
|
|
currentBlock = maxBlockNumberRequested + 1
|
|
break
|
|
|
|
let blocksWithDeposits = depositEventsToBlocks(depositLogs)
|
|
|
|
for i in 0 ..< blocksWithDeposits.len:
|
|
let blk = blocksWithDeposits[i]
|
|
if blk.number > fullSyncFromBlock:
|
|
await fetchTimestamp(connection, rpcClient, blk)
|
|
let lastBlock = m.eth1Chain.blocks.peekLast
|
|
for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number:
|
|
debug "Obtaining block without deposits", blockNum = n
|
|
let noDepositsBlock = raiseIfNil connection.trackedRequestWithTimeout(
|
|
"getBlockByNumber",
|
|
rpcClient.getBlockByNumber(n),
|
|
web3RequestsTimeout)
|
|
|
|
m.eth1Chain.addBlock(
|
|
lastBlock.makeSuccessorWithoutDeposits(noDepositsBlock))
|
|
eth1_synced_head.set noDepositsBlock.number.toGaugeValue
|
|
|
|
m.eth1Chain.addBlock blk
|
|
eth1_synced_head.set blk.number.toGaugeValue
|
|
|
|
if blocksWithDeposits.len > 0:
|
|
let lastIdx = blocksWithDeposits.len - 1
|
|
template lastBlock: auto = blocksWithDeposits[lastIdx]
|
|
|
|
let status = when hasDepositRootChecks:
|
|
await fetchDepositContractData(
|
|
connection, rpcClient, depositContract, lastBlock)
|
|
else:
|
|
DepositRootUnavailable
|
|
|
|
when hasDepositRootChecks:
|
|
debug "Deposit contract state verified",
|
|
status = $status,
|
|
ourCount = lastBlock.depositCount,
|
|
ourRoot = lastBlock.depositRoot
|
|
|
|
case status
|
|
of DepositRootIncorrect, DepositCountIncorrect:
|
|
raise newException(CorruptDataProvider,
|
|
"The deposit log events disagree with the deposit contract state")
|
|
else:
|
|
discard
|
|
|
|
info "Eth1 sync progress",
|
|
blockNumber = lastBlock.number,
|
|
depositsProcessed = lastBlock.depositCount
|
|
|
|
func hasConnection*(m: ELManager): bool =
|
|
m.elConnections.len > 0
|
|
|
|
func hasAnyWorkingConnection*(m: ELManager): bool =
|
|
m.elConnections.anyIt(it.state == Working or it.state == NeverTested)
|
|
|
|
func hasProperlyConfiguredConnection*(m: ELManager): bool =
|
|
for connection in m.elConnections:
|
|
if connection.etcStatus == EtcStatus.match:
|
|
return true
|
|
|
|
false
|
|
|
|
proc startExchangeTransitionConfigurationLoop(m: ELManager) {.async.} =
|
|
debug "Starting exchange transition configuration loop"
|
|
|
|
while true:
|
|
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#specification-3
|
|
debug "Exchange transition configuration tick"
|
|
traceAsyncErrors m.exchangeTransitionConfiguration()
|
|
await sleepAsync(60.seconds)
|
|
|
|
proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} =
|
|
let rpcClient = awaitOrRaiseOnTimeout(connection.connectedRpcClient(),
|
|
1.seconds)
|
|
let
|
|
# BEWARE
|
|
# `connectedRpcClient` guarantees that connection.web3 will not be
|
|
# `none` here, but it's not safe to initialize this later (e.g closer
|
|
# to where it's used) because `connection.web3` may be set to `none`
|
|
# at any time after a failed request. Luckily, the `contractSender`
|
|
# object is very cheap to create.
|
|
depositContract = connection.web3.get.contractSender(
|
|
DepositContract, m.depositContractAddress)
|
|
|
|
shouldProcessDeposits = not (
|
|
m.depositContractAddress.isZeroMemory or
|
|
m.eth1Chain.finalizedBlockHash.data.isZeroMemory)
|
|
|
|
trace "Starting syncEth1Chain", shouldProcessDeposits
|
|
|
|
logScope:
|
|
url = connection.engineUrl.url
|
|
|
|
# We might need to reset the chain if the new provider disagrees
|
|
# with the previous one regarding the history of the chain or if
|
|
# we have detected a conensus violation - our view disagreeing with
|
|
# the majority of the validators in the network.
|
|
#
|
|
# Consensus violations happen in practice because the web3 providers
|
|
# sometimes return incomplete or incorrect deposit log events even
|
|
# when they don't indicate any errors in the response. When this
|
|
# happens, we are usually able to download the data successfully
|
|
# on the second attempt.
|
|
#
|
|
# TODO
|
|
# Perhaps the above problem was manifesting only with the obsolete
|
|
# JSON-RPC data providers, which can no longer be used with Nimbus.
|
|
if m.eth1Chain.blocks.len > 0:
|
|
let needsReset = m.eth1Chain.hasConsensusViolation or (block:
|
|
let
|
|
lastKnownBlock = m.eth1Chain.blocks.peekLast
|
|
matchingBlockAtNewEl = raiseIfNil connection.trackedRequestWithTimeout(
|
|
"getBlockByNumber",
|
|
rpcClient.getBlockByNumber(lastKnownBlock.number),
|
|
web3RequestsTimeout)
|
|
|
|
lastKnownBlock.hash.asBlockHash != matchingBlockAtNewEl.hash)
|
|
|
|
if needsReset:
|
|
trace "Resetting the Eth1 chain",
|
|
hasConsensusViolation = m.eth1Chain.hasConsensusViolation
|
|
m.eth1Chain.clear()
|
|
|
|
var eth1SyncedTo: Eth1BlockNumber
|
|
if shouldProcessDeposits:
|
|
if m.eth1Chain.blocks.len == 0:
|
|
let finalizedBlockHash = m.eth1Chain.finalizedBlockHash.asBlockHash
|
|
let startBlock = raiseIfNil connection.trackedRequestWithTimeout(
|
|
"getBlockByHash",
|
|
rpcClient.getBlockByHash(finalizedBlockHash),
|
|
web3RequestsTimeout)
|
|
|
|
m.eth1Chain.addBlock Eth1Block(
|
|
hash: m.eth1Chain.finalizedBlockHash,
|
|
number: Eth1BlockNumber startBlock.number,
|
|
timestamp: Eth1BlockTimestamp startBlock.timestamp)
|
|
|
|
eth1SyncedTo = m.eth1Chain.blocks[^1].number
|
|
|
|
eth1_synced_head.set eth1SyncedTo.toGaugeValue
|
|
eth1_finalized_head.set eth1SyncedTo.toGaugeValue
|
|
eth1_finalized_deposits.set(
|
|
m.eth1Chain.finalizedDepositsMerkleizer.getChunkCount.toGaugeValue)
|
|
|
|
debug "Starting Eth1 syncing", `from` = shortLog(m.eth1Chain.blocks[^1])
|
|
|
|
while true:
|
|
debug "syncEth1Chain tick"
|
|
|
|
if bnStatus == BeaconNodeStatus.Stopping:
|
|
await m.stop()
|
|
return
|
|
|
|
if m.eth1Chain.hasConsensusViolation:
|
|
raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus")
|
|
|
|
let latestBlock = try:
|
|
raiseIfNil connection.trackedRequestWithTimeout(
|
|
"getBlockByNumber",
|
|
rpcClient.eth_getBlockByNumber(blockId("latest"), false),
|
|
web3RequestsTimeout)
|
|
except CatchableError as err:
|
|
warn "Failed to obtain the latest block from the EL", err = err.msg
|
|
raise err
|
|
|
|
m.syncTargetBlock = some(
|
|
if Eth1BlockNumber(latestBlock.number) > m.cfg.ETH1_FOLLOW_DISTANCE:
|
|
Eth1BlockNumber(latestBlock.number) - m.cfg.ETH1_FOLLOW_DISTANCE
|
|
else:
|
|
Eth1BlockNumber(0))
|
|
if m.syncTargetBlock.get <= eth1SyncedTo:
|
|
# The chain reorged to a lower height.
|
|
# It's relatively safe to ignore that.
|
|
await sleepAsync(m.cfg.SECONDS_PER_ETH1_BLOCK.int.seconds)
|
|
continue
|
|
|
|
eth1_latest_head.set latestBlock.number.toGaugeValue
|
|
|
|
if shouldProcessDeposits and
|
|
latestBlock.number.uint64 > m.cfg.ETH1_FOLLOW_DISTANCE:
|
|
await m.syncBlockRange(connection,
|
|
rpcClient,
|
|
depositContract,
|
|
eth1SyncedTo + 1,
|
|
m.syncTargetBlock.get,
|
|
m.earliestBlockOfInterest(Eth1BlockNumber latestBlock.number))
|
|
|
|
eth1SyncedTo = m.syncTargetBlock.get
|
|
eth1_synced_head.set eth1SyncedTo.toGaugeValue
|
|
|
|
proc startChainSyncingLoop(m: ELManager) {.async.} =
|
|
info "Starting execution layer deposit syncing",
|
|
contract = $m.depositContractAddress
|
|
|
|
var syncedConnectionFut = m.selectConnectionForChainSyncing()
|
|
info "Connection attempt started"
|
|
|
|
while true:
|
|
try:
|
|
await syncedConnectionFut or sleepAsync(60.seconds)
|
|
if not syncedConnectionFut.finished:
|
|
notice "No synced execution layer available for deposit syncing"
|
|
await sleepAsync(chronos.seconds(30))
|
|
continue
|
|
|
|
await syncEth1Chain(m, syncedConnectionFut.read)
|
|
except CatchableError:
|
|
await sleepAsync(10.seconds)
|
|
|
|
# A more detailed error is already logged by trackEngineApiRequest
|
|
debug "Restarting the deposit syncing loop"
|
|
|
|
# To be extra safe, we will make a fresh connection attempt
|
|
await syncedConnectionFut.cancelAndWait()
|
|
syncedConnectionFut = m.selectConnectionForChainSyncing()
|
|
|
|
proc start*(m: ELManager, syncChain = true) {.gcsafe.} =
|
|
if m.elConnections.len == 0:
|
|
return
|
|
|
|
## Calling `ELManager.start()` on an already started ELManager is a noop
|
|
if syncChain and m.chainSyncingLoopFut.isNil:
|
|
m.chainSyncingLoopFut =
|
|
m.startChainSyncingLoop()
|
|
|
|
if m.hasJwtSecret and m.exchangeTransitionConfigurationLoopFut.isNil:
|
|
m.exchangeTransitionConfigurationLoopFut =
|
|
m.startExchangeTransitionConfigurationLoop()
|
|
|
|
func `$`(x: Quantity): string =
|
|
$(x.uint64)
|
|
|
|
func `$`(x: BlockObject): string =
|
|
$(x.number) & " [" & $(x.hash) & "]"
|
|
|
|
proc testWeb3Provider*(web3Url: Uri,
|
|
depositContractAddress: Eth1Address,
|
|
jwtSecret: Opt[seq[byte]]) {.async.} =
|
|
stdout.write "Establishing web3 connection..."
|
|
var web3: Web3
|
|
try:
|
|
web3 = awaitOrRaiseOnTimeout(
|
|
newWeb3($web3Url, getJsonRpcRequestHeaders(jwtSecret)),
|
|
5.seconds)
|
|
stdout.write "\rEstablishing web3 connection: Connected\n"
|
|
except CatchableError as err:
|
|
stdout.write "\rEstablishing web3 connection: Failure(" & err.msg & ")\n"
|
|
quit 1
|
|
|
|
template request(actionDesc: static string,
|
|
action: untyped): untyped =
|
|
stdout.write actionDesc & "..."
|
|
stdout.flushFile()
|
|
var res: typeof(read action)
|
|
try:
|
|
res = awaitOrRaiseOnTimeout(action, web3RequestsTimeout)
|
|
when res is BlockObject:
|
|
res = raiseIfNil res
|
|
stdout.write "\r" & actionDesc & ": " & $res
|
|
except CatchableError as err:
|
|
stdout.write "\r" & actionDesc & ": Error(" & err.msg & ")"
|
|
stdout.write "\n"
|
|
res
|
|
|
|
discard request "Chain ID":
|
|
web3.provider.eth_chainId()
|
|
|
|
discard request "Sync status":
|
|
web3.provider.eth_syncing()
|
|
|
|
let
|
|
latestBlock = request "Latest block":
|
|
web3.provider.eth_getBlockByNumber(blockId("latest"), false)
|
|
|
|
ns = web3.contractSender(DepositContract, depositContractAddress)
|
|
|
|
discard request "Deposit root":
|
|
ns.get_deposit_root.call(blockNumber = latestBlock.number.uint64)
|