1618 lines
60 KiB
Nim
1618 lines
60 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[deques, options, strformat, strutils, sequtils, tables,
|
|
typetraits, uri, json],
|
|
# Nimble packages:
|
|
chronos, metrics, chronicles/timings, stint/endians2,
|
|
web3, web3/ethtypes as web3Types, web3/ethhexstrings, web3/engine_api,
|
|
eth/common/eth_types,
|
|
eth/async_utils, stew/[byteutils, objects, results, shims/hashes],
|
|
# Local modules:
|
|
../spec/[deposit_snapshots, eth2_merkleization, forks, helpers],
|
|
../spec/datatypes/[base, phase0, bellatrix, eip4844],
|
|
../networking/network_metadata,
|
|
../consensus_object_pools/block_pools_types,
|
|
".."/[beacon_chain_db, beacon_node_status, beacon_clock],
|
|
./merkle_minimal
|
|
|
|
from std/times import getTime, inSeconds, initTime, `-`
|
|
from ../spec/engine_authentication import getSignedIatToken
|
|
|
|
export
|
|
web3Types, deques, base, DepositTreeSnapshot
|
|
|
|
logScope:
|
|
topics = "eth1"
|
|
|
|
type
|
|
PubKeyBytes = DynamicBytes[48, 48]
|
|
WithdrawalCredentialsBytes = DynamicBytes[32, 32]
|
|
SignatureBytes = DynamicBytes[96, 96]
|
|
Int64LeBytes = DynamicBytes[8, 8]
|
|
|
|
contract(DepositContract):
|
|
proc deposit(pubkey: PubKeyBytes,
|
|
withdrawalCredentials: WithdrawalCredentialsBytes,
|
|
signature: SignatureBytes,
|
|
deposit_data_root: FixedBytes[32])
|
|
|
|
proc get_deposit_root(): FixedBytes[32]
|
|
proc get_deposit_count(): Int64LeBytes
|
|
|
|
proc DepositEvent(pubkey: PubKeyBytes,
|
|
withdrawalCredentials: WithdrawalCredentialsBytes,
|
|
amount: Int64LeBytes,
|
|
signature: SignatureBytes,
|
|
index: Int64LeBytes) {.event.}
|
|
|
|
const
|
|
web3Timeouts = 60.seconds
|
|
hasDepositRootChecks = defined(has_deposit_root_checks)
|
|
|
|
targetBlocksPerLogsRequest = 5000'u64 # This is roughly a day of Eth1 blocks
|
|
|
|
type
|
|
Eth1BlockNumber* = uint64
|
|
Eth1BlockTimestamp* = uint64
|
|
Eth1BlockHeader = web3Types.BlockHeader
|
|
|
|
GenesisStateRef = ref phase0.BeaconState
|
|
|
|
Eth1Block* = ref object
|
|
hash*: Eth2Digest
|
|
number*: Eth1BlockNumber
|
|
timestamp*: Eth1BlockTimestamp
|
|
## Basic properties of the block
|
|
## These must be initialized in the constructor
|
|
|
|
deposits*: seq[DepositData]
|
|
## Deposits inside this particular block
|
|
|
|
depositRoot*: Eth2Digest
|
|
depositCount*: uint64
|
|
## Global deposits count and hash tree root of the entire sequence
|
|
## These are computed when the block is added to the chain (see `addBlock`)
|
|
|
|
Eth1Chain* = object
|
|
db: BeaconChainDB
|
|
cfg: RuntimeConfig
|
|
finalizedBlockHash: Eth2Digest
|
|
finalizedDepositsMerkleizer: DepositsMerkleizer
|
|
## The latest block that reached a 50% majority vote from
|
|
## the Eth2 validators according to the follow distance and
|
|
## the ETH1_VOTING_PERIOD
|
|
|
|
blocks*: Deque[Eth1Block]
|
|
## A non-forkable chain of blocks ending at the block with
|
|
## ETH1_FOLLOW_DISTANCE offset from the head.
|
|
|
|
blocksByHash: Table[BlockHash, Eth1Block]
|
|
|
|
headMerkleizer: DepositsMerkleizer
|
|
## Merkleizer state after applying all `blocks`
|
|
|
|
hasConsensusViolation: bool
|
|
## The local chain contradicts the observed consensus on the network
|
|
|
|
Eth1MonitorState = enum
|
|
Initialized
|
|
Started
|
|
ReadyToRestartToPrimary
|
|
Failed
|
|
Stopping
|
|
Stopped
|
|
|
|
Eth1Monitor* = ref object
|
|
state: Eth1MonitorState
|
|
startIdx: int
|
|
web3Urls: seq[string]
|
|
eth1Network: Option[Eth1Network]
|
|
depositContractAddress*: Eth1Address
|
|
depositContractDeployedAt: BlockHashOrNumber
|
|
forcePolling: bool
|
|
jwtSecret: Option[seq[byte]]
|
|
blocksPerLogsRequest: uint64
|
|
|
|
dataProvider: Web3DataProviderRef
|
|
latestEth1Block: Option[FullBlockId]
|
|
|
|
depositsChain: Eth1Chain
|
|
eth1Progress: AsyncEvent
|
|
|
|
exchangedConfiguration*: bool
|
|
|
|
runFut: Future[void]
|
|
stopFut: Future[void]
|
|
getBeaconTime: GetBeaconTimeFn
|
|
|
|
Web3DataProvider* = object
|
|
url: string
|
|
web3: Web3
|
|
ns: Sender[DepositContract]
|
|
blockHeadersSubscription: Subscription
|
|
|
|
Web3DataProviderRef* = ref Web3DataProvider
|
|
|
|
FullBlockId* = object
|
|
number: Eth1BlockNumber
|
|
hash: BlockHash
|
|
|
|
DataProviderFailure* = object of CatchableError
|
|
CorruptDataProvider* = object of DataProviderFailure
|
|
DataProviderTimeout* = object of DataProviderFailure
|
|
|
|
DisconnectHandler* = proc () {.gcsafe, raises: [Defect].}
|
|
|
|
DepositEventHandler* = proc (
|
|
pubkey: PubKeyBytes,
|
|
withdrawalCredentials: WithdrawalCredentialsBytes,
|
|
amount: Int64LeBytes,
|
|
signature: SignatureBytes,
|
|
merkleTreeIndex: Int64LeBytes,
|
|
j: JsonNode) {.gcsafe, raises: [Defect].}
|
|
|
|
BlockProposalEth1Data* = object
|
|
vote*: Eth1Data
|
|
deposits*: seq[Deposit]
|
|
hasMissingDeposits*: bool
|
|
|
|
declareCounter failed_web3_requests,
|
|
"Failed web3 requests"
|
|
|
|
declareGauge eth1_latest_head,
|
|
"The highest Eth1 block number observed on the network"
|
|
|
|
declareGauge eth1_synced_head,
|
|
"Block number of the highest synchronized block according to follow distance"
|
|
|
|
declareGauge eth1_finalized_head,
|
|
"Block number of the highest Eth1 block finalized by Eth2 consensus"
|
|
|
|
declareGauge eth1_finalized_deposits,
|
|
"Number of deposits that were finalized by the Eth2 consensus"
|
|
|
|
declareGauge eth1_chain_len,
|
|
"The length of the in-memory chain of Eth1 blocks"
|
|
|
|
template cfg(m: Eth1Monitor): auto =
|
|
m.depositsChain.cfg
|
|
|
|
template depositChainBlocks*(m: Eth1Monitor): Deque[Eth1Block] =
|
|
m.depositsChain.blocks
|
|
|
|
template finalizedDepositsMerkleizer(m: Eth1Monitor): auto =
|
|
m.depositsChain.finalizedDepositsMerkleizer
|
|
|
|
template headMerkleizer(m: Eth1Monitor): auto =
|
|
m.depositsChain.headMerkleizer
|
|
|
|
proc fixupWeb3Urls*(web3Url: var string) =
|
|
var normalizedUrl = toLowerAscii(web3Url)
|
|
if not (normalizedUrl.startsWith("https://") or
|
|
normalizedUrl.startsWith("http://") or
|
|
normalizedUrl.startsWith("wss://") or
|
|
normalizedUrl.startsWith("ws://")):
|
|
warn "The Web3 URL does not specify a protocol. Assuming a WebSocket server", web3Url
|
|
web3Url = "ws://" & web3Url
|
|
|
|
template toGaugeValue(x: Quantity): int64 =
|
|
toGaugeValue(distinctBase x)
|
|
|
|
# TODO: Add cfg validation
|
|
# MIN_GENESIS_ACTIVE_VALIDATOR_COUNT should be larger than SLOTS_PER_EPOCH
|
|
# doAssert SECONDS_PER_ETH1_BLOCK * cfg.ETH1_FOLLOW_DISTANCE < GENESIS_DELAY,
|
|
# "Invalid configuration: GENESIS_DELAY is set too low"
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.1/specs/phase0/validator.md#get_eth1_data
|
|
func compute_time_at_slot(genesis_time: uint64, slot: Slot): uint64 =
|
|
genesis_time + slot * SECONDS_PER_SLOT
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.1/specs/phase0/validator.md#get_eth1_data
|
|
func voting_period_start_time(state: ForkedHashedBeaconState): uint64 =
|
|
let eth1_voting_period_start_slot =
|
|
getStateField(state, slot) - getStateField(state, slot) mod
|
|
SLOTS_PER_ETH1_VOTING_PERIOD.uint64
|
|
compute_time_at_slot(
|
|
getStateField(state, genesis_time), eth1_voting_period_start_slot)
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.1/specs/phase0/validator.md#get_eth1_data
|
|
func is_candidate_block(cfg: RuntimeConfig,
|
|
blk: Eth1Block,
|
|
period_start: uint64): bool =
|
|
(blk.timestamp + cfg.SECONDS_PER_ETH1_BLOCK * cfg.ETH1_FOLLOW_DISTANCE <= period_start) and
|
|
(blk.timestamp + cfg.SECONDS_PER_ETH1_BLOCK * cfg.ETH1_FOLLOW_DISTANCE * 2 >= period_start)
|
|
|
|
func asEth2Digest*(x: BlockHash): Eth2Digest =
|
|
Eth2Digest(data: array[32, byte](x))
|
|
|
|
template asBlockHash*(x: Eth2Digest): BlockHash =
|
|
BlockHash(x.data)
|
|
|
|
from ../spec/datatypes/capella import ExecutionPayload, Withdrawal
|
|
|
|
func asConsensusWithdrawal(w: WithdrawalV1): capella.Withdrawal =
|
|
capella.Withdrawal(
|
|
index: w.index.uint64,
|
|
validator_index: w.validatorIndex.uint64,
|
|
address: ExecutionAddress(data: w.address.distinctBase),
|
|
amount: w.amount.uint64)
|
|
|
|
func asEngineWithdrawal(w: capella.Withdrawal): WithdrawalV1 =
|
|
WithdrawalV1(
|
|
index: Quantity(w.index),
|
|
validatorIndex: Quantity(w.validator_index),
|
|
address: Address(w.address.data),
|
|
amount: Quantity(w.amount))
|
|
|
|
func asConsensusExecutionPayload*(rpcExecutionPayload: ExecutionPayloadV1):
|
|
bellatrix.ExecutionPayload =
|
|
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
|
|
bellatrix.Transaction.init(tt.distinctBase)
|
|
|
|
bellatrix.ExecutionPayload(
|
|
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
|
|
feeRecipient:
|
|
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
|
|
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
|
|
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
|
|
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
|
|
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
|
|
block_number: rpcExecutionPayload.blockNumber.uint64,
|
|
gas_limit: rpcExecutionPayload.gasLimit.uint64,
|
|
gas_used: rpcExecutionPayload.gasUsed.uint64,
|
|
timestamp: rpcExecutionPayload.timestamp.uint64,
|
|
extra_data:
|
|
List[byte, MAX_EXTRA_DATA_BYTES].init(
|
|
rpcExecutionPayload.extraData.distinctBase),
|
|
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
|
|
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
|
|
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.transactions, it.getTransaction)))
|
|
|
|
func asConsensusExecutionPayload*(rpcExecutionPayload: ExecutionPayloadV2):
|
|
capella.ExecutionPayload =
|
|
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
|
|
bellatrix.Transaction.init(tt.distinctBase)
|
|
|
|
capella.ExecutionPayload(
|
|
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
|
|
feeRecipient:
|
|
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
|
|
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
|
|
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
|
|
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
|
|
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
|
|
block_number: rpcExecutionPayload.blockNumber.uint64,
|
|
gas_limit: rpcExecutionPayload.gasLimit.uint64,
|
|
gas_used: rpcExecutionPayload.gasUsed.uint64,
|
|
timestamp: rpcExecutionPayload.timestamp.uint64,
|
|
extra_data:
|
|
List[byte, MAX_EXTRA_DATA_BYTES].init(
|
|
rpcExecutionPayload.extraData.distinctBase),
|
|
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
|
|
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
|
|
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.transactions, it.getTransaction)),
|
|
withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal)))
|
|
|
|
func asConsensusExecutionPayload*(rpcExecutionPayload: ExecutionPayloadV3):
|
|
eip4844.ExecutionPayload =
|
|
template getTransaction(tt: TypedTransaction): bellatrix.Transaction =
|
|
bellatrix.Transaction.init(tt.distinctBase)
|
|
|
|
eip4844.ExecutionPayload(
|
|
parent_hash: rpcExecutionPayload.parentHash.asEth2Digest,
|
|
feeRecipient:
|
|
ExecutionAddress(data: rpcExecutionPayload.feeRecipient.distinctBase),
|
|
state_root: rpcExecutionPayload.stateRoot.asEth2Digest,
|
|
receipts_root: rpcExecutionPayload.receiptsRoot.asEth2Digest,
|
|
logs_bloom: BloomLogs(data: rpcExecutionPayload.logsBloom.distinctBase),
|
|
prev_randao: rpcExecutionPayload.prevRandao.asEth2Digest,
|
|
block_number: rpcExecutionPayload.blockNumber.uint64,
|
|
gas_limit: rpcExecutionPayload.gasLimit.uint64,
|
|
gas_used: rpcExecutionPayload.gasUsed.uint64,
|
|
timestamp: rpcExecutionPayload.timestamp.uint64,
|
|
extra_data:
|
|
List[byte, MAX_EXTRA_DATA_BYTES].init(
|
|
rpcExecutionPayload.extraData.distinctBase),
|
|
base_fee_per_gas: rpcExecutionPayload.baseFeePerGas,
|
|
excess_data_gas: rpcExecutionPayload.excessDataGas,
|
|
block_hash: rpcExecutionPayload.blockHash.asEth2Digest,
|
|
transactions: List[bellatrix.Transaction, MAX_TRANSACTIONS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.transactions, it.getTransaction)),
|
|
withdrawals: List[capella.Withdrawal, MAX_WITHDRAWALS_PER_PAYLOAD].init(
|
|
mapIt(rpcExecutionPayload.withdrawals, it.asConsensusWithdrawal)))
|
|
|
|
func asEngineExecutionPayload*(executionPayload: bellatrix.ExecutionPayload):
|
|
ExecutionPayloadV1 =
|
|
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
|
|
TypedTransaction(tt.distinctBase)
|
|
|
|
engine_api.ExecutionPayloadV1(
|
|
parentHash: executionPayload.parent_hash.asBlockHash,
|
|
feeRecipient: Address(executionPayload.fee_recipient.data),
|
|
stateRoot: executionPayload.state_root.asBlockHash,
|
|
receiptsRoot: executionPayload.receipts_root.asBlockHash,
|
|
logsBloom:
|
|
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
|
|
prevRandao: executionPayload.prev_randao.asBlockHash,
|
|
blockNumber: Quantity(executionPayload.block_number),
|
|
gasLimit: Quantity(executionPayload.gas_limit),
|
|
gasUsed: Quantity(executionPayload.gas_used),
|
|
timestamp: Quantity(executionPayload.timestamp),
|
|
extraData:
|
|
DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
|
|
baseFeePerGas: executionPayload.base_fee_per_gas,
|
|
blockHash: executionPayload.block_hash.asBlockHash,
|
|
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction))
|
|
|
|
func asEngineExecutionPayload*(executionPayload: capella.ExecutionPayload):
|
|
ExecutionPayloadV2 =
|
|
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
|
|
TypedTransaction(tt.distinctBase)
|
|
|
|
engine_api.ExecutionPayloadV2(
|
|
parentHash: executionPayload.parent_hash.asBlockHash,
|
|
feeRecipient: Address(executionPayload.fee_recipient.data),
|
|
stateRoot: executionPayload.state_root.asBlockHash,
|
|
receiptsRoot: executionPayload.receipts_root.asBlockHash,
|
|
logsBloom:
|
|
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
|
|
prevRandao: executionPayload.prev_randao.asBlockHash,
|
|
blockNumber: Quantity(executionPayload.block_number),
|
|
gasLimit: Quantity(executionPayload.gas_limit),
|
|
gasUsed: Quantity(executionPayload.gas_used),
|
|
timestamp: Quantity(executionPayload.timestamp),
|
|
extraData:
|
|
DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
|
|
baseFeePerGas: executionPayload.base_fee_per_gas,
|
|
blockHash: executionPayload.block_hash.asBlockHash,
|
|
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction),
|
|
withdrawals: mapIt(executionPayload.withdrawals, it.asEngineWithdrawal))
|
|
|
|
func asEngineExecutionPayload*(executionPayload: eip4844.ExecutionPayload):
|
|
ExecutionPayloadV3 =
|
|
template getTypedTransaction(tt: bellatrix.Transaction): TypedTransaction =
|
|
TypedTransaction(tt.distinctBase)
|
|
|
|
engine_api.ExecutionPayloadV3(
|
|
parentHash: executionPayload.parent_hash.asBlockHash,
|
|
feeRecipient: Address(executionPayload.fee_recipient.data),
|
|
stateRoot: executionPayload.state_root.asBlockHash,
|
|
receiptsRoot: executionPayload.receipts_root.asBlockHash,
|
|
logsBloom:
|
|
FixedBytes[BYTES_PER_LOGS_BLOOM](executionPayload.logs_bloom.data),
|
|
prevRandao: executionPayload.prev_randao.asBlockHash,
|
|
blockNumber: Quantity(executionPayload.block_number),
|
|
gasLimit: Quantity(executionPayload.gas_limit),
|
|
gasUsed: Quantity(executionPayload.gas_used),
|
|
timestamp: Quantity(executionPayload.timestamp),
|
|
extraData:
|
|
DynamicBytes[0, MAX_EXTRA_DATA_BYTES](executionPayload.extra_data),
|
|
baseFeePerGas: executionPayload.base_fee_per_gas,
|
|
excessDataGas: executionPayload.excess_data_gas,
|
|
blockHash: executionPayload.block_hash.asBlockHash,
|
|
transactions: mapIt(executionPayload.transactions, it.getTypedTransaction),
|
|
withdrawals: mapIt(executionPayload.withdrawals, it.asEngineWithdrawal))
|
|
|
|
func shortLog*(b: Eth1Block): string =
|
|
try:
|
|
&"{b.number}:{shortLog b.hash}(deposits = {b.depositCount})"
|
|
except ValueError as exc: raiseAssert exc.msg
|
|
|
|
template findBlock(chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block =
|
|
getOrDefault(chain.blocksByHash, asBlockHash(eth1Data.block_hash), nil)
|
|
|
|
func makeSuccessorWithoutDeposits(existingBlock: Eth1Block,
|
|
successor: BlockObject): Eth1Block =
|
|
result = Eth1Block(
|
|
hash: successor.hash.asEth2Digest,
|
|
number: Eth1BlockNumber successor.number,
|
|
timestamp: Eth1BlockTimestamp successor.timestamp)
|
|
|
|
func latestCandidateBlock(chain: Eth1Chain, periodStart: uint64): Eth1Block =
|
|
for i in countdown(chain.blocks.len - 1, 0):
|
|
let blk = chain.blocks[i]
|
|
if is_candidate_block(chain.cfg, blk, periodStart):
|
|
return blk
|
|
|
|
proc popFirst(chain: var Eth1Chain) =
|
|
let removed = chain.blocks.popFirst
|
|
chain.blocksByHash.del removed.hash.asBlockHash
|
|
eth1_chain_len.set chain.blocks.len.int64
|
|
|
|
func getDepositsRoot*(m: DepositsMerkleizer): Eth2Digest =
|
|
mixInLength(m.getFinalHash, int m.totalChunks)
|
|
|
|
proc addBlock*(chain: var Eth1Chain, newBlock: Eth1Block) =
|
|
for deposit in newBlock.deposits:
|
|
chain.headMerkleizer.addChunk hash_tree_root(deposit).data
|
|
|
|
newBlock.depositCount = chain.headMerkleizer.getChunkCount
|
|
newBlock.depositRoot = chain.headMerkleizer.getDepositsRoot
|
|
|
|
chain.blocks.addLast newBlock
|
|
chain.blocksByHash[newBlock.hash.asBlockHash] = newBlock
|
|
|
|
eth1_chain_len.set chain.blocks.len.int64
|
|
|
|
func toVoteData(blk: Eth1Block): Eth1Data =
|
|
Eth1Data(
|
|
deposit_root: blk.depositRoot,
|
|
deposit_count: blk.depositCount,
|
|
block_hash: blk.hash)
|
|
|
|
func hash*(x: Eth1Data): Hash =
|
|
hash(x.block_hash)
|
|
|
|
template awaitWithRetries*[T](lazyFutExpr: Future[T],
|
|
retries = 3,
|
|
timeout = web3Timeouts): untyped =
|
|
const
|
|
reqType = astToStr(lazyFutExpr)
|
|
var
|
|
retryDelayMs = 16000
|
|
f: Future[T]
|
|
attempts = 0
|
|
|
|
while true:
|
|
f = lazyFutExpr
|
|
yield f or sleepAsync(timeout)
|
|
if not f.finished:
|
|
await cancelAndWait(f)
|
|
elif f.failed:
|
|
when not (f.error of CatchableError):
|
|
static: doAssert false, "f.error not CatchableError"
|
|
debug "Web3 request failed", req = reqType, err = f.error.msg
|
|
inc failed_web3_requests
|
|
else:
|
|
break
|
|
|
|
inc attempts
|
|
if attempts >= retries:
|
|
var errorMsg = reqType & " failed " & $retries & " times"
|
|
if f.failed: errorMsg &= ". Last error: " & f.error.msg
|
|
raise newException(DataProviderFailure, errorMsg)
|
|
|
|
await sleepAsync(chronos.milliseconds(retryDelayMs))
|
|
retryDelayMs *= 2
|
|
|
|
read(f)
|
|
|
|
proc close(p: Web3DataProviderRef): Future[void] {.async.} =
|
|
if p.blockHeadersSubscription != nil:
|
|
try:
|
|
awaitWithRetries(p.blockHeadersSubscription.unsubscribe())
|
|
except CatchableError:
|
|
debug "Failed to clean up block headers subscription properly"
|
|
|
|
awaitWithTimeout(p.web3.close(), 30.seconds):
|
|
debug "Failed to close data provider in time"
|
|
|
|
proc getBlockByHash(p: Web3DataProviderRef, hash: BlockHash):
|
|
Future[BlockObject] =
|
|
return p.web3.provider.eth_getBlockByHash(hash, false)
|
|
|
|
proc getBlockByNumber*(p: Web3DataProviderRef,
|
|
number: Eth1BlockNumber): Future[BlockObject] =
|
|
let hexNumber = try: &"0x{number:X}" # No leading 0's!
|
|
except ValueError as exc: raiseAssert exc.msg # Never fails
|
|
p.web3.provider.eth_getBlockByNumber(hexNumber, false)
|
|
|
|
proc getPayloadV1*(
|
|
p: Eth1Monitor, payloadId: bellatrix.PayloadID):
|
|
Future[engine_api.ExecutionPayloadV1] =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.isNil or p.dataProvider.isNil:
|
|
let epr = newFuture[engine_api.ExecutionPayloadV1]("getPayload")
|
|
epr.complete(default(engine_api.ExecutionPayloadV1))
|
|
return epr
|
|
|
|
p.dataProvider.web3.provider.engine_getPayloadV1(FixedBytes[8] payloadId)
|
|
|
|
proc getPayloadV2*(
|
|
p: Eth1Monitor, payloadId: bellatrix.PayloadID):
|
|
Future[engine_api.ExecutionPayloadV2] {.async.} =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.isNil or p.dataProvider.isNil:
|
|
return default(engine_api.ExecutionPayloadV2)
|
|
|
|
return (await p.dataProvider.web3.provider.engine_getPayloadV2(
|
|
FixedBytes[8] payloadId)).executionPayload
|
|
|
|
proc getPayloadV3*(
|
|
p: Eth1Monitor, payloadId: bellatrix.PayloadID):
|
|
Future[engine_api.ExecutionPayloadV3] {.async.} =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.isNil or p.dataProvider.isNil:
|
|
return default(engine_api.ExecutionPayloadV3)
|
|
|
|
return (await p.dataProvider.web3.provider.engine_getPayloadV3(
|
|
FixedBytes[8] payloadId)).executionPayload
|
|
|
|
proc getBlobsBundleV1*(
|
|
p: Eth1Monitor, payloadId: bellatrix.PayloadID):
|
|
Future[engine_api.BlobsBundleV1] {.async.} =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.isNil or p.dataProvider.isNil:
|
|
return default(engine_api.BlobsBundleV1)
|
|
|
|
return (await p.dataProvider.web3.provider.engine_getBlobsBundleV1(
|
|
FixedBytes[8] payloadId))
|
|
|
|
proc newPayload*(p: Eth1Monitor, payload: engine_api.ExecutionPayloadV1):
|
|
Future[PayloadStatusV1] =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.dataProvider.isNil:
|
|
let epr = newFuture[PayloadStatusV1]("newPayload")
|
|
epr.complete(PayloadStatusV1(status: PayloadExecutionStatus.syncing))
|
|
return epr
|
|
|
|
p.dataProvider.web3.provider.engine_newPayloadV1(payload)
|
|
|
|
proc newPayload*(p: Eth1Monitor, payload: engine_api.ExecutionPayloadV2):
|
|
Future[PayloadStatusV1] =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.dataProvider.isNil:
|
|
let epr = newFuture[PayloadStatusV1]("newPayload")
|
|
epr.complete(PayloadStatusV1(status: PayloadExecutionStatus.syncing))
|
|
return epr
|
|
|
|
p.dataProvider.web3.provider.engine_newPayloadV2(payload)
|
|
|
|
proc newPayload*(p: Eth1Monitor, payload: engine_api.ExecutionPayloadV3):
|
|
Future[PayloadStatusV1] =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.dataProvider.isNil:
|
|
let epr = newFuture[PayloadStatusV1]("newPayload")
|
|
epr.complete(PayloadStatusV1(status: PayloadExecutionStatus.syncing))
|
|
return epr
|
|
|
|
p.dataProvider.web3.provider.engine_newPayloadV3(payload)
|
|
|
|
proc forkchoiceUpdated*(
|
|
p: Eth1Monitor, headBlock, safeBlock, finalizedBlock: Eth2Digest):
|
|
Future[engine_api.ForkchoiceUpdatedResponse] =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.isNil or p.dataProvider.isNil:
|
|
let fcuR =
|
|
newFuture[engine_api.ForkchoiceUpdatedResponse]("forkchoiceUpdated")
|
|
fcuR.complete(engine_api.ForkchoiceUpdatedResponse(
|
|
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing)))
|
|
return fcuR
|
|
|
|
p.dataProvider.web3.provider.engine_forkchoiceUpdatedV1(
|
|
ForkchoiceStateV1(
|
|
headBlockHash: headBlock.asBlockHash,
|
|
safeBlockHash: safeBlock.asBlockHash,
|
|
finalizedBlockHash: finalizedBlock.asBlockHash),
|
|
none(engine_api.PayloadAttributesV1))
|
|
|
|
proc forkchoiceUpdated*(
|
|
p: Eth1Monitor, headBlock, safeBlock, finalizedBlock: Eth2Digest,
|
|
timestamp: uint64, randomData: array[32, byte],
|
|
suggestedFeeRecipient: Eth1Address,
|
|
withdrawals: Opt[seq[capella.Withdrawal]]):
|
|
Future[engine_api.ForkchoiceUpdatedResponse] =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.isNil or p.dataProvider.isNil:
|
|
let fcuR =
|
|
newFuture[engine_api.ForkchoiceUpdatedResponse]("forkchoiceUpdated")
|
|
fcuR.complete(engine_api.ForkchoiceUpdatedResponse(
|
|
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing)))
|
|
return fcuR
|
|
|
|
let forkchoiceState = ForkchoiceStateV1(
|
|
headBlockHash: headBlock.asBlockHash,
|
|
safeBlockHash: safeBlock.asBlockHash,
|
|
finalizedBlockHash: finalizedBlock.asBlockHash)
|
|
|
|
if withdrawals.isNone:
|
|
p.dataProvider.web3.provider.engine_forkchoiceUpdatedV1(
|
|
forkchoiceState,
|
|
some(engine_api.PayloadAttributesV1(
|
|
timestamp: Quantity timestamp,
|
|
prevRandao: FixedBytes[32] randomData,
|
|
suggestedFeeRecipient: suggestedFeeRecipient)))
|
|
else:
|
|
p.dataProvider.web3.provider.engine_forkchoiceUpdatedV2(
|
|
forkchoiceState,
|
|
some(engine_api.PayloadAttributesV2(
|
|
timestamp: Quantity timestamp,
|
|
prevRandao: FixedBytes[32] randomData,
|
|
suggestedFeeRecipient: suggestedFeeRecipient,
|
|
withdrawals: mapIt(withdrawals.get, it.asEngineWithdrawal))))
|
|
|
|
# TODO can't be defined within exchangeTransitionConfiguration
|
|
proc `==`(x, y: Quantity): bool {.borrow, noSideEffect.}
|
|
|
|
type
|
|
EtcStatus {.pure.} = enum
|
|
exchangeError
|
|
mismatch
|
|
match
|
|
|
|
proc exchangeTransitionConfiguration*(p: Eth1Monitor): Future[EtcStatus] {.async.} =
|
|
# Eth1 monitor can recycle connections without (external) warning; at least,
|
|
# don't crash.
|
|
if p.isNil:
|
|
debug "exchangeTransitionConfiguration: nil Eth1Monitor"
|
|
return EtcStatus.exchangeError
|
|
|
|
let dataProvider = p.dataProvider
|
|
if dataProvider.isNil:
|
|
return EtcStatus.exchangeError
|
|
|
|
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.1/src/engine/specification.md#engine_exchangetransitionconfigurationv1
|
|
let consensusCfg = TransitionConfigurationV1(
|
|
terminalTotalDifficulty: p.depositsChain.cfg.TERMINAL_TOTAL_DIFFICULTY,
|
|
terminalBlockHash: p.depositsChain.cfg.TERMINAL_BLOCK_HASH,
|
|
terminalBlockNumber: Quantity 0)
|
|
let executionCfg =
|
|
try:
|
|
awaitWithRetries(
|
|
dataProvider.web3.provider.engine_exchangeTransitionConfigurationV1(
|
|
consensusCfg),
|
|
timeout = 1.seconds)
|
|
except CatchableError as err:
|
|
warn "Failed to exchange transition configuration", err = err.msg
|
|
return EtcStatus.exchangeError
|
|
|
|
return
|
|
if consensusCfg.terminalTotalDifficulty != executionCfg.terminalTotalDifficulty:
|
|
error "Engine API configured with different terminal total difficulty",
|
|
engineAPI_value = executionCfg.terminalTotalDifficulty,
|
|
localValue = consensusCfg.terminalTotalDifficulty
|
|
EtcStatus.mismatch
|
|
elif consensusCfg.terminalBlockNumber != executionCfg.terminalBlockNumber:
|
|
warn "Engine API reporting different terminal block number",
|
|
engineAPI_value = executionCfg.terminalBlockNumber.uint64,
|
|
localValue = consensusCfg.terminalBlockNumber.uint64
|
|
EtcStatus.mismatch
|
|
elif consensusCfg.terminalBlockHash != executionCfg.terminalBlockHash:
|
|
warn "Engine API reporting different terminal block hash",
|
|
engineAPI_value = executionCfg.terminalBlockHash,
|
|
localValue = consensusCfg.terminalBlockHash
|
|
EtcStatus.mismatch
|
|
else:
|
|
if not p.exchangedConfiguration:
|
|
# Log successful engine configuration exchange once at startup
|
|
p.exchangedConfiguration = true
|
|
info "Exchanged engine configuration",
|
|
terminalTotalDifficulty = executionCfg.terminalTotalDifficulty,
|
|
terminalBlockHash = executionCfg.terminalBlockHash,
|
|
terminalBlockNumber = executionCfg.terminalBlockNumber.uint64
|
|
EtcStatus.match
|
|
|
|
template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped =
|
|
var res: ValueType
|
|
fromJson(j[fieldName], fieldName, res)
|
|
res
|
|
|
|
template init[N: static int](T: type DynamicBytes[N, N]): T =
|
|
T newSeq[byte](N)
|
|
|
|
proc fetchTimestampWithRetries(blkParam: Eth1Block, p: Web3DataProviderRef) {.async.} =
|
|
let blk = blkParam
|
|
let web3block = awaitWithRetries(
|
|
p.getBlockByHash(blk.hash.asBlockHash))
|
|
blk.timestamp = Eth1BlockTimestamp web3block.timestamp
|
|
|
|
func depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] {.
|
|
raises: [Defect, CatchableError].} =
|
|
if depositsList.kind != JArray:
|
|
raise newException(CatchableError,
|
|
"Web3 provider didn't return a list of deposit events")
|
|
|
|
var lastEth1Block: Eth1Block
|
|
|
|
for logEvent in depositsList:
|
|
let
|
|
blockNumber = Eth1BlockNumber readJsonField(logEvent, "blockNumber", Quantity)
|
|
blockHash = readJsonField(logEvent, "blockHash", BlockHash)
|
|
logData = strip0xPrefix(logEvent["data"].getStr)
|
|
|
|
if lastEth1Block == nil or lastEth1Block.number != blockNumber:
|
|
lastEth1Block = Eth1Block(
|
|
hash: blockHash.asEth2Digest,
|
|
number: blockNumber
|
|
# The `timestamp` is set in `syncBlockRange` immediately
|
|
# after calling this function, because we don't want to
|
|
# make this function `async`
|
|
)
|
|
|
|
result.add lastEth1Block
|
|
|
|
var
|
|
pubkey = init PubKeyBytes
|
|
withdrawalCredentials = init WithdrawalCredentialsBytes
|
|
amount = init Int64LeBytes
|
|
signature = init SignatureBytes
|
|
index = init Int64LeBytes
|
|
|
|
var offset = 0
|
|
offset += decode(logData, offset, pubkey)
|
|
offset += decode(logData, offset, withdrawalCredentials)
|
|
offset += decode(logData, offset, amount)
|
|
offset += decode(logData, offset, signature)
|
|
offset += decode(logData, offset, index)
|
|
|
|
if pubkey.len != 48 or
|
|
withdrawalCredentials.len != 32 or
|
|
amount.len != 8 or
|
|
signature.len != 96 or
|
|
index.len != 8:
|
|
raise newException(CorruptDataProvider, "Web3 provider supplied invalid deposit logs")
|
|
|
|
lastEth1Block.deposits.add DepositData(
|
|
pubkey: ValidatorPubKey.init(pubkey.toArray),
|
|
withdrawal_credentials: Eth2Digest(data: withdrawalCredentials.toArray),
|
|
amount: bytes_to_uint64(amount.toArray),
|
|
signature: ValidatorSig.init(signature.toArray))
|
|
|
|
type
|
|
DepositContractDataStatus = enum
|
|
Fetched
|
|
VerifiedCorrect
|
|
DepositRootIncorrect
|
|
DepositRootUnavailable
|
|
DepositCountIncorrect
|
|
DepositCountUnavailable
|
|
|
|
template awaitOrRaiseOnTimeout[T](fut: Future[T],
|
|
timeout: Duration): T =
|
|
awaitWithTimeout(fut, timeout):
|
|
raise newException(DataProviderTimeout, "Timeout")
|
|
|
|
when hasDepositRootChecks:
|
|
const
|
|
contractCallTimeout = 60.seconds
|
|
|
|
proc fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block):
|
|
Future[DepositContractDataStatus] {.async.} =
|
|
let
|
|
depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number)
|
|
rawCount = p.ns.get_deposit_count.call(blockNumber = blk.number)
|
|
|
|
try:
|
|
let fetchedRoot = asEth2Digest(
|
|
awaitOrRaiseOnTimeout(depositRoot, contractCallTimeout))
|
|
if blk.depositRoot.isZero:
|
|
blk.depositRoot = fetchedRoot
|
|
result = Fetched
|
|
elif blk.depositRoot == fetchedRoot:
|
|
result = VerifiedCorrect
|
|
else:
|
|
result = DepositRootIncorrect
|
|
except CatchableError as err:
|
|
debug "Failed to fetch deposits root",
|
|
blockNumber = blk.number,
|
|
err = err.msg
|
|
result = DepositRootUnavailable
|
|
|
|
try:
|
|
let fetchedCount = bytes_to_uint64(
|
|
awaitOrRaiseOnTimeout(rawCount, contractCallTimeout).toArray)
|
|
if blk.depositCount == 0:
|
|
blk.depositCount = fetchedCount
|
|
elif blk.depositCount != fetchedCount:
|
|
result = DepositCountIncorrect
|
|
except CatchableError as err:
|
|
debug "Failed to fetch deposits count",
|
|
blockNumber = blk.number,
|
|
err = err.msg
|
|
result = DepositCountUnavailable
|
|
|
|
proc onBlockHeaders(p: Web3DataProviderRef,
|
|
blockHeaderHandler: BlockHeaderHandler,
|
|
errorHandler: SubscriptionErrorHandler) {.async.} =
|
|
info "Waiting for new Eth1 block headers"
|
|
|
|
p.blockHeadersSubscription = awaitWithRetries(
|
|
p.web3.subscribeForBlockHeaders(blockHeaderHandler, errorHandler))
|
|
|
|
proc pruneOldBlocks(chain: var Eth1Chain, depositIndex: uint64) =
|
|
## Called on block finalization to delete old and now redundant data.
|
|
let initialChunks = chain.finalizedDepositsMerkleizer.getChunkCount
|
|
var lastBlock: Eth1Block
|
|
|
|
while chain.blocks.len > 0:
|
|
let blk = chain.blocks.peekFirst
|
|
if blk.depositCount >= depositIndex:
|
|
break
|
|
else:
|
|
for deposit in blk.deposits:
|
|
chain.finalizedDepositsMerkleizer.addChunk hash_tree_root(deposit).data
|
|
chain.popFirst()
|
|
lastBlock = blk
|
|
|
|
if chain.finalizedDepositsMerkleizer.getChunkCount > initialChunks:
|
|
chain.finalizedBlockHash = lastBlock.hash
|
|
chain.db.putDepositTreeSnapshot DepositTreeSnapshot(
|
|
eth1Block: lastBlock.hash,
|
|
depositContractState: chain.finalizedDepositsMerkleizer.toDepositContractState,
|
|
blockHeight: lastBlock.number,
|
|
)
|
|
|
|
eth1_finalized_head.set lastBlock.number.toGaugeValue
|
|
eth1_finalized_deposits.set lastBlock.depositCount.toGaugeValue
|
|
|
|
debug "Eth1 blocks pruned",
|
|
newTailBlock = lastBlock.hash,
|
|
depositsCount = lastBlock.depositCount
|
|
|
|
func advanceMerkleizer(chain: Eth1Chain,
|
|
merkleizer: var DepositsMerkleizer,
|
|
depositIndex: uint64): bool =
|
|
if chain.blocks.len == 0:
|
|
return depositIndex == merkleizer.getChunkCount
|
|
|
|
if chain.blocks.peekLast.depositCount < depositIndex:
|
|
return false
|
|
|
|
let
|
|
firstBlock = chain.blocks[0]
|
|
depositsInLastPrunedBlock = firstBlock.depositCount -
|
|
firstBlock.deposits.lenu64
|
|
|
|
# advanceMerkleizer should always be called shortly after prunning the chain
|
|
doAssert depositsInLastPrunedBlock == merkleizer.getChunkCount
|
|
|
|
for blk in chain.blocks:
|
|
for deposit in blk.deposits:
|
|
if merkleizer.getChunkCount < depositIndex:
|
|
merkleizer.addChunk hash_tree_root(deposit).data
|
|
else:
|
|
return true
|
|
|
|
return merkleizer.getChunkCount == depositIndex
|
|
|
|
iterator getDepositsRange*(chain: Eth1Chain, first, last: uint64): DepositData =
|
|
# TODO It's possible to make this faster by performing binary search that
|
|
# will locate the blocks holding the `first` and `last` indices.
|
|
# TODO There is an assumption here that the requested range will be present
|
|
# in the Eth1Chain. This should hold true at the call sites right now,
|
|
# but we need to guard the pre-conditions better.
|
|
for blk in chain.blocks:
|
|
if blk.depositCount <= first:
|
|
continue
|
|
|
|
let firstDepositIdxInBlk = blk.depositCount - blk.deposits.lenu64
|
|
if firstDepositIdxInBlk >= last:
|
|
break
|
|
|
|
for i in 0 ..< blk.deposits.lenu64:
|
|
let globalIdx = firstDepositIdxInBlk + i
|
|
if globalIdx >= first and globalIdx < last:
|
|
yield blk.deposits[i]
|
|
|
|
func lowerBound(chain: Eth1Chain, depositCount: uint64): Eth1Block =
|
|
# TODO: This can be replaced with a proper binary search in the
|
|
# future, but the `algorithm` module currently requires an
|
|
# `openArray`, which the `deques` module can't provide yet.
|
|
for eth1Block in chain.blocks:
|
|
if eth1Block.depositCount > depositCount:
|
|
return
|
|
result = eth1Block
|
|
|
|
proc trackFinalizedState(chain: var Eth1Chain,
|
|
finalizedEth1Data: Eth1Data,
|
|
finalizedStateDepositIndex: uint64,
|
|
blockProposalExpected = false): bool =
|
|
## This function will return true if the Eth1Monitor is synced
|
|
## to the finalization point.
|
|
|
|
if chain.blocks.len == 0:
|
|
debug "Eth1 chain not initialized"
|
|
return false
|
|
|
|
let latest = chain.blocks.peekLast
|
|
if latest.depositCount < finalizedEth1Data.deposit_count:
|
|
if blockProposalExpected:
|
|
error "The Eth1 chain is not synced",
|
|
ourDepositsCount = latest.depositCount,
|
|
targetDepositsCount = finalizedEth1Data.deposit_count
|
|
return false
|
|
|
|
let matchingBlock = chain.lowerBound(finalizedEth1Data.deposit_count)
|
|
result = if matchingBlock != nil:
|
|
if matchingBlock.depositRoot == finalizedEth1Data.deposit_root:
|
|
true
|
|
else:
|
|
error "Corrupted deposits history detected",
|
|
ourDepositsCount = matchingBlock.depositCount,
|
|
taretDepositsCount = finalizedEth1Data.deposit_count,
|
|
ourDepositsRoot = matchingBlock.depositRoot,
|
|
targetDepositsRoot = finalizedEth1Data.deposit_root
|
|
chain.hasConsensusViolation = true
|
|
false
|
|
else:
|
|
error "The Eth1 chain is in inconsistent state",
|
|
checkpointHash = finalizedEth1Data.block_hash,
|
|
checkpointDeposits = finalizedEth1Data.deposit_count,
|
|
localChainStart = shortLog(chain.blocks.peekFirst),
|
|
localChainEnd = shortLog(chain.blocks.peekLast)
|
|
chain.hasConsensusViolation = true
|
|
false
|
|
|
|
if result:
|
|
chain.pruneOldBlocks(finalizedStateDepositIndex)
|
|
|
|
template trackFinalizedState*(m: Eth1Monitor,
|
|
finalizedEth1Data: Eth1Data,
|
|
finalizedStateDepositIndex: uint64): bool =
|
|
trackFinalizedState(m.depositsChain, finalizedEth1Data, finalizedStateDepositIndex)
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.1/specs/phase0/validator.md#get_eth1_data
|
|
proc getBlockProposalData*(chain: var Eth1Chain,
|
|
state: ForkedHashedBeaconState,
|
|
finalizedEth1Data: Eth1Data,
|
|
finalizedStateDepositIndex: uint64): BlockProposalEth1Data =
|
|
let
|
|
periodStart = voting_period_start_time(state)
|
|
hasLatestDeposits = chain.trackFinalizedState(finalizedEth1Data,
|
|
finalizedStateDepositIndex,
|
|
blockProposalExpected = true)
|
|
|
|
var otherVotesCountTable = initCountTable[Eth1Data]()
|
|
for vote in getStateField(state, eth1_data_votes):
|
|
let eth1Block = chain.findBlock(vote)
|
|
if eth1Block != nil and
|
|
eth1Block.depositRoot == vote.deposit_root and
|
|
vote.deposit_count >= getStateField(state, eth1_data).deposit_count and
|
|
is_candidate_block(chain.cfg, eth1Block, periodStart):
|
|
otherVotesCountTable.inc vote
|
|
else:
|
|
debug "Ignoring eth1 vote",
|
|
root = vote.block_hash,
|
|
deposits = vote.deposit_count,
|
|
depositsRoot = vote.deposit_root,
|
|
localDeposits = getStateField(state, eth1_data).deposit_count
|
|
|
|
let
|
|
stateDepositIdx = getStateField(state, eth1_deposit_index)
|
|
stateDepositsCount = getStateField(state, eth1_data).deposit_count
|
|
|
|
# A valid state should never have this condition, but it doesn't hurt
|
|
# to be extra defensive here because we are working with uint types
|
|
var pendingDepositsCount = if stateDepositsCount > stateDepositIdx:
|
|
stateDepositsCount - stateDepositIdx
|
|
else:
|
|
0
|
|
|
|
if otherVotesCountTable.len > 0:
|
|
let (winningVote, votes) = otherVotesCountTable.largest
|
|
debug "Voting on eth1 head with majority", votes
|
|
result.vote = winningVote
|
|
if uint64((votes + 1) * 2) > SLOTS_PER_ETH1_VOTING_PERIOD:
|
|
pendingDepositsCount = winningVote.deposit_count - stateDepositIdx
|
|
|
|
else:
|
|
let latestBlock = chain.latestCandidateBlock(periodStart)
|
|
if latestBlock == nil:
|
|
debug "No acceptable eth1 votes and no recent candidates. Voting no change"
|
|
result.vote = getStateField(state, eth1_data)
|
|
else:
|
|
debug "No acceptable eth1 votes. Voting for latest candidate"
|
|
result.vote = latestBlock.toVoteData
|
|
|
|
if pendingDepositsCount > 0:
|
|
if hasLatestDeposits:
|
|
let
|
|
totalDepositsInNewBlock = min(MAX_DEPOSITS, pendingDepositsCount)
|
|
postStateDepositIdx = stateDepositIdx + pendingDepositsCount
|
|
var
|
|
deposits = newSeqOfCap[DepositData](totalDepositsInNewBlock)
|
|
depositRoots = newSeqOfCap[Eth2Digest](pendingDepositsCount)
|
|
for data in chain.getDepositsRange(stateDepositIdx, postStateDepositIdx):
|
|
if deposits.lenu64 < totalDepositsInNewBlock:
|
|
deposits.add data
|
|
depositRoots.add hash_tree_root(data)
|
|
|
|
var scratchMerkleizer = copy chain.finalizedDepositsMerkleizer
|
|
if chain.advanceMerkleizer(scratchMerkleizer, stateDepositIdx):
|
|
let proofs = scratchMerkleizer.addChunksAndGenMerkleProofs(depositRoots)
|
|
for i in 0 ..< totalDepositsInNewBlock:
|
|
var proof: array[33, Eth2Digest]
|
|
proof[0..31] = proofs.getProof(i.int)
|
|
proof[32] = default(Eth2Digest)
|
|
proof[32].data[0..7] = toBytesLE uint64(postStateDepositIdx)
|
|
result.deposits.add Deposit(data: deposits[i], proof: proof)
|
|
else:
|
|
error "The Eth1 chain is in inconsistent state" # This should not really happen
|
|
result.hasMissingDeposits = true
|
|
else:
|
|
result.hasMissingDeposits = true
|
|
|
|
template getBlockProposalData*(m: Eth1Monitor,
|
|
state: ForkedHashedBeaconState,
|
|
finalizedEth1Data: Eth1Data,
|
|
finalizedStateDepositIndex: uint64):
|
|
BlockProposalEth1Data =
|
|
getBlockProposalData(
|
|
m.depositsChain, state, finalizedEth1Data, finalizedStateDepositIndex)
|
|
|
|
proc getJsonRpcRequestHeaders(jwtSecret: Option[seq[byte]]):
|
|
auto =
|
|
if jwtSecret.isSome:
|
|
let secret = jwtSecret.get
|
|
(proc(): seq[(string, string)] =
|
|
# https://www.rfc-editor.org/rfc/rfc6750#section-6.1.1
|
|
@[("Authorization", "Bearer " & getSignedIatToken(
|
|
secret, (getTime() - initTime(0, 0)).inSeconds))])
|
|
else:
|
|
(proc(): seq[(string, string)] = @[])
|
|
|
|
proc new*(T: type Web3DataProvider,
|
|
depositContractAddress: Eth1Address,
|
|
web3Url: string,
|
|
jwtSecret: Option[seq[byte]]):
|
|
Future[Result[Web3DataProviderRef, string]] {.async.} =
|
|
let web3Fut = newWeb3(web3Url, getJsonRpcRequestHeaders(jwtSecret))
|
|
yield web3Fut or sleepAsync(10.seconds)
|
|
if (not web3Fut.finished) or web3Fut.failed:
|
|
await cancelAndWait(web3Fut)
|
|
if web3Fut.failed:
|
|
return err "Failed to setup web3 connection: " & web3Fut.readError.msg
|
|
else:
|
|
return err "Failed to setup web3 connection"
|
|
|
|
let
|
|
web3 = web3Fut.read
|
|
ns = web3.contractSender(DepositContract, depositContractAddress)
|
|
|
|
return ok Web3DataProviderRef(url: web3Url, web3: web3, ns: ns)
|
|
|
|
template getOrDefault[T, E](r: Result[T, E]): T =
|
|
type TT = T
|
|
get(r, default(TT))
|
|
|
|
proc init*(T: type Eth1Chain,
|
|
cfg: RuntimeConfig,
|
|
db: BeaconChainDB,
|
|
depositContractBlockNumber: uint64,
|
|
depositContractBlockHash: Eth2Digest): T =
|
|
let
|
|
(finalizedBlockHash, depositContractState) =
|
|
if db != nil:
|
|
let treeSnapshot = db.getDepositTreeSnapshot()
|
|
if treeSnapshot.isSome:
|
|
(treeSnapshot.get.eth1Block, treeSnapshot.get.depositContractState)
|
|
else:
|
|
let oldSnapshot = db.getUpgradableDepositSnapshot()
|
|
if oldSnapshot.isSome:
|
|
(oldSnapshot.get.eth1Block, oldSnapshot.get.depositContractState)
|
|
else:
|
|
db.putDepositTreeSnapshot DepositTreeSnapshot(
|
|
eth1Block: depositContractBlockHash,
|
|
blockHeight: depositContractBlockNumber)
|
|
(depositContractBlockHash, default(DepositContractState))
|
|
else:
|
|
(depositContractBlockHash, default(DepositContractState))
|
|
m = DepositsMerkleizer.init(depositContractState)
|
|
|
|
T(db: db,
|
|
cfg: cfg,
|
|
finalizedBlockHash: finalizedBlockHash,
|
|
finalizedDepositsMerkleizer: m,
|
|
headMerkleizer: copy m)
|
|
|
|
proc getBlock(provider: Web3DataProviderRef, id: BlockHashOrNumber):
|
|
Future[BlockObject] =
|
|
if id.isHash:
|
|
let hash = id.hash.asBlockHash()
|
|
return provider.getBlockByHash(hash)
|
|
else:
|
|
return provider.getBlockByNumber(id.number)
|
|
|
|
proc currentEpoch(m: Eth1Monitor): Epoch =
|
|
if m.getBeaconTime != nil:
|
|
m.getBeaconTime().slotOrZero.epoch
|
|
else:
|
|
Epoch 0
|
|
|
|
proc init*(T: type Eth1Monitor,
|
|
cfg: RuntimeConfig,
|
|
depositContractBlockNumber: uint64,
|
|
depositContractBlockHash: Eth2Digest,
|
|
db: BeaconChainDB,
|
|
getBeaconTime: GetBeaconTimeFn,
|
|
web3Urls: seq[string],
|
|
eth1Network: Option[Eth1Network],
|
|
forcePolling: bool,
|
|
jwtSecret: Option[seq[byte]]): T =
|
|
doAssert web3Urls.len > 0
|
|
var web3Urls = web3Urls
|
|
for url in mitems(web3Urls):
|
|
fixupWeb3Urls url
|
|
|
|
let eth1Chain = Eth1Chain.init(
|
|
cfg, db, depositContractBlockNumber, depositContractBlockHash)
|
|
|
|
T(state: Initialized,
|
|
depositsChain: eth1Chain,
|
|
depositContractAddress: cfg.DEPOSIT_CONTRACT_ADDRESS,
|
|
depositContractDeployedAt: BlockHashOrNumber(
|
|
isHash: true,
|
|
hash: depositContractBlockHash),
|
|
getBeaconTime: getBeaconTime,
|
|
web3Urls: web3Urls,
|
|
eth1Network: eth1Network,
|
|
eth1Progress: newAsyncEvent(),
|
|
forcePolling: forcePolling,
|
|
jwtSecret: jwtSecret,
|
|
blocksPerLogsRequest: targetBlocksPerLogsRequest)
|
|
|
|
proc safeCancel(fut: var Future[void]) =
|
|
if not fut.isNil and not fut.finished:
|
|
fut.cancel()
|
|
fut = nil
|
|
|
|
func clear(chain: var Eth1Chain) =
|
|
chain.blocks.clear()
|
|
chain.blocksByHash.clear()
|
|
chain.headMerkleizer = copy chain.finalizedDepositsMerkleizer
|
|
chain.hasConsensusViolation = false
|
|
|
|
proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} =
|
|
const checkInterval = 30.seconds
|
|
|
|
let
|
|
web3Url = m.web3Urls[0]
|
|
initialRunFut = m.runFut
|
|
|
|
# This is a way to detect that the monitor was restarted. When this
|
|
# happens, this function will just return terminating the "async thread"
|
|
while m.runFut == initialRunFut:
|
|
let tempProviderRes = await Web3DataProvider.new(
|
|
m.depositContractAddress,
|
|
web3Url,
|
|
m.jwtSecret)
|
|
|
|
if tempProviderRes.isErr:
|
|
await sleepAsync(checkInterval)
|
|
continue
|
|
|
|
var tempProvider = tempProviderRes.get
|
|
|
|
# Use one of the get/request-type methods from
|
|
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.1/src/engine/specification.md#underlying-protocol
|
|
# which doesn't take parameters and returns a small structure, to ensure
|
|
# this works with engine API endpoints.
|
|
let testRequest = tempProvider.web3.provider.eth_syncing()
|
|
|
|
yield testRequest or sleepAsync(web3Timeouts)
|
|
|
|
traceAsyncErrors tempProvider.close()
|
|
|
|
if testRequest.completed and m.state == Started:
|
|
m.state = ReadyToRestartToPrimary
|
|
return
|
|
else:
|
|
await sleepAsync(checkInterval)
|
|
|
|
proc doStop(m: Eth1Monitor) {.async.} =
|
|
safeCancel m.runFut
|
|
|
|
if m.dataProvider != nil:
|
|
awaitWithTimeout(m.dataProvider.close(), 30.seconds):
|
|
debug "Failed to close data provider in time"
|
|
m.dataProvider = nil
|
|
|
|
proc ensureDataProvider*(m: Eth1Monitor) {.async.} =
|
|
if m.isNil or not m.dataProvider.isNil:
|
|
return
|
|
|
|
let web3Url = m.web3Urls[m.startIdx mod m.web3Urls.len]
|
|
inc m.startIdx
|
|
|
|
m.dataProvider = block:
|
|
let v = await Web3DataProvider.new(
|
|
m.depositContractAddress, web3Url, m.jwtSecret)
|
|
if v.isErr():
|
|
raise (ref CatchableError)(msg: v.error())
|
|
info "Established connection to execution layer", url = web3Url
|
|
v.get()
|
|
|
|
proc stop(m: Eth1Monitor) {.async.} =
|
|
if m.state in {Started, ReadyToRestartToPrimary}:
|
|
m.state = Stopping
|
|
m.stopFut = m.doStop()
|
|
await m.stopFut
|
|
m.state = Stopped
|
|
elif m.state == Stopping:
|
|
await m.stopFut
|
|
|
|
const
|
|
votedBlocksSafetyMargin = 50
|
|
|
|
func latestEth1BlockNumber(m: Eth1Monitor): Eth1BlockNumber =
|
|
if m.latestEth1Block.isSome:
|
|
Eth1BlockNumber m.latestEth1Block.get.number
|
|
else:
|
|
Eth1BlockNumber 0
|
|
|
|
func earliestBlockOfInterest(m: Eth1Monitor): Eth1BlockNumber =
|
|
m.latestEth1BlockNumber - (2 * m.cfg.ETH1_FOLLOW_DISTANCE) - votedBlocksSafetyMargin
|
|
|
|
proc syncBlockRange(m: Eth1Monitor,
|
|
fromBlock, toBlock,
|
|
fullSyncFromBlock: Eth1BlockNumber) {.gcsafe, async.} =
|
|
doAssert m.dataProvider != nil, "close not called concurrently"
|
|
doAssert m.depositsChain.blocks.len > 0
|
|
|
|
var currentBlock = fromBlock
|
|
while currentBlock <= toBlock:
|
|
var
|
|
depositLogs: JsonNode = nil
|
|
maxBlockNumberRequested: Eth1BlockNumber
|
|
backoff = 100
|
|
|
|
while true:
|
|
maxBlockNumberRequested =
|
|
min(toBlock, currentBlock + m.blocksPerLogsRequest - 1)
|
|
|
|
debug "Obtaining deposit log events",
|
|
fromBlock = currentBlock,
|
|
toBlock = maxBlockNumberRequested,
|
|
backoff
|
|
|
|
debug.logTime "Deposit logs obtained":
|
|
# Reduce all request rate until we have a more general solution
|
|
# for dealing with Infura's rate limits
|
|
await sleepAsync(milliseconds(backoff))
|
|
let jsonLogsFut = m.dataProvider.ns.getJsonLogs(
|
|
DepositEvent,
|
|
fromBlock = some blockId(currentBlock),
|
|
toBlock = some blockId(maxBlockNumberRequested))
|
|
|
|
depositLogs = try:
|
|
# Downloading large amounts of deposits may take several minutes
|
|
awaitWithTimeout(jsonLogsFut, web3Timeouts):
|
|
raise newException(DataProviderTimeout,
|
|
"Request time out while obtaining json logs")
|
|
except CatchableError as err:
|
|
debug "Request for deposit logs failed", err = err.msg
|
|
inc failed_web3_requests
|
|
backoff = (backoff * 3) div 2
|
|
m.blocksPerLogsRequest = m.blocksPerLogsRequest div 2
|
|
if m.blocksPerLogsRequest == 0:
|
|
m.blocksPerLogsRequest = 1
|
|
raise err
|
|
continue
|
|
m.blocksPerLogsRequest = min(
|
|
(m.blocksPerLogsRequest * 3 + 1) div 2,
|
|
targetBlocksPerLogsRequest)
|
|
|
|
currentBlock = maxBlockNumberRequested + 1
|
|
break
|
|
|
|
let blocksWithDeposits = depositEventsToBlocks(depositLogs)
|
|
|
|
for i in 0 ..< blocksWithDeposits.len:
|
|
let blk = blocksWithDeposits[i]
|
|
await blk.fetchTimestampWithRetries(m.dataProvider)
|
|
|
|
if blk.number > fullSyncFromBlock:
|
|
let lastBlock = m.depositsChain.blocks.peekLast
|
|
for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number:
|
|
debug "Obtaining block without deposits", blockNum = n
|
|
let blockWithoutDeposits = awaitWithRetries(
|
|
m.dataProvider.getBlockByNumber(n))
|
|
|
|
m.depositsChain.addBlock(
|
|
lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits))
|
|
eth1_synced_head.set blockWithoutDeposits.number.toGaugeValue
|
|
|
|
m.depositsChain.addBlock blk
|
|
eth1_synced_head.set blk.number.toGaugeValue
|
|
|
|
if blocksWithDeposits.len > 0:
|
|
let lastIdx = blocksWithDeposits.len - 1
|
|
template lastBlock: auto = blocksWithDeposits[lastIdx]
|
|
|
|
let status = when hasDepositRootChecks:
|
|
awaitWithRetries m.dataProvider.fetchDepositContractData(lastBlock)
|
|
else:
|
|
DepositRootUnavailable
|
|
|
|
when hasDepositRootChecks:
|
|
debug "Deposit contract state verified",
|
|
status = $status,
|
|
ourCount = lastBlock.depositCount,
|
|
ourRoot = lastBlock.depositRoot
|
|
|
|
case status
|
|
of DepositRootIncorrect, DepositCountIncorrect:
|
|
raise newException(CorruptDataProvider,
|
|
"The deposit log events disagree with the deposit contract state")
|
|
else:
|
|
discard
|
|
|
|
info "Eth1 sync progress",
|
|
blockNumber = lastBlock.number,
|
|
depositsProcessed = lastBlock.depositCount
|
|
|
|
func init(T: type FullBlockId, blk: Eth1BlockHeader|BlockObject): T =
|
|
FullBlockId(number: Eth1BlockNumber blk.number, hash: blk.hash)
|
|
|
|
func isNewLastBlock(m: Eth1Monitor, blk: Eth1BlockHeader|BlockObject): bool =
|
|
m.latestEth1Block.isNone or blk.number.uint64 > m.latestEth1BlockNumber
|
|
|
|
proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
|
|
if m.state == Started:
|
|
return
|
|
|
|
let isFirstRun = m.state == Initialized
|
|
let needsReset = m.state in {Failed, ReadyToRestartToPrimary}
|
|
|
|
m.state = Started
|
|
|
|
if delayBeforeStart != ZeroDuration:
|
|
await sleepAsync(delayBeforeStart)
|
|
|
|
# If the monitor died with an exception, the web3 provider may be in
|
|
# an arbitary state, so we better reset it (not doing this has resulted
|
|
# in resource leaks historically).
|
|
if not m.dataProvider.isNil and needsReset:
|
|
# We introduce a local var to eliminate the risk of scheduling two
|
|
# competing calls to `close` below.
|
|
let provider = m.dataProvider
|
|
m.dataProvider = nil
|
|
await provider.close()
|
|
|
|
await m.ensureDataProvider()
|
|
doAssert m.dataProvider != nil, "close not called concurrently"
|
|
|
|
# We might need to reset the chain if the new provider disagrees
|
|
# with the previous one regarding the history of the chain or if
|
|
# we have detected a conensus violation - our view disagreeing with
|
|
# the majority of the validators in the network.
|
|
#
|
|
# Consensus violations happen in practice because the web3 providers
|
|
# sometimes return incomplete or incorrect deposit log events even
|
|
# when they don't indicate any errors in the response. When this
|
|
# happens, we are usually able to download the data successfully
|
|
# on the second attempt.
|
|
if m.latestEth1Block.isSome and m.depositsChain.blocks.len > 0:
|
|
let needsReset = m.depositsChain.hasConsensusViolation or (block:
|
|
let
|
|
lastKnownBlock = m.depositsChain.blocks.peekLast
|
|
matchingBlockAtNewProvider = awaitWithRetries(
|
|
m.dataProvider.getBlockByNumber lastKnownBlock.number)
|
|
|
|
lastKnownBlock.hash.asBlockHash != matchingBlockAtNewProvider.hash)
|
|
|
|
if needsReset:
|
|
m.depositsChain.clear()
|
|
m.latestEth1Block = none(FullBlockId)
|
|
|
|
template web3Url: string = m.dataProvider.url
|
|
|
|
if web3Url != m.web3Urls[0]:
|
|
asyncSpawn m.detectPrimaryProviderComingOnline()
|
|
|
|
info "Starting Eth1 deposit contract monitoring",
|
|
contract = $m.depositContractAddress
|
|
|
|
if isFirstRun and m.eth1Network.isSome:
|
|
try:
|
|
let
|
|
providerChain =
|
|
awaitWithRetries m.dataProvider.web3.provider.eth_chainId()
|
|
|
|
# https://eips.ethereum.org/EIPS/eip-155#list-of-chain-ids
|
|
expectedChain = case m.eth1Network.get
|
|
of mainnet: 1.Quantity
|
|
of ropsten: 3.Quantity
|
|
of rinkeby: 4.Quantity
|
|
of goerli: 5.Quantity
|
|
of sepolia: 11155111.Quantity # https://chainid.network/
|
|
if expectedChain != providerChain:
|
|
fatal "The specified Web3 provider serves data for a different chain",
|
|
expectedChain = distinctBase(expectedChain),
|
|
providerChain = distinctBase(providerChain)
|
|
quit 1
|
|
except CatchableError as exc:
|
|
# Typically because it's not synced through EIP-155, assuming this Web3
|
|
# endpoint has been otherwise working.
|
|
debug "startEth1Syncing: eth_chainId failed: ",
|
|
error = exc.msg
|
|
|
|
var mustUsePolling = m.forcePolling or
|
|
web3Url.startsWith("http://") or
|
|
web3Url.startsWith("https://")
|
|
|
|
if not mustUsePolling:
|
|
proc newBlockHeadersHandler(blk: Eth1BlockHeader)
|
|
{.raises: [Defect], gcsafe.} =
|
|
try:
|
|
if m.isNewLastBlock(blk):
|
|
eth1_latest_head.set blk.number.toGaugeValue
|
|
m.latestEth1Block = some FullBlockId.init(blk)
|
|
m.eth1Progress.fire()
|
|
except Exception:
|
|
# TODO Investigate why this exception is being raised
|
|
raiseAssert "AsyncEvent.fire should not raise exceptions"
|
|
|
|
proc subscriptionErrorHandler(err: CatchableError)
|
|
{.raises: [Defect], gcsafe.} =
|
|
warn "Failed to subscribe for block headers. Switching to polling",
|
|
err = err.msg
|
|
mustUsePolling = true
|
|
|
|
await m.dataProvider.onBlockHeaders(newBlockHeadersHandler,
|
|
subscriptionErrorHandler)
|
|
|
|
let shouldProcessDeposits = not (
|
|
m.depositContractAddress.isZeroMemory or
|
|
m.depositsChain.finalizedBlockHash.data.isZeroMemory)
|
|
|
|
var eth1SyncedTo: Eth1BlockNumber
|
|
if shouldProcessDeposits:
|
|
if m.depositsChain.blocks.len == 0:
|
|
let startBlock = awaitWithRetries(
|
|
m.dataProvider.getBlockByHash(
|
|
m.depositsChain.finalizedBlockHash.asBlockHash))
|
|
|
|
m.depositsChain.addBlock Eth1Block(
|
|
hash: m.depositsChain.finalizedBlockHash,
|
|
number: Eth1BlockNumber startBlock.number,
|
|
timestamp: Eth1BlockTimestamp startBlock.timestamp)
|
|
|
|
eth1SyncedTo = Eth1BlockNumber m.depositsChain.blocks[^1].number
|
|
|
|
eth1_synced_head.set eth1SyncedTo.toGaugeValue
|
|
eth1_finalized_head.set eth1SyncedTo.toGaugeValue
|
|
eth1_finalized_deposits.set(
|
|
m.depositsChain.finalizedDepositsMerkleizer.getChunkCount.toGaugeValue)
|
|
|
|
debug "Starting Eth1 syncing", `from` = shortLog(m.depositsChain.blocks[^1])
|
|
|
|
var didPollOnce = false
|
|
while true:
|
|
if bnStatus == BeaconNodeStatus.Stopping:
|
|
await m.stop()
|
|
return
|
|
|
|
if m.depositsChain.hasConsensusViolation:
|
|
raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus")
|
|
|
|
if m.state == ReadyToRestartToPrimary:
|
|
info "Primary web3 provider is back online. Restarting the Eth1 monitor"
|
|
m.startIdx = 0
|
|
return
|
|
|
|
let nextBlock = if mustUsePolling or not didPollOnce:
|
|
let blk = awaitWithRetries(
|
|
m.dataProvider.web3.provider.eth_getBlockByNumber(blockId("latest"), false))
|
|
|
|
# Same as when handling events, minus `m.eth1Progress` round trip
|
|
if m.isNewLastBlock(blk):
|
|
eth1_latest_head.set blk.number.toGaugeValue
|
|
m.latestEth1Block = some FullBlockId.init(blk)
|
|
elif mustUsePolling:
|
|
await sleepAsync(m.cfg.SECONDS_PER_ETH1_BLOCK.int.seconds)
|
|
continue
|
|
else:
|
|
doAssert not didPollOnce
|
|
|
|
didPollOnce = true
|
|
blk
|
|
else:
|
|
awaitWithTimeout(m.eth1Progress.wait(), 5.minutes):
|
|
raise newException(CorruptDataProvider, "No eth1 chain progress for too long")
|
|
|
|
m.eth1Progress.clear()
|
|
|
|
doAssert m.latestEth1Block.isSome
|
|
awaitWithRetries m.dataProvider.getBlockByHash(m.latestEth1Block.get.hash)
|
|
|
|
if shouldProcessDeposits:
|
|
if m.latestEth1BlockNumber <= m.cfg.ETH1_FOLLOW_DISTANCE:
|
|
continue
|
|
|
|
let targetBlock = m.latestEth1BlockNumber - m.cfg.ETH1_FOLLOW_DISTANCE
|
|
if targetBlock <= eth1SyncedTo:
|
|
continue
|
|
|
|
let earliestBlockOfInterest = m.earliestBlockOfInterest()
|
|
await m.syncBlockRange(eth1SyncedTo + 1,
|
|
targetBlock,
|
|
earliestBlockOfInterest)
|
|
eth1SyncedTo = targetBlock
|
|
eth1_synced_head.set eth1SyncedTo.toGaugeValue
|
|
|
|
proc start(m: Eth1Monitor, delayBeforeStart: Duration) {.gcsafe.} =
|
|
if m.runFut.isNil:
|
|
let runFut = m.startEth1Syncing(delayBeforeStart)
|
|
m.runFut = runFut
|
|
runFut.addCallback do (p: pointer) {.gcsafe.}:
|
|
if runFut.failed:
|
|
if runFut == m.runFut:
|
|
warn "Eth1 chain monitoring failure, restarting", err = runFut.error.msg
|
|
m.state = Failed
|
|
|
|
safeCancel m.runFut
|
|
m.start(5.seconds)
|
|
|
|
proc start*(m: Eth1Monitor) =
|
|
m.start(0.seconds)
|
|
|
|
proc getEth1BlockHash*(
|
|
url: string, blockId: RtBlockIdentifier, jwtSecret: Option[seq[byte]]):
|
|
Future[BlockHash] {.async.} =
|
|
let web3 = awaitOrRaiseOnTimeout(newWeb3(url, getJsonRpcRequestHeaders(jwtSecret)),
|
|
10.seconds)
|
|
try:
|
|
let blk = awaitWithRetries(
|
|
web3.provider.eth_getBlockByNumber(blockId, false))
|
|
return blk.hash
|
|
finally:
|
|
await web3.close()
|
|
|
|
func `$`(x: Quantity): string =
|
|
$(x.uint64)
|
|
|
|
func `$`(x: BlockObject): string =
|
|
$(x.number) & " [" & $(x.hash) & "]"
|
|
|
|
proc testWeb3Provider*(web3Url: Uri,
|
|
depositContractAddress: Eth1Address,
|
|
jwtSecret: Option[seq[byte]]) {.async.} =
|
|
stdout.write "Establishing web3 connection..."
|
|
var web3: Web3
|
|
try:
|
|
web3 = awaitOrRaiseOnTimeout(
|
|
newWeb3($web3Url, getJsonRpcRequestHeaders(jwtSecret)), 5.seconds)
|
|
stdout.write "\rEstablishing web3 connection: Connected\n"
|
|
except CatchableError as err:
|
|
stdout.write "\rEstablishing web3 connection: Failure(" & err.msg & ")\n"
|
|
quit 1
|
|
|
|
template request(actionDesc: static string,
|
|
action: untyped): untyped =
|
|
stdout.write actionDesc & "..."
|
|
stdout.flushFile()
|
|
var res: typeof(read action)
|
|
try:
|
|
res = awaitWithRetries action
|
|
stdout.write "\r" & actionDesc & ": " & $res
|
|
except CatchableError as err:
|
|
stdout.write "\r" & actionDesc & ": Error(" & err.msg & ")"
|
|
stdout.write "\n"
|
|
res
|
|
|
|
let
|
|
chainId = request "Chain ID":
|
|
web3.provider.eth_chainId()
|
|
|
|
latestBlock = request "Latest block":
|
|
web3.provider.eth_getBlockByNumber(blockId("latest"), false)
|
|
|
|
syncStatus = request "Sync status":
|
|
web3.provider.eth_syncing()
|
|
|
|
ns = web3.contractSender(DepositContract, depositContractAddress)
|
|
|
|
depositRoot = request "Deposit root":
|
|
ns.get_deposit_root.call(blockNumber = latestBlock.number.uint64)
|