Update the portal beacon lc manager (#1770)
Similar changes sd done on the libp2p version. To be able to have same improvements and reuse more of the lc sync helpers code.
This commit is contained in:
parent
cc7a7db74b
commit
c405f79a84
|
@ -14,6 +14,7 @@ import
|
|||
beacon_chain/spec/datatypes/[phase0, altair, bellatrix, capella, deneb],
|
||||
beacon_chain/spec/[forks_light_client, digest],
|
||||
beacon_chain/beacon_clock,
|
||||
beacon_chain/sync/light_client_sync_helpers,
|
||||
"."/[beacon_light_client_network, beacon_light_client_content]
|
||||
|
||||
from beacon_chain/consensus_object_pools/block_pools_types import VerifierError
|
||||
|
@ -23,6 +24,7 @@ logScope:
|
|||
|
||||
type
|
||||
Nothing = object
|
||||
ResponseError = object of CatchableError
|
||||
SlotInfo = object
|
||||
finalizedSlot: Slot
|
||||
optimisticSlot: Slot
|
||||
|
@ -33,7 +35,9 @@ type
|
|||
Bootstrap =
|
||||
Endpoint[Eth2Digest, ForkedLightClientBootstrap]
|
||||
UpdatesByRange =
|
||||
Endpoint[Slice[SyncCommitteePeriod], ForkedLightClientUpdate]
|
||||
Endpoint[
|
||||
tuple[startPeriod: SyncCommitteePeriod, count: uint64],
|
||||
ForkedLightClientUpdate]
|
||||
FinalityUpdate =
|
||||
Endpoint[SlotInfo, ForkedLightClientFinalityUpdate]
|
||||
OptimisticUpdate =
|
||||
|
@ -109,23 +113,7 @@ proc getFinalizedPeriod(self: LightClientManager): SyncCommitteePeriod =
|
|||
proc getOptimisticPeriod(self: LightClientManager): SyncCommitteePeriod =
|
||||
self.getOptimisticSlot().sync_committee_period
|
||||
|
||||
proc isGossipSupported*(
|
||||
self: LightClientManager,
|
||||
period: SyncCommitteePeriod
|
||||
): bool =
|
||||
## Indicate whether the light client is sufficiently synced to accept gossip.
|
||||
if not self.isLightClientStoreInitialized():
|
||||
return false
|
||||
|
||||
let
|
||||
finalizedPeriod = self.getFinalizedPeriod()
|
||||
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
|
||||
if isNextSyncCommitteeKnown:
|
||||
period <= finalizedPeriod + 1
|
||||
else:
|
||||
period <= finalizedPeriod
|
||||
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap
|
||||
proc doRequest(
|
||||
e: typedesc[Bootstrap],
|
||||
n: LightClientNetwork,
|
||||
|
@ -133,22 +121,24 @@ proc doRequest(
|
|||
): Future[NetRes[ForkedLightClientBootstrap]] =
|
||||
n.getLightClientBootstrap(blockRoot)
|
||||
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
|
||||
type LightClientUpdatesByRangeResponse = NetRes[ForkedLightClientUpdateList]
|
||||
proc doRequest(
|
||||
e: typedesc[UpdatesByRange],
|
||||
n: LightClientNetwork,
|
||||
periods: Slice[SyncCommitteePeriod]
|
||||
): Future[LightClientUpdatesByRangeResponse] =
|
||||
let
|
||||
startPeriod = periods.a
|
||||
reqCount = min(periods.len, MAX_REQUEST_LIGHT_CLIENT_UPDATES).uint64
|
||||
n.getLightClientUpdatesByRange(
|
||||
distinctBase(startPeriod),
|
||||
reqCount
|
||||
)
|
||||
key: tuple[startPeriod: SyncCommitteePeriod, count: uint64]
|
||||
): Future[LightClientUpdatesByRangeResponse] {.async.} =
|
||||
let (startPeriod, count) = key
|
||||
doAssert count > 0 and count <= MAX_REQUEST_LIGHT_CLIENT_UPDATES
|
||||
let response = await n.getLightClientUpdatesByRange(startPeriod, count)
|
||||
if response.isOk:
|
||||
let e = distinctBase(response.get)
|
||||
.checkLightClientUpdates(startPeriod, count)
|
||||
if e.isErr:
|
||||
raise newException(ResponseError, e.error)
|
||||
return response
|
||||
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
|
||||
proc doRequest(
|
||||
e: typedesc[FinalityUpdate],
|
||||
n: LightClientNetwork,
|
||||
|
@ -159,7 +149,7 @@ proc doRequest(
|
|||
distinctBase(slotInfo.optimisticSlot)
|
||||
)
|
||||
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
|
||||
proc doRequest(
|
||||
e: typedesc[OptimisticUpdate],
|
||||
n: LightClientNetwork,
|
||||
|
@ -237,7 +227,9 @@ proc workerTask[E](
|
|||
else:
|
||||
didProgress = true
|
||||
else:
|
||||
debug "Failed to receive value on request", value
|
||||
debug "Failed to receive value on request", value, endpoint = E.name
|
||||
except ResponseError as exc:
|
||||
warn "Received invalid response", error = exc.msg, endpoint = E.name
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
|
@ -251,15 +243,16 @@ proc query[E](
|
|||
e: typedesc[E],
|
||||
key: E.K
|
||||
): Future[bool] =
|
||||
# TODO Consider making few requests concurrently
|
||||
return self.workerTask(e, key)
|
||||
|
||||
template query(
|
||||
self: LightClientManager,
|
||||
e: typedesc[UpdatesByRange],
|
||||
key: SyncCommitteePeriod
|
||||
): Future[bool] =
|
||||
self.query(e, key .. key)
|
||||
# Note:
|
||||
# The libp2p version does concurrent requests here. But it seems to be done
|
||||
# for the same key and thus as redundant request to avoid waiting on a not
|
||||
# responding peer.
|
||||
# In Portal this is already build into the lookups and thus not really
|
||||
# needed. On difference important is that the lookup concurrent requests are
|
||||
# already getting canceled when 1 peer returns the content but before the
|
||||
# content gets validated. This is improvement to do for all Portal content
|
||||
# requests however, see: https://github.com/status-im/nimbus-eth1/issues/1769
|
||||
self.workerTask(e, key)
|
||||
|
||||
template query[E](
|
||||
self: LightClientManager,
|
||||
|
@ -296,13 +289,13 @@ func fetchTime(
|
|||
jitterDelay = chronos.seconds(self.rng[].rand(jitterSeconds).int64)
|
||||
return wallTime + minDelay + jitterDelay
|
||||
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/light-client.md#light-client-sync-process
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/light-client.md#light-client-sync-process
|
||||
proc loop(self: LightClientManager) {.async.} =
|
||||
var nextFetchTime = self.getBeaconTime()
|
||||
var nextSyncTaskTime = self.getBeaconTime()
|
||||
while true:
|
||||
# Periodically wake and check for changes
|
||||
let wallTime = self.getBeaconTime()
|
||||
if wallTime < nextFetchTime:
|
||||
if wallTime < nextSyncTaskTime:
|
||||
await sleepAsync(chronos.seconds(2))
|
||||
continue
|
||||
|
||||
|
@ -314,46 +307,46 @@ proc loop(self: LightClientManager) {.async.} =
|
|||
continue
|
||||
|
||||
let didProgress = await self.query(Bootstrap, trustedBlockRoot.get)
|
||||
if not didProgress:
|
||||
nextFetchTime = self.fetchTime(wallTime, Soon)
|
||||
nextSyncTaskTime =
|
||||
if didProgress:
|
||||
wallTime
|
||||
else:
|
||||
wallTime + self.rng.computeDelayWithJitter(chronos.seconds(0))
|
||||
continue
|
||||
|
||||
# Fetch updates
|
||||
var allowWaitNextPeriod = false
|
||||
let
|
||||
current = wallTime.slotOrZero().sync_committee_period
|
||||
|
||||
syncTask = nextLightClientSyncTask(
|
||||
current = current,
|
||||
finalized = self.getFinalizedPeriod(),
|
||||
optimistic = self.getOptimisticPeriod(),
|
||||
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown())
|
||||
|
||||
finalizedSlot = self.getFinalizedSlot()
|
||||
optimisticSlot = self.getOptimisticSlot()
|
||||
finalized = finalizedSlot.sync_committee_period
|
||||
optimistic = optimisticSlot.sync_committee_period
|
||||
current = wallTime.slotOrZero().sync_committee_period
|
||||
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()
|
||||
|
||||
didProgress =
|
||||
if finalized == optimistic and not isNextSyncCommitteeKnown:
|
||||
if finalized >= current:
|
||||
await self.query(UpdatesByRange, finalized)
|
||||
else:
|
||||
await self.query(UpdatesByRange, finalized ..< current)
|
||||
elif finalized + 1 < current:
|
||||
await self.query(UpdatesByRange, finalized + 1 ..< current)
|
||||
elif finalized != optimistic:
|
||||
case syncTask.kind
|
||||
of LcSyncKind.UpdatesByRange:
|
||||
discard
|
||||
await self.query(UpdatesByRange,
|
||||
(startPeriod: syncTask.startPeriod, count: syncTask.count))
|
||||
of LcSyncKind.FinalityUpdate:
|
||||
await self.query(FinalityUpdate, SlotInfo(
|
||||
finalizedSlot: finalizedSlot,
|
||||
optimisticSlot: optimisticSlot
|
||||
))
|
||||
else:
|
||||
allowWaitNextPeriod = true
|
||||
of LcSyncKind.OptimisticUpdate:
|
||||
await self.query(OptimisticUpdate, optimisticSlot)
|
||||
|
||||
schedulingMode =
|
||||
if not didProgress or not self.isGossipSupported(current):
|
||||
Soon
|
||||
elif not allowWaitNextPeriod:
|
||||
CurrentPeriod
|
||||
else:
|
||||
NextPeriod
|
||||
|
||||
nextFetchTime = self.fetchTime(wallTime, schedulingMode)
|
||||
nextSyncTaskTime = wallTime + self.rng.nextLcSyncTaskDelay(
|
||||
wallTime,
|
||||
finalized = self.getFinalizedPeriod(),
|
||||
optimistic = self.getOptimisticPeriod(),
|
||||
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(),
|
||||
didLatestSyncTaskProgress = didProgress)
|
||||
|
||||
proc start*(self: var LightClientManager) =
|
||||
## Start light client manager's loop.
|
||||
|
|
|
@ -59,7 +59,8 @@ proc getLightClientBootstrap*(
|
|||
|
||||
let
|
||||
bootstrap = bootstrapContentLookup.unsafeGet()
|
||||
decodingResult = decodeLightClientBootstrapForked(l.forkDigests, bootstrap.content)
|
||||
decodingResult = decodeLightClientBootstrapForked(
|
||||
l.forkDigests, bootstrap.content)
|
||||
|
||||
if decodingResult.isErr:
|
||||
return Opt.none(ForkedLightClientBootstrap)
|
||||
|
@ -70,11 +71,12 @@ proc getLightClientBootstrap*(
|
|||
|
||||
proc getLightClientUpdatesByRange*(
|
||||
l: LightClientNetwork,
|
||||
startPeriod: uint64,
|
||||
startPeriod: SyncCommitteePeriod,
|
||||
count: uint64):
|
||||
Future[results.Opt[ForkedLightClientUpdateList]] {.async.} =
|
||||
let
|
||||
bk = LightClientUpdateKey(startPeriod: startPeriod, count: count)
|
||||
bk = LightClientUpdateKey(
|
||||
startPeriod: distinctBase(startPeriod), count: count)
|
||||
ck = ContentKey(
|
||||
contentType: lightClientUpdate,
|
||||
lightClientUpdateKey: bk
|
||||
|
@ -107,13 +109,13 @@ proc getUpdate(
|
|||
let
|
||||
keyEncoded = encode(ck)
|
||||
contentID = toContentId(keyEncoded)
|
||||
updateLooukup = await l.portalProtocol.contentLookup(keyEncoded, contentId)
|
||||
updateLookup = await l.portalProtocol.contentLookup(keyEncoded, contentId)
|
||||
|
||||
if updateLooukup.isNone():
|
||||
if updateLookup.isNone():
|
||||
warn "Failed fetching update from the network", contentKey = keyEncoded
|
||||
return Opt.none(seq[byte])
|
||||
|
||||
return ok(updateLooukup.get().content)
|
||||
return ok(updateLookup.get().content)
|
||||
|
||||
# TODO:
|
||||
# Currently both getLightClientFinalityUpdate and getLightClientOptimisticUpdate
|
||||
|
@ -214,7 +216,6 @@ proc validateContent(
|
|||
return false
|
||||
|
||||
let contentId = contentIdOpt.get()
|
||||
|
||||
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
|
||||
|
||||
info "Received offered content validated successfully", contentKey
|
||||
|
|
|
@ -181,7 +181,7 @@ procSuite "Beacon Light Client Content Network":
|
|||
|
||||
let updatesResult =
|
||||
await lcNode1.lightClientNetwork.getLightClientUpdatesByRange(
|
||||
startPeriod.uint64,
|
||||
startPeriod,
|
||||
uint64(2)
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue