BN: Fix el_manager timeouts issue in block processing. (#6665)

* Fix el_manager + block_processor NEWPAYLOAD_TIMEOUT timeouts issue.
Use predefined array of exponential timeouts when all the requests to EL has been failed.

* Increase timeout value to (next_slot.start_time - 1.second)

* Address review comments.

* Do not repeat requests when node is optimistically synced.
This commit is contained in:
Eugene Kabanov 2024-10-23 03:42:46 +03:00 committed by GitHub
parent ead72deaa2
commit a3d4a3ee5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 127 additions and 33 deletions

View File

@ -34,6 +34,10 @@ export
logScope:
topics = "elman"
const
SleepDurations =
[100.milliseconds, 200.milliseconds, 500.milliseconds, 1.seconds]
type
FixedBytes[N: static int] = web3.FixedBytes[N]
PubKeyBytes = DynamicBytes[48, 48]
@ -43,6 +47,11 @@ type
WithoutTimeout* = distinct int
Address = web3.Address
DeadlineObject* = object
# TODO (cheatfate): This object declaration could be removed when
# `Raising()` macro starts to support procedure arguments.
future*: Future[void].Raising([CancelledError])
SomeEnginePayloadWithValue =
BellatrixExecutionPayloadWithValue |
GetPayloadV2Response |
@ -233,6 +242,22 @@ declareCounter engine_api_last_minute_forkchoice_updates_sent,
"Number of last minute requests to the forkchoiceUpdated Engine API end-point just before block proposals",
labels = ["url"]
proc init*(t: typedesc[DeadlineObject], d: Duration): DeadlineObject =
DeadlineObject(future: sleepAsync(d))
proc variedSleep*(
counter: var int,
durations: openArray[Duration]
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
doAssert(len(durations) > 0, "Empty durations array!")
let index =
if (counter < 0) or (counter > high(durations)):
high(durations)
else:
counter
inc(counter)
sleepAsync(durations[index])
proc close(connection: ELConnection): Future[void] {.async: (raises: []).} =
if connection.web3.isSome:
try:
@ -942,14 +967,20 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
proc sendNewPayload*(
m: ELManager,
blck: SomeForkyBeaconBlock
blck: SomeForkyBeaconBlock,
deadlineObj: DeadlineObject,
maxRetriesCount: int
): Future[PayloadExecutionStatus] {.async: (raises: [CancelledError]).} =
doAssert maxRetriesCount > 0
let
startTime = Moment.now()
deadline = sleepAsync(NEWPAYLOAD_TIMEOUT)
deadline = deadlineObj.future
payload = blck.body.asEngineExecutionPayload
var
responseProcessor = ELConsensusViolationDetector.init()
sleepCounter = 0
retriesCount = 0
while true:
block mainLoop:
@ -1033,18 +1064,23 @@ proc sendNewPayload*(
return PayloadExecutionStatus.syncing
if len(pendingRequests) == 0:
# All requests failed, we will continue our attempts until deadline
# is not finished.
# All requests failed.
inc(retriesCount)
if retriesCount == maxRetriesCount:
return PayloadExecutionStatus.syncing
# To avoid continous spam of requests when EL node is offline we
# going to sleep until next attempt for
# (NEWPAYLOAD_TIMEOUT / 4) time (2.seconds).
let timeout =
chronos.nanoseconds(NEWPAYLOAD_TIMEOUT.nanoseconds div 4)
await sleepAsync(timeout)
# going to sleep until next attempt.
await variedSleep(sleepCounter, SleepDurations)
break mainLoop
proc sendNewPayload*(
m: ELManager,
blck: SomeForkyBeaconBlock
): Future[PayloadExecutionStatus] {.
async: (raises: [CancelledError], raw: true).} =
sendNewPayload(m, blck, DeadlineObject.init(NEWPAYLOAD_TIMEOUT), high(int))
proc forkchoiceUpdatedForSingleEL(
connection: ELConnection,
state: ref ForkchoiceStateV1,
@ -1072,11 +1108,14 @@ proc forkchoiceUpdated*(
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
payloadAttributes: Opt[PayloadAttributesV1] |
Opt[PayloadAttributesV2] |
Opt[PayloadAttributesV3]
Opt[PayloadAttributesV3],
deadlineObj: DeadlineObject,
maxRetriesCount: int
): Future[(PayloadExecutionStatus, Opt[BlockHash])] {.
async: (raises: [CancelledError]).} =
doAssert not headBlockHash.isZero
doAssert maxRetriesCount > 0
# Allow finalizedBlockHash to be 0 to avoid sync deadlocks.
#
@ -1132,9 +1171,12 @@ proc forkchoiceUpdated*(
safeBlockHash: safeBlockHash.asBlockHash,
finalizedBlockHash: finalizedBlockHash.asBlockHash)
startTime = Moment.now
deadline = sleepAsync(FORKCHOICEUPDATED_TIMEOUT)
deadline = deadlineObj.future
var
responseProcessor = ELConsensusViolationDetector.init()
sleepCounter = 0
retriesCount = 0
while true:
block mainLoop:
@ -1216,16 +1258,28 @@ proc forkchoiceUpdated*(
if len(pendingRequests) == 0:
# All requests failed, we will continue our attempts until deadline
# is not finished.
inc(retriesCount)
if retriesCount == maxRetriesCount:
return (PayloadExecutionStatus.syncing, Opt.none BlockHash)
# To avoid continous spam of requests when EL node is offline we
# going to sleep until next attempt for
# (FORKCHOICEUPDATED_TIMEOUT / 4) time (2.seconds).
let timeout =
chronos.nanoseconds(FORKCHOICEUPDATED_TIMEOUT.nanoseconds div 4)
await sleepAsync(timeout)
# going to sleep until next attempt.
await variedSleep(sleepCounter, SleepDurations)
break mainLoop
proc forkchoiceUpdated*(
m: ELManager,
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
payloadAttributes: Opt[PayloadAttributesV1] |
Opt[PayloadAttributesV2] |
Opt[PayloadAttributesV3]
): Future[(PayloadExecutionStatus, Opt[BlockHash])] {.
async: (raises: [CancelledError], raw: true).} =
forkchoiceUpdated(
m, headBlockHash, safeBlockHash, finalizedBlockHash,
payloadAttributes, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT),
high(int))
# TODO can't be defined within exchangeConfigWithSingleEL
func `==`(x, y: Quantity): bool {.borrow.}

View File

@ -20,7 +20,7 @@ from ../consensus_object_pools/consensus_manager import
updateHeadWithExecution
from ../consensus_object_pools/blockchain_dag import
getBlockRef, getForkedBlock, getProposer, forkAtEpoch, loadExecutionBlockHash,
markBlockVerified, validatorKey
markBlockVerified, validatorKey, is_optimistic
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
from ../consensus_object_pools/block_pools_types import
@ -230,19 +230,24 @@ from web3/engine_api_types import
PayloadAttributesV1, PayloadAttributesV2, PayloadAttributesV3,
PayloadExecutionStatus, PayloadStatusV1
from ../el/el_manager import
ELManager, forkchoiceUpdated, hasConnection, hasProperlyConfiguredConnection,
sendNewPayload
ELManager, DeadlineObject, forkchoiceUpdated, hasConnection,
hasProperlyConfiguredConnection, sendNewPayload, init
proc expectValidForkchoiceUpdated(
elManager: ELManager, headBlockPayloadAttributesType: typedesc,
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
receivedBlock: ForkySignedBeaconBlock): Future[void] {.async: (raises: [CancelledError]).} =
receivedBlock: ForkySignedBeaconBlock,
deadlineObj: DeadlineObject,
maxRetriesCount: int
): Future[void] {.async: (raises: [CancelledError]).} =
let
(payloadExecutionStatus, _) = await elManager.forkchoiceUpdated(
headBlockHash = headBlockHash,
safeBlockHash = safeBlockHash,
finalizedBlockHash = finalizedBlockHash,
payloadAttributes = Opt.none headBlockPayloadAttributesType)
payloadAttributes = Opt.none headBlockPayloadAttributesType,
deadlineObj = deadlineObj,
maxRetriesCount = maxRetriesCount)
receivedExecutionBlockHash =
when typeof(receivedBlock).kind >= ConsensusFork.Bellatrix:
receivedBlock.message.body.execution_payload.block_hash
@ -277,8 +282,11 @@ from ../consensus_object_pools/attestation_pool import
from ../consensus_object_pools/spec_cache import get_attesting_indices
proc newExecutionPayload*(
elManager: ELManager, blck: SomeForkyBeaconBlock):
Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} =
elManager: ELManager,
blck: SomeForkyBeaconBlock,
deadlineObj: DeadlineObject,
maxRetriesCount: int
): Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} =
template executionPayload: untyped = blck.body.execution_payload
@ -295,7 +303,8 @@ proc newExecutionPayload*(
executionPayload = shortLog(executionPayload)
try:
let payloadStatus = await elManager.sendNewPayload(blck)
let payloadStatus =
await elManager.sendNewPayload(blck, deadlineObj, maxRetriesCount)
debug "newPayload: succeeded",
parentHash = executionPayload.parent_hash,
@ -312,22 +321,34 @@ proc newExecutionPayload*(
blockNumber = executionPayload.block_number
return Opt.none PayloadExecutionStatus
proc newExecutionPayload*(
elManager: ELManager,
blck: SomeForkyBeaconBlock
): Future[Opt[PayloadExecutionStatus]] {.
async: (raises: [CancelledError], raw: true).} =
newExecutionPayload(
elManager, blck, DeadlineObject.init(FORKCHOICEUPDATED_TIMEOUT),
high(int))
proc getExecutionValidity(
elManager: ELManager,
blck: bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock |
deneb.SignedBeaconBlock | electra.SignedBeaconBlock):
Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} =
deneb.SignedBeaconBlock | electra.SignedBeaconBlock,
deadlineObj: DeadlineObject,
maxRetriesCount: int
): Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} =
if not blck.message.is_execution_block:
return NewPayloadStatus.valid # vacuously
try:
let executionPayloadStatus = await elManager.newExecutionPayload(
blck.message)
blck.message, deadlineObj, maxRetriesCount)
if executionPayloadStatus.isNone:
return NewPayloadStatus.noResponse
case executionPayloadStatus.get
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
of PayloadExecutionStatus.invalid,
PayloadExecutionStatus.invalid_block_hash:
# Blocks come either from gossip or request manager requests. In the
# former case, they've passed libp2p gosisp validation which implies
# correct signature for correct proposer,which makes spam expensive,
@ -413,6 +434,20 @@ proc storeBlock(
vm = self.validatorMonitor
dag = self.consensusManager.dag
wallSlot = wallTime.slotOrZero
deadlineTime =
block:
let slotTime = (wallSlot + 1).start_beacon_time() - 1.seconds
if slotTime <= wallTime:
0.seconds
else:
chronos.nanoseconds((slotTime - wallTime).nanoseconds)
deadlineObj = DeadlineObject.init(deadlineTime)
func getRetriesCount(): int =
if dag.is_optimistic(dag.head.bid):
1
else:
high(int)
# If the block is missing its parent, it will be re-orphaned below
self.consensusManager.quarantine[].removeOrphan(signedBlock)
@ -518,7 +553,8 @@ proc storeBlock(
NewPayloadStatus.noResponse
else:
when typeof(signedBlock).kind >= ConsensusFork.Bellatrix:
await self.consensusManager.elManager.getExecutionValidity(signedBlock)
await self.consensusManager.elManager.getExecutionValidity(
signedBlock, deadlineObj, getRetriesCount())
else:
NewPayloadStatus.valid # vacuously
payloadValid = payloadStatus == NewPayloadStatus.valid
@ -685,7 +721,9 @@ proc storeBlock(
self.consensusManager[].optimisticExecutionBlockHash,
safeBlockHash = newHead.get.safeExecutionBlockHash,
finalizedBlockHash = newHead.get.finalizedExecutionBlockHash,
payloadAttributes = Opt.none attributes)
payloadAttributes = Opt.none attributes,
deadlineObj = deadlineObj,
maxRetriesCount = getRetriesCount())
let consensusFork = self.consensusManager.dag.cfg.consensusForkAtEpoch(
newHead.get.blck.bid.slot.epoch)
@ -712,7 +750,9 @@ proc storeBlock(
headBlockHash = headExecutionBlockHash,
safeBlockHash = newHead.get.safeExecutionBlockHash,
finalizedBlockHash = newHead.get.finalizedExecutionBlockHash,
receivedBlock = signedBlock)
receivedBlock = signedBlock,
deadlineObj = deadlineObj,
maxRetriesCount = getRetriesCount())
template callForkChoiceUpdated: auto =
case self.consensusManager.dag.cfg.consensusForkAtEpoch(