From e6b9bfc9e4246f2e098a21e3ba6f4bdabc206a73 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Tue, 14 May 2024 21:03:30 +0300 Subject: [PATCH] el_manager initial refactor. (#6228) * Initial commit. * Address review comments and fix missing primitive. * Fix developer build. * More asyncraises updates. * Refactor and optimize forkchoiceUpdated() and sendNewPayload(). * Fix runtime assertion. * Refactor getPayload(). --- beacon_chain/el/el_manager.nim | 1278 +++++++++++++++++++------------- 1 file changed, 760 insertions(+), 518 deletions(-) diff --git a/beacon_chain/el/el_manager.nim b/beacon_chain/el/el_manager.nim index daee037d1..bdd648a91 100644 --- a/beacon_chain/el/el_manager.nim +++ b/beacon_chain/el/el_manager.nim @@ -8,18 +8,18 @@ {.push raises: [].} import - std/[strformat, typetraits, json], + std/[strformat, typetraits, json, sequtils], # Nimble packages: chronos, metrics, chronicles/timings, json_rpc/[client, errors], web3, web3/[engine_api, primitives, conversions], eth/common/eth_types, - eth/async_utils, results, + results, stew/[assign2, byteutils, objects], # Local modules: ../spec/[eth2_merkleization, forks], ../networking/network_metadata, - ".."/[beacon_node_status, future_combinators], + ".."/beacon_node_status, "."/[eth1_chain, el_conf] from std/sequtils import anyIt, mapIt @@ -39,6 +39,7 @@ type WithdrawalCredentialsBytes = DynamicBytes[32, 32] SignatureBytes = DynamicBytes[96, 96] Int64LeBytes = DynamicBytes[8, 8] + WithoutTimeout* = distinct int contract(DepositContract): proc deposit(pubkey: PubKeyBytes, @@ -56,6 +57,7 @@ contract(DepositContract): index: Int64LeBytes) {.event.} const + noTimeout = WithoutTimeout(0) hasDepositRootChecks = defined(has_deposit_root_checks) targetBlocksPerLogsRequest = 1000'u64 @@ -101,6 +103,9 @@ type finalizedBlockHash*: Eth2Digest payloadAttributes*: PayloadAttributesV3 + ELManagerState* {.pure.} = enum + Running, Closing, Closed + ELManager* = ref object eth1Network: Option[Eth1Network] ## If this value is supplied the EL manager will check whether @@ -133,7 +138,7 @@ type chainSyncingLoopFut: Future[void] exchangeTransitionConfigurationLoopFut: Future[void] - stopFut: Future[void] + managerState: ELManagerState nextExpectedPayloadParams*: Option[NextExpectedPayloadParams] @@ -147,7 +152,7 @@ type notSynced synced - ConnectionState = enum + ELConnectionState {.pure.} = enum NeverTested Working Degraded @@ -155,7 +160,7 @@ type ELConnection* = ref object engineUrl: EngineApiUrl - web3: Option[Web3] + web3: Opt[Web3] ## This will be `none` before connecting and while we are ## reconnecting after a lost connetion. You can wait on ## the future below for the moment the connection is active. @@ -167,7 +172,7 @@ type ## The latest status of the `exchangeTransitionConfiguration` ## exchange. - state: ConnectionState + state: ELConnectionState hysteresisCounter: int depositContractSyncStatus: DepositContractSyncStatus @@ -182,6 +187,7 @@ type DataProviderFailure* = object of CatchableError CorruptDataProvider* = object of DataProviderFailure DataProviderTimeout* = object of DataProviderFailure + DataProviderConnectionFailure* = object of DataProviderFailure DisconnectHandler* = proc () {.gcsafe, raises: [].} @@ -229,10 +235,19 @@ declareCounter engine_api_last_minute_forkchoice_updates_sent, "Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals", labels = ["url"] -proc close(connection: ELConnection): Future[void] {.async.} = +proc close(connection: ELConnection): Future[void] {.async: (raises: []).} = if connection.web3.isSome: - awaitWithTimeout(connection.web3.get.close(), 30.seconds): - debug "Failed to close data provider in time" + try: + let web3 = connection.web3.get + await noCancel web3.close().wait(30.seconds) + except AsyncTimeoutError: + debug "Failed to close execution layer data provider in time", + timeout = 30.seconds + except CatchableError as exc: + # TODO (cheatfate): This handler should be removed when `nim-web3` will + # adopt `asyncraises`. + debug "Failed to close execution layer", error = $exc.name, + reason = $exc.msg proc increaseCounterTowardsStateChange(connection: ELConnection): bool = result = connection.hysteresisCounter >= connectionStateChangeHysteresisThreshold @@ -249,13 +264,15 @@ proc decreaseCounterTowardsStateChange(connection: ELConnection) = # between success and failure is roughly 50:50% connection.hysteresisCounter = connection.hysteresisCounter div 5 -proc setDegradedState(connection: ELConnection, - requestName: string, - statusCode: int, errMsg: string) = +proc setDegradedState( + connection: ELConnection, + requestName: string, + statusCode: int, + errMsg: string +): Future[void] {.async: (raises: []).} = debug "Failed EL Request", requestName, statusCode, err = errMsg - case connection.state - of NeverTested, Working: + of ELConnectionState.NeverTested, ELConnectionState.Working: if connection.increaseCounterTowardsStateChange(): warn "Connection to EL node degraded", url = url(connection.engineUrl), @@ -264,79 +281,75 @@ proc setDegradedState(connection: ELConnection, connection.state = Degraded - asyncSpawn connection.close() - connection.web3 = none[Web3]() - of Degraded: + await connection.close() + connection.web3 = Opt.none(Web3) + of ELConnectionState.Degraded: connection.decreaseCounterTowardsStateChange() proc setWorkingState(connection: ELConnection) = case connection.state - of NeverTested: + of ELConnectionState.NeverTested: connection.hysteresisCounter = 0 connection.state = Working - of Degraded: + of ELConnectionState.Degraded: if connection.increaseCounterTowardsStateChange(): info "Connection to EL node restored", url = url(connection.engineUrl) - connection.state = Working - of Working: + of ELConnectionState.Working: connection.decreaseCounterTowardsStateChange() -proc trackEngineApiRequest(connection: ELConnection, - request: FutureBase, requestName: string, - startTime: Moment, deadline: Future[void], - failureAllowed = false) = - request.addCallback do (udata: pointer) {.gcsafe, raises: [].}: - # TODO `udata` is nil here. How come? - # This forces us to create a GC cycle between the Future and the closure - if request.completed: - engine_api_request_duration_seconds.observe( - float(milliseconds(Moment.now - startTime)) / 1000.0, +proc engineApiRequest[T]( + connection: ELConnection, + request: Future[T], + requestName: string, + startTime: Moment, + deadline: Future[void] | Duration | WithoutTimeout, + failureAllowed = false +): Future[T] {.async: (raises: [CatchableError]).} = + ## This procedure raises `CancelledError` and `DataProviderTimeout` + ## exceptions, and everything which `request` could raise. + try: + let res = + when deadline is WithoutTimeout: + await request + else: + await request.wait(deadline) + engine_api_request_duration_seconds.observe( + float(milliseconds(Moment.now - startTime)) / 1000.0, [connection.engineUrl.url, requestName]) - - connection.setWorkingState() - - deadline.addCallback do (udata: pointer) {.gcsafe, raises: [].}: - if not request.finished: - request.cancelSoon() + engine_api_responses.inc( + 1, [connection.engineUrl.url, requestName, "200"]) + connection.setWorkingState() + res + except AsyncTimeoutError as exc: + engine_api_timeouts.inc(1, [connection.engineUrl.url, requestName]) + if not(failureAllowed): + await connection.setDegradedState(requestName, 0, "Request timed out") + raise newException(DataProviderTimeout, "Request timed out") + except CancelledError as exc: + when deadline is WithoutTimeout: + # When `deadline` is set to `noTimeout`, we usually get cancelled on + # timeout which was handled by caller. engine_api_timeouts.inc(1, [connection.engineUrl.url, requestName]) - if not failureAllowed: - connection.setDegradedState(requestName, 0, "Request timed out") + if not(failureAllowed): + await connection.setDegradedState(requestName, 0, "Request timed out") else: - let statusCode = if not request.failed: - 200 - elif request.error of ErrorResponse: + if not(failureAllowed): + await connection.setDegradedState(requestName, 0, "Request interrupted") + raise exc + except CatchableError as exc: + let statusCode = + if request.error of ErrorResponse: ((ref ErrorResponse) request.error).status else: 0 - - if request.failed and not failureAllowed: - connection.setDegradedState(requestName, statusCode, request.error.msg) - - engine_api_responses.inc(1, [connection.engineUrl.url, requestName, $statusCode]) - -template awaitOrRaiseOnTimeout[T](fut: Future[T], - timeout: Duration): T = - awaitWithTimeout(fut, timeout): - raise newException(DataProviderTimeout, "Timeout") - -template trackedRequestWithTimeout[T](connection: ELConnection, - requestName: static string, - lazyRequestExpression: Future[T], - timeout: Duration, - failureAllowed = false): T = - let - connectionParam = connection - startTime = Moment.now - deadline = sleepAsync(timeout) - request = lazyRequestExpression - - connectionParam.trackEngineApiRequest( - request, requestName, startTime, deadline, failureAllowed) - - awaitWithTimeout(request, deadline): - raise newException(DataProviderTimeout, "Timeout") + engine_api_responses.inc( + 1, [connection.engineUrl.url, requestName, $statusCode]) + if not(failureAllowed): + await connection.setDegradedState( + requestName, statusCode, request.error.msg) + raise exc func raiseIfNil(web3block: BlockObject): BlockObject {.raises: [ValueError].} = if web3block == nil: @@ -689,8 +702,7 @@ func getJsonRpcRequestHeaders(jwtSecret: Opt[seq[byte]]): proc newWeb3*(engineUrl: EngineApiUrl): Future[Web3] = newWeb3(engineUrl.url, - getJsonRpcRequestHeaders(engineUrl.jwtSecret), - httpFlags = {HttpClientFlag.NewConnectionAlways}) + getJsonRpcRequestHeaders(engineUrl.jwtSecret), httpFlags = {}) proc establishEngineApiConnection(url: EngineApiUrl): Future[Result[Web3, string]] {. @@ -705,35 +717,41 @@ proc establishEngineApiConnection(url: EngineApiUrl): err exc.msg proc tryConnecting(connection: ELConnection): Future[bool] {. - async: (raises: [CancelledError]).} = + async: (raises: [CancelledError]).} = if connection.isConnected: return true if connection.connectingFut == nil or connection.connectingFut.finished: # The previous attempt was not successful - connection.connectingFut = establishEngineApiConnection(connection.engineUrl) + connection.connectingFut = + establishEngineApiConnection(connection.engineUrl) let web3Res = await connection.connectingFut if web3Res.isErr: warn "Engine API connection failed", err = web3Res.error - return false + false else: - connection.web3 = some web3Res.get - return true + connection.web3 = Opt.some(web3Res.get) + true proc connectedRpcClient(connection: ELConnection): Future[RpcClient] {. - async: (raises: [CancelledError]).} = + async: (raises: [CancelledError]).} = while not connection.isConnected: - if not await connection.tryConnecting(): + if not(await connection.tryConnecting()): await sleepAsync(chronos.seconds(10)) - return connection.web3.get.provider + connection.web3.get.provider -proc getBlockByHash(rpcClient: RpcClient, hash: BlockHash): Future[BlockObject] = +proc getBlockByHash( + rpcClient: RpcClient, + hash: BlockHash +): Future[BlockObject] {.async: (raising: [CatchableError], raw: true).} = rpcClient.eth_getBlockByHash(hash, false) -proc getBlockByNumber*(rpcClient: RpcClient, - number: Eth1BlockNumber): Future[BlockObject] = +proc getBlockByNumber*( + rpcClient: RpcClient, + number: Eth1BlockNumber +): Future[BlockObject] {.async: (raising: [CatchableError], raw: true).} = let hexNumber = try: let num = distinctBase(number) &"0x{num:X}" # No leading 0's! @@ -782,7 +800,8 @@ proc getPayloadFromSingleEL( timestamp: uint64, randomData: Eth2Digest, suggestedFeeRecipient: Eth1Address, - withdrawals: seq[WithdrawalV1]): Future[GetPayloadResponseType] {.async.} = + withdrawals: seq[WithdrawalV1] +): Future[GetPayloadResponseType] {.async: (raises: [CatchableError]).} = let rpcClient = await connection.connectedRpcClient() @@ -877,15 +896,16 @@ template kind(T: typedesc[ExecutionPayloadV1OrV2|ExecutionPayloadV2]): Consensus template kind(T: type ExecutionPayloadV3): ConsensusFork = ConsensusFork.Deneb -proc getPayload*(m: ELManager, - PayloadType: type ForkyExecutionPayloadForSigning, - consensusHead: Eth2Digest, - headBlock, safeBlock, finalizedBlock: Eth2Digest, - timestamp: uint64, - randomData: Eth2Digest, - suggestedFeeRecipient: Eth1Address, - withdrawals: seq[capella.Withdrawal]): - Future[Opt[PayloadType]] {.async: (raises: [CancelledError]).} = +proc getPayload*( + m: ELManager, + PayloadType: type ForkyExecutionPayloadForSigning, + consensusHead: Eth2Digest, + headBlock, safeBlock, finalizedBlock: Eth2Digest, + timestamp: uint64, + randomData: Eth2Digest, + suggestedFeeRecipient: Eth1Address, + withdrawals: seq[capella.Withdrawal] +): Future[Opt[PayloadType]] {.async: (raises: [CancelledError]).} = if m.elConnections.len == 0: return err() @@ -900,77 +920,105 @@ proc getPayload*(m: ELManager, let timeout = GETPAYLOAD_TIMEOUT + extraProcessingOverhead deadline = sleepAsync(timeout) - requests = m.elConnections.mapIt(it.getPayloadFromSingleEL( - EngineApiResponseType(PayloadType), - isFcUpToDate, consensusHead, headBlock, safeBlock, finalizedBlock, - timestamp, randomData, suggestedFeeRecipient, engineApiWithdrawals - )) - requestsCompleted = allFutures(requests) - # TODO cancel requests on cancellation - await requestsCompleted or deadline + var bestPayloadIdx = Opt.none(int) - var bestPayloadIdx = none int - for idx, req in requests: - if not req.finished: - warn "Timeout while getting execution payload", - url = m.elConnections[idx].engineUrl.url - req.cancelSoon() - elif req.failed: - warn "Failed to get execution payload from EL", + while true: + let requests = + m.elConnections.mapIt( + it.getPayloadFromSingleEL(EngineApiResponseType(PayloadType), + isFcUpToDate, consensusHead, headBlock, safeBlock, finalizedBlock, + timestamp, randomData, suggestedFeeRecipient, engineApiWithdrawals)) + + let timeoutExceeded = + try: + await allFutures(requests).wait(deadline) + false + except AsyncTimeoutError as exc: + true + except CancelledError as exc: + let pending = + requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + raise exc + + for idx, req in requests: + if not(req.finished()): + warn "Timeout while getting execution payload", + url = m.elConnections[idx].engineUrl.url + elif req.failed(): + warn "Failed to get execution payload from EL", url = m.elConnections[idx].engineUrl.url, - err = req.error.msg - else: - const payloadFork = PayloadType.kind - when payloadFork >= ConsensusFork.Capella: - when payloadFork == ConsensusFork.Capella: - # TODO: The engine_api module may offer an alternative API where it is guaranteed - # to return the correct response type (i.e. the rule below will be enforced - # during deserialization). - if req.value().executionPayload.withdrawals.isNone: - warn "Execution client returned a block without a 'withdrawals' field for a post-Shanghai block", - url = m.elConnections[idx].engineUrl.url - continue - - if engineApiWithdrawals != req.value().executionPayload.withdrawals.maybeDeref: - # otherwise it formats as "@[(index: ..., validatorIndex: ..., - # address: ..., amount: ...), (index: ..., validatorIndex: ..., - # address: ..., amount: ...)]" - warn "Execution client did not return correct withdrawals", - withdrawals_from_cl_len = engineApiWithdrawals.len, - withdrawals_from_el_len = - req.value().executionPayload.withdrawals.maybeDeref.len, - withdrawals_from_cl = - mapIt(engineApiWithdrawals, it.asConsensusWithdrawal), - withdrawals_from_el = - mapIt( - req.value().executionPayload.withdrawals.maybeDeref, - it.asConsensusWithdrawal), - url = m.elConnections[idx].engineUrl.url - - if req.value().executionPayload.extraData.len > MAX_EXTRA_DATA_BYTES: - warn "Execution client provided a block with invalid extraData (size exceeds limit)", - url = m.elConnections[idx].engineUrl.url, - size = req.value().executionPayload.extraData.len, - limit = MAX_EXTRA_DATA_BYTES - continue - - if bestPayloadIdx.isNone: - bestPayloadIdx = some idx + reason = req.error.msg else: - if cmpGetPayloadResponses(req.value(), requests[bestPayloadIdx.get].value()) > 0: - bestPayloadIdx = some idx + const payloadFork = PayloadType.kind + when payloadFork >= ConsensusFork.Capella: + when payloadFork == ConsensusFork.Capella: + # TODO: The engine_api module may offer an alternative API where + # it is guaranteed to return the correct response type (i.e. the + # rule below will be enforced during deserialization). + if req.value().executionPayload.withdrawals.isNone: + warn "Execution client returned a block without a " & + "'withdrawals' field for a post-Shanghai block", + url = m.elConnections[idx].engineUrl.url + continue - deadline.cancelSoon() + if engineApiWithdrawals != + req.value().executionPayload.withdrawals.maybeDeref: + # otherwise it formats as "@[(index: ..., validatorIndex: ..., + # address: ..., amount: ...), (index: ..., validatorIndex: ..., + # address: ..., amount: ...)]" + # TODO (cheatfate): should we have `continue` statement at the + # end of this branch. If no such payload could be choosen as + # best one. + warn "Execution client did not return correct withdrawals", + withdrawals_from_cl_len = engineApiWithdrawals.len, + withdrawals_from_el_len = + req.value().executionPayload.withdrawals.maybeDeref.len, + withdrawals_from_cl = + mapIt(engineApiWithdrawals, it.asConsensusWithdrawal), + withdrawals_from_el = + mapIt( + req.value().executionPayload.withdrawals.maybeDeref, + it.asConsensusWithdrawal), + url = m.elConnections[idx].engineUrl.url + # If we have more than one EL connection we consider this as + # a failure. + if len(requests) > 1: + continue - if bestPayloadIdx.isSome: - return ok requests[bestPayloadIdx.get].value().asConsensusType - else: - return err() + if req.value().executionPayload.extraData.len > MAX_EXTRA_DATA_BYTES: + warn "Execution client provided a block with invalid extraData " & + "(size exceeds limit)", + url = m.elConnections[idx].engineUrl.url, + size = req.value().executionPayload.extraData.len, + limit = MAX_EXTRA_DATA_BYTES + continue -proc waitELToSyncDeposits(connection: ELConnection, - minimalRequiredBlock: BlockHash) {.async.} = - var rpcClient = await connection.connectedRpcClient() + if bestPayloadIdx.isNone: + bestPayloadIdx = Opt.some(idx) + else: + if cmpGetPayloadResponses( + req.value(), requests[bestPayloadIdx.get].value()) > 0: + bestPayloadIdx = Opt.some(idx) + + let pending = + requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + + if bestPayloadIdx.isSome(): + return ok(requests[bestPayloadIdx.get()].value().asConsensusType) + + if timeoutExceeded: + break + + err() + +proc waitELToSyncDeposits( + connection: ELConnection, + minimalRequiredBlock: BlockHash +) {.async: (raises: [CancelledError]).} = + var rpcClient: RpcClient = nil if connection.depositContractSyncStatus == DepositContractSyncStatus.synced: return @@ -978,36 +1026,38 @@ proc waitELToSyncDeposits(connection: ELConnection, var attempt = 0 while true: + if isNil(rpcClient): + rpcClient = await connection.connectedRpcClient() + try: - discard raiseIfNil connection.trackedRequestWithTimeout( - "getBlockByHash", + discard raiseIfNil await connection.engineApiRequest( rpcClient.getBlockByHash(minimalRequiredBlock), - web3RequestsTimeout, - failureAllowed = true) + "getBlockByHash", Moment.now(), + web3RequestsTimeout, failureAllowed = true) connection.depositContractSyncStatus = DepositContractSyncStatus.synced return - except CancelledError as err: - trace "waitELToSyncDepositContract cancelled", + except CancelledError as exc: + trace "waitELToSyncDepositContract interrupted", url = connection.engineUrl.url - raise err - except CatchableError as err: + raise exc + except CatchableError as exc: connection.depositContractSyncStatus = DepositContractSyncStatus.notSynced if attempt == 0: - warn "Failed to obtain the most recent known block from the execution " & - "layer node (the node is probably not synced)", + warn "Failed to obtain the most recent known block from the " & + "execution layer node (the node is probably not synced)", url = connection.engineUrl.url, blk = minimalRequiredBlock, - err = err.msg + reason = exc.msg elif attempt mod 60 == 0: # This warning will be produced every 30 minutes warn "Still failing to obtain the most recent known block from the " & "execution layer node (the node is probably still not synced)", url = connection.engineUrl.url, blk = minimalRequiredBlock, - err = err.msg - inc attempt + reason = exc.msg + inc(attempt) await sleepAsync(seconds(30)) - rpcClient = await connection.connectedRpcClient() + rpcClient = nil func networkHasDepositContract(m: ELManager): bool = not m.cfg.DEPOSIT_CONTRACT_ADDRESS.isDefaultValue @@ -1018,59 +1068,76 @@ func mostRecentKnownBlock(m: ELManager): BlockHash = else: m.depositContractBlockHash -proc selectConnectionForChainSyncing(m: ELManager): Future[ELConnection] {.async.} = +proc selectConnectionForChainSyncing( + m: ELManager +): Future[ELConnection] {.async: (raises: [CancelledError, + DataProviderConnectionFailure]).} = doAssert m.elConnections.len > 0 - let connectionsFuts = mapIt( - m.elConnections, + let pendingConnections = m.elConnections.mapIt( if m.networkHasDepositContract: FutureBase waitELToSyncDeposits(it, m.mostRecentKnownBlock) else: FutureBase connectedRpcClient(it)) - # TODO: Ideally, the cancellation will be handled automatically - # by a helper like `firstCompletedFuture` - let firstConnected = try: - await firstCompletedFuture(connectionsFuts) - except CancelledError as err: - for future in connectionsFuts: - future.cancelSoon() - raise err + while true: + var pendingFutures = pendingConnections + try: + discard await race(pendingFutures) + except ValueError as exc: + raiseAssert "pendingFutures should not be empty at this moment" + except CancelledError as exc: + let pending = pendingConnections.filterIt(not(it.finished())). + mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + raise exc - for future in connectionsFuts: - if future != firstConnected: - future.cancelSoon() + pendingFutures.reset() + for index, future in pendingConnections.pairs(): + if future.completed(): + let pending = pendingConnections.filterIt(not(it.finished())). + mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + return m.elConnections[index] + elif not(future.finished()): + pendingFutures.add(future) - return m.elConnections[find(connectionsFuts, firstConnected)] + if len(pendingFutures) == 0: + raise newException(DataProviderConnectionFailure, + "Unable to establish connection for chain syncing") -proc sendNewPayloadToSingleEL(connection: ELConnection, - payload: engine_api.ExecutionPayloadV1): - Future[PayloadStatusV1] {.async.} = +proc sendNewPayloadToSingleEL( + connection: ELConnection, + payload: engine_api.ExecutionPayloadV1 +): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} = let rpcClient = await connection.connectedRpcClient() - return await rpcClient.engine_newPayloadV1(payload) + await rpcClient.engine_newPayloadV1(payload) -proc sendNewPayloadToSingleEL(connection: ELConnection, - payload: engine_api.ExecutionPayloadV2): - Future[PayloadStatusV1] {.async.} = +proc sendNewPayloadToSingleEL( + connection: ELConnection, + payload: engine_api.ExecutionPayloadV2 +): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} = let rpcClient = await connection.connectedRpcClient() - return await rpcClient.engine_newPayloadV2(payload) + await rpcClient.engine_newPayloadV2(payload) -proc sendNewPayloadToSingleEL(connection: ELConnection, - payload: engine_api.ExecutionPayloadV3, - versioned_hashes: seq[engine_api.VersionedHash], - parent_beacon_block_root: FixedBytes[32]): - Future[PayloadStatusV1] {.async.} = +proc sendNewPayloadToSingleEL( + connection: ELConnection, + payload: engine_api.ExecutionPayloadV3, + versioned_hashes: seq[engine_api.VersionedHash], + parent_beacon_block_root: FixedBytes[32] +): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} = let rpcClient = await connection.connectedRpcClient() - return await rpcClient.engine_newPayloadV3( + await rpcClient.engine_newPayloadV3( payload, versioned_hashes, parent_beacon_block_root) -proc sendNewPayloadToSingleEL(connection: ELConnection, - payload: engine_api.ExecutionPayloadV4, - versioned_hashes: seq[engine_api.VersionedHash], - parent_beacon_block_root: FixedBytes[32]): - Future[PayloadStatusV1] {.async.} = +proc sendNewPayloadToSingleEL( + connection: ELConnection, + payload: engine_api.ExecutionPayloadV4, + versioned_hashes: seq[engine_api.VersionedHash], + parent_beacon_block_root: FixedBytes[32] +): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} = let rpcClient = await connection.connectedRpcClient() - return await rpcClient.engine_newPayloadV4( + await rpcClient.engine_newPayloadV4( payload, versioned_hashes, parent_beacon_block_root) type @@ -1079,7 +1146,9 @@ type oldStatusIsOk disagreement -func compareStatuses(newStatus, prevStatus: PayloadExecutionStatus): StatusRelation = +func compareStatuses( + newStatus, prevStatus: PayloadExecutionStatus +): StatusRelation = case prevStatus of PayloadExecutionStatus.syncing: if newStatus == PayloadExecutionStatus.syncing: @@ -1126,110 +1195,155 @@ func compareStatuses(newStatus, prevStatus: PayloadExecutionStatus): StatusRelat type ELConsensusViolationDetector = object - selectedResponse: Option[int] + selectedResponse: Opt[int] disagreementAlreadyDetected: bool func init(T: type ELConsensusViolationDetector): T = - ELConsensusViolationDetector(selectedResponse: none int, - disagreementAlreadyDetected: false) + ELConsensusViolationDetector( + selectedResponse: Opt.none(int), + disagreementAlreadyDetected: false + ) -proc processResponse[ELResponseType]( +proc processResponse( d: var ELConsensusViolationDetector, + elResponseType: typedesc, connections: openArray[ELConnection], - requests: openArray[Future[ELResponseType]], + requests: auto, idx: int) = if not requests[idx].completed: return - let status = try: requests[idx].read.status - except CatchableError: raiseAssert "checked above" + let status = requests[idx].value().status if d.selectedResponse.isNone: - d.selectedResponse = some idx + d.selectedResponse = Opt.some(idx) elif not d.disagreementAlreadyDetected: - let prevStatus = try: requests[d.selectedResponse.get].read.status - except CatchableError: raiseAssert "previously checked" + let prevStatus = requests[d.selectedResponse.get].value().status case compareStatuses(status, prevStatus) of newStatusIsPreferable: - d.selectedResponse = some idx + d.selectedResponse = Opt.some(idx) of oldStatusIsOk: discard of disagreement: d.disagreementAlreadyDetected = true error "Execution layer consensus violation detected", - responseType = name(ELResponseType), + responseType = name(elResponseType), url1 = connections[d.selectedResponse.get].engineUrl.url, status1 = prevStatus, url2 = connections[idx].engineUrl.url, status2 = status -proc sendNewPayload*(m: ELManager, blck: SomeForkyBeaconBlock): - Future[PayloadExecutionStatus] {.async.} = +proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} = + block: + let pending = futures.filterIt(not(it.finished())) + if len(pending) > 0: + try: + await allFutures(pending).wait(30.seconds) + except CancelledError: + discard + except AsyncTimeoutError: + discard + + block: + let pending = futures.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + if len(pending) > 0: + await noCancel allFutures(pending) + +proc sendNewPayload*( + m: ELManager, + blck: SomeForkyBeaconBlock +): Future[PayloadExecutionStatus] {.async: (raises: [CancelledError]).} = let - earlyDeadline = sleepAsync(chronos.seconds 1) - startTime = Moment.now + startTime = Moment.now() deadline = sleepAsync(NEWPAYLOAD_TIMEOUT) payload = blck.body.execution_payload.asEngineExecutionPayload - requests = m.elConnections.mapIt: - let req = - when payload is engine_api.ExecutionPayloadV3 or - payload is engine_api.ExecutionPayloadV4: - # https://github.com/ethereum/consensus-specs/blob/v1.4.0-alpha.1/specs/deneb/beacon-chain.md#process_execution_payload - # Verify the execution payload is valid - # [Modified in Deneb] Pass `versioned_hashes` to Execution Engine - let versioned_hashes = mapIt( - blck.body.blob_kzg_commitments, - engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it))) - sendNewPayloadToSingleEL( - it, payload, versioned_hashes, - FixedBytes[32] blck.parent_root.data) - elif payload is engine_api.ExecutionPayloadV1 or - payload is engine_api.ExecutionPayloadV2: - sendNewPayloadToSingleEL(it, payload) - else: - static: doAssert false - trackEngineApiRequest(it, req, "newPayload", startTime, deadline) - req - - requestsCompleted = allFutures(requests) - - await requestsCompleted or earlyDeadline - var - stillPending = newSeq[Future[PayloadStatusV1]]() - responseProcessor = init ELConsensusViolationDetector + responseProcessor = ELConsensusViolationDetector.init() - for idx, req in requests: - if not req.finished: - stillPending.add req - elif req.completed: - responseProcessor.processResponse(m.elConnections, requests, idx) + while true: + block mainLoop: + let + requests = m.elConnections.mapIt: + let req = + when payload is engine_api.ExecutionPayloadV3 or + payload is engine_api.ExecutionPayloadV4: + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-alpha.1/specs/deneb/beacon-chain.md#process_execution_payload + # Verify the execution payload is valid + # [Modified in Deneb] Pass `versioned_hashes` to Execution Engine + let versioned_hashes = mapIt( + blck.body.blob_kzg_commitments, + engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it))) + sendNewPayloadToSingleEL( + it, payload, versioned_hashes, + FixedBytes[32] blck.parent_root.data) + elif payload is engine_api.ExecutionPayloadV1 or + payload is engine_api.ExecutionPayloadV2: + sendNewPayloadToSingleEL(it, payload) + else: + static: doAssert false + engineApiRequest(it, req, "newPayload", startTime, noTimeout) - if responseProcessor.disagreementAlreadyDetected: - return PayloadExecutionStatus.invalid - elif responseProcessor.selectedResponse.isSome: - return requests[responseProcessor.selectedResponse.get].read.status + var pendingRequests = requests - await requestsCompleted or deadline + while true: + let timeoutExceeded = + try: + discard await race(pendingRequests).wait(deadline) + false + except AsyncTimeoutError: + true + except ValueError: + raiseAssert "pendingRequests should not be empty!" + except CancelledError as exc: + let pending = + requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + raise exc - for idx, req in requests: - if req.completed and req in stillPending: - responseProcessor.processResponse(m.elConnections, requests, idx) + var stillPending: type(pendingRequests) + for request in pendingRequests: + if not(request.finished()): + stillPending.add(request) + elif request.completed(): + let index = requests.find(request) + doAssert(index >= 0) + responseProcessor.processResponse(type(payload), + m.elConnections, requests, index) + pendingRequests = stillPending - return if responseProcessor.disagreementAlreadyDetected: - PayloadExecutionStatus.invalid - elif responseProcessor.selectedResponse.isSome: - requests[responseProcessor.selectedResponse.get].read.status - else: - PayloadExecutionStatus.syncing + if responseProcessor.disagreementAlreadyDetected: + let pending = + pendingRequests.filterIt(not(it.finished())). + mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + return PayloadExecutionStatus.invalid + elif responseProcessor.selectedResponse.isSome(): + # We spawn task which will wait for all other responses which are + # still pending, after 30.seconds all pending requests will be + # cancelled. + asyncSpawn lazyWait(pendingRequests.mapIt(FutureBase(it))) + return requests[responseProcessor.selectedResponse.get].value().status + + if timeoutExceeded: + # Timeout exceeded, cancelling all pending requests. + let pending = + pendingRequests.filterIt(not(it.finished())). + mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + return PayloadExecutionStatus.syncing + + if len(pendingRequests) == 0: + # All requests failed, we will continue our attempts until deadline + # is not finished. + break mainLoop proc forkchoiceUpdatedForSingleEL( connection: ELConnection, state: ref ForkchoiceStateV1, payloadAttributes: Option[PayloadAttributesV1] | Option[PayloadAttributesV2] | - Option[PayloadAttributesV3]): - Future[PayloadStatusV1] {.async.} = + Option[PayloadAttributesV3] +): Future[PayloadStatusV1] {.async: (raises: [CatchableError]).} = let rpcClient = await connection.connectedRpcClient() response = await rpcClient.forkchoiceUpdated(state[], payloadAttributes) @@ -1245,13 +1359,15 @@ proc forkchoiceUpdatedForSingleEL( return response.payloadStatus -proc forkchoiceUpdated*(m: ELManager, - headBlockHash, safeBlockHash, - finalizedBlockHash: Eth2Digest, - payloadAttributes: Option[PayloadAttributesV1] | - Option[PayloadAttributesV2] | - Option[PayloadAttributesV3]): - Future[(PayloadExecutionStatus, Option[BlockHash])] {.async: (raises: [CancelledError]).} = +proc forkchoiceUpdated*( + m: ELManager, + headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest, + payloadAttributes: Option[PayloadAttributesV1] | + Option[PayloadAttributesV2] | + Option[PayloadAttributesV3] +): Future[(PayloadExecutionStatus, Option[BlockHash])] {. + async: (raises: [CancelledError]).} = + doAssert not headBlockHash.isZero # Allow finalizedBlockHash to be 0 to avoid sync deadlocks. @@ -1310,82 +1426,107 @@ proc forkchoiceUpdated*(m: ELManager, earlyDeadline = sleepAsync(chronos.seconds 1) startTime = Moment.now deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT) - requests = m.elConnections.mapIt: - let req = it.forkchoiceUpdatedForSingleEL(state, payloadAttributes) - trackEngineApiRequest(it, req, "forkchoiceUpdated", startTime, deadline) - req - requestsCompleted = allFutures(requests) - - await requestsCompleted or earlyDeadline - var - stillPending = newSeq[Future[PayloadStatusV1]]() - responseProcessor = init ELConsensusViolationDetector + responseProcessor = ELConsensusViolationDetector.init() - for idx, req in requests: - if not req.finished: - stillPending.add req - elif req.completed: - responseProcessor.processResponse(m.elConnections, requests, idx) + while true: + block mainLoop: + let requests = + m.elConnections.mapIt: + let req = it.forkchoiceUpdatedForSingleEL(state, payloadAttributes) + engineApiRequest(it, req, "forkchoiceUpdated", startTime, noTimeout) - template assignNextExpectedPayloadParams() = - # Ensure that there's no race condition window where getPayload's check for - # whether it needs to trigger a new fcU payload, due to cache invalidation, - # falsely suggests that the expected payload matches, and similarly that if - # the fcU fails or times out for other reasons, the expected payload params - # remain synchronized with EL state. - assign( - m.nextExpectedPayloadParams, - some NextExpectedPayloadParams( - headBlockHash: headBlockHash, - safeBlockHash: safeBlockHash, - finalizedBlockHash: finalizedBlockHash, - payloadAttributes: payloadAttributesV3)) + var pendingRequests = requests - template getSelected: untyped = - let - data = - try: - requests[responseProcessor.selectedResponse.get].read - except CatchableError: - raiseAssert "Only completed requests get selected" - (data.status, data.latestValidHash) + while true: + let timeoutExceeded = + try: + discard await race(pendingRequests).wait(deadline) + false + except ValueError: + raiseAssert "pendingRequests should not be empty!" + except AsyncTimeoutError: + true + except CancelledError as exc: + let pending = + pendingRequests.filterIt(not(it.finished())). + mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + raise exc - if responseProcessor.disagreementAlreadyDetected: - return (PayloadExecutionStatus.invalid, none BlockHash) - elif responseProcessor.selectedResponse.isSome: - assignNextExpectedPayloadParams() - return getSelected() + var stillPending: type(pendingRequests) + for request in pendingRequests: + if not(request.finished()): + stillPending.add(request) + elif request.completed(): + let index = requests.find(request) + doAssert(index >= 0) + responseProcessor.processResponse( + PayloadStatusV1, m.elConnections, requests, index) + pendingRequests = stillPending - await requestsCompleted or deadline + template assignNextExpectedPayloadParams() = + # Ensure that there's no race condition window where getPayload's + # check for whether it needs to trigger a new fcU payload, due to + # cache invalidation, falsely suggests that the expected payload + # matches, and similarly that if the fcU fails or times out for other + # reasons, the expected payload params remain synchronized with + # EL state. + assign( + m.nextExpectedPayloadParams, + some NextExpectedPayloadParams( + headBlockHash: headBlockHash, + safeBlockHash: safeBlockHash, + finalizedBlockHash: finalizedBlockHash, + payloadAttributes: payloadAttributesV3)) - for idx, req in requests: - if req.completed and req in stillPending: - responseProcessor.processResponse(m.elConnections, requests, idx) + template getSelected: untyped = + let data = requests[responseProcessor.selectedResponse.get].value() + (data.status, data.latestValidHash) - return if responseProcessor.disagreementAlreadyDetected: - (PayloadExecutionStatus.invalid, none BlockHash) - elif responseProcessor.selectedResponse.isSome: - assignNextExpectedPayloadParams() - getSelected() - else: - (PayloadExecutionStatus.syncing, none BlockHash) + if responseProcessor.disagreementAlreadyDetected: + let pending = + pendingRequests.filterIt(not(it.finished())). + mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + return (PayloadExecutionStatus.invalid, none BlockHash) + elif responseProcessor.selectedResponse.isSome: + # We spawn task which will wait for all other responses which are + # still pending, after 30.seconds all pending requests will be + # cancelled. + asyncSpawn lazyWait(pendingRequests.mapIt(FutureBase(it))) + assignNextExpectedPayloadParams() + return getSelected() + + if timeoutExceeded: + # Timeout exceeded, cancelling all pending requests. + let pending = + pendingRequests.filterIt(not(it.finished())). + mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + return (PayloadExecutionStatus.syncing, none BlockHash) + + if len(pendingRequests) == 0: + # All requests failed, we will continue our attempts until deadline + # is not finished. + break mainLoop # TODO can't be defined within exchangeConfigWithSingleEL func `==`(x, y: Quantity): bool {.borrow.} -proc exchangeConfigWithSingleEL(m: ELManager, connection: ELConnection) {.async.} = +proc exchangeConfigWithSingleEL( + m: ELManager, + connection: ELConnection +) {.async: (raises: [CancelledError]).} = let rpcClient = await connection.connectedRpcClient() if m.eth1Network.isSome and connection.etcStatus == EtcStatus.notExchangedYet: try: let - providerChain = - connection.trackedRequestWithTimeout( - "chainId", - rpcClient.eth_chainId(), - web3RequestsTimeout) + providerChain = await connection.engineApiRequest( + rpcClient.eth_chainId(), "chainId", Moment.now(), + web3RequestsTimeout) # https://chainid.network/ expectedChain = case m.eth1Network.get @@ -1400,33 +1541,54 @@ proc exchangeConfigWithSingleEL(m: ELManager, connection: ELConnection) {.async. actualChain = distinctBase(providerChain) connection.etcStatus = EtcStatus.mismatch return + except CancelledError as exc: + debug "Configuration exchange was interrupted" + raise exc except CatchableError as exc: # Typically because it's not synced through EIP-155, assuming this Web3 # endpoint has been otherwise working. - debug "Failed to obtain eth_chainId", - error = exc.msg + debug "Failed to obtain eth_chainId", reason = exc.msg connection.etcStatus = EtcStatus.match -proc exchangeTransitionConfiguration*(m: ELManager) {.async.} = +proc exchangeTransitionConfiguration*( + m: ELManager +) {.async: (raises: [CancelledError]).} = if m.elConnections.len == 0: return - let - deadline = sleepAsync(3.seconds) - requests = m.elConnections.mapIt(m.exchangeConfigWithSingleEL(it)) - requestsCompleted = allFutures(requests) + let requests = m.elConnections.mapIt(m.exchangeConfigWithSingleEL(it)) + try: + await allFutures(requests).wait(3.seconds) + except AsyncTimeoutError: + discard + except CancelledError as exc: + let pending = requests.filterIt(not(it.finished())). + mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + raise exc - await requestsCompleted or deadline + let (pending, failed, finished) = + block: + var + failed = 0 + done = 0 + pending: seq[Future[void]] + for req in requests: + if not req.finished(): + pending.add(req.cancelAndWait()) + else: + if req.completed(): + inc(done) + else: + inc(failed) + (pending, failed, done) - var cancelled = 0 - for idx, req in requests: - if not req.finished: - req.cancelSoon() - inc cancelled + await noCancel allFutures(pending) - if cancelled == requests.len: - warn "Failed to exchange configuration with the configured EL end-points" + if (len(pending) > 0) or (failed != 0): + warn "Failed to exchange configuration with the configured EL end-points", + completed = finished, failed = failed, timed_out = len(pending) template readJsonField(logEvent, field: untyped, ValueType: type): untyped = if logEvent.field.isNone: @@ -1437,20 +1599,22 @@ template readJsonField(logEvent, field: untyped, ValueType: type): untyped = template init[N: static int](T: type DynamicBytes[N, N]): T = T newSeq[byte](N) -proc fetchTimestamp(connection: ELConnection, - rpcClient: RpcClient, - blk: Eth1Block) {.async.} = +proc fetchTimestamp( + connection: ELConnection, + rpcClient: RpcClient, + blk: Eth1Block +) {.async: (raises: [CatchableError]).} = debug "Fetching block timestamp", blockNum = blk.number - let web3block = raiseIfNil connection.trackedRequestWithTimeout( - "getBlockByHash", + let web3block = raiseIfNil await connection.engineApiRequest( rpcClient.getBlockByHash(blk.hash.asBlockHash), - web3RequestsTimeout) + "getBlockByHash", Moment.now(), web3RequestsTimeout) - blk.timestamp = Eth1BlockTimestamp web3block.timestamp + blk.timestamp = Eth1BlockTimestamp(web3block.timestamp) -func depositEventsToBlocks(depositsList: openArray[JsonString]): seq[Eth1Block] {. - raises: [CatchableError].} = +func depositEventsToBlocks( + depositsList: openArray[JsonString] +): seq[Eth1Block] {.raises: [CatchableError].} = var lastEth1Block: Eth1Block for logEventData in depositsList: @@ -1489,7 +1653,8 @@ func depositEventsToBlocks(depositsList: openArray[JsonString]): seq[Eth1Block] amount.len != 8 or signature.len != 96 or index.len != 8: - raise newException(CorruptDataProvider, "Web3 provider supplied invalid deposit logs") + raise newException(CorruptDataProvider, + "Web3 provider supplied invalid deposit logs") lastEth1Block.deposits.add DepositData( pubkey: ValidatorPubKey.init(pubkey.toArray), @@ -1510,58 +1675,68 @@ when hasDepositRootChecks: const contractCallTimeout = 60.seconds - proc fetchDepositContractData(connection: ELConnection, - rpcClient: RpcClient, - depositContract: Sender[DepositContract], - blk: Eth1Block): Future[DepositContractDataStatus] {.async.} = + proc fetchDepositContractData( + connection: ELConnection, + rpcClient: RpcClient, + depositContract: Sender[DepositContract], + blk: Eth1Block + ): Future[DepositContractDataStatus] {.async: (raises: [CancelledError]).} = let - startTime = Moment.now + startTime = Moment.now() deadline = sleepAsync(contractCallTimeout) - depositRoot = depositContract.get_deposit_root.call(blockNumber = blk.number) - rawCount = depositContract.get_deposit_count.call(blockNumber = blk.number) - - # We allow failures on these requests becaues the clients - # are expected to prune the state data for historical blocks - connection.trackEngineApiRequest( - depositRoot, "get_deposit_root", startTime, deadline, - failureAllowed = true) - connection.trackEngineApiRequest( - rawCount, "get_deposit_count", startTime, deadline, - failureAllowed = true) + depositRootFut = + depositContract.get_deposit_root.call(blockNumber = blk.number) + rawCountFut = + depositContract.get_deposit_count.call(blockNumber = blk.number) + engineFut1 = connection.engineApiRequest( + depositRootFut, "get_deposit_root", startTime, deadline, + failureAllowed = true) + engineFut2 = connection.engineApiRequest( + rawCountFut, "get_deposit_count", startTime, deadline, + failureAllowed = true) try: - let fetchedRoot = asEth2Digest(block: - awaitWithTimeout(depositRoot, deadline): - raise newException(DataProviderTimeout, - "Request time out while obtaining deposits root")) + await allFutures(engineFut1, engineFut2) + except CancelledError as exc: + var pending: seq[Future[void]] + if not(engineFut1.finished()): + pending.add(engineFut1.cancelAndWait()) + if not(engineFut2.finished()): + pending.add(engineFut2.cancelAndWait()) + await noCancel allFutures(pending) + raise exc + + var res: DepositContractDataStatus + + try: + # `engineFut1` could hold timeout exception `DataProviderTimeout`. + discard engineFut1.read() + let fetchedRoot = asEth2Digest(depositRootFut.read()) if blk.depositRoot.isZero: blk.depositRoot = fetchedRoot - result = Fetched + res = Fetched elif blk.depositRoot == fetchedRoot: - result = VerifiedCorrect + res = VerifiedCorrect else: - result = DepositRootIncorrect - except CatchableError as err: - debug "Failed to fetch deposits root", - blockNumber = blk.number, - err = err.msg - result = DepositRootUnavailable + res = DepositRootIncorrect + except CatchableError as exc: + debug "Failed to fetch deposits root", block_number = blk.number, + reason = exc.msg + res = DepositRootUnavailable try: - let fetchedCount = bytes_to_uint64((block: - awaitWithTimeout(rawCount, deadline): - raise newException(DataProviderTimeout, - "Request time out while obtaining deposits count")).toArray) + # `engineFut2` could hold timeout exception `DataProviderTimeout`. + discard engineFut2.read() + let fetchedCount = bytes_to_uint64(rawCountFut.read().toArray) if blk.depositCount == 0: blk.depositCount = fetchedCount elif blk.depositCount != fetchedCount: - result = DepositCountIncorrect - except CatchableError as err: - debug "Failed to fetch deposits count", - blockNumber = blk.number, - err = err.msg - result = DepositCountUnavailable - + res = DepositCountIncorrect + except CatchableError as exc: + debug "Failed to fetch deposits count", block_number = blk.number, + reason = exc.msg + res = DepositCountUnavailable + res template trackFinalizedState*(m: ELManager, finalizedEth1Data: Eth1Data, @@ -1576,8 +1751,7 @@ template getBlockProposalData*(m: ELManager, getBlockProposalData( m.eth1Chain, state, finalizedEth1Data, finalizedStateDepositIndex) -func new*(T: type ELConnection, - engineUrl: EngineApiUrl): T = +func new*(T: type ELConnection, engineUrl: EngineApiUrl): T = ELConnection( engineUrl: engineUrl, depositContractSyncStatus: DepositContractSyncStatus.unknown) @@ -1603,28 +1777,23 @@ proc new*(T: type ELManager, depositContractBlockHash: depositContractBlockHash.asBlockHash, elConnections: mapIt(engineApiUrls, ELConnection.new(it)), eth1Network: eth1Network, - blocksPerLogsRequest: targetBlocksPerLogsRequest) + blocksPerLogsRequest: targetBlocksPerLogsRequest, + managerState: ELManagerState.Running) -proc safeCancel(fut: var Future[void]) = - if not fut.isNil and not fut.finished: - fut.cancelSoon() - fut = nil - -proc doStop(m: ELManager) {.async.} = - safeCancel m.chainSyncingLoopFut - safeCancel m.exchangeTransitionConfigurationLoopFut - - if m.elConnections.len > 0: - let closeConnectionFutures = mapIt(m.elConnections, close(it)) - await allFutures(closeConnectionFutures) - -proc stop(m: ELManager) {.async.} = - if not m.stopFut.isNil: - await m.stopFut - else: - m.stopFut = m.doStop() - await m.stopFut - m.stopFut = nil +proc stop(m: ELManager) {.async: (raises: []).} = + if m.managerState notin {ELManagerState.Closing, ELManagerState.Closed}: + m.managerState = ELManagerState.Closing + var pending: seq[Future[void].Raising([])] + if not(m.chainSyncingLoopFut.isNil()) and + not(m.chainSyncingLoopFut.finished()): + pending.add(m.chainSyncingLoopFut.cancelAndWait()) + if not(m.exchangeTransitionConfigurationLoopFut.isNil()) and + not(m.exchangeTransitionConfigurationLoopFut.finished()): + pending.add(m.exchangeTransitionConfigurationLoopFut.cancelAndWait()) + for connection in m.elConnections: + pending.add(connection.close()) + await noCancel allFutures(pending) + m.managerState = ELManagerState.Closed const votedBlocksSafetyMargin = 50 @@ -1642,12 +1811,14 @@ func earliestBlockOfInterest( else: 0.Eth1BlockNumber -proc syncBlockRange(m: ELManager, - connection: ELConnection, - rpcClient: RpcClient, - depositContract: Sender[DepositContract], - fromBlock, toBlock, - fullSyncFromBlock: Eth1BlockNumber) {.gcsafe, async.} = +proc syncBlockRange( + m: ELManager, + connection: ELConnection, + rpcClient: RpcClient, + depositContract: Sender[DepositContract], + fromBlock, toBlock, + fullSyncFromBlock: Eth1BlockNumber +) {.async: (raises: [CatchableError]).} = doAssert m.eth1Chain.blocks.len > 0 var currentBlock = fromBlock @@ -1670,31 +1841,27 @@ proc syncBlockRange(m: ELManager, # Reduce all request rate until we have a more general solution # for dealing with Infura's rate limits await sleepAsync(milliseconds(backoff)) - let - startTime = Moment.now - deadline = sleepAsync 30.seconds - jsonLogsFut = depositContract.getJsonLogs( - DepositEvent, - fromBlock = some blockId(currentBlock), - toBlock = some blockId(maxBlockNumberRequested)) - connection.trackEngineApiRequest( - jsonLogsFut, "getLogs", startTime, deadline) - - depositLogs = try: - # Downloading large amounts of deposits may take several minutes - awaitWithTimeout(jsonLogsFut, deadline): - raise newException(DataProviderTimeout, - "Request time out while obtaining json logs") - except CatchableError as err: - debug "Request for deposit logs failed", err = err.msg - inc failed_web3_requests - backoff = (backoff * 3) div 2 - m.blocksPerLogsRequest = m.blocksPerLogsRequest div 2 - if m.blocksPerLogsRequest == 0: - m.blocksPerLogsRequest = 1 - raise err - continue + let depositLogs = + try: + await connection.engineApiRequest( + depositContract.getJsonLogs( + DepositEvent, + fromBlock = some blockId(currentBlock), + toBlock = some blockId(maxBlockNumberRequested)), + "getLogs", Moment.now(), 30.seconds) + except CancelledError as exc: + debug "Request for deposit logs was interrupted" + raise exc + except CatchableError as exc: + debug "Request for deposit logs failed", reason = exc.msg + inc failed_web3_requests + backoff = (backoff * 3) div 2 + m.blocksPerLogsRequest = m.blocksPerLogsRequest div 2 + if m.blocksPerLogsRequest == 0: + m.blocksPerLogsRequest = 1 + raise exc + continue m.blocksPerLogsRequest = min( (m.blocksPerLogsRequest * 3 + 1) div 2, targetBlocksPerLogsRequest) @@ -1707,14 +1874,32 @@ proc syncBlockRange(m: ELManager, for i in 0 ..< blocksWithDeposits.len: let blk = blocksWithDeposits[i] if blk.number > fullSyncFromBlock: - await fetchTimestamp(connection, rpcClient, blk) + try: + await fetchTimestamp(connection, rpcClient, blk) + except CancelledError as exc: + debug "Request for block timestamp was interrupted", + block_number = blk.number + raise exc + except CatchableError as exc: + debug "Request for block timestamp failed", + block_number = blk.number, reason = exc.msg + let lastBlock = m.eth1Chain.blocks.peekLast for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number: debug "Obtaining block without deposits", blockNum = n - let noDepositsBlock = raiseIfNil connection.trackedRequestWithTimeout( - "getBlockByNumber", - rpcClient.getBlockByNumber(n), - web3RequestsTimeout) + let noDepositsBlock = + try: + raiseIfNil await connection.engineApiRequest( + rpcClient.getBlockByNumber(n), + "getBlockByNumber", Moment.now(), web3RequestsTimeout) + except CancelledError as exc: + debug "The process of obtaining the block was interrupted", + block_number = n + raise exc + except CatchableError as exc: + debug "Request for block failed", block_number = n, + reason = exc.msg + raise exc m.eth1Chain.addBlock( lastBlock.makeSuccessorWithoutDeposits(noDepositsBlock)) @@ -1727,11 +1912,12 @@ proc syncBlockRange(m: ELManager, let lastIdx = blocksWithDeposits.len - 1 template lastBlock: auto = blocksWithDeposits[lastIdx] - let status = when hasDepositRootChecks: - await fetchDepositContractData( - connection, rpcClient, depositContract, lastBlock) - else: - DepositRootUnavailable + let status = + when hasDepositRootChecks: + await fetchDepositContractData( + connection, rpcClient, depositContract, lastBlock) + else: + DepositRootUnavailable when hasDepositRootChecks: debug "Deposit contract state verified", @@ -1763,18 +1949,27 @@ func hasProperlyConfiguredConnection*(m: ELManager): bool = false -proc startExchangeTransitionConfigurationLoop(m: ELManager) {.async.} = +proc startExchangeTransitionConfigurationLoop( + m: ELManager +) {.async: (raises: [CancelledError]).} = debug "Starting exchange transition configuration loop" while true: # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#specification-3 debug "Exchange transition configuration tick" - traceAsyncErrors m.exchangeTransitionConfiguration() + await m.exchangeTransitionConfiguration() await sleepAsync(60.seconds) -proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = - let rpcClient = awaitOrRaiseOnTimeout(connection.connectedRpcClient(), - 1.seconds) +proc syncEth1Chain( + m: ELManager, + connection: ELConnection +) {.async: (raises: [CatchableError]).} = + let rpcClient = + try: + await connection.connectedRpcClient().wait(1.seconds) + except AsyncTimeoutError: + raise newException(DataProviderTimeout, "Connection timed out") + let # BEWARE # `connectedRpcClient` guarantees that connection.web3 will not be @@ -1812,10 +2007,20 @@ proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = let needsReset = m.eth1Chain.hasConsensusViolation or (block: let lastKnownBlock = m.eth1Chain.blocks.peekLast - matchingBlockAtNewEl = raiseIfNil connection.trackedRequestWithTimeout( - "getBlockByNumber", - rpcClient.getBlockByNumber(lastKnownBlock.number), - web3RequestsTimeout) + matchingBlockAtNewEl = + try: + raiseIfNil await connection.engineApiRequest( + rpcClient.getBlockByNumber(lastKnownBlock.number), + "getBlockByNumber", Moment.now(), web3RequestsTimeout) + except CancelledError as exc: + debug "getBlockByNumber request has been interrupted", + last_known_block_number = lastKnownBlock.number + raise exc + except CatchableError as exc: + debug "getBlockByNumber request failed", + last_known_block_number = lastKnownBlock.number, + reason = exc.msg + raise exc lastKnownBlock.hash.asBlockHash != matchingBlockAtNewEl.hash) @@ -1828,10 +2033,20 @@ proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = if shouldProcessDeposits: if m.eth1Chain.blocks.len == 0: let finalizedBlockHash = m.eth1Chain.finalizedBlockHash.asBlockHash - let startBlock = raiseIfNil connection.trackedRequestWithTimeout( - "getBlockByHash", - rpcClient.getBlockByHash(finalizedBlockHash), - web3RequestsTimeout) + let startBlock = + try: + raiseIfNil await connection.engineApiRequest( + rpcClient.getBlockByHash(finalizedBlockHash), + "getBlockByHash", Moment.now(), web3RequestsTimeout) + except CancelledError as exc: + debug "getBlockByHash() request has been interrupted", + finalized_block_hash = finalizedBlockHash + raise exc + except CatchableError as exc: + debug "getBlockByHash() request has failed", + finalized_block_hash = finalizedBlockHash, + reason = exc.msg + raise exc m.eth1Chain.addBlock Eth1Block( hash: m.eth1Chain.finalizedBlockHash, @@ -1852,21 +2067,27 @@ proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = debug "syncEth1Chain tick", shouldProcessDeposits, latestBlockNumber, eth1SyncedTo + # TODO (cheatfate): This should be removed if bnStatus == BeaconNodeStatus.Stopping: - await m.stop() + await noCancel m.stop() return if m.eth1Chain.hasConsensusViolation: - raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus") + raise newException(CorruptDataProvider, + "Eth1 chain contradicts Eth2 consensus") + + let latestBlock = + try: + raiseIfNil await connection.engineApiRequest( + rpcClient.eth_getBlockByNumber(blockId("latest"), false), + "getBlockByNumber", Moment.now(), web3RequestsTimeout) + except CancelledError as exc: + debug "Latest block request has been interrupted" + raise exc + except CatchableError as exc: + warn "Failed to obtain the latest block from the EL", reason = exc.msg + raise exc - let latestBlock = try: - raiseIfNil connection.trackedRequestWithTimeout( - "getBlockByNumber", - rpcClient.eth_getBlockByNumber(blockId("latest"), false), - web3RequestsTimeout) - except CatchableError as err: - warn "Failed to obtain the latest block from the EL", err = err.msg - raise err latestBlockNumber = latestBlock.number m.syncTargetBlock = some( @@ -1884,42 +2105,59 @@ proc syncEth1Chain(m: ELManager, connection: ELConnection) {.async.} = if shouldProcessDeposits and latestBlock.number.uint64 > m.cfg.ETH1_FOLLOW_DISTANCE: - await m.syncBlockRange(connection, - rpcClient, - depositContract, - eth1SyncedTo + 1, - m.syncTargetBlock.get, - m.earliestBlockOfInterest(latestBlock.number)) + try: + await m.syncBlockRange(connection, + rpcClient, + depositContract, + eth1SyncedTo + 1, + m.syncTargetBlock.get, + m.earliestBlockOfInterest(latestBlock.number)) + except CancelledError as exc: + debug "Syncing block range process has been interrupted" + raise exc + except CatchableError as exc: + debug "Syncing block range process has been failed", reason = exc.msg + raise exc eth1SyncedTo = m.syncTargetBlock.get eth1_synced_head.set eth1SyncedTo.toGaugeValue -proc startChainSyncingLoop(m: ELManager) {.async.} = +proc startChainSyncingLoop( + m: ELManager +) {.async: (raises: []).} = info "Starting execution layer deposit syncing", contract = $m.depositContractAddress var syncedConnectionFut = m.selectConnectionForChainSyncing() info "Connection attempt started" - while true: + var runLoop = true + while runLoop: try: - await syncedConnectionFut or sleepAsync(60.seconds) - if not syncedConnectionFut.finished: - notice "No synced execution layer available for deposit syncing" + let connection = await syncedConnectionFut.wait(60.seconds) + await syncEth1Chain(m, connection) + except AsyncTimeoutError: + notice "No synced EL nodes available for deposit syncing" + try: await sleepAsync(chronos.seconds(30)) - continue - - await syncEth1Chain(m, syncedConnectionFut.read) - except CatchableError: - await sleepAsync(10.seconds) - - # A more detailed error is already logged by trackEngineApiRequest + except CancelledError: + runLoop = false + except CancelledError: + runLoop = false + except CatchableError as exc: + try: + await sleepAsync(10.seconds) + except CancelledError: + runLoop = false + break debug "Restarting the deposit syncing loop" - + # A more detailed error is already logged by trackEngineApiRequest # To be extra safe, we will make a fresh connection attempt await syncedConnectionFut.cancelAndWait() syncedConnectionFut = m.selectConnectionForChainSyncing() + debug "EL chain syncing process has been stopped" + proc start*(m: ELManager, syncChain = true) {.gcsafe.} = if m.elConnections.len == 0: return @@ -1939,19 +2177,22 @@ func `$`(x: Quantity): string = func `$`(x: BlockObject): string = $(x.number) & " [" & $(x.hash) & "]" -proc testWeb3Provider*(web3Url: Uri, - depositContractAddress: Eth1Address, - jwtSecret: Opt[seq[byte]]) {.async.} = +proc testWeb3Provider*( + web3Url: Uri, + depositContractAddress: Eth1Address, + jwtSecret: Opt[seq[byte]] +) {.async: (raises: [CatchableError]).} = + stdout.write "Establishing web3 connection..." - var web3: Web3 - try: - web3 = awaitOrRaiseOnTimeout( - newWeb3($web3Url, getJsonRpcRequestHeaders(jwtSecret)), - 5.seconds) - stdout.write "\rEstablishing web3 connection: Connected\n" - except CatchableError as err: - stdout.write "\rEstablishing web3 connection: Failure(" & err.msg & ")\n" - quit 1 + let web3 = + try: + await newWeb3($web3Url, + getJsonRpcRequestHeaders(jwtSecret)).wait(5.seconds) + except CatchableError as exc: + stdout.write "\rEstablishing web3 connection: Failure(" & exc.msg & ")\n" + quit 1 + + stdout.write "\rEstablishing web3 connection: Connected\n" template request(actionDesc: static string, action: untyped): untyped = @@ -1959,7 +2200,8 @@ proc testWeb3Provider*(web3Url: Uri, stdout.flushFile() var res: typeof(read action) try: - res = awaitOrRaiseOnTimeout(action, web3RequestsTimeout) + let fut = action + res = await fut.wait(web3RequestsTimeout) when res is BlockObject: res = raiseIfNil res stdout.write "\r" & actionDesc & ": " & $res