mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-01 09:17:52 +00:00
Fix EL manager to wait for non-syncing/accepted response. (#6812)
* Fix EL manager to wait for non-syncing/accepted response. * Update copyright year.
This commit is contained in:
parent
e580fd2b69
commit
b831692968
@ -1,5 +1,5 @@
|
|||||||
# beacon_chain
|
# beacon_chain
|
||||||
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
# Copyright (c) 2018-2025 Status Research & Development GmbH
|
||||||
# Licensed and distributed under either of
|
# Licensed and distributed under either of
|
||||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
@ -918,11 +918,13 @@ func compareStatuses(
|
|||||||
type
|
type
|
||||||
ELConsensusViolationDetector = object
|
ELConsensusViolationDetector = object
|
||||||
selectedResponse: Opt[int]
|
selectedResponse: Opt[int]
|
||||||
|
selectedStatus: Opt[PayloadExecutionStatus]
|
||||||
disagreementAlreadyDetected: bool
|
disagreementAlreadyDetected: bool
|
||||||
|
|
||||||
func init(T: type ELConsensusViolationDetector): T =
|
func init(T: type ELConsensusViolationDetector): T =
|
||||||
ELConsensusViolationDetector(
|
ELConsensusViolationDetector(
|
||||||
selectedResponse: Opt.none(int),
|
selectedResponse: Opt.none(int),
|
||||||
|
selectedStatus: Opt.none(PayloadExecutionStatus),
|
||||||
disagreementAlreadyDetected: false
|
disagreementAlreadyDetected: false
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -939,11 +941,13 @@ proc processResponse(
|
|||||||
let status = requests[idx].value().status
|
let status = requests[idx].value().status
|
||||||
if d.selectedResponse.isNone:
|
if d.selectedResponse.isNone:
|
||||||
d.selectedResponse = Opt.some(idx)
|
d.selectedResponse = Opt.some(idx)
|
||||||
|
d.selectedStatus = Opt.some(status)
|
||||||
elif not d.disagreementAlreadyDetected:
|
elif not d.disagreementAlreadyDetected:
|
||||||
let prevStatus = requests[d.selectedResponse.get].value().status
|
let prevStatus = requests[d.selectedResponse.get].value().status
|
||||||
case compareStatuses(status, prevStatus)
|
case compareStatuses(status, prevStatus)
|
||||||
of newStatusIsPreferable:
|
of newStatusIsPreferable:
|
||||||
d.selectedResponse = Opt.some(idx)
|
d.selectedResponse = Opt.some(idx)
|
||||||
|
d.selectedStatus = Opt.some(status)
|
||||||
of oldStatusIsOk:
|
of oldStatusIsOk:
|
||||||
discard
|
discard
|
||||||
of disagreement:
|
of disagreement:
|
||||||
@ -955,6 +959,21 @@ proc processResponse(
|
|||||||
url2 = connections[idx].engineUrl.url,
|
url2 = connections[idx].engineUrl.url,
|
||||||
status2 = status
|
status2 = status
|
||||||
|
|
||||||
|
proc couldBeBetter(d: ELConsensusViolationDetector): bool =
|
||||||
|
const
|
||||||
|
SyncingOrAccepted = {
|
||||||
|
PayloadExecutionStatus.syncing,
|
||||||
|
PayloadExecutionStatus.accepted
|
||||||
|
}
|
||||||
|
if d.disagreementAlreadyDetected:
|
||||||
|
return false
|
||||||
|
if d.selectedStatus.isNone():
|
||||||
|
return true
|
||||||
|
if d.selectedStatus.get() in SyncingOrAccepted:
|
||||||
|
true
|
||||||
|
else:
|
||||||
|
false
|
||||||
|
|
||||||
proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
|
proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
|
||||||
block:
|
block:
|
||||||
let pending = futures.filterIt(not(it.finished()))
|
let pending = futures.filterIt(not(it.finished()))
|
||||||
@ -1070,11 +1089,14 @@ proc sendNewPayload*(
|
|||||||
await noCancel allFutures(pending)
|
await noCancel allFutures(pending)
|
||||||
return PayloadExecutionStatus.invalid
|
return PayloadExecutionStatus.invalid
|
||||||
elif responseProcessor.selectedResponse.isSome():
|
elif responseProcessor.selectedResponse.isSome():
|
||||||
# We spawn task which will wait for all other responses which are
|
if (len(pendingRequests) == 0) or
|
||||||
# still pending, after 30.seconds all pending requests will be
|
not(responseProcessor.couldBeBetter()):
|
||||||
# cancelled.
|
# We spawn task which will wait for all other responses which are
|
||||||
asyncSpawn lazyWait(pendingRequests.mapIt(FutureBase(it)))
|
# still pending, after 30.seconds all pending requests will be
|
||||||
return requests[responseProcessor.selectedResponse.get].value().status
|
# cancelled.
|
||||||
|
asyncSpawn lazyWait(pendingRequests.mapIt(FutureBase(it)))
|
||||||
|
return
|
||||||
|
requests[responseProcessor.selectedResponse.get].value().status
|
||||||
|
|
||||||
if timeoutExceeded:
|
if timeoutExceeded:
|
||||||
# Timeout exceeded, cancelling all pending requests.
|
# Timeout exceeded, cancelling all pending requests.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user