From 89dccedcc9e9146fa986932e96f894126f2343f0 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Fri, 10 Mar 2023 12:55:55 +0200 Subject: [PATCH] Prevent the build up of parallel 'waitELToSync' operations --- beacon_chain/eth1/eth1_monitor.nim | 52 ++++++++++++------------------ 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/beacon_chain/eth1/eth1_monitor.nim b/beacon_chain/eth1/eth1_monitor.nim index 6540d9a59..9ae161a21 100644 --- a/beacon_chain/eth1/eth1_monitor.nim +++ b/beacon_chain/eth1/eth1_monitor.nim @@ -976,6 +976,7 @@ proc waitELToSyncDeposits(connection: ELConnection, url = connection.engineUrl.url, blk = minimalRequiredBlock, err = err.msg + inc attempt await sleepAsync(seconds(30)) rpcClient = await connection.connectedRpcClient() @@ -998,39 +999,21 @@ proc selectConnectionForChainSyncing(m: ELManager): Future[ELConnection] {.async else: FutureBase connectedRpcClient(it)) - let firstConnected = await firstCompletedFuture(connectionsFuts) - # TODO: Ideally, the cancellation will be handled automatically # by a helper like `firstCompletedFuture` + let firstConnected = try: + await firstCompletedFuture(connectionsFuts) + except CancelledError as err: + for future in connectionsFuts: + future.cancel() + raise err + for future in connectionsFuts: if future != firstConnected: future.cancel() return m.elConnections[find(connectionsFuts, firstConnected)] -proc getBlobsBundleFromASyncedEL( - m: ELManager, - payloadId: bellatrix.PayloadID): Future[BlobsBundleV1] {.async.} = - let - connection = await m.selectConnectionForChainSyncing() - rpcClient = await connection.connectedRpcClient() - - return connection.trackedRequestWithTimeout( - "getBlobsBundle", - rpcClient.engine_getBlobsBundleV1(FixedBytes[8] payloadId), - GETBLOBS_TIMEOUT) - -proc getBlobsBundleV1*( - m: ELManager, payloadId: bellatrix.PayloadID): - Future[Opt[BlobsBundleV1]] {.async.} = - if m.elConnections.len == 0: - return Opt.none BlobsBundleV1 - - result = try: - Opt.some(await m.getBlobsBundleFromASyncedEL(payload_id)) - except CatchableError: - Opt.none BlobsBundleV1 - proc sendNewPayloadToSingleEL(connection: ELConnection, payload: engine_api.ExecutionPayloadV1): Future[PayloadStatusV1] {.async.} = @@ -2087,20 +2070,27 @@ proc startChainSyncingLoop(m: ELManager) {.async.} = info "Starting execution layer deposits syncing", contract = $m.depositContractAddress + var syncedConnectionFut = m.selectConnectionForChainSyncing() + info "Connection attempt started" + while true: - let connection = awaitWithTimeout( - m.selectConnectionForChainSyncing(), - chronos.seconds(60)): + try: + await syncedConnectionFut or sleepAsync(60.seconds) + if not syncedConnectionFut.finished: warn "No suitable EL connection for deposit syncing" await sleepAsync(chronos.seconds(30)) continue - try: - await syncEth1Chain(m, connection) + await syncEth1Chain(m, syncedConnectionFut.read) except CatchableError as err: + await sleepAsync(10.seconds) + # A more detailed error is already logged by trackEngineApiRequest debug "Restarting the deposit syncing loop" - await sleepAsync(5.seconds) + + # To be extra safe, we will make a fresh connection attempt + await syncedConnectionFut.cancelAndWait() + syncedConnectionFut = m.selectConnectionForChainSyncing() proc start*(m: ELManager) {.gcsafe.} = if m.elConnections.len == 0: