# beacon_chain # Copyright (c) 2022-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import chronos, chronicles import ../spec/network, ../networking/eth2_network, ../beacon_clock, "."/[light_client_sync_helpers, light_client_protocol, sync_manager] export sync_manager logScope: topics = "lcman" type Nothing = object ResponseError = object of CatchableError Endpoint[K, V] = (K, V) # https://github.com/nim-lang/Nim/issues/19531 Bootstrap = Endpoint[Eth2Digest, ForkedLightClientBootstrap] UpdatesByRange = Endpoint[ tuple[startPeriod: SyncCommitteePeriod, count: uint64], ForkedLightClientUpdate] FinalityUpdate = Endpoint[Nothing, ForkedLightClientFinalityUpdate] OptimisticUpdate = Endpoint[Nothing, ForkedLightClientOptimisticUpdate] ValueVerifier[V] = proc(v: V): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} BootstrapVerifier* = ValueVerifier[ForkedLightClientBootstrap] UpdateVerifier* = ValueVerifier[ForkedLightClientUpdate] FinalityUpdateVerifier* = ValueVerifier[ForkedLightClientFinalityUpdate] OptimisticUpdateVerifier* = ValueVerifier[ForkedLightClientOptimisticUpdate] GetTrustedBlockRootCallback* = proc(): Option[Eth2Digest] {.gcsafe, raises: [].} GetBoolCallback* = proc(): bool {.gcsafe, raises: [].} GetSyncCommitteePeriodCallback* = proc(): SyncCommitteePeriod {.gcsafe, raises: [].} LightClientManager* = object network: Eth2Node rng: ref HmacDrbgContext getTrustedBlockRoot: GetTrustedBlockRootCallback bootstrapVerifier: BootstrapVerifier updateVerifier: UpdateVerifier finalityUpdateVerifier: FinalityUpdateVerifier optimisticUpdateVerifier: OptimisticUpdateVerifier isLightClientStoreInitialized: GetBoolCallback isNextSyncCommitteeKnown: GetBoolCallback getFinalizedPeriod: GetSyncCommitteePeriodCallback getOptimisticPeriod: GetSyncCommitteePeriodCallback getBeaconTime: GetBeaconTimeFn shouldInhibitSync: GetBoolCallback loopFuture: Future[void].Raising([CancelledError]) func init*( T: type LightClientManager, network: Eth2Node, rng: ref HmacDrbgContext, getTrustedBlockRoot: GetTrustedBlockRootCallback, bootstrapVerifier: BootstrapVerifier, updateVerifier: UpdateVerifier, finalityUpdateVerifier: FinalityUpdateVerifier, optimisticUpdateVerifier: OptimisticUpdateVerifier, isLightClientStoreInitialized: GetBoolCallback, isNextSyncCommitteeKnown: GetBoolCallback, getFinalizedPeriod: GetSyncCommitteePeriodCallback, getOptimisticPeriod: GetSyncCommitteePeriodCallback, getBeaconTime: GetBeaconTimeFn, shouldInhibitSync: GetBoolCallback = nil ): LightClientManager = ## Initialize light client manager. LightClientManager( network: network, rng: rng, getTrustedBlockRoot: getTrustedBlockRoot, bootstrapVerifier: bootstrapVerifier, updateVerifier: updateVerifier, finalityUpdateVerifier: finalityUpdateVerifier, optimisticUpdateVerifier: optimisticUpdateVerifier, isLightClientStoreInitialized: isLightClientStoreInitialized, isNextSyncCommitteeKnown: isNextSyncCommitteeKnown, getFinalizedPeriod: getFinalizedPeriod, getOptimisticPeriod: getOptimisticPeriod, getBeaconTime: getBeaconTime, shouldInhibitSync: shouldInhibitSync) proc isGossipSupported*( self: LightClientManager, period: SyncCommitteePeriod ): bool = ## Indicate whether the light client is sufficiently synced to accept gossip. if not self.isLightClientStoreInitialized(): return false period.isGossipSupported( finalizedPeriod = self.getFinalizedPeriod(), isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()) # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap proc doRequest( e: typedesc[Bootstrap], peer: Peer, blockRoot: Eth2Digest ): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError], raw: true).} = peer.lightClientBootstrap(blockRoot) # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange type LightClientUpdatesByRangeResponse = NetRes[List[ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]] proc doRequest( e: typedesc[UpdatesByRange], peer: Peer, key: tuple[startPeriod: SyncCommitteePeriod, count: uint64] ): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [ResponseError, CancelledError]).} = let (startPeriod, count) = key doAssert count > 0 and count <= MAX_REQUEST_LIGHT_CLIENT_UPDATES let response = await peer.lightClientUpdatesByRange(startPeriod, count) if response.isOk: let e = distinctBase(response.get) .checkLightClientUpdates(startPeriod, count) if e.isErr: raise newException(ResponseError, e.error) return response # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate proc doRequest( e: typedesc[FinalityUpdate], peer: Peer ): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError], raw: true).} = peer.lightClientFinalityUpdate() # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate proc doRequest( e: typedesc[OptimisticUpdate], peer: Peer ): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError], raw: true).} = peer.lightClientOptimisticUpdate() template valueVerifier[E]( self: LightClientManager, e: typedesc[E] ): ValueVerifier[E.V] = when E.V is ForkedLightClientBootstrap: self.bootstrapVerifier elif E.V is ForkedLightClientUpdate: self.updateVerifier elif E.V is ForkedLightClientFinalityUpdate: self.finalityUpdateVerifier elif E.V is ForkedLightClientOptimisticUpdate: self.optimisticUpdateVerifier else: static: doAssert false iterator values(v: auto): auto = ## Local helper for `workerTask` to share the same implementation for both ## scalar and aggregate values, by treating scalars as 1-length aggregates. when v is List: for i in v: yield i else: yield v proc workerTask[E]( self: LightClientManager, e: typedesc[E], key: E.K ): Future[bool] {.async: (raises: [CancelledError]).} = var peer: Peer didProgress = false try: peer = self.network.peerPool.acquireNoWait() let value = when E.K is Nothing: await E.doRequest(peer) else: await E.doRequest(peer, key) if value.isOk: var applyReward = false for val in value.get().values: let res = await self.valueVerifier(E)(val) if res.isErr: case res.error of VerifierError.MissingParent: # Stop, requires different request to progress return didProgress of VerifierError.Duplicate: # Ignore, a concurrent request may have already fulfilled this when E.V is ForkedLightClientBootstrap: didProgress = true else: discard of VerifierError.UnviableFork: # Descore, peer is on an incompatible fork version withForkyObject(val): when lcDataFork > LightClientDataFork.None: notice "Received value from an unviable fork", value = forkyObject, endpoint = E.name, peer, peer_score = peer.getScore() else: notice "Received value from an unviable fork", endpoint = E.name, peer, peer_score = peer.getScore() peer.updateScore(PeerScoreUnviableFork) return didProgress of VerifierError.Invalid: # Descore, received data is malformed withForkyObject(val): when lcDataFork > LightClientDataFork.None: warn "Received invalid value", value = forkyObject.shortLog, endpoint = E.name, peer, peer_score = peer.getScore() else: warn "Received invalid value", endpoint = E.name, peer, peer_score = peer.getScore() peer.updateScore(PeerScoreBadValues) return didProgress else: # Reward, peer returned something useful applyReward = true didProgress = true if applyReward: peer.updateScore(PeerScoreGoodValues) else: peer.updateScore(PeerScoreNoValues) debug "Failed to receive value on request", value, endpoint = E.name, peer, peer_score = peer.getScore() except ResponseError as exc: warn "Received invalid response", error = exc.msg, endpoint = E.name, peer, peer_score = peer.getScore() peer.updateScore(PeerScoreBadValues) except CancelledError as exc: raise exc except PeerPoolError as exc: debug "Failed to acquire peer", exc = exc.msg finally: if peer != nil: self.network.peerPool.release(peer) return didProgress proc query[E]( self: LightClientManager, e: typedesc[E], key: E.K ): Future[bool] {.async: (raises: [CancelledError]).} = const PARALLEL_REQUESTS = 2 var workers: array[PARALLEL_REQUESTS, Future[bool]] let progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress") doneFut = Future[void].Raising([CancelledError]).init("lcmanDone") var numCompleted = 0 maxCompleted = workers.len proc handleFinishedWorker(future: pointer) = try: let didProgress = cast[Future[bool]](future).read() if didProgress and not progressFut.finished: progressFut.complete() except CancelledError: if not progressFut.finished: progressFut.cancelSoon() except CatchableError: discard finally: inc numCompleted if numCompleted == maxCompleted: doneFut.complete() try: # Start concurrent workers for i in 0 ..< workers.len: try: workers[i] = self.workerTask(e, key) workers[i].addCallback(handleFinishedWorker) except CancelledError as exc: raise exc except CatchableError: workers[i] = newFuture[bool]() workers[i].complete(false) # Wait for any worker to report progress, or for all workers to finish try: discard await race(progressFut, doneFut) except ValueError: raiseAssert "race API invariant" finally: for i in 0 ..< maxCompleted: if workers[i] == nil: maxCompleted = i if numCompleted == maxCompleted: doneFut.complete() break if not workers[i].finished: workers[i].cancelSoon() while true: try: await allFutures(workers[0 ..< maxCompleted]) break except CancelledError: continue while true: try: await doneFut break except CancelledError: continue if not progressFut.finished: progressFut.cancelSoon() return progressFut.completed template query[E]( self: LightClientManager, e: typedesc[E] ): Future[bool].Raising([CancelledError]) = self.query(e, Nothing()) # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.7/specs/altair/light-client/light-client.md#light-client-sync-process proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = var nextSyncTaskTime = self.getBeaconTime() while true: # Periodically wake and check for changes let wallTime = self.getBeaconTime() if wallTime < nextSyncTaskTime or (self.shouldInhibitSync != nil and self.shouldInhibitSync()) or self.network.peerPool.lenAvailable < 1: await sleepAsync(chronos.seconds(2)) continue # Obtain bootstrap data once a trusted block root is supplied if not self.isLightClientStoreInitialized(): let trustedBlockRoot = self.getTrustedBlockRoot() if trustedBlockRoot.isNone: await sleepAsync(chronos.seconds(2)) continue let didProgress = await self.query(Bootstrap, trustedBlockRoot.get) nextSyncTaskTime = if didProgress: wallTime else: wallTime + self.rng.computeDelayWithJitter(chronos.seconds(0)) continue # Fetch updates let current = wallTime.slotOrZero().sync_committee_period syncTask = nextLightClientSyncTask( current = current, finalized = self.getFinalizedPeriod(), optimistic = self.getOptimisticPeriod(), isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()) didProgress = case syncTask.kind of LcSyncKind.UpdatesByRange: await self.query(UpdatesByRange, (startPeriod: syncTask.startPeriod, count: syncTask.count)) of LcSyncKind.FinalityUpdate: await self.query(FinalityUpdate) of LcSyncKind.OptimisticUpdate: await self.query(OptimisticUpdate) nextSyncTaskTime = wallTime + self.rng.nextLcSyncTaskDelay( wallTime, finalized = self.getFinalizedPeriod(), optimistic = self.getOptimisticPeriod(), isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(), didLatestSyncTaskProgress = didProgress) proc start*(self: var LightClientManager) = ## Start light client manager's loop. doAssert self.loopFuture == nil self.loopFuture = self.loop() proc stop*(self: var LightClientManager) {.async: (raises: []).} = ## Stop light client manager's loop. if self.loopFuture != nil: await noCancel self.loopFuture.cancelAndWait() self.loopFuture = nil