mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-25 22:11:06 +00:00
2eb56f6e1b
The various `PeerScore` constants are used for both beacon blocks and LC objects, and will likely also find use for EIP4844 blob sidecars. Renaming them to use more generically applicable names not referring to blocks explicitly aymore.
456 lines
15 KiB
Nim
456 lines
15 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2022 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import chronos, chronicles, stew/base10
|
|
import
|
|
eth/p2p/discoveryv5/random2,
|
|
../spec/datatypes/[altair],
|
|
../networking/eth2_network,
|
|
../beacon_clock,
|
|
"."/sync_protocol, "."/sync_manager
|
|
export sync_manager
|
|
|
|
logScope:
|
|
topics = "lcman"
|
|
|
|
type
|
|
Nothing = object
|
|
ResponseError = object of CatchableError
|
|
Endpoint[K, V] =
|
|
(K, V) # https://github.com/nim-lang/Nim/issues/19531
|
|
Bootstrap =
|
|
Endpoint[Eth2Digest, altair.LightClientBootstrap]
|
|
UpdatesByRange =
|
|
Endpoint[Slice[SyncCommitteePeriod], altair.LightClientUpdate]
|
|
FinalityUpdate =
|
|
Endpoint[Nothing, altair.LightClientFinalityUpdate]
|
|
OptimisticUpdate =
|
|
Endpoint[Nothing, altair.LightClientOptimisticUpdate]
|
|
|
|
ValueVerifier[V] =
|
|
proc(v: V): Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
|
BootstrapVerifier* =
|
|
ValueVerifier[altair.LightClientBootstrap]
|
|
UpdateVerifier* =
|
|
ValueVerifier[altair.LightClientUpdate]
|
|
FinalityUpdateVerifier* =
|
|
ValueVerifier[altair.LightClientFinalityUpdate]
|
|
OptimisticUpdateVerifier* =
|
|
ValueVerifier[altair.LightClientOptimisticUpdate]
|
|
|
|
GetTrustedBlockRootCallback* =
|
|
proc(): Option[Eth2Digest] {.gcsafe, raises: [Defect].}
|
|
GetBoolCallback* =
|
|
proc(): bool {.gcsafe, raises: [Defect].}
|
|
GetSyncCommitteePeriodCallback* =
|
|
proc(): SyncCommitteePeriod {.gcsafe, raises: [Defect].}
|
|
|
|
LightClientManager* = object
|
|
network: Eth2Node
|
|
rng: ref HmacDrbgContext
|
|
getTrustedBlockRoot: GetTrustedBlockRootCallback
|
|
bootstrapVerifier: BootstrapVerifier
|
|
updateVerifier: UpdateVerifier
|
|
finalityUpdateVerifier: FinalityUpdateVerifier
|
|
optimisticUpdateVerifier: OptimisticUpdateVerifier
|
|
isLightClientStoreInitialized: GetBoolCallback
|
|
isNextSyncCommitteeKnown: GetBoolCallback
|
|
getFinalizedPeriod: GetSyncCommitteePeriodCallback
|
|
getOptimisticPeriod: GetSyncCommitteePeriodCallback
|
|
getBeaconTime: GetBeaconTimeFn
|
|
loopFuture: Future[void]
|
|
|
|
func init*(
|
|
T: type LightClientManager,
|
|
network: Eth2Node,
|
|
rng: ref HmacDrbgContext,
|
|
getTrustedBlockRoot: GetTrustedBlockRootCallback,
|
|
bootstrapVerifier: BootstrapVerifier,
|
|
updateVerifier: UpdateVerifier,
|
|
finalityUpdateVerifier: FinalityUpdateVerifier,
|
|
optimisticUpdateVerifier: OptimisticUpdateVerifier,
|
|
isLightClientStoreInitialized: GetBoolCallback,
|
|
isNextSyncCommitteeKnown: GetBoolCallback,
|
|
getFinalizedPeriod: GetSyncCommitteePeriodCallback,
|
|
getOptimisticPeriod: GetSyncCommitteePeriodCallback,
|
|
getBeaconTime: GetBeaconTimeFn
|
|
): LightClientManager =
|
|
## Initialize light client manager.
|
|
LightClientManager(
|
|
network: network,
|
|
rng: rng,
|
|
getTrustedBlockRoot: getTrustedBlockRoot,
|
|
bootstrapVerifier: bootstrapVerifier,
|
|
updateVerifier: updateVerifier,
|
|
finalityUpdateVerifier: finalityUpdateVerifier,
|
|
optimisticUpdateVerifier: optimisticUpdateVerifier,
|
|
isLightClientStoreInitialized: isLightClientStoreInitialized,
|
|
isNextSyncCommitteeKnown: isNextSyncCommitteeKnown,
|
|
getFinalizedPeriod: getFinalizedPeriod,
|
|
getOptimisticPeriod: getOptimisticPeriod,
|
|
getBeaconTime: getBeaconTime
|
|
)
|
|
|
|
proc isGossipSupported*(
|
|
self: LightClientManager,
|
|
period: SyncCommitteePeriod
|
|
): bool =
|
|
## Indicate whether the light client is sufficiently synced to accept gossip.
|
|
if not self.isLightClientStoreInitialized():
|
|
return false
|
|
|
|
let
|
|
finalizedPeriod = self.getFinalizedPeriod()
|
|
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
|
|
if isNextSyncCommitteeKnown:
|
|
period <= finalizedPeriod + 1
|
|
else:
|
|
period <= finalizedPeriod
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-alpha.0/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap
|
|
proc doRequest(
|
|
e: typedesc[Bootstrap],
|
|
peer: Peer,
|
|
blockRoot: Eth2Digest
|
|
): Future[NetRes[altair.LightClientBootstrap]] {.
|
|
raises: [Defect, IOError].} =
|
|
peer.lightClientBootstrap(blockRoot)
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-alpha.0/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
|
|
type LightClientUpdatesByRangeResponse =
|
|
NetRes[List[altair.LightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]]
|
|
proc doRequest(
|
|
e: typedesc[UpdatesByRange],
|
|
peer: Peer,
|
|
periods: Slice[SyncCommitteePeriod]
|
|
): Future[LightClientUpdatesByRangeResponse] {.
|
|
async, raises: [Defect, IOError].} =
|
|
let
|
|
startPeriod = periods.a
|
|
lastPeriod = periods.b
|
|
reqCount = min(periods.len, MAX_REQUEST_LIGHT_CLIENT_UPDATES).uint64
|
|
let response = await peer.lightClientUpdatesByRange(startPeriod, reqCount)
|
|
if response.isOk:
|
|
if response.get.lenu64 > reqCount:
|
|
raise newException(ResponseError, "Too many values in response" &
|
|
" (" & Base10.toString(response.get.lenu64) &
|
|
" > " & Base10.toString(reqCount.uint) & ")")
|
|
var expectedPeriod = startPeriod
|
|
for update in response.get:
|
|
let
|
|
attestedPeriod = update.attested_header.slot.sync_committee_period
|
|
signaturePeriod = update.signature_slot.sync_committee_period
|
|
if attestedPeriod != update.signature_slot.sync_committee_period:
|
|
raise newException(ResponseError, "Conflicting sync committee periods" &
|
|
" (signature: " & Base10.toString(distinctBase(signaturePeriod)) &
|
|
" != " & Base10.toString(distinctBase(attestedPeriod)) & ")")
|
|
if attestedPeriod < expectedPeriod:
|
|
raise newException(ResponseError, "Unexpected sync committee period" &
|
|
" (" & Base10.toString(distinctBase(attestedPeriod)) &
|
|
" < " & Base10.toString(distinctBase(expectedPeriod)) & ")")
|
|
if attestedPeriod > expectedPeriod:
|
|
if attestedPeriod > lastPeriod:
|
|
raise newException(ResponseError, "Sync committee period too high" &
|
|
" (" & Base10.toString(distinctBase(attestedPeriod)) &
|
|
" > " & Base10.toString(distinctBase(lastPeriod)) & ")")
|
|
expectedPeriod = attestedPeriod
|
|
inc expectedPeriod
|
|
return response
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-alpha.0/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
|
|
proc doRequest(
|
|
e: typedesc[FinalityUpdate],
|
|
peer: Peer
|
|
): Future[NetRes[altair.LightClientFinalityUpdate]] {.
|
|
raises: [Defect, IOError].} =
|
|
peer.lightClientFinalityUpdate()
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-alpha.0/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
|
|
proc doRequest(
|
|
e: typedesc[OptimisticUpdate],
|
|
peer: Peer
|
|
): Future[NetRes[altair.LightClientOptimisticUpdate]] {.
|
|
raises: [Defect, IOError].} =
|
|
peer.lightClientOptimisticUpdate()
|
|
|
|
template valueVerifier[E](
|
|
self: LightClientManager,
|
|
e: typedesc[E]
|
|
): ValueVerifier[E.V] =
|
|
when E.V is altair.LightClientBootstrap:
|
|
self.bootstrapVerifier
|
|
elif E.V is altair.LightClientUpdate:
|
|
self.updateVerifier
|
|
elif E.V is altair.LightClientFinalityUpdate:
|
|
self.finalityUpdateVerifier
|
|
elif E.V is altair.LightClientOptimisticUpdate:
|
|
self.optimisticUpdateVerifier
|
|
else: static: doAssert false
|
|
|
|
iterator values(v: auto): auto =
|
|
## Local helper for `workerTask` to share the same implementation for both
|
|
## scalar and aggregate values, by treating scalars as 1-length aggregates.
|
|
when v is List:
|
|
for i in v:
|
|
yield i
|
|
else:
|
|
yield v
|
|
|
|
proc workerTask[E](
|
|
self: LightClientManager,
|
|
e: typedesc[E],
|
|
key: E.K
|
|
): Future[bool] {.async.} =
|
|
var
|
|
peer: Peer
|
|
didProgress = false
|
|
try:
|
|
peer = self.network.peerPool.acquireNoWait()
|
|
let value =
|
|
when E.K is Nothing:
|
|
await E.doRequest(peer)
|
|
else:
|
|
await E.doRequest(peer, key)
|
|
if value.isOk:
|
|
var applyReward = false
|
|
for val in value.get().values:
|
|
let res = await self.valueVerifier(E)(val)
|
|
if res.isErr:
|
|
case res.error
|
|
of VerifierError.MissingParent:
|
|
# Stop, requires different request to progress
|
|
return didProgress
|
|
of VerifierError.Duplicate:
|
|
# Ignore, a concurrent request may have already fulfilled this
|
|
when E.V is altair.LightClientBootstrap:
|
|
didProgress = true
|
|
else:
|
|
discard
|
|
of VerifierError.UnviableFork:
|
|
# Descore, peer is on an incompatible fork version
|
|
notice "Received value from an unviable fork", value = val.shortLog,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
peer.updateScore(PeerScoreUnviableFork)
|
|
return didProgress
|
|
of VerifierError.Invalid:
|
|
# Descore, received data is malformed
|
|
warn "Received invalid value", value = val.shortLog,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
peer.updateScore(PeerScoreBadValues)
|
|
return didProgress
|
|
else:
|
|
# Reward, peer returned something useful
|
|
applyReward = true
|
|
didProgress = true
|
|
if applyReward:
|
|
peer.updateScore(PeerScoreGoodValues)
|
|
else:
|
|
peer.updateScore(PeerScoreNoValues)
|
|
debug "Failed to receive value on request", value,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
except ResponseError as exc:
|
|
warn "Received invalid response", error = exc.msg,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
peer.updateScore(PeerScoreBadValues)
|
|
except CancelledError as exc:
|
|
raise exc
|
|
except PeerPoolError as exc:
|
|
debug "Failed to acquire peer", exc = exc.msg
|
|
except CatchableError as exc:
|
|
if peer != nil:
|
|
peer.updateScore(PeerScoreNoValues)
|
|
debug "Unexpected exception while receiving value", exc = exc.msg,
|
|
endpoint = E.name, peer, peer_score = peer.getScore()
|
|
raise exc
|
|
finally:
|
|
if peer != nil:
|
|
self.network.peerPool.release(peer)
|
|
return didProgress
|
|
|
|
proc query[E](
|
|
self: LightClientManager,
|
|
e: typedesc[E],
|
|
key: E.K
|
|
): Future[bool] {.async.} =
|
|
const PARALLEL_REQUESTS = 2
|
|
var workers: array[PARALLEL_REQUESTS, Future[bool]]
|
|
|
|
let
|
|
progressFut = newFuture[void]("lcmanProgress")
|
|
doneFut = newFuture[void]("lcmanDone")
|
|
var
|
|
numCompleted = 0
|
|
maxCompleted = workers.len
|
|
|
|
proc handleFinishedWorker(future: pointer) =
|
|
try:
|
|
let didProgress = cast[Future[bool]](future).read()
|
|
if didProgress and not progressFut.finished:
|
|
progressFut.complete()
|
|
except CancelledError as exc:
|
|
if not progressFut.finished:
|
|
progressFut.cancel()
|
|
except CatchableError as exc:
|
|
discard
|
|
finally:
|
|
inc numCompleted
|
|
if numCompleted == maxCompleted:
|
|
doneFut.complete()
|
|
|
|
try:
|
|
# Start concurrent workers
|
|
for i in 0 ..< workers.len:
|
|
try:
|
|
workers[i] = self.workerTask(e, key)
|
|
workers[i].addCallback(handleFinishedWorker)
|
|
except CancelledError as exc:
|
|
raise exc
|
|
except CatchableError as exc:
|
|
workers[i] = newFuture[bool]()
|
|
workers[i].complete(false)
|
|
|
|
# Wait for any worker to report progress, or for all workers to finish
|
|
discard await race(progressFut, doneFut)
|
|
finally:
|
|
for i in 0 ..< maxCompleted:
|
|
if workers[i] == nil:
|
|
maxCompleted = i
|
|
if numCompleted == maxCompleted:
|
|
doneFut.complete()
|
|
break
|
|
if not workers[i].finished:
|
|
workers[i].cancel()
|
|
while true:
|
|
try:
|
|
await allFutures(workers[0 ..< maxCompleted])
|
|
break
|
|
except CancelledError as exc:
|
|
continue
|
|
while true:
|
|
try:
|
|
await doneFut
|
|
break
|
|
except CancelledError as exc:
|
|
continue
|
|
|
|
if not progressFut.finished:
|
|
progressFut.cancel()
|
|
return progressFut.completed
|
|
|
|
template query(
|
|
self: LightClientManager,
|
|
e: typedesc[UpdatesByRange],
|
|
key: SyncCommitteePeriod
|
|
): Future[bool] =
|
|
self.query(e, key .. key)
|
|
|
|
template query[E](
|
|
self: LightClientManager,
|
|
e: typedesc[E]
|
|
): Future[bool] =
|
|
self.query(e, Nothing())
|
|
|
|
type SchedulingMode = enum
|
|
Soon,
|
|
CurrentPeriod,
|
|
NextPeriod
|
|
|
|
func fetchTime(
|
|
self: LightClientManager,
|
|
wallTime: BeaconTime,
|
|
schedulingMode: SchedulingMode
|
|
): BeaconTime =
|
|
let
|
|
remainingTime =
|
|
case schedulingMode:
|
|
of Soon:
|
|
chronos.seconds(0)
|
|
of CurrentPeriod:
|
|
let
|
|
wallPeriod = wallTime.slotOrZero().sync_committee_period
|
|
deadlineSlot = (wallPeriod + 1).start_slot - 1
|
|
deadline = deadlineSlot.start_beacon_time()
|
|
chronos.nanoseconds((deadline - wallTime).nanoseconds)
|
|
of NextPeriod:
|
|
chronos.seconds(
|
|
(SLOTS_PER_SYNC_COMMITTEE_PERIOD * SECONDS_PER_SLOT).int64)
|
|
minDelay = max(remainingTime div 8, chronos.seconds(10))
|
|
jitterSeconds = (minDelay * 2).seconds
|
|
jitterDelay = chronos.seconds(self.rng[].rand(jitterSeconds).int64)
|
|
return wallTime + minDelay + jitterDelay
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-alpha.0/specs/altair/light-client/light-client.md#light-client-sync-process
|
|
proc loop(self: LightClientManager) {.async.} =
|
|
var nextFetchTime = self.getBeaconTime()
|
|
while true:
|
|
# Periodically wake and check for changes
|
|
let wallTime = self.getBeaconTime()
|
|
if wallTime < nextFetchTime or
|
|
self.network.peerPool.lenAvailable < 1:
|
|
await sleepAsync(chronos.seconds(2))
|
|
continue
|
|
|
|
# Obtain bootstrap data once a trusted block root is supplied
|
|
if not self.isLightClientStoreInitialized():
|
|
let trustedBlockRoot = self.getTrustedBlockRoot()
|
|
if trustedBlockRoot.isNone:
|
|
await sleepAsync(chronos.seconds(2))
|
|
continue
|
|
|
|
let didProgress = await self.query(Bootstrap, trustedBlockRoot.get)
|
|
if not didProgress:
|
|
nextFetchTime = self.fetchTime(wallTime, Soon)
|
|
continue
|
|
|
|
# Fetch updates
|
|
var allowWaitNextPeriod = false
|
|
let
|
|
finalized = self.getFinalizedPeriod()
|
|
optimistic = self.getOptimisticPeriod()
|
|
current = wallTime.slotOrZero().sync_committee_period
|
|
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
|
|
|
|
didProgress =
|
|
if finalized == optimistic and not isNextSyncCommitteeKnown:
|
|
if finalized >= current:
|
|
await self.query(UpdatesByRange, finalized)
|
|
else:
|
|
await self.query(UpdatesByRange, finalized ..< current)
|
|
elif finalized + 1 < current:
|
|
await self.query(UpdatesByRange, finalized + 1 ..< current)
|
|
elif finalized != optimistic:
|
|
await self.query(FinalityUpdate)
|
|
else:
|
|
allowWaitNextPeriod = true
|
|
await self.query(OptimisticUpdate)
|
|
|
|
schedulingMode =
|
|
if not didProgress or not self.isGossipSupported(current):
|
|
Soon
|
|
elif not allowWaitNextPeriod:
|
|
CurrentPeriod
|
|
else:
|
|
NextPeriod
|
|
|
|
nextFetchTime = self.fetchTime(wallTime, schedulingMode)
|
|
|
|
proc start*(self: var LightClientManager) =
|
|
## Start light client manager's loop.
|
|
doAssert self.loopFuture == nil
|
|
self.loopFuture = self.loop()
|
|
|
|
proc stop*(self: var LightClientManager) {.async.} =
|
|
## Stop light client manager's loop.
|
|
if self.loopFuture != nil:
|
|
await self.loopFuture.cancelAndWait()
|
|
self.loopFuture = nil
|