From 215caa21ae79fb4e8a87beaaacfed76c336cafd1 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Mon, 31 Jan 2022 19:28:26 +0200 Subject: [PATCH] Eth1 monitor fixes * Fix a resource leak introduced in https://github.com/status-im/nimbus-eth2/pull/3279 * Don't restart the Eth1 syncing proggress from scratch in case of monitor failures during Eth2 syncing. * Switch to the primary operator as soon as it is back online. * Log the web3 credentials in fewer places Other changes: The 'web3 test' command has been enhanced to obtain and print more data regarding the selected provider. --- .../block_pools_types.nim | 5 +- .../consensus_object_pools/blockchain_dag.nim | 2 +- beacon_chain/eth1/eth1_monitor.nim | 171 ++++++++++++++---- beacon_chain/nimbus_beacon_node.nim | 24 ++- vendor/nim-json-rpc | 2 +- 5 files changed, 153 insertions(+), 51 deletions(-) diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 19117e905..df612f346 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -51,7 +51,7 @@ type OnReorgCallback* = proc(data: ReorgInfoObject) {.gcsafe, raises: [Defect].} OnFinalizedCallback* = - proc(data: FinalizationInfoObject) {.gcsafe, raises: [Defect].} + proc(dag: ChainDAGRef, data: FinalizationInfoObject) {.gcsafe, raises: [Defect].} KeyedBlockRef* = object # Special wrapper for BlockRef used in ChainDAG.blocks that allows lookup @@ -250,6 +250,9 @@ func shortLog*(v: EpochKey): string = # epoch:root when logging epoch, root:slot when logging slot! $v.epoch & ":" & shortLog(v.blck) +template setFinalizationCb*(dag: ChainDAGRef, cb: OnFinalizedCallback) = + dag.onFinHappened = cb + func shortLog*(v: EpochRef): string = # epoch:root when logging epoch, root:slot when logging slot! if v.isNil(): diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index d7fed9132..288ce544f 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -1531,7 +1531,7 @@ proc updateHead*( dag.finalizedHead.blck.root, stateRoot, dag.finalizedHead.slot.epoch) - dag.onFinHappened(data) + dag.onFinHappened(dag, data) proc isInitialized*(T: type ChainDAGRef, db: BeaconChainDB): Result[void, cstring] = # Lightweight check to see if we have the minimal information needed to diff --git a/beacon_chain/eth1/eth1_monitor.nim b/beacon_chain/eth1/eth1_monitor.nim index 08fdc15c9..1b0063fda 100644 --- a/beacon_chain/eth1/eth1_monitor.nim +++ b/beacon_chain/eth1/eth1_monitor.nim @@ -95,6 +95,7 @@ type Eth1MonitorState = enum Initialized Started + ReadyToRestartToPrimary Failed Stopping Stopped @@ -392,7 +393,6 @@ template awaitWithRetries*[T](lazyFutExpr: Future[T], timeout = web3Timeouts): untyped = const reqType = astToStr(lazyFutExpr) - var retryDelayMs = 16000 f: Future[T] @@ -714,7 +714,8 @@ func lowerBound(chain: Eth1Chain, depositCount: uint64): Eth1Block = proc trackFinalizedState(chain: var Eth1Chain, finalizedEth1Data: Eth1Data, - finalizedStateDepositIndex: uint64): bool = + finalizedStateDepositIndex: uint64, + blockProposalExpected = false): bool = # Returns true if the Eth1Monitor is synced to the finalization point if chain.blocks.len == 0: debug "Eth1 chain not initialized" @@ -722,9 +723,10 @@ proc trackFinalizedState(chain: var Eth1Chain, let latest = chain.blocks.peekLast if latest.voteData.deposit_count < finalizedEth1Data.deposit_count: - warn "Eth1 chain not synced", - ourDepositsCount = latest.voteData.deposit_count, - targetDepositsCount = finalizedEth1Data.deposit_count + if blockProposalExpected: + error "The Eth1 chain is not synced", + ourDepositsCount = latest.voteData.deposit_count, + targetDepositsCount = finalizedEth1Data.deposit_count return false let matchingBlock = chain.lowerBound(finalizedEth1Data.deposit_count) @@ -764,7 +766,8 @@ proc getBlockProposalData*(chain: var Eth1Chain, let periodStart = voting_period_start_time(state) hasLatestDeposits = chain.trackFinalizedState(finalizedEth1Data, - finalizedStateDepositIndex) + finalizedStateDepositIndex, + blockProposalExpected = true) var otherVotesCountTable = initCountTable[Eth1Data]() for vote in getStateField(state, eth1_data_votes): @@ -900,7 +903,6 @@ proc init*(T: type Eth1Monitor, eth1Network: Option[Eth1Network], forcePolling: bool): T = doAssert web3Urls.len > 0 - var web3Urls = web3Urls for url in mitems(web3Urls): fixupWeb3Urls url @@ -926,11 +928,41 @@ func clear(chain: var Eth1Chain) = chain.blocksByHash.clear() chain.hasConsensusViolation = false -proc resetState(m: Eth1Monitor) {.async.} = - safeCancel m.runFut +proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} = + const checkInterval = chronos.seconds(30) - m.depositsChain.clear() - m.latestEth1Block = none(FullBlockId) + let + web3Url = m.web3Urls[0] + initialRunFut = m.runFut + + # This is a way to detect that the monitor was restarted. When this + # happens, this function will just return terminating the "async thread" + while m.runFut == initialRunFut: + let tempProviderRes = await Web3DataProvider.new( + m.depositContractAddress, + web3Url) + + if tempProviderRes.isErr: + await sleepAsync(checkInterval) + continue + + var tempProvider = tempProviderRes.get + var testRequest = tempProvider.web3.provider.net_version() + + yield testRequest + + try: await tempProvider.close() + except CatchableError as err: + debug "Failed to close temp web3 provider", err = err.msg + + if testRequest.failed: + await sleepAsync(checkInterval) + elif m.state == Started: + m.state = ReadyToRestartToPrimary + return + +proc doStop(m: Eth1Monitor) {.async.} = + safeCancel m.runFut if m.dataProvider != nil: await m.dataProvider.close() @@ -950,9 +982,9 @@ proc ensureDataProvider*(m: Eth1Monitor) {.async.} = v.get() proc stop(m: Eth1Monitor) {.async.} = - if m.state == Started: + if m.state in {Started, ReadyToRestartToPrimary}: m.state = Stopping - m.stopFut = resetState(m) + m.stopFut = m.doStop() await m.stopFut m.state = Stopped elif m.state == Stopping: @@ -1002,7 +1034,6 @@ proc syncBlockRange(m: Eth1Monitor, # Reduce all request rate until we have a more general solution # for dealing with Infura's rate limits await sleepAsync(milliseconds(backoff)) - let jsonLogsFut = m.dataProvider.ns.getJsonLogs( DepositEvent, fromBlock = some blockId(currentBlock), @@ -1142,25 +1173,61 @@ func init(T: type FullBlockId, blk: Eth1BlockHeader|BlockObject): T = FullBlockId(number: Eth1BlockNumber blk.number, hash: blk.hash) proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = - if m.state == Failed: - await m.resetState() - elif m.state == Stopping: - await m.stopFut + if m.state in {Started, ReadyToRestartToPrimary}: + return + + let isFirstRun = m.state == Initialized if delayBeforeStart != ZeroDuration: await sleepAsync(delayBeforeStart) + # If the monitor died with an exception, the web3 provider may be in + # an arbitary state, so we better reset it (not doing this has resulted + # in resource leaks historically). + if not m.dataProvider.isNil and m.state == Failed: + # We introduce a local var to eliminate the risk of scheduling two + # competing calls to `close` below. + let provider = m.dataProvider + m.dataProvider = nil + await provider.close() + + m.state = Started await m.ensureDataProvider() - let - web3Url = m.web3Urls[(m.startIdx + m.web3Urls.len - 1) mod m.web3Urls.len] - web3 = m.dataProvider.web3 + + # We might need to reset the chain if the new provider disagrees + # with the previous one regarding the history of the chain or if + # we have detected a conensus violation - our view disagreeing with + # the majority of the validators in the network. + # + # Consensus violations happen in practice because the web3 providers + # sometimes return incomplete or incorrect deposit log events even + # when they don't indicate any errors in the response. When this + # happens, we are usually able to download the data successfully + # on the second attempt. + if m.latestEth1Block.isSome and m.depositsChain.blocks.len > 0: + let needsReset = m.depositsChain.hasConsensusViolation or (block: + let + lastKnownBlock = m.depositsChain.blocks.peekLast + matchingBlockAtNewProvider = awaitWithRetries( + m.dataProvider.getBlockByNumber lastKnownBlock.number) + + lastKnownBlock.voteData.block_hash.asBlockHash != matchingBlockAtNewProvider.hash) + + if needsReset: + m.depositsChain.clear() + m.latestEth1Block = none(FullBlockId) + + template web3Url: string = m.dataProvider.url + + if web3Url != m.web3Urls[0]: + asyncSpawn m.detectPrimaryProviderComingOnline() info "Starting Eth1 deposit contract monitoring", - contract = $m.depositContractAddress, url = web3Url + contract = $m.depositContractAddress - if m.state == Initialized and m.eth1Network.isSome: + if isFirstRun and m.eth1Network.isSome: let - providerNetwork = awaitWithRetries web3.provider.net_version() + providerNetwork = awaitWithRetries m.dataProvider.web3.provider.net_version() expectedNetwork = case m.eth1Network.get of mainnet: "1" of rinkeby: "4" @@ -1170,7 +1237,6 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = expectedNetwork, providerNetwork quit 1 - m.state = Started var mustUsePolling = m.forcePolling or web3Url.startsWith("http://") or web3Url.startsWith("https://") @@ -1190,24 +1256,24 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = proc subscriptionErrorHandler(err: CatchableError) {.raises: [Defect], gcsafe.} = warn "Failed to subscribe for block headers. Switching to polling", - web3Url, err = err.msg + err = err.msg mustUsePolling = true await m.dataProvider.onBlockHeaders(newBlockHeadersHandler, subscriptionErrorHandler) - let startBlock = awaitWithRetries( - m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash)) + if m.depositsChain.blocks.len == 0: + let startBlock = awaitWithRetries( + m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash)) - doAssert m.depositsChain.blocks.len == 0 - m.depositsChain.addBlock Eth1Block( - number: Eth1BlockNumber startBlock.number, - timestamp: Eth1BlockTimestamp startBlock.timestamp, - voteData: eth1DataFromMerkleizer( - m.depositsChain.finalizedBlockHash, - m.depositsChain.finalizedDepositsMerkleizer)) + m.depositsChain.addBlock Eth1Block( + number: Eth1BlockNumber startBlock.number, + timestamp: Eth1BlockTimestamp startBlock.timestamp, + voteData: eth1DataFromMerkleizer( + m.depositsChain.finalizedBlockHash, + m.depositsChain.finalizedDepositsMerkleizer)) - var eth1SyncedTo = Eth1BlockNumber startBlock.number + var eth1SyncedTo = Eth1BlockNumber m.depositsChain.blocks.peekLast.number eth1_synced_head.set eth1SyncedTo.toGaugeValue eth1_finalized_head.set eth1SyncedTo.toGaugeValue eth1_finalized_deposits.set( @@ -1229,6 +1295,11 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = if m.depositsChain.hasConsensusViolation: raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus") + if m.state == ReadyToRestartToPrimary: + info "Primary web3 provider is back online. Restarting the Eth1 monitor" + m.startIdx = 0 + return + if mustUsePolling: let blk = awaitWithRetries( m.dataProvider.web3.provider.eth_getBlockByNumber(blockId("latest"), false)) @@ -1271,7 +1342,6 @@ proc start(m: Eth1Monitor, delayBeforeStart: Duration) = if runFut.error[] of CatchableError: if runFut == m.runFut: warn "Eth1 chain monitoring failure, restarting", err = runFut.error.msg - m.dataProvider = nil m.state = Failed else: fatal "Fatal exception reached", err = runFut.error.msg @@ -1303,13 +1373,35 @@ proc testWeb3Provider*(web3Url: Uri, let web3 = mustSucceed "connect to web3 provider": await newWeb3($web3Url) - network = mustSucceed "get network version": + networkVersion = mustSucceed "get network version": awaitWithRetries web3.provider.net_version() latestBlock = mustSucceed "get latest block": awaitWithRetries web3.provider.eth_getBlockByNumber(blockId("latest"), false) + syncStatus = mustSucceed "get sync status": + awaitWithRetries web3.provider.eth_syncing() + listening = mustSucceed "get network listening": + awaitWithRetries web3.provider.net_listening() + peers = + try: + awaitWithRetries web3.provider.net_peerCount() + except: + 0 + clientVersion = mustSucceed "get client version": + awaitWithRetries web3.provider.web3_clientVersion() + protocolVersion = mustSucceed "get protocol version": + awaitWithRetries web3.provider.eth_protocolVersion() + mining = mustSucceed "get mining status": + awaitWithRetries web3.provider.eth_mining() - echo "Network: ", network + echo "Client Version: ", clientVersion + echo "Protocol Version: ", protocolVersion, " (", $protocolVersion.fromHex[:int], ")" + echo "Network Version: ", networkVersion + echo "Network Listening: ", listening + echo "Network Peers: ", peers + echo "Syncing: ", syncStatus echo "Latest block: ", latestBlock.number.uint64 + echo "Last Known Nonce: ", web3.lastKnownNonce + echo "Mining: ", mining let ns = web3.contractSender(DepositContract, depositContractAddress) try: @@ -1318,7 +1410,6 @@ proc testWeb3Provider*(web3Url: Uri, echo "Deposit root: ", depositRoot except CatchableError as err: echo "Web3 provider is not archive mode: ", err.msg - when hasGenesisDetection: proc init*(T: type Eth1Monitor, cfg: RuntimeConfig, diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 12753012c..db30594b7 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -169,8 +169,19 @@ proc init*(T: type BeaconNode, eventBus.emit("head-change", data) proc onChainReorg(data: ReorgInfoObject) = eventBus.emit("chain-reorg", data) - proc onFinalization(data: FinalizationInfoObject) = - eventBus.emit("finalization", data) + proc makeOnFinalizationCb( + # This `nimcall` functions helps for keeping track of what + # needs to be captured by the onFinalization closure. + eventBus: AsyncEventBus, + eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} = + static: doAssert (eventBus is ref) and (eth1Monitor is ref) + return proc(dag: ChainDAGRef, data: FinalizationInfoObject) = + if eth1Monitor != nil: + let finalizedEpochRef = dag.getFinalizedEpochRef() + discard trackFinalizedState(eth1Monitor, + finalizedEpochRef.eth1_data, + finalizedEpochRef.eth1_deposit_index) + eventBus.emit("finalization", data) proc onSyncContribution(data: SignedContributionAndProof) = eventBus.emit("sync-contribution-and-proof", data) @@ -341,7 +352,7 @@ proc init*(T: type BeaconNode, else: {} dag = ChainDAGRef.init( cfg, db, validatorMonitor, chainDagFlags, onBlockAdded, onHeadChanged, - onChainReorg, onFinalization) + onChainReorg) quarantine = newClone(Quarantine.init()) databaseGenesisValidatorsRoot = getStateField(dag.headState.data, genesis_validators_root) @@ -509,6 +520,8 @@ proc init*(T: type BeaconNode, else: nil + dag.setFinalizationCb makeOnFinalizationCb(eventBus, eth1Monitor) + var node = BeaconNode( nickname: nickname, graffitiBytes: if config.graffiti.isSome: config.graffiti.get @@ -1106,11 +1119,6 @@ proc onSlotStart( await node.handleValidatorDuties(lastSlot, wallSlot) - if node.eth1Monitor != nil and (wallSlot mod SLOTS_PER_EPOCH) == 0: - let finalizedEpochRef = node.dag.getFinalizedEpochRef() - discard node.eth1Monitor.trackFinalizedState( - finalizedEpochRef.eth1_data, finalizedEpochRef.eth1_deposit_index) - await onSlotEnd(node, wallSlot) proc handleMissingBlocks(node: BeaconNode) = diff --git a/vendor/nim-json-rpc b/vendor/nim-json-rpc index 5a2817608..97ba55bbf 160000 --- a/vendor/nim-json-rpc +++ b/vendor/nim-json-rpc @@ -1 +1 @@ -Subproject commit 5a281760803907f4989cacf109b516381dfbbe11 +Subproject commit 97ba55bbf6246ce798f44871ca84a4e96c59167c