Eth1 monitor fixes

* Fix a resource leak introduced in https://github.com/status-im/nimbus-eth2/pull/3279

* Don't restart the Eth1 syncing proggress from scratch in case of
  monitor failures during Eth2 syncing.

* Switch to the primary operator as soon as it is back online.

* Log the web3 credentials in fewer places

Other changes:

The 'web3 test' command has been enhanced to obtain and print more
data regarding the selected provider.
This commit is contained in:
Zahary Karadjov 2022-01-31 19:28:26 +02:00 committed by zah
parent 702d9e8c55
commit 215caa21ae
5 changed files with 153 additions and 51 deletions

View File

@ -51,7 +51,7 @@ type
OnReorgCallback* =
proc(data: ReorgInfoObject) {.gcsafe, raises: [Defect].}
OnFinalizedCallback* =
proc(data: FinalizationInfoObject) {.gcsafe, raises: [Defect].}
proc(dag: ChainDAGRef, data: FinalizationInfoObject) {.gcsafe, raises: [Defect].}
KeyedBlockRef* = object
# Special wrapper for BlockRef used in ChainDAG.blocks that allows lookup
@ -250,6 +250,9 @@ func shortLog*(v: EpochKey): string =
# epoch:root when logging epoch, root:slot when logging slot!
$v.epoch & ":" & shortLog(v.blck)
template setFinalizationCb*(dag: ChainDAGRef, cb: OnFinalizedCallback) =
dag.onFinHappened = cb
func shortLog*(v: EpochRef): string =
# epoch:root when logging epoch, root:slot when logging slot!
if v.isNil():

View File

@ -1531,7 +1531,7 @@ proc updateHead*(
dag.finalizedHead.blck.root,
stateRoot,
dag.finalizedHead.slot.epoch)
dag.onFinHappened(data)
dag.onFinHappened(dag, data)
proc isInitialized*(T: type ChainDAGRef, db: BeaconChainDB): Result[void, cstring] =
# Lightweight check to see if we have the minimal information needed to

View File

@ -95,6 +95,7 @@ type
Eth1MonitorState = enum
Initialized
Started
ReadyToRestartToPrimary
Failed
Stopping
Stopped
@ -392,7 +393,6 @@ template awaitWithRetries*[T](lazyFutExpr: Future[T],
timeout = web3Timeouts): untyped =
const
reqType = astToStr(lazyFutExpr)
var
retryDelayMs = 16000
f: Future[T]
@ -714,7 +714,8 @@ func lowerBound(chain: Eth1Chain, depositCount: uint64): Eth1Block =
proc trackFinalizedState(chain: var Eth1Chain,
finalizedEth1Data: Eth1Data,
finalizedStateDepositIndex: uint64): bool =
finalizedStateDepositIndex: uint64,
blockProposalExpected = false): bool =
# Returns true if the Eth1Monitor is synced to the finalization point
if chain.blocks.len == 0:
debug "Eth1 chain not initialized"
@ -722,9 +723,10 @@ proc trackFinalizedState(chain: var Eth1Chain,
let latest = chain.blocks.peekLast
if latest.voteData.deposit_count < finalizedEth1Data.deposit_count:
warn "Eth1 chain not synced",
ourDepositsCount = latest.voteData.deposit_count,
targetDepositsCount = finalizedEth1Data.deposit_count
if blockProposalExpected:
error "The Eth1 chain is not synced",
ourDepositsCount = latest.voteData.deposit_count,
targetDepositsCount = finalizedEth1Data.deposit_count
return false
let matchingBlock = chain.lowerBound(finalizedEth1Data.deposit_count)
@ -764,7 +766,8 @@ proc getBlockProposalData*(chain: var Eth1Chain,
let
periodStart = voting_period_start_time(state)
hasLatestDeposits = chain.trackFinalizedState(finalizedEth1Data,
finalizedStateDepositIndex)
finalizedStateDepositIndex,
blockProposalExpected = true)
var otherVotesCountTable = initCountTable[Eth1Data]()
for vote in getStateField(state, eth1_data_votes):
@ -900,7 +903,6 @@ proc init*(T: type Eth1Monitor,
eth1Network: Option[Eth1Network],
forcePolling: bool): T =
doAssert web3Urls.len > 0
var web3Urls = web3Urls
for url in mitems(web3Urls):
fixupWeb3Urls url
@ -926,11 +928,41 @@ func clear(chain: var Eth1Chain) =
chain.blocksByHash.clear()
chain.hasConsensusViolation = false
proc resetState(m: Eth1Monitor) {.async.} =
safeCancel m.runFut
proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} =
const checkInterval = chronos.seconds(30)
m.depositsChain.clear()
m.latestEth1Block = none(FullBlockId)
let
web3Url = m.web3Urls[0]
initialRunFut = m.runFut
# This is a way to detect that the monitor was restarted. When this
# happens, this function will just return terminating the "async thread"
while m.runFut == initialRunFut:
let tempProviderRes = await Web3DataProvider.new(
m.depositContractAddress,
web3Url)
if tempProviderRes.isErr:
await sleepAsync(checkInterval)
continue
var tempProvider = tempProviderRes.get
var testRequest = tempProvider.web3.provider.net_version()
yield testRequest
try: await tempProvider.close()
except CatchableError as err:
debug "Failed to close temp web3 provider", err = err.msg
if testRequest.failed:
await sleepAsync(checkInterval)
elif m.state == Started:
m.state = ReadyToRestartToPrimary
return
proc doStop(m: Eth1Monitor) {.async.} =
safeCancel m.runFut
if m.dataProvider != nil:
await m.dataProvider.close()
@ -950,9 +982,9 @@ proc ensureDataProvider*(m: Eth1Monitor) {.async.} =
v.get()
proc stop(m: Eth1Monitor) {.async.} =
if m.state == Started:
if m.state in {Started, ReadyToRestartToPrimary}:
m.state = Stopping
m.stopFut = resetState(m)
m.stopFut = m.doStop()
await m.stopFut
m.state = Stopped
elif m.state == Stopping:
@ -1002,7 +1034,6 @@ proc syncBlockRange(m: Eth1Monitor,
# Reduce all request rate until we have a more general solution
# for dealing with Infura's rate limits
await sleepAsync(milliseconds(backoff))
let jsonLogsFut = m.dataProvider.ns.getJsonLogs(
DepositEvent,
fromBlock = some blockId(currentBlock),
@ -1142,25 +1173,61 @@ func init(T: type FullBlockId, blk: Eth1BlockHeader|BlockObject): T =
FullBlockId(number: Eth1BlockNumber blk.number, hash: blk.hash)
proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
if m.state == Failed:
await m.resetState()
elif m.state == Stopping:
await m.stopFut
if m.state in {Started, ReadyToRestartToPrimary}:
return
let isFirstRun = m.state == Initialized
if delayBeforeStart != ZeroDuration:
await sleepAsync(delayBeforeStart)
# If the monitor died with an exception, the web3 provider may be in
# an arbitary state, so we better reset it (not doing this has resulted
# in resource leaks historically).
if not m.dataProvider.isNil and m.state == Failed:
# We introduce a local var to eliminate the risk of scheduling two
# competing calls to `close` below.
let provider = m.dataProvider
m.dataProvider = nil
await provider.close()
m.state = Started
await m.ensureDataProvider()
let
web3Url = m.web3Urls[(m.startIdx + m.web3Urls.len - 1) mod m.web3Urls.len]
web3 = m.dataProvider.web3
# We might need to reset the chain if the new provider disagrees
# with the previous one regarding the history of the chain or if
# we have detected a conensus violation - our view disagreeing with
# the majority of the validators in the network.
#
# Consensus violations happen in practice because the web3 providers
# sometimes return incomplete or incorrect deposit log events even
# when they don't indicate any errors in the response. When this
# happens, we are usually able to download the data successfully
# on the second attempt.
if m.latestEth1Block.isSome and m.depositsChain.blocks.len > 0:
let needsReset = m.depositsChain.hasConsensusViolation or (block:
let
lastKnownBlock = m.depositsChain.blocks.peekLast
matchingBlockAtNewProvider = awaitWithRetries(
m.dataProvider.getBlockByNumber lastKnownBlock.number)
lastKnownBlock.voteData.block_hash.asBlockHash != matchingBlockAtNewProvider.hash)
if needsReset:
m.depositsChain.clear()
m.latestEth1Block = none(FullBlockId)
template web3Url: string = m.dataProvider.url
if web3Url != m.web3Urls[0]:
asyncSpawn m.detectPrimaryProviderComingOnline()
info "Starting Eth1 deposit contract monitoring",
contract = $m.depositContractAddress, url = web3Url
contract = $m.depositContractAddress
if m.state == Initialized and m.eth1Network.isSome:
if isFirstRun and m.eth1Network.isSome:
let
providerNetwork = awaitWithRetries web3.provider.net_version()
providerNetwork = awaitWithRetries m.dataProvider.web3.provider.net_version()
expectedNetwork = case m.eth1Network.get
of mainnet: "1"
of rinkeby: "4"
@ -1170,7 +1237,6 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
expectedNetwork, providerNetwork
quit 1
m.state = Started
var mustUsePolling = m.forcePolling or
web3Url.startsWith("http://") or
web3Url.startsWith("https://")
@ -1190,24 +1256,24 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
proc subscriptionErrorHandler(err: CatchableError)
{.raises: [Defect], gcsafe.} =
warn "Failed to subscribe for block headers. Switching to polling",
web3Url, err = err.msg
err = err.msg
mustUsePolling = true
await m.dataProvider.onBlockHeaders(newBlockHeadersHandler,
subscriptionErrorHandler)
let startBlock = awaitWithRetries(
m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash))
if m.depositsChain.blocks.len == 0:
let startBlock = awaitWithRetries(
m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash))
doAssert m.depositsChain.blocks.len == 0
m.depositsChain.addBlock Eth1Block(
number: Eth1BlockNumber startBlock.number,
timestamp: Eth1BlockTimestamp startBlock.timestamp,
voteData: eth1DataFromMerkleizer(
m.depositsChain.finalizedBlockHash,
m.depositsChain.finalizedDepositsMerkleizer))
m.depositsChain.addBlock Eth1Block(
number: Eth1BlockNumber startBlock.number,
timestamp: Eth1BlockTimestamp startBlock.timestamp,
voteData: eth1DataFromMerkleizer(
m.depositsChain.finalizedBlockHash,
m.depositsChain.finalizedDepositsMerkleizer))
var eth1SyncedTo = Eth1BlockNumber startBlock.number
var eth1SyncedTo = Eth1BlockNumber m.depositsChain.blocks.peekLast.number
eth1_synced_head.set eth1SyncedTo.toGaugeValue
eth1_finalized_head.set eth1SyncedTo.toGaugeValue
eth1_finalized_deposits.set(
@ -1229,6 +1295,11 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
if m.depositsChain.hasConsensusViolation:
raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus")
if m.state == ReadyToRestartToPrimary:
info "Primary web3 provider is back online. Restarting the Eth1 monitor"
m.startIdx = 0
return
if mustUsePolling:
let blk = awaitWithRetries(
m.dataProvider.web3.provider.eth_getBlockByNumber(blockId("latest"), false))
@ -1271,7 +1342,6 @@ proc start(m: Eth1Monitor, delayBeforeStart: Duration) =
if runFut.error[] of CatchableError:
if runFut == m.runFut:
warn "Eth1 chain monitoring failure, restarting", err = runFut.error.msg
m.dataProvider = nil
m.state = Failed
else:
fatal "Fatal exception reached", err = runFut.error.msg
@ -1303,13 +1373,35 @@ proc testWeb3Provider*(web3Url: Uri,
let
web3 = mustSucceed "connect to web3 provider":
await newWeb3($web3Url)
network = mustSucceed "get network version":
networkVersion = mustSucceed "get network version":
awaitWithRetries web3.provider.net_version()
latestBlock = mustSucceed "get latest block":
awaitWithRetries web3.provider.eth_getBlockByNumber(blockId("latest"), false)
syncStatus = mustSucceed "get sync status":
awaitWithRetries web3.provider.eth_syncing()
listening = mustSucceed "get network listening":
awaitWithRetries web3.provider.net_listening()
peers =
try:
awaitWithRetries web3.provider.net_peerCount()
except:
0
clientVersion = mustSucceed "get client version":
awaitWithRetries web3.provider.web3_clientVersion()
protocolVersion = mustSucceed "get protocol version":
awaitWithRetries web3.provider.eth_protocolVersion()
mining = mustSucceed "get mining status":
awaitWithRetries web3.provider.eth_mining()
echo "Network: ", network
echo "Client Version: ", clientVersion
echo "Protocol Version: ", protocolVersion, " (", $protocolVersion.fromHex[:int], ")"
echo "Network Version: ", networkVersion
echo "Network Listening: ", listening
echo "Network Peers: ", peers
echo "Syncing: ", syncStatus
echo "Latest block: ", latestBlock.number.uint64
echo "Last Known Nonce: ", web3.lastKnownNonce
echo "Mining: ", mining
let ns = web3.contractSender(DepositContract, depositContractAddress)
try:
@ -1318,7 +1410,6 @@ proc testWeb3Provider*(web3Url: Uri,
echo "Deposit root: ", depositRoot
except CatchableError as err:
echo "Web3 provider is not archive mode: ", err.msg
when hasGenesisDetection:
proc init*(T: type Eth1Monitor,
cfg: RuntimeConfig,

View File

@ -169,8 +169,19 @@ proc init*(T: type BeaconNode,
eventBus.emit("head-change", data)
proc onChainReorg(data: ReorgInfoObject) =
eventBus.emit("chain-reorg", data)
proc onFinalization(data: FinalizationInfoObject) =
eventBus.emit("finalization", data)
proc makeOnFinalizationCb(
# This `nimcall` functions helps for keeping track of what
# needs to be captured by the onFinalization closure.
eventBus: AsyncEventBus,
eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} =
static: doAssert (eventBus is ref) and (eth1Monitor is ref)
return proc(dag: ChainDAGRef, data: FinalizationInfoObject) =
if eth1Monitor != nil:
let finalizedEpochRef = dag.getFinalizedEpochRef()
discard trackFinalizedState(eth1Monitor,
finalizedEpochRef.eth1_data,
finalizedEpochRef.eth1_deposit_index)
eventBus.emit("finalization", data)
proc onSyncContribution(data: SignedContributionAndProof) =
eventBus.emit("sync-contribution-and-proof", data)
@ -341,7 +352,7 @@ proc init*(T: type BeaconNode,
else: {}
dag = ChainDAGRef.init(
cfg, db, validatorMonitor, chainDagFlags, onBlockAdded, onHeadChanged,
onChainReorg, onFinalization)
onChainReorg)
quarantine = newClone(Quarantine.init())
databaseGenesisValidatorsRoot =
getStateField(dag.headState.data, genesis_validators_root)
@ -509,6 +520,8 @@ proc init*(T: type BeaconNode,
else:
nil
dag.setFinalizationCb makeOnFinalizationCb(eventBus, eth1Monitor)
var node = BeaconNode(
nickname: nickname,
graffitiBytes: if config.graffiti.isSome: config.graffiti.get
@ -1106,11 +1119,6 @@ proc onSlotStart(
await node.handleValidatorDuties(lastSlot, wallSlot)
if node.eth1Monitor != nil and (wallSlot mod SLOTS_PER_EPOCH) == 0:
let finalizedEpochRef = node.dag.getFinalizedEpochRef()
discard node.eth1Monitor.trackFinalizedState(
finalizedEpochRef.eth1_data, finalizedEpochRef.eth1_deposit_index)
await onSlotEnd(node, wallSlot)
proc handleMissingBlocks(node: BeaconNode) =

2
vendor/nim-json-rpc vendored

@ -1 +1 @@
Subproject commit 5a281760803907f4989cacf109b516381dfbbe11
Subproject commit 97ba55bbf6246ce798f44871ca84a4e96c59167c