From a3d4a3ee5eee30117e62e179425ca33903db6279 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 23 Oct 2024 03:42:46 +0300 Subject: [PATCH] BN: Fix el_manager timeouts issue in block processing. (#6665) * Fix el_manager + block_processor NEWPAYLOAD_TIMEOUT timeouts issue. Use predefined array of exponential timeouts when all the requests to EL has been failed. * Increase timeout value to (next_slot.start_time - 1.second) * Address review comments. * Do not repeat requests when node is optimistically synced. --- beacon_chain/el/el_manager.nim | 90 +++++++++++++++---- .../gossip_processing/block_processor.nim | 70 +++++++++++---- 2 files changed, 127 insertions(+), 33 deletions(-) diff --git a/beacon_chain/el/el_manager.nim b/beacon_chain/el/el_manager.nim index e8b59306c..d60702aab 100644 --- a/beacon_chain/el/el_manager.nim +++ b/beacon_chain/el/el_manager.nim @@ -34,6 +34,10 @@ export logScope: topics = "elman" +const + SleepDurations = + [100.milliseconds, 200.milliseconds, 500.milliseconds, 1.seconds] + type FixedBytes[N: static int] = web3.FixedBytes[N] PubKeyBytes = DynamicBytes[48, 48] @@ -43,6 +47,11 @@ type WithoutTimeout* = distinct int Address = web3.Address + DeadlineObject* = object + # TODO (cheatfate): This object declaration could be removed when + # `Raising()` macro starts to support procedure arguments. + future*: Future[void].Raising([CancelledError]) + SomeEnginePayloadWithValue = BellatrixExecutionPayloadWithValue | GetPayloadV2Response | @@ -233,6 +242,22 @@ declareCounter engine_api_last_minute_forkchoice_updates_sent, "Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals", labels = ["url"] +proc init*(t: typedesc[DeadlineObject], d: Duration): DeadlineObject = + DeadlineObject(future: sleepAsync(d)) + +proc variedSleep*( + counter: var int, + durations: openArray[Duration] +): Future[void] {.async: (raises: [CancelledError], raw: true).} = + doAssert(len(durations) > 0, "Empty durations array!") + let index = + if (counter < 0) or (counter > high(durations)): + high(durations) + else: + counter + inc(counter) + sleepAsync(durations[index]) + proc close(connection: ELConnection): Future[void] {.async: (raises: []).} = if connection.web3.isSome: try: @@ -942,14 +967,20 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} = proc sendNewPayload*( m: ELManager, - blck: SomeForkyBeaconBlock + blck: SomeForkyBeaconBlock, + deadlineObj: DeadlineObject, + maxRetriesCount: int ): Future[PayloadExecutionStatus] {.async: (raises: [CancelledError]).} = + doAssert maxRetriesCount > 0 + let startTime = Moment.now() - deadline = sleepAsync(NEWPAYLOAD_TIMEOUT) + deadline = deadlineObj.future payload = blck.body.asEngineExecutionPayload var responseProcessor = ELConsensusViolationDetector.init() + sleepCounter = 0 + retriesCount = 0 while true: block mainLoop: @@ -1033,18 +1064,23 @@ proc sendNewPayload*( return PayloadExecutionStatus.syncing if len(pendingRequests) == 0: - # All requests failed, we will continue our attempts until deadline - # is not finished. + # All requests failed. + inc(retriesCount) + if retriesCount == maxRetriesCount: + return PayloadExecutionStatus.syncing # To avoid continous spam of requests when EL node is offline we - # going to sleep until next attempt for - # (NEWPAYLOAD_TIMEOUT / 4) time (2.seconds). - let timeout = - chronos.nanoseconds(NEWPAYLOAD_TIMEOUT.nanoseconds div 4) - await sleepAsync(timeout) - + # going to sleep until next attempt. + await variedSleep(sleepCounter, SleepDurations) break mainLoop +proc sendNewPayload*( + m: ELManager, + blck: SomeForkyBeaconBlock +): Future[PayloadExecutionStatus] {. + async: (raises: [CancelledError], raw: true).} = + sendNewPayload(m, blck, DeadlineObject.init(NEWPAYLOAD_TIMEOUT), high(int)) + proc forkchoiceUpdatedForSingleEL( connection: ELConnection, state: ref ForkchoiceStateV1, @@ -1072,11 +1108,14 @@ proc forkchoiceUpdated*( headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest, payloadAttributes: Opt[PayloadAttributesV1] | Opt[PayloadAttributesV2] | - Opt[PayloadAttributesV3] + Opt[PayloadAttributesV3], + deadlineObj: DeadlineObject, + maxRetriesCount: int ): Future[(PayloadExecutionStatus, Opt[BlockHash])] {. async: (raises: [CancelledError]).} = doAssert not headBlockHash.isZero + doAssert maxRetriesCount > 0 # Allow finalizedBlockHash to be 0 to avoid sync deadlocks. # @@ -1132,9 +1171,12 @@ proc forkchoiceUpdated*( safeBlockHash: safeBlockHash.asBlockHash, finalizedBlockHash: finalizedBlockHash.asBlockHash) startTime = Moment.now - deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT) + deadline = deadlineObj.future + var responseProcessor = ELConsensusViolationDetector.init() + sleepCounter = 0 + retriesCount = 0 while true: block mainLoop: @@ -1216,16 +1258,28 @@ proc forkchoiceUpdated*( if len(pendingRequests) == 0: # All requests failed, we will continue our attempts until deadline # is not finished. + inc(retriesCount) + if retriesCount == maxRetriesCount: + return (PayloadExecutionStatus.syncing, Opt.none BlockHash) # To avoid continous spam of requests when EL node is offline we - # going to sleep until next attempt for - # (FORKCHOICEUPDATED_TIMEOUT / 4) time (2.seconds). - let timeout = - chronos.nanoseconds(FORKCHOICEUPDATED_TIMEOUT.nanoseconds div 4) - await sleepAsync(timeout) - + # going to sleep until next attempt. + await variedSleep(sleepCounter, SleepDurations) break mainLoop +proc forkchoiceUpdated*( + m: ELManager, + headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest, + payloadAttributes: Opt[PayloadAttributesV1] | + Opt[PayloadAttributesV2] | + Opt[PayloadAttributesV3] +): Future[(PayloadExecutionStatus, Opt[BlockHash])] {. + async: (raises: [CancelledError], raw: true).} = + forkchoiceUpdated( + m, headBlockHash, safeBlockHash, finalizedBlockHash, + payloadAttributes, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT), + high(int)) + # TODO can't be defined within exchangeConfigWithSingleEL func `==`(x, y: Quantity): bool {.borrow.} diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 918cad42d..7fbb56fa9 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -20,7 +20,7 @@ from ../consensus_object_pools/consensus_manager import updateHeadWithExecution from ../consensus_object_pools/blockchain_dag import getBlockRef, getForkedBlock, getProposer, forkAtEpoch, loadExecutionBlockHash, - markBlockVerified, validatorKey + markBlockVerified, validatorKey, is_optimistic from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot from ../consensus_object_pools/block_pools_types import @@ -230,19 +230,24 @@ from web3/engine_api_types import PayloadAttributesV1, PayloadAttributesV2, PayloadAttributesV3, PayloadExecutionStatus, PayloadStatusV1 from ../el/el_manager import - ELManager, forkchoiceUpdated, hasConnection, hasProperlyConfiguredConnection, - sendNewPayload + ELManager, DeadlineObject, forkchoiceUpdated, hasConnection, + hasProperlyConfiguredConnection, sendNewPayload, init proc expectValidForkchoiceUpdated( elManager: ELManager, headBlockPayloadAttributesType: typedesc, headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest, - receivedBlock: ForkySignedBeaconBlock): Future[void] {.async: (raises: [CancelledError]).} = + receivedBlock: ForkySignedBeaconBlock, + deadlineObj: DeadlineObject, + maxRetriesCount: int +): Future[void] {.async: (raises: [CancelledError]).} = let (payloadExecutionStatus, _) = await elManager.forkchoiceUpdated( headBlockHash = headBlockHash, safeBlockHash = safeBlockHash, finalizedBlockHash = finalizedBlockHash, - payloadAttributes = Opt.none headBlockPayloadAttributesType) + payloadAttributes = Opt.none headBlockPayloadAttributesType, + deadlineObj = deadlineObj, + maxRetriesCount = maxRetriesCount) receivedExecutionBlockHash = when typeof(receivedBlock).kind >= ConsensusFork.Bellatrix: receivedBlock.message.body.execution_payload.block_hash @@ -277,8 +282,11 @@ from ../consensus_object_pools/attestation_pool import from ../consensus_object_pools/spec_cache import get_attesting_indices proc newExecutionPayload*( - elManager: ELManager, blck: SomeForkyBeaconBlock): - Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} = + elManager: ELManager, + blck: SomeForkyBeaconBlock, + deadlineObj: DeadlineObject, + maxRetriesCount: int +): Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} = template executionPayload: untyped = blck.body.execution_payload @@ -295,7 +303,8 @@ proc newExecutionPayload*( executionPayload = shortLog(executionPayload) try: - let payloadStatus = await elManager.sendNewPayload(blck) + let payloadStatus = + await elManager.sendNewPayload(blck, deadlineObj, maxRetriesCount) debug "newPayload: succeeded", parentHash = executionPayload.parent_hash, @@ -312,22 +321,34 @@ proc newExecutionPayload*( blockNumber = executionPayload.block_number return Opt.none PayloadExecutionStatus +proc newExecutionPayload*( + elManager: ELManager, + blck: SomeForkyBeaconBlock +): Future[Opt[PayloadExecutionStatus]] {. + async: (raises: [CancelledError], raw: true).} = + newExecutionPayload( + elManager, blck, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT), + high(int)) + proc getExecutionValidity( elManager: ELManager, blck: bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock | - deneb.SignedBeaconBlock | electra.SignedBeaconBlock): - Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} = + deneb.SignedBeaconBlock | electra.SignedBeaconBlock, + deadlineObj: DeadlineObject, + maxRetriesCount: int +): Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} = if not blck.message.is_execution_block: return NewPayloadStatus.valid # vacuously try: let executionPayloadStatus = await elManager.newExecutionPayload( - blck.message) + blck.message, deadlineObj, maxRetriesCount) if executionPayloadStatus.isNone: return NewPayloadStatus.noResponse case executionPayloadStatus.get - of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash: + of PayloadExecutionStatus.invalid, + PayloadExecutionStatus.invalid_block_hash: # Blocks come either from gossip or request manager requests. In the # former case, they've passed libp2p gosisp validation which implies # correct signature for correct proposer,which makes spam expensive, @@ -413,6 +434,20 @@ proc storeBlock( vm = self.validatorMonitor dag = self.consensusManager.dag wallSlot = wallTime.slotOrZero + deadlineTime = + block: + let slotTime = (wallSlot + 1).start_beacon_time() - 1.seconds + if slotTime <= wallTime: + 0.seconds + else: + chronos.nanoseconds((slotTime - wallTime).nanoseconds) + deadlineObj = DeadlineObject.init(deadlineTime) + + func getRetriesCount(): int = + if dag.is_optimistic(dag.head.bid): + 1 + else: + high(int) # If the block is missing its parent, it will be re-orphaned below self.consensusManager.quarantine[].removeOrphan(signedBlock) @@ -518,7 +553,8 @@ proc storeBlock( NewPayloadStatus.noResponse else: when typeof(signedBlock).kind >= ConsensusFork.Bellatrix: - await self.consensusManager.elManager.getExecutionValidity(signedBlock) + await self.consensusManager.elManager.getExecutionValidity( + signedBlock, deadlineObj, getRetriesCount()) else: NewPayloadStatus.valid # vacuously payloadValid = payloadStatus == NewPayloadStatus.valid @@ -685,7 +721,9 @@ proc storeBlock( self.consensusManager[].optimisticExecutionBlockHash, safeBlockHash = newHead.get.safeExecutionBlockHash, finalizedBlockHash = newHead.get.finalizedExecutionBlockHash, - payloadAttributes = Opt.none attributes) + payloadAttributes = Opt.none attributes, + deadlineObj = deadlineObj, + maxRetriesCount = getRetriesCount()) let consensusFork = self.consensusManager.dag.cfg.consensusForkAtEpoch( newHead.get.blck.bid.slot.epoch) @@ -712,7 +750,9 @@ proc storeBlock( headBlockHash = headExecutionBlockHash, safeBlockHash = newHead.get.safeExecutionBlockHash, finalizedBlockHash = newHead.get.finalizedExecutionBlockHash, - receivedBlock = signedBlock) + receivedBlock = signedBlock, + deadlineObj = deadlineObj, + maxRetriesCount = getRetriesCount()) template callForkChoiceUpdated: auto = case self.consensusManager.dag.cfg.consensusForkAtEpoch(