Prevent the build up of parallel 'waitELToSync' operations

This commit is contained in:
Zahary Karadjov 2023-03-10 12:55:55 +02:00
parent 57b2151f95
commit 89dccedcc9
No known key found for this signature in database
GPG Key ID: C1F42EAFF38D570F
1 changed files with 21 additions and 31 deletions

View File

@ -976,6 +976,7 @@ proc waitELToSyncDeposits(connection: ELConnection,
url = connection.engineUrl.url,
blk = minimalRequiredBlock,
err = err.msg
inc attempt
await sleepAsync(seconds(30))
rpcClient = await connection.connectedRpcClient()
@ -998,39 +999,21 @@ proc selectConnectionForChainSyncing(m: ELManager): Future[ELConnection] {.async
else:
FutureBase connectedRpcClient(it))
let firstConnected = await firstCompletedFuture(connectionsFuts)
# TODO: Ideally, the cancellation will be handled automatically
# by a helper like `firstCompletedFuture`
let firstConnected = try:
await firstCompletedFuture(connectionsFuts)
except CancelledError as err:
for future in connectionsFuts:
future.cancel()
raise err
for future in connectionsFuts:
if future != firstConnected:
future.cancel()
return m.elConnections[find(connectionsFuts, firstConnected)]
proc getBlobsBundleFromASyncedEL(
m: ELManager,
payloadId: bellatrix.PayloadID): Future[BlobsBundleV1] {.async.} =
let
connection = await m.selectConnectionForChainSyncing()
rpcClient = await connection.connectedRpcClient()
return connection.trackedRequestWithTimeout(
"getBlobsBundle",
rpcClient.engine_getBlobsBundleV1(FixedBytes[8] payloadId),
GETBLOBS_TIMEOUT)
proc getBlobsBundleV1*(
m: ELManager, payloadId: bellatrix.PayloadID):
Future[Opt[BlobsBundleV1]] {.async.} =
if m.elConnections.len == 0:
return Opt.none BlobsBundleV1
result = try:
Opt.some(await m.getBlobsBundleFromASyncedEL(payload_id))
except CatchableError:
Opt.none BlobsBundleV1
proc sendNewPayloadToSingleEL(connection: ELConnection,
payload: engine_api.ExecutionPayloadV1):
Future[PayloadStatusV1] {.async.} =
@ -2087,20 +2070,27 @@ proc startChainSyncingLoop(m: ELManager) {.async.} =
info "Starting execution layer deposits syncing",
contract = $m.depositContractAddress
var syncedConnectionFut = m.selectConnectionForChainSyncing()
info "Connection attempt started"
while true:
let connection = awaitWithTimeout(
m.selectConnectionForChainSyncing(),
chronos.seconds(60)):
try:
await syncedConnectionFut or sleepAsync(60.seconds)
if not syncedConnectionFut.finished:
warn "No suitable EL connection for deposit syncing"
await sleepAsync(chronos.seconds(30))
continue
try:
await syncEth1Chain(m, connection)
await syncEth1Chain(m, syncedConnectionFut.read)
except CatchableError as err:
await sleepAsync(10.seconds)
# A more detailed error is already logged by trackEngineApiRequest
debug "Restarting the deposit syncing loop"
await sleepAsync(5.seconds)
# To be extra safe, we will make a fresh connection attempt
await syncedConnectionFut.cancelAndWait()
syncedConnectionFut = m.selectConnectionForChainSyncing()
proc start*(m: ELManager) {.gcsafe.} =
if m.elConnections.len == 0: