nimbus-eth2/beacon_chain/future_combinators.nim

99 lines
3.1 KiB
Nim
Raw Normal View History

Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 03:40:21 +02:00
# TODO: These should be added to the Chronos's asyncfutures2 module
# See https://github.com/status-im/nim-chronos/pull/339
import
chronos
proc firstCompletedFuture*(futs: varargs[FutureBase]): Future[FutureBase] =
## Returns a future which will complete and return completed FutureBase,
## when one of the futures in ``futs`` is completed.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success, the returned Future will hold the completed FutureBase.
##
## If all futures fail naturally or due to cancellation, the returned
## future will be failed as well.
##
## On cancellation, futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[FutureBase]("chronos.firstCompletedFuture()")
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs
# If one of the Future[T] already finished we return it as result
for fut in nfuts:
if fut.completed():
retFuture.complete(fut)
return retFuture
if len(nfuts) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return
var failedFutures = 0
var cb: proc(udata: pointer) {.gcsafe, raises: [Defect].}
cb = proc(udata: pointer) {.gcsafe, raises: [Defect].} =
if not(retFuture.finished()):
var res: FutureBase
var rfut = cast[FutureBase](udata)
if rfut.completed:
for i in 0..<len(nfuts):
if nfuts[i] != rfut:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
retFuture.complete(res)
else:
inc failedFutures
if failedFutures == nfuts.len:
retFuture.fail(newException(CatchableError,
"None of the operations completed successfully"))
proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)
for fut in nfuts:
fut.addCallback(cb, cast[pointer](fut))
retFuture.cancelCallback = cancellation
return retFuture
proc firstCompleted*[T](futs: varargs[Future[T]]): Future[T] =
## On success, the returned Future will hold the result of the first
## completed imput Future.
##
## If the varargs list is empty, the returned future FAILS immediately.
##
## If all futures fail naturally or due to cancellation, the returned
## future will be failed as well.
##
## On cancellation, futures in ``futs`` WILL NOT BE cancelled.
let subFuture = firstCompletedFuture(futs)
if subFuture.completed:
return Future[T](subFuture.read)
var retFuture = newFuture[T]("chronos.firstCompleted()")
if subFuture.finished: # It must be failed ot cancelled
retFuture.fail(subFuture.error)
return retFuture
proc cb(udata: pointer) {.gcsafe, raises: [Defect].} =
let subFuture = cast[Future[FutureBase]](udata)
if subFuture.completed:
retFuture.complete(Future[T](subFuture.read).read)
else:
retFuture.fail(subFuture.error)
subFuture.addCallback(cb, cast[pointer](subFuture))
retFuture.cancelCallback = proc (udata: pointer) =
subFuture.cancel()