mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-12 15:24:14 +00:00
bd09e4d864
Normally, running LC and DAG sync at same time is fine, but on tiny devnet where some peer may not support the LC data, we can end up in situation where peer gets disconnected when DAG is in sync, because DAG sync never uses any req/resp on local devnet (perfect nw conditions) so the LC sync over minutes removes the peer as sync is stuck. We don't need to actively sync LC from network if DAG is already synced, preventing this specific low peer devnet issue (there are others still). LC is still locally updated when DAG finalized checkpoint advances.
397 lines
14 KiB
Nim
397 lines
14 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import chronos, chronicles
|
|
import
|
|
../spec/network,
|
|
../networking/eth2_network,
|
|
../beacon_clock,
|
|
"."/[light_client_sync_helpers, light_client_protocol, sync_manager]
|
|
export sync_manager
|
|
|
|
logScope:
|
|
topics = "lcman"
|
|
|
|
type
|
|
Nothing = object
|
|
ResponseError = object of CatchableError
|
|
Endpoint[K, V] =
|
|
(K, V) # https://github.com/nim-lang/Nim/issues/19531
|
|
Bootstrap =
|
|
Endpoint[Eth2Digest, ForkedLightClientBootstrap]
|
|
UpdatesByRange =
|
|
Endpoint[
|
|
tuple[startPeriod: SyncCommitteePeriod, count: uint64],
|
|
ForkedLightClientUpdate]
|
|
FinalityUpdate =
|
|
Endpoint[Nothing, ForkedLightClientFinalityUpdate]
|
|
OptimisticUpdate =
|
|
Endpoint[Nothing, ForkedLightClientOptimisticUpdate]
|
|
|
|
ValueVerifier[V] =
|
|
proc(v: V): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
|
|
BootstrapVerifier* =
|
|
ValueVerifier[ForkedLightClientBootstrap]
|
|
UpdateVerifier* =
|
|
ValueVerifier[ForkedLightClientUpdate]
|
|
FinalityUpdateVerifier* =
|
|
ValueVerifier[ForkedLightClientFinalityUpdate]
|
|
OptimisticUpdateVerifier* =
|
|
ValueVerifier[ForkedLightClientOptimisticUpdate]
|
|
|
|
GetTrustedBlockRootCallback* =
|
|
proc(): Option[Eth2Digest] {.gcsafe, raises: [].}
|
|
GetBoolCallback* =
|
|
proc(): bool {.gcsafe, raises: [].}
|
|
GetSyncCommitteePeriodCallback* =
|
|
proc(): SyncCommitteePeriod {.gcsafe, raises: [].}
|
|
|
|
LightClientManager* = object
|
|
network: Eth2Node
|
|
rng: ref HmacDrbgContext
|
|
getTrustedBlockRoot: GetTrustedBlockRootCallback
|
|
bootstrapVerifier: BootstrapVerifier
|
|
updateVerifier: UpdateVerifier
|
|
finalityUpdateVerifier: FinalityUpdateVerifier
|
|
optimisticUpdateVerifier: OptimisticUpdateVerifier
|
|
isLightClientStoreInitialized: GetBoolCallback
|
|
isNextSyncCommitteeKnown: GetBoolCallback
|
|
getFinalizedPeriod: GetSyncCommitteePeriodCallback
|
|
getOptimisticPeriod: GetSyncCommitteePeriodCallback
|
|
getBeaconTime: GetBeaconTimeFn
|
|
shouldInhibitSync: GetBoolCallback
|
|
loopFuture: Future[void].Raising([CancelledError])
|
|
|
|
func init*(
|
|
T: type LightClientManager,
|
|
network: Eth2Node,
|
|
rng: ref HmacDrbgContext,
|
|
getTrustedBlockRoot: GetTrustedBlockRootCallback,
|
|
bootstrapVerifier: BootstrapVerifier,
|
|
updateVerifier: UpdateVerifier,
|
|
finalityUpdateVerifier: FinalityUpdateVerifier,
|
|
optimisticUpdateVerifier: OptimisticUpdateVerifier,
|
|
isLightClientStoreInitialized: GetBoolCallback,
|
|
isNextSyncCommitteeKnown: GetBoolCallback,
|
|
getFinalizedPeriod: GetSyncCommitteePeriodCallback,
|
|
getOptimisticPeriod: GetSyncCommitteePeriodCallback,
|
|
getBeaconTime: GetBeaconTimeFn,
|
|
shouldInhibitSync: GetBoolCallback = nil
|
|
): LightClientManager =
|
|
## Initialize light client manager.
|
|
LightClientManager(
|
|
network: network,
|
|
rng: rng,
|
|
getTrustedBlockRoot: getTrustedBlockRoot,
|
|
bootstrapVerifier: bootstrapVerifier,
|
|
updateVerifier: updateVerifier,
|
|
finalityUpdateVerifier: finalityUpdateVerifier,
|
|
optimisticUpdateVerifier: optimisticUpdateVerifier,
|
|
isLightClientStoreInitialized: isLightClientStoreInitialized,
|
|
isNextSyncCommitteeKnown: isNextSyncCommitteeKnown,
|
|
getFinalizedPeriod: getFinalizedPeriod,
|
|
getOptimisticPeriod: getOptimisticPeriod,
|
|
getBeaconTime: getBeaconTime,
|
|
shouldInhibitSync: shouldInhibitSync)
|
|
|
|
proc isGossipSupported*(
|
|
self: LightClientManager,
|
|
period: SyncCommitteePeriod
|
|
): bool =
|
|
## Indicate whether the light client is sufficiently synced to accept gossip.
|
|
if not self.isLightClientStoreInitialized():
|
|
return false
|
|
|
|
period.isGossipSupported(
|
|
finalizedPeriod = self.getFinalizedPeriod(),
|
|
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown())
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap
|
|
proc doRequest(
|
|
e: typedesc[Bootstrap],
|
|
peer: Peer,
|
|
blockRoot: Eth2Digest
|
|
): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError], raw: true).} =
|
|
peer.lightClientBootstrap(blockRoot)
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
|
|
type LightClientUpdatesByRangeResponse =
|
|
NetRes[List[ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]]
|
|
proc doRequest(
|
|
e: typedesc[UpdatesByRange],
|
|
peer: Peer,
|
|
key: tuple[startPeriod: SyncCommitteePeriod, count: uint64]
|
|
): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [ResponseError, CancelledError]).} =
|
|
let (startPeriod, count) = key
|
|
doAssert count > 0 and count <= MAX_REQUEST_LIGHT_CLIENT_UPDATES
|
|
let response = await peer.lightClientUpdatesByRange(startPeriod, count)
|
|
if response.isOk:
|
|
let e = distinctBase(response.get)
|
|
.checkLightClientUpdates(startPeriod, count)
|
|
if e.isErr:
|
|
raise newException(ResponseError, e.error)
|
|
return response
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
|
|
proc doRequest(
|
|
e: typedesc[FinalityUpdate],
|
|
peer: Peer
|
|
): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError], raw: true).} =
|
|
peer.lightClientFinalityUpdate()
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
|
|
proc doRequest(
|
|
e: typedesc[OptimisticUpdate],
|
|
peer: Peer
|
|
): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError], raw: true).} =
|
|
peer.lightClientOptimisticUpdate()
|
|
|
|
template valueVerifier[E](
|
|
self: LightClientManager,
|
|
e: typedesc[E]
|
|
): ValueVerifier[E.V] =
|
|
when E.V is ForkedLightClientBootstrap:
|
|
self.bootstrapVerifier
|
|
elif E.V is ForkedLightClientUpdate:
|
|
self.updateVerifier
|
|
elif E.V is ForkedLightClientFinalityUpdate:
|
|
self.finalityUpdateVerifier
|
|
elif E.V is ForkedLightClientOptimisticUpdate:
|
|
self.optimisticUpdateVerifier
|
|
else: static: doAssert false
|
|
|
|
iterator values(v: auto): auto =
|
|
## Local helper for `workerTask` to share the same implementation for both
|
|
## scalar and aggregate values, by treating scalars as 1-length aggregates.
|
|
when v is List:
|
|
for i in v:
|
|
yield i
|
|
else:
|
|
yield v
|
|
|
|
proc workerTask[E](
|
|
self: LightClientManager,
|
|
e: typedesc[E],
|
|
key: E.K
|
|
): Future[bool] {.async: (raises: [CancelledError]).} =
|
|
var
|
|
peer: Peer
|
|
didProgress = false
|
|
try:
|
|
peer = self.network.peerPool.acquireNoWait()
|
|
let value =
|
|
when E.K is Nothing:
|
|
await E.doRequest(peer)
|
|
else:
|
|
await E.doRequest(peer, key)
|
|
if value.isOk:
|
|
var applyReward = false
|
|
for val in value.get().values:
|
|
let res = await self.valueVerifier(E)(val)
|
|
if res.isErr:
|
|
case res.error
|
|
of VerifierError.MissingParent:
|
|
# Stop, requires different request to progress
|
|
return didProgress
|
|
of VerifierError.Duplicate:
|
|
# Ignore, a concurrent request may have already fulfilled this
|
|
when E.V is ForkedLightClientBootstrap:
|
|
didProgress = true
|
|
else:
|
|
discard
|
|
of VerifierError.UnviableFork:
|
|
# Descore, peer is on an incompatible fork version
|
|
withForkyObject(val):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
notice "Received value from an unviable fork",
|
|
value = forkyObject,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
else:
|
|
notice "Received value from an unviable fork",
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
peer.updateScore(PeerScoreUnviableFork)
|
|
return didProgress
|
|
of VerifierError.Invalid:
|
|
# Descore, received data is malformed
|
|
withForkyObject(val):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
warn "Received invalid value", value = forkyObject.shortLog,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
else:
|
|
warn "Received invalid value",
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
peer.updateScore(PeerScoreBadValues)
|
|
return didProgress
|
|
else:
|
|
# Reward, peer returned something useful
|
|
applyReward = true
|
|
didProgress = true
|
|
if applyReward:
|
|
peer.updateScore(PeerScoreGoodValues)
|
|
else:
|
|
peer.updateScore(PeerScoreNoValues)
|
|
debug "Failed to receive value on request", value,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
except ResponseError as exc:
|
|
warn "Received invalid response", error = exc.msg,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
peer.updateScore(PeerScoreBadValues)
|
|
except CancelledError as exc:
|
|
raise exc
|
|
except PeerPoolError as exc:
|
|
debug "Failed to acquire peer", exc = exc.msg
|
|
finally:
|
|
if peer != nil:
|
|
self.network.peerPool.release(peer)
|
|
return didProgress
|
|
|
|
proc query[E](
|
|
self: LightClientManager,
|
|
e: typedesc[E],
|
|
key: E.K
|
|
): Future[bool] {.async: (raises: [CancelledError]).} =
|
|
const PARALLEL_REQUESTS = 2
|
|
var workers: array[PARALLEL_REQUESTS, Future[bool]]
|
|
|
|
let
|
|
progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress")
|
|
doneFut = Future[void].Raising([CancelledError]).init("lcmanDone")
|
|
var
|
|
numCompleted = 0
|
|
maxCompleted = workers.len
|
|
|
|
proc handleFinishedWorker(future: pointer) =
|
|
try:
|
|
let didProgress = cast[Future[bool]](future).read()
|
|
if didProgress and not progressFut.finished:
|
|
progressFut.complete()
|
|
except CancelledError:
|
|
if not progressFut.finished:
|
|
progressFut.cancelSoon()
|
|
except CatchableError:
|
|
discard
|
|
finally:
|
|
inc numCompleted
|
|
if numCompleted == maxCompleted:
|
|
doneFut.complete()
|
|
|
|
try:
|
|
# Start concurrent workers
|
|
for i in 0 ..< workers.len:
|
|
try:
|
|
workers[i] = self.workerTask(e, key)
|
|
workers[i].addCallback(handleFinishedWorker)
|
|
except CancelledError as exc:
|
|
raise exc
|
|
except CatchableError:
|
|
workers[i] = newFuture[bool]()
|
|
workers[i].complete(false)
|
|
|
|
# Wait for any worker to report progress, or for all workers to finish
|
|
try:
|
|
discard await race(progressFut, doneFut)
|
|
except ValueError:
|
|
raiseAssert "race API invariant"
|
|
finally:
|
|
for i in 0 ..< maxCompleted:
|
|
if workers[i] == nil:
|
|
maxCompleted = i
|
|
if numCompleted == maxCompleted:
|
|
doneFut.complete()
|
|
break
|
|
if not workers[i].finished:
|
|
workers[i].cancelSoon()
|
|
while true:
|
|
try:
|
|
await allFutures(workers[0 ..< maxCompleted])
|
|
break
|
|
except CancelledError:
|
|
continue
|
|
while true:
|
|
try:
|
|
await doneFut
|
|
break
|
|
except CancelledError:
|
|
continue
|
|
|
|
if not progressFut.finished:
|
|
progressFut.cancelSoon()
|
|
return progressFut.completed
|
|
|
|
template query[E](
|
|
self: LightClientManager,
|
|
e: typedesc[E]
|
|
): Future[bool].Raising([CancelledError]) =
|
|
self.query(e, Nothing())
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.5/specs/altair/light-client/light-client.md#light-client-sync-process
|
|
proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
|
|
var nextSyncTaskTime = self.getBeaconTime()
|
|
while true:
|
|
# Periodically wake and check for changes
|
|
let wallTime = self.getBeaconTime()
|
|
if wallTime < nextSyncTaskTime or
|
|
(self.shouldInhibitSync != nil and self.shouldInhibitSync()) or
|
|
self.network.peerPool.lenAvailable < 1:
|
|
await sleepAsync(chronos.seconds(2))
|
|
continue
|
|
|
|
# Obtain bootstrap data once a trusted block root is supplied
|
|
if not self.isLightClientStoreInitialized():
|
|
let trustedBlockRoot = self.getTrustedBlockRoot()
|
|
if trustedBlockRoot.isNone:
|
|
await sleepAsync(chronos.seconds(2))
|
|
continue
|
|
|
|
let didProgress = await self.query(Bootstrap, trustedBlockRoot.get)
|
|
nextSyncTaskTime =
|
|
if didProgress:
|
|
wallTime
|
|
else:
|
|
wallTime + self.rng.computeDelayWithJitter(chronos.seconds(0))
|
|
continue
|
|
|
|
# Fetch updates
|
|
let
|
|
current = wallTime.slotOrZero().sync_committee_period
|
|
|
|
syncTask = nextLightClientSyncTask(
|
|
current = current,
|
|
finalized = self.getFinalizedPeriod(),
|
|
optimistic = self.getOptimisticPeriod(),
|
|
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown())
|
|
|
|
didProgress =
|
|
case syncTask.kind
|
|
of LcSyncKind.UpdatesByRange:
|
|
await self.query(UpdatesByRange,
|
|
(startPeriod: syncTask.startPeriod, count: syncTask.count))
|
|
of LcSyncKind.FinalityUpdate:
|
|
await self.query(FinalityUpdate)
|
|
of LcSyncKind.OptimisticUpdate:
|
|
await self.query(OptimisticUpdate)
|
|
|
|
nextSyncTaskTime = wallTime + self.rng.nextLcSyncTaskDelay(
|
|
wallTime,
|
|
finalized = self.getFinalizedPeriod(),
|
|
optimistic = self.getOptimisticPeriod(),
|
|
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(),
|
|
didLatestSyncTaskProgress = didProgress)
|
|
|
|
proc start*(self: var LightClientManager) =
|
|
## Start light client manager's loop.
|
|
doAssert self.loopFuture == nil
|
|
self.loopFuture = self.loop()
|
|
|
|
proc stop*(self: var LightClientManager) {.async: (raises: []).} =
|
|
## Stop light client manager's loop.
|
|
if self.loopFuture != nil:
|
|
await noCancel self.loopFuture.cancelAndWait()
|
|
self.loopFuture = nil
|