From ef20e831a870220509f6a60893b9f1138278d995 Mon Sep 17 00:00:00 2001 From: zah Date: Thu, 9 Mar 2023 19:29:04 +0200 Subject: [PATCH] More metrics for the EL monitor (#4707) * `engine_api_response_time` provides a histogram for the Engine API response times for each unique pair ot URL and request type. * All engine API requests are now tracked Other changes: The client will no longer exit on start-up if it fails to connect to a properly configured EL node. --- beacon_chain/eth1/eth1_monitor.nim | 205 ++++++++++++++++++----------- 1 file changed, 125 insertions(+), 80 deletions(-) diff --git a/beacon_chain/eth1/eth1_monitor.nim b/beacon_chain/eth1/eth1_monitor.nim index ca95acd5c..a420ae704 100644 --- a/beacon_chain/eth1/eth1_monitor.nim +++ b/beacon_chain/eth1/eth1_monitor.nim @@ -259,6 +259,11 @@ declareCounter engine_api_responses, "Number of successful requests to the newPayload Engine API end-point", labels = ["url", "request", "status"] +declareHistogram engine_api_response_time, + "Time(s) used to generate signature usign remote signer", + buckets = [0.1, 0.25, 0.5, 1.0, 2.0, 5.0], + labels = ["url", "request"] + declareCounter engine_api_timeouts, "Number of timed-out requests to Engine API end-point", labels = ["url", "request"] @@ -267,9 +272,41 @@ declareCounter engine_api_last_minute_forkchoice_updates_sent, "Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals", labels = ["url"] +proc setDegradedState(connection: ELConnection, + requestName: string, + statusCode: int, errMsg: string) = + case connection.state + of NeverTested, Working: + warn "Connection to EL node degraded", + url = url(connection.engineUrl), + failedRequest = requestName, + statusCode, err = errMsg + of Degraded: + discard + connection.state = Degraded + +proc setWorkingState(connection: ELConnection) = + case connection.state + of Degraded: + info "Connection to EL node restored", + url = url(connection.engineUrl) + of NeverTested, Working: + discard + connection.state = Working + proc trackEngineApiRequest(connection: ELConnection, request: FutureBase, requestName: string, - deadline: Future[void]) = + startTime: Moment, deadline: Future[void]) = + request.addCallback do (udata: pointer) {.gcsafe, raises: [Defect].}: + # TODO `udata` is nil here. How come? + # This forces us to create a GC cycle between the Future and the closure + if request.completed: + engine_api_response_time.observe( + float(milliseconds(Moment.now - startTime)) / 1000.0, + [connection.engineUrl.url, requestName]) + + connection.setWorkingState() + deadline.addCallback do (udata: pointer) {.gcsafe, raises: [Defect].}: if not request.finished: request.cancel() @@ -283,23 +320,7 @@ proc trackEngineApiRequest(connection: ELConnection, 0 if request.failed: - case connection.state - of NeverTested, Working: - warn "Connection to EL node degraded", - url = url(connection.engineUrl), - failedRequest = requestName, - statusCode - of Degraded: - discard - connection.state = Degraded - else: - case connection.state - of Degraded: - info "Connection to EL node restored", - url = url(connection.engineUrl) - of NeverTested, Working: - discard - connection.state = Working + connection.setDegradedState(requestName, statusCode, request.error.msg) engine_api_responses.inc(1, [connection.engineUrl.url, requestName, $statusCode]) @@ -308,6 +329,21 @@ template awaitOrRaiseOnTimeout[T](fut: Future[T], awaitWithTimeout(fut, timeout): raise newException(DataProviderTimeout, "Timeout") +template trackedRequestWithTimeout[T](connection: ELConnection, + requestName: static string, + lazyRequestExpression: Future[T], + timeout: Duration): T = + let + connectionParam = connection + startTime = Moment.now + deadline = sleepAsync(timeout) + request = lazyRequestExpression + + connectionParam.trackEngineApiRequest(request, requestName, startTime, deadline) + + awaitWithTimeout(request, deadline): + raise newException(DataProviderTimeout, "Timeout") + template cfg(m: ELManager): auto = m.eth1Chain.cfg @@ -886,7 +922,8 @@ proc getPayload*(m: ELManager, if req.read.executionPayload.extraData.len > MAX_EXTRA_DATA_BYTES: warn "Execution client provided a block with invalid extraData (size exceeds limit)", - size = req.read.executionPayload.extraData.len, limit = MAX_EXTRA_DATA_BYTES + size = req.read.executionPayload.extraData.len, + limit = MAX_EXTRA_DATA_BYTES continue if bestPayloadIdx.isNone: @@ -911,8 +948,10 @@ proc waitELToSyncDeposits(connection: ELConnection, while true: try: - discard awaitOrRaiseOnTimeout(rpcClient.getBlockByHash(minimalRequiredBlock), - web3RequestsTimeout) + discard connection.trackedRequestWithTimeout( + "getBlockByHash", + rpcClient.getBlockByHash(minimalRequiredBlock), + web3RequestsTimeout) connection.depositContractSyncStatus = DepositContractSyncStatus.synced return except CancelledError as err: @@ -973,7 +1012,10 @@ proc getBlobsBundleFromASyncedEL( connection = await m.selectConnectionForChainSyncing() rpcClient = await connection.connectedRpcClient() - return await rpcClient.engine_getBlobsBundleV1(FixedBytes[8] payloadId) + return connection.trackedRequestWithTimeout( + "getBlobsBundle", + rpcClient.engine_getBlobsBundleV1(FixedBytes[8] payloadId), + GETBLOBS_TIMEOUT) proc getBlobsBundleV1*( m: ELManager, payloadId: bellatrix.PayloadID): @@ -981,16 +1023,10 @@ proc getBlobsBundleV1*( if m.elConnections.len == 0: return Opt.none BlobsBundleV1 - return Opt.some: - try: - awaitWithTimeout( - m.getBlobsBundleFromASyncedEL(payload_id), - GETBLOBS_TIMEOUT): - # beacon_block_payload_errors.inc() - warn "Getting blobs sidecar from Engine API timed out", payload_id - return Opt.none BlobsBundleV1 - except CatchableError: - return Opt.none BlobsBundleV1 + result = try: + Opt.some(await m.getBlobsBundleFromASyncedEL(payload_id)) + except CatchableError: + Opt.none BlobsBundleV1 proc sendNewPayloadToSingleEL(connection: ELConnection, payload: engine_api.ExecutionPayloadV1): @@ -1104,10 +1140,11 @@ proc sendNewPayload*(m: ELManager, Future[PayloadExecutionStatus] {.async.} = let earlyDeadline = sleepAsync(chronos.seconds 1) + startTime = Moment.now deadline = sleepAsync(NEWPAYLOAD_TIMEOUT) requests = m.elConnections.mapIt: let req = sendNewPayloadToSingleEL(it, payload) - trackEngineApiRequest(it, req, "newPayload", deadline) + trackEngineApiRequest(it, req, "newPayload", startTime, deadline) req requestsCompleted = allFutures(requests) @@ -1211,10 +1248,11 @@ proc forkchoiceUpdated*(m: ELManager, safeBlockHash: safeBlockHash.asBlockHash, finalizedBlockHash: finalizedBlockHash.asBlockHash) earlyDeadline = sleepAsync(chronos.seconds 1) + startTime = Moment.now deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT) requests = m.elConnections.mapIt: let req = it.forkchoiceUpdatedForSingleEL(state, payloadAttributes) - trackEngineApiRequest(it, req, "forkchoiceUpdated", deadline) + trackEngineApiRequest(it, req, "forkchoiceUpdated", startTime, deadline) req requestsCompleted = allFutures(requests) @@ -1267,7 +1305,10 @@ proc exchangeConfigWithSingleEL(m: ELManager, connection: ELConnection) {.async. try: let providerChain = - awaitOrRaiseOnTimeout(rpcClient.eth_chainId(), web3RequestsTimeout) + connection.trackedRequestWithTimeout( + "chainId", + rpcClient.eth_chainId(), + web3RequestsTimeout) # https://eips.ethereum.org/EIPS/eip-155#list-of-chain-ids expectedChain = case m.eth1Network.get @@ -1296,7 +1337,8 @@ proc exchangeConfigWithSingleEL(m: ELManager, connection: ELConnection) {.async. terminalBlockHash: m.eth1Chain.cfg.TERMINAL_BLOCK_HASH, terminalBlockNumber: Quantity 0) elConf = try: - awaitOrRaiseOnTimeout( + connection.trackedRequestWithTimeout( + "exchangeTransitionConfiguration", rpcClient.engine_exchangeTransitionConfigurationV1(ourConf), timeout = 1.seconds) except CatchableError as err: @@ -1352,11 +1394,16 @@ template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped template init[N: static int](T: type DynamicBytes[N, N]): T = T newSeq[byte](N) -proc fetchTimestampWithRetries(rpcClient: RpcClient, - blk: Eth1Block) {.async.} = - let web3block = awaitOrRaiseOnTimeout( +proc fetchTimestamp(connection: ELConnection, + rpcClient: RpcClient, + blk: Eth1Block) {.async.} = + debug "Fetching block timestamp", blockNum = blk.number + + let web3block = connection.trackedRequestWithTimeout( + "getBlockByHash", rpcClient.getBlockByHash(blk.hash.asBlockHash), web3RequestsTimeout) + blk.timestamp = Eth1BlockTimestamp web3block.timestamp func depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] {. @@ -1424,16 +1471,24 @@ when hasDepositRootChecks: const contractCallTimeout = 60.seconds - proc fetchDepositContractData(rpcClient: RpcClient, + proc fetchDepositContractData(connection: ELConnection, + rpcClient: RpcClient, depositContact: Sender[DepositContract], blk: Eth1Block): Future[DepositContractDataStatus] {.async.} = let + startTime = Moment.now + deadline = sleepAsync(contractCallTimeout) depositRoot = depositContract.get_deposit_root.call(blockNumber = blk.number) rawCount = depositContract.get_deposit_count.call(blockNumber = blk.number) + connection.trackEngineApiRequest( + depositRoot, "get_deposit_root", startTime, deadline) + connection.trackEngineApiRequest( + rawCount, "get_deposit_count", startTime, deadline) + try: let fetchedRoot = asEth2Digest( - awaitOrRaiseOnTimeout(depositRoot, contractCallTimeout)) + awaitWithTimeout(depositRoot, deadline)) if blk.depositRoot.isZero: blk.depositRoot = fetchedRoot result = Fetched @@ -1449,7 +1504,7 @@ when hasDepositRootChecks: try: let fetchedCount = bytes_to_uint64( - awaitOrRaiseOnTimeout(rawCount, contractCallTimeout).toArray) + awaitWithTimeout(rawCount, deadline).toArray) if blk.depositCount == 0: blk.depositCount = fetchedCount elif blk.depositCount != fetchedCount: @@ -1777,6 +1832,7 @@ func earliestBlockOfInterest(m: ELManager, latestEth1BlockNumber: Eth1BlockNumbe latestEth1BlockNumber - (2 * m.cfg.ETH1_FOLLOW_DISTANCE) - votedBlocksSafetyMargin proc syncBlockRange(m: ELManager, + connection: ELConnection, rpcClient: RpcClient, depositContract: Sender[DepositContract], fromBlock, toBlock, @@ -1803,14 +1859,20 @@ proc syncBlockRange(m: ELManager, # Reduce all request rate until we have a more general solution # for dealing with Infura's rate limits await sleepAsync(milliseconds(backoff)) - let jsonLogsFut = depositContract.getJsonLogs( - DepositEvent, - fromBlock = some blockId(currentBlock), - toBlock = some blockId(maxBlockNumberRequested)) + let + startTime = Moment.now + deadline = sleepAsync 30.seconds + jsonLogsFut = depositContract.getJsonLogs( + DepositEvent, + fromBlock = some blockId(currentBlock), + toBlock = some blockId(maxBlockNumberRequested)) + + connection.trackEngineApiRequest( + jsonLogsFut, "getLogs", startTime, deadline) depositLogs = try: # Downloading large amounts of deposits may take several minutes - awaitWithTimeout(jsonLogsFut, 60.seconds): + awaitWithTimeout(jsonLogsFut, deadline): raise newException(DataProviderTimeout, "Request time out while obtaining json logs") except CatchableError as err: @@ -1833,14 +1895,14 @@ proc syncBlockRange(m: ELManager, for i in 0 ..< blocksWithDeposits.len: let blk = blocksWithDeposits[i] - debug "Fetching block timestamp", blockNum = blk.number - await rpcClient.fetchTimestampWithRetries(blk) + await fetchTimestamp(connection, rpcClient, blk) if blk.number > fullSyncFromBlock: let lastBlock = m.eth1Chain.blocks.peekLast for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number: debug "Obtaining block without deposits", blockNum = n - let blockWithoutDeposits = awaitOrRaiseOnTimeout( + let blockWithoutDeposits = connection.trackedRequestWithTimeout( + "getBlockByNumber", rpcClient.getBlockByNumber(n), web3RequestsTimeout) @@ -1856,9 +1918,7 @@ proc syncBlockRange(m: ELManager, template lastBlock: auto = blocksWithDeposits[lastIdx] let status = when hasDepositRootChecks: - awaitOrRaiseOnTimeout( - rpcClient.fetchDepositContractData(depositContract, lastBlock), - web3RequestsTimeout) + rpcClient.fetchDepositContractData(depositContract, lastBlock) else: DepositRootUnavailable @@ -1895,18 +1955,11 @@ func hasProperlyConfiguredConnection*(m: ELManager): bool = proc startExchangeTransitionConfigurationLoop(m: ELManager) {.async.} = debug "Starting exchange transition configuration loop" - if not m.hasProperlyConfiguredConnection: - await m.exchangeTransitionConfiguration() - if not m.hasProperlyConfiguredConnection: - fatal "The Bellatrix hard fork requires the beacon node to be connected to a properly configured Engine API end-point. " & - "See https://nimbus.guide/merge.html for more details." - quit 1 - while true: # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.2/src/engine/specification.md#engine_exchangetransitionconfigurationv1 - await sleepAsync(60.seconds) debug "Exchange transition configuration tick" traceAsyncErrors m.exchangeTransitionConfiguration() + await sleepAsync(60.seconds) proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = let rpcClient = await connection.connectedRpcClient() @@ -1939,7 +1992,8 @@ proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = let needsReset = m.eth1Chain.hasConsensusViolation or (block: let lastKnownBlock = m.eth1Chain.blocks.peekLast - matchingBlockAtNewProvider = awaitOrRaiseOnTimeout( + matchingBlockAtNewProvider = connection.trackedRequestWithTimeout( + "getBlockByNumber", rpcClient.getBlockByNumber(lastKnownBlock.number), web3RequestsTimeout) @@ -1955,8 +2009,10 @@ proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = if m.eth1Chain.blocks.len == 0: let finalizedBlockHash = m.eth1Chain.finalizedBlockHash.asBlockHash let startBlock = - awaitOrRaiseOnTimeout(rpcClient.getBlockByHash(finalizedBlockHash), - web3RequestsTimeout) + connection.trackedRequestWithTimeout( + "getBlockByHash", + rpcClient.getBlockByHash(finalizedBlockHash), + web3RequestsTimeout) m.eth1Chain.addBlock Eth1Block( hash: m.eth1Chain.finalizedBlockHash, @@ -1984,7 +2040,8 @@ proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus") let latestBlock = try: - awaitOrRaiseOnTimeout( + connection.trackedRequestWithTimeout( + "getBlockByNumber", rpcClient.eth_getBlockByNumber(blockId("latest"), false), web3RequestsTimeout) except CatchableError as err: @@ -2008,7 +2065,8 @@ proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = latestBlock.number.uint64 > m.cfg.ETH1_FOLLOW_DISTANCE: let depositContract = connection.web3.get.contractSender( DepositContract, m.depositContractAddress) - await m.syncBlockRange(rpcClient, + await m.syncBlockRange(connection, + rpcClient, depositContract, eth1SyncedTo + 1, m.syncTargetBlock.get, @@ -2032,8 +2090,7 @@ proc startChainSyncingLoop(m: ELManager) {.async.} = try: await syncEth1Chain(m, connection) except CatchableError as err: - error "EL connection failure while syncing deposits", - url = connection.engineUrl.url, err = err.msg + # The error is already logged by trackEngineApiRequest await sleepAsync(5.seconds) proc start*(m: ELManager) {.gcsafe.} = @@ -2049,18 +2106,6 @@ proc start*(m: ELManager) {.gcsafe.} = m.exchangeTransitionConfigurationLoopFut = m.startExchangeTransitionConfigurationLoop() -proc getEth1BlockHash*( - url: EngineApiUrl, blockId: RtBlockIdentifier, jwtSecret: Option[seq[byte]]): - Future[BlockHash] {.async.} = - let web3 = awaitOrRaiseOnTimeout(url.newWeb3(), 10.seconds) - try: - let blk = awaitOrRaiseOnTimeout( - web3.provider.eth_getBlockByNumber(blockId, false), - web3RequestsTimeout) - return blk.hash - finally: - await web3.close() - func `$`(x: Quantity): string = $(x.uint64)