persist LC data across restarts (#3823)

* persist LC data across restarts

With the Altair spec `LightClientUpdate` structure taking its final form
it is finally possible to persist LC data across restarts without having
to worry about data migration due to spec changes. A separate `lcdataV1`
database is created in the `caches` subdirectory to hold known LC data.
A full database with default settings (129 periods) uses <15 MB disk.

* extend LC data DB rationale

* wording

* add `isSupportedBySQLite` helper and explicit return

* remove redundant `return`
This commit is contained in:
Etan Kissling 2022-06-30 15:04:39 +02:00 committed by GitHub
parent 24c435abae
commit 499abd927f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 482 additions and 81 deletions

View File

@ -1056,6 +1056,9 @@ func outWalletFile*(config: BeaconNodeConf): Option[OutFile] =
func databaseDir*(config: AnyConf): string =
config.dataDir / "db"
func cachesDir*(config: AnyConf): string =
config.databaseDir / "caches"
func runAsService*(config: BeaconNodeConf): bool =
config.cmd == noCommand and config.runAsServiceFlag

View File

@ -15,6 +15,7 @@ import
stew/bitops2,
# Beacon chain internals
../spec/datatypes/altair,
../light_client_data_db,
./block_dag
type
@ -49,15 +50,6 @@ type
## Key is the block ID of which the post state was used to get the data.
## Data stored for the finalized head block and all non-finalized blocks.
currentBranches*: Table[Slot, altair.CurrentSyncCommitteeBranch]
## Cached data for creating future `LightClientBootstrap` instances.
## Key is the block slot of which the post state was used to get the data.
## Data stored for all finalized epoch boundary blocks.
bestUpdates*: Table[SyncCommitteePeriod, altair.LightClientUpdate]
## Stores the `LightClientUpdate` with the most `sync_committee_bits` per
## `SyncCommitteePeriod`. Sync committee finality gives precedence.
pendingBest*:
Table[(SyncCommitteePeriod, Eth2Digest), altair.LightClientUpdate]
## Same as `bestUpdates`, but for `SyncCommitteePeriod` with not yet
@ -72,6 +64,8 @@ type
## The earliest slot for which light client data is imported.
LightClientDataConfig* = object
dbDir*: Option[string]
## Directory to store light client data DB in
serve*: bool
## Whether to make local light client data available or not
importMode*: LightClientDataImportMode
@ -89,6 +83,8 @@ type
cache*: LightClientDataCache
## Cached data to accelerate creating light client data
db*: LightClientDataDB
## Persistent light client data to avoid expensive recomputations
# -----------------------------------
# Config

View File

@ -581,6 +581,7 @@ proc updateBeaconMetrics(
import blockchain_dag_light_client
export
blockchain_dag_light_client.closeLightClientDataStore,
blockchain_dag_light_client.getLightClientBootstrap,
blockchain_dag_light_client.getLightClientUpdateForPeriod,
blockchain_dag_light_client.getLightClientFinalityUpdate,

View File

@ -15,6 +15,7 @@ import
stew/[bitops2, objects],
# Beacon chain internals
../spec/datatypes/[phase0, altair, bellatrix],
../light_client_data_db,
"."/[block_pools_types, blockchain_dag]
logScope: topics = "chaindag"
@ -125,13 +126,29 @@ proc syncCommitteeRootForPeriod(
proc initLightClientDataStore*(
config: LightClientDataConfig, cfg: RuntimeConfig): LightClientDataStore =
## Initialize light client data store.
LightClientDataStore(
var lcDataStore = LightClientDataStore(
serve: config.serve,
importMode: config.importMode,
maxPeriods: config.maxPeriods.get(cfg.defaultLightClientDataMaxPeriods),
onLightClientFinalityUpdate: config.onLightClientFinalityUpdate,
onLightClientOptimisticUpdate: config.onLightClientOptimisticUpdate)
if config.serve or config.importMode != LightClientDataImportMode.None:
lcDataStore.db =
if config.dbDir.isSome:
initLightClientDataDB(config.dbDir.get, inMemory = false).valueOr:
warn "Falling back to in-memory LC data DB"
initLightClientDataDB(config.dbDir.get, inMemory = true).expect(
"In-memory LC data DB expected to succeed")
else:
initLightClientDataDB(".", inMemory = true).expect(
"In-memory LC data DB expected to succeed")
lcDataStore
proc closeLightClientDataStore*(dag: ChainDAGRef) =
dag.lcDataStore.db.close()
func targetLightClientTailSlot(dag: ChainDAGRef): Slot =
## Earliest slot for which light client data is retained.
let
@ -151,11 +168,13 @@ func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) =
proc initLightClientBootstrapForPeriod(
dag: ChainDAGRef,
period: SyncCommitteePeriod) =
period: SyncCommitteePeriod): Opt[void] =
## Compute and cache `LightClientBootstrap` data for all finalized
## epoch boundary blocks within a given sync committee period.
if not dag.isNextSyncCommitteeFinalized(period):
return
return ok()
if dag.lcDataStore.db.isPeriodSealed(period):
return ok()
let startTick = Moment.now()
debug "Caching historic LC bootstrap data", period
@ -172,6 +191,7 @@ proc initLightClientBootstrapForPeriod(
lowBoundarySlot = lowSlot.nextEpochBoundarySlot
highBoundarySlot = highSlot.nextEpochBoundarySlot
var
res = ok()
tmpState = assignClone(dag.headState)
tmpCache: StateCache
nextBoundarySlot = lowBoundarySlot
@ -180,30 +200,33 @@ proc initLightClientBootstrapForPeriod(
let
bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr:
dag.handleUnexpectedLightClientError(nextBoundarySlot)
res.err()
continue
bid = bsi.bid
boundarySlot = bid.slot.nextEpochBoundarySlot
if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and
not dag.lcDataStore.cache.currentBranches.hasKey(bid.slot):
not dag.lcDataStore.db.hasCurrentSyncCommitteeBranch(bid.slot):
if not dag.updateExistingState(
tmpState[], bid.atSlot, save = false, tmpCache):
dag.handleUnexpectedLightClientError(bid.slot)
res.err()
continue
let branch = withState(tmpState[]):
when stateFork >= BeaconStateFork.Altair:
state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get
else: raiseAssert "Unreachable"
dag.lcDataStore.cache.currentBranches[bid.slot] = branch
dag.lcDataStore.db.putCurrentSyncCommitteeBranch(bid.slot, branch)
res
proc initLightClientUpdateForPeriod(
dag: ChainDAGRef, period: SyncCommitteePeriod) =
dag: ChainDAGRef, period: SyncCommitteePeriod): Opt[void] =
## Compute and cache the best `LightClientUpdate` within a given
## sync committee period up through the finalized head block.
## Non-finalized blocks are processed incrementally.
## Non-finalized blocks are processed incrementally by other functions.
if not dag.isNextSyncCommitteeFinalized(period):
return
if dag.lcDataStore.cache.bestUpdates.hasKey(period):
return
return ok()
if dag.lcDataStore.db.isPeriodSealed(period):
return ok()
let startTick = Moment.now()
debug "Computing best historic LC update", period
@ -212,35 +235,35 @@ proc initLightClientUpdateForPeriod(
# replicated on every `return`, and the log statement allocates another
# copy of the arguments on the stack for each instantiation (~1 MB stack!)
debug "Best historic LC update computed",
period, update = dag.lcDataStore.cache.bestUpdates.getOrDefault(period),
period, update = dag.lcDataStore.db.getBestUpdate(period),
computeDur = endTick - startTick
defer: logBest()
proc maxParticipantsBlock(
dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot
): tuple[bid: Opt[BlockId], ok: bool] =
): tuple[bid: Opt[BlockId], res: Opt[void]] =
## Determine the earliest block with most sync committee signatures among
## ancestors of `highBid` with at least `lowSlot` as parent block slot.
## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists.
## `bool` in result indicates whether no unexpected errors occurred.
## `res` in result indicates whether no unexpected errors occurred.
var
maxParticipants = MIN_SYNC_COMMITTEE_PARTICIPANTS
maxBid: Opt[BlockId]
allOk = true
res = Opt[void].ok()
bid = highBid
while true:
if bid.slot <= lowSlot:
break
let parentBid = dag.existingParent(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
allOk = false
res.err()
break
if parentBid.slot < lowSlot:
break
let
bdata = dag.getExistingForkedBlock(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
allOk = false
res.err()
break
numParticipants =
withBlck(bdata):
@ -251,7 +274,7 @@ proc initLightClientUpdateForPeriod(
maxParticipants = numParticipants
maxBid.ok bid
bid = parentBid
(bid: maxBid, ok: allOk)
(bid: maxBid, res: res)
# Determine the block in the period with highest sync committee participation
let
@ -259,22 +282,26 @@ proc initLightClientUpdateForPeriod(
periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1
lowSlot = max(periodStartSlot, dag.targetLightClientTailSlot)
highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot)
fullPeriodCovered = (dag.finalizedHead.slot > periodEndSlot)
highBsi = dag.getExistingBlockIdAtSlot(highSlot).valueOr:
dag.handleUnexpectedLightClientError(highSlot)
return
return err()
highBid = highBsi.bid
maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot)
maxParticipantsBid = maxParticipantsRes.bid.valueOr:
if maxParticipantsRes.ok: # No single valid block exists in the period
dag.lcDataStore.cache.bestUpdates[period] =
default(altair.LightClientUpdate)
return
const update = default(altair.LightClientUpdate)
if fullPeriodCovered and maxParticipantsRes.res.isOk: # No block in period
dag.lcDataStore.db.putBestUpdate(period, update)
else:
dag.lcDataStore.db.putUpdateIfBetter(period, update)
return maxParticipantsRes.res
# The block with highest participation may refer to a `finalized_checkpoint`
# in a different sync committee period. If that is the case, search for a
# later block with a `finalized_checkpoint` within the given sync committee
# period, despite it having a lower sync committee participation
var
res = ok()
tmpState = assignClone(dag.headState)
signatureBid {.noinit.}, finalizedBid {.noinit.}: BlockId
signatureBid.slot = FAR_FUTURE_SLOT
@ -286,12 +313,15 @@ proc initLightClientUpdateForPeriod(
let
nextLowSlot = signatureBid.slot + 1
signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot)
if signatureRes.res.isErr:
res.err()
signatureBid = signatureRes.bid.valueOr:
signatureBid = maxParticipantsBid
break
let
attestedBid = dag.existingParent(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
res.err()
continue
finalizedEpoch = block:
dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
@ -301,12 +331,14 @@ proc initLightClientUpdateForPeriod(
else: raiseAssert "Unreachable"
do:
dag.handleUnexpectedLightClientError(attestedBid.slot)
res.err()
continue
finalizedSlot = finalizedEpoch.start_slot
finalizedBsi =
if finalizedSlot >= dag.tail.slot:
dag.getExistingBlockIdAtSlot(finalizedSlot).valueOr:
dag.handleUnexpectedLightClientError(finalizedSlot)
res.err()
continue
else:
continue
@ -320,11 +352,11 @@ proc initLightClientUpdateForPeriod(
var update {.noinit.}: altair.LightClientUpdate
let attestedBid = dag.existingParent(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
return
return err()
dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
let bdata = dag.getExistingForkedBlock(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
return
return err()
withStateAndBlck(state, bdata):
when stateFork >= BeaconStateFork.Altair:
update.attested_header = blck.toBeaconBlockHeader()
@ -339,24 +371,44 @@ proc initLightClientUpdateForPeriod(
else: raiseAssert "Unreachable"
do:
dag.handleUnexpectedLightClientError(attestedBid.slot)
return
return err()
if finalizedBid.slot == FAR_FUTURE_SLOT or finalizedBid.slot == GENESIS_SLOT:
update.finalized_header.reset()
else:
let bdata = dag.getExistingForkedBlock(finalizedBid).valueOr:
dag.handleUnexpectedLightClientError(finalizedBid.slot)
return
return err()
withBlck(bdata):
update.finalized_header = blck.toBeaconBlockHeader()
let bdata = dag.getExistingForkedBlock(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
return
return err()
withBlck(bdata):
when stateFork >= BeaconStateFork.Altair:
update.sync_aggregate = blck.asSigned().message.body.sync_aggregate
else: raiseAssert "Unreachable"
update.signature_slot = signatureBid.slot
dag.lcDataStore.cache.bestUpdates[period] = update
if fullPeriodCovered and res.isOk:
dag.lcDataStore.db.putBestUpdate(period, update)
else:
dag.lcDataStore.db.putUpdateIfBetter(period, update)
res
proc initLightClientDataForPeriod(
dag: ChainDAGRef, period: SyncCommitteePeriod): Opt[void] =
## Import light client data for a given sync committee period.
if dag.lcDataStore.db.isPeriodSealed(period):
return ok()
let
fullPeriodCovered = (dag.finalizedHead.slot >= (period + 1).start_slot)
res1 = dag.initLightClientBootstrapForPeriod(period)
res2 = dag.initLightClientUpdateForPeriod(period)
if res1.isErr or res2.isErr:
return err()
if fullPeriodCovered:
dag.lcDataStore.db.sealPeriod(period)
ok()
proc getLightClientData(
dag: ChainDAGRef,
@ -503,7 +555,7 @@ proc createLightClientUpdates(
let isCommitteeFinalized = dag.isNextSyncCommitteeFinalized(attested_period)
var best =
if isCommitteeFinalized:
dag.lcDataStore.cache.bestUpdates.getOrDefault(attested_period)
dag.lcDataStore.db.getBestUpdate(attested_period)
else:
let key = (attested_period, state.syncCommitteeRoot)
dag.lcDataStore.cache.pendingBest.getOrDefault(key)
@ -539,7 +591,7 @@ proc createLightClientUpdates(
best.signature_slot = signature_slot
if isCommitteeFinalized:
dag.lcDataStore.cache.bestUpdates[attested_period] = best
dag.lcDataStore.db.putBestUpdate(attested_period, best)
debug "Best LC update improved", period = attested_period, update = best
else:
let key = (attested_period, state.syncCommitteeRoot)
@ -556,6 +608,9 @@ proc initLightClientDataCache*(dag: ChainDAGRef) =
if dag.lcDataStore.importMode == LightClientDataImportMode.None:
return
# Prune non-finalized data
dag.lcDataStore.db.delPeriodsFrom(dag.firstNonFinalizedPeriod)
# Initialize tail slot
let targetTailSlot = dag.targetLightClientTailSlot
dag.lcDataStore.cache.tailSlot = max(dag.head.slot, targetTailSlot)
@ -576,12 +631,11 @@ proc initLightClientDataCache*(dag: ChainDAGRef) =
return
dag.lcDataStore.cache.tailSlot = finalizedSlot
let finalizedPeriod = finalizedSlot.sync_committee_period
dag.initLightClientBootstrapForPeriod(finalizedPeriod)
dag.initLightClientUpdateForPeriod(finalizedPeriod)
var res = dag.initLightClientDataForPeriod(finalizedPeriod)
let lightClientStartTick = Moment.now()
logScope: lightClientDataMaxPeriods = dag.lcDataStore.maxPeriods
debug "Initializing cached LC data"
debug "Initializing cached LC data", res
# Build list of block to process.
# As it is slow to load states in descending order,
@ -593,6 +647,7 @@ proc initLightClientDataCache*(dag: ChainDAGRef) =
blocks.add bid
bid = dag.existingParent(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
res.err()
break
if bid.slot == finalizedSlot:
blocks.add bid
@ -608,9 +663,11 @@ proc initLightClientDataCache*(dag: ChainDAGRef) =
if not dag.updateExistingState(
dag.headState, bid.atSlot, save = false, cache):
dag.handleUnexpectedLightClientError(bid.slot)
res.err()
continue
let bdata = dag.getExistingForkedBlock(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
res.err()
continue
withStateAndBlck(dag.headState, bdata):
when stateFork >= BeaconStateFork.Altair:
@ -625,7 +682,9 @@ proc initLightClientDataCache*(dag: ChainDAGRef) =
let lightClientEndTick = Moment.now()
debug "Initialized cached LC data",
initDur = lightClientEndTick - lightClientStartTick
initDur = lightClientEndTick - lightClientStartTick, res
if res.isErr:
return
if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand:
return
@ -637,17 +696,12 @@ proc initLightClientDataCache*(dag: ChainDAGRef) =
# https://github.com/nim-lang/Nim/pull/19926
var period = finalizedPeriod - 1
while period >= targetTailPeriod:
dag.initLightClientBootstrapForPeriod(period)
dag.initLightClientUpdateForPeriod(period)
if dag.initLightClientDataForPeriod(period).isErr:
res.err()
if period <= targetTailPeriod:
break
dec period
if dag.lcDataStore.cache.tailSlot == targetTailSlot:
debug "Historic LC data imported"
else:
# `handleUnexpectedLightClientError` updates `tailSlot`
warn "Error while importing historic LC data",
errorSlot = dag.lcDataStore.cache.tailSlot - 1, targetTailSlot
debug "Historic LC data imported", res
proc processNewBlockForLightClient*(
dag: ChainDAGRef,
@ -695,13 +749,13 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) =
dag.handleUnexpectedLightClientError(period.start_slot)
continue
key = (period, syncCommitteeRoot)
dag.lcDataStore.cache.bestUpdates[period] =
dag.lcDataStore.cache.pendingBest.getOrDefault(key)
dag.lcDataStore.db.putBestUpdate(
period, dag.lcDataStore.cache.pendingBest.getOrDefault(key))
withState(dag.headState): # Common case separate to avoid `tmpState` copy
when stateFork >= BeaconStateFork.Altair:
let key = (headPeriod, state.syncCommitteeRoot)
dag.lcDataStore.cache.bestUpdates[headPeriod] =
dag.lcDataStore.cache.pendingBest.getOrDefault(key)
dag.lcDataStore.db.putBestUpdate(
headPeriod, dag.lcDataStore.cache.pendingBest.getOrDefault(key))
else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair
proc processFinalizationForLightClient*(
@ -727,13 +781,25 @@ proc processFinalizationForLightClient*(
break
bid = bsi.bid
if bid.slot >= lowSlot:
dag.lcDataStore.cache.currentBranches[bid.slot] =
dag.getLightClientData(bid).current_sync_committee_branch
dag.lcDataStore.db.putCurrentSyncCommitteeBranch(
bid.slot, dag.getLightClientData(bid).current_sync_committee_branch)
boundarySlot = bid.slot.nextEpochBoundarySlot
if boundarySlot < SLOTS_PER_EPOCH:
break
boundarySlot -= SLOTS_PER_EPOCH
# Seal sync committee periods for which data can no longer improve further
let
oldFinalizedPeriod = oldFinalizedHead.slot.sync_committee_period
newFinalizedPeriod = dag.finalizedHead.slot.sync_committee_period
if newFinalizedPeriod > oldFinalizedPeriod:
for period in countdown(newFinalizedPeriod - 1, oldFinalizedPeriod):
if dag.lcDataStore.cache.tailSlot > period.start_slot:
break
debug "Best LC update sealed",
period, update = dag.lcDataStore.db.getBestUpdate(period)
dag.lcDataStore.db.sealPeriod(period)
# Prune light client data that is no longer referrable by future updates
var bidsToDelete: seq[BlockId]
for bid, data in dag.lcDataStore.cache.data:
@ -743,25 +809,9 @@ proc processFinalizationForLightClient*(
for bid in bidsToDelete:
dag.lcDataStore.cache.data.del bid
let
targetTailSlot = dag.targetLightClientTailSlot
targetTailPeriod = targetTailSlot.sync_committee_period
# Prune bootstrap data that is no longer relevant
var slotsToDelete: seq[Slot]
for slot in dag.lcDataStore.cache.currentBranches.keys:
if slot < targetTailSlot:
slotsToDelete.add slot
for slot in slotsToDelete:
dag.lcDataStore.cache.currentBranches.del slot
# Prune best `LightClientUpdate` that are no longer relevant
var periodsToDelete: seq[SyncCommitteePeriod]
for period in dag.lcDataStore.cache.bestUpdates.keys:
if period < targetTailPeriod:
periodsToDelete.add period
for period in periodsToDelete:
dag.lcDataStore.cache.bestUpdates.del period
# Prune seal tracking data that is no longer relevant
let targetTailPeriod = dag.targetLightClientTailSlot.sync_committee_period
dag.lcDataStore.db.keepPeriodsFrom(targetTailPeriod)
# Prune best `LightClientUpdate` referring to non-finalized sync committees
# that are no longer relevant, i.e., orphaned or too old
@ -786,13 +836,13 @@ proc getLightClientBootstrap*(
withBlck(bdata):
let slot = blck.message.slot
when stateFork >= BeaconStateFork.Altair:
if slot < dag.lcDataStore.cache.tailSlot:
if slot < dag.targetLightClientTailSlot:
debug "LC bootstrap unavailable: Block too old", slot
return err()
if slot > dag.finalizedHead.blck.slot:
debug "LC bootstrap unavailable: Not finalized", blockRoot
return err()
var branch = dag.lcDataStore.cache.currentBranches.getOrDefault(slot)
var branch = dag.lcDataStore.db.getCurrentSyncCommitteeBranch(slot)
if branch.isZeroMemory:
if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand:
let bsi = ? dag.getExistingBlockIdAtSlot(slot)
@ -803,7 +853,7 @@ proc getLightClientBootstrap*(
state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get
else: raiseAssert "Unreachable"
do: return err()
dag.lcDataStore.cache.currentBranches[slot] = branch
dag.lcDataStore.db.putCurrentSyncCommitteeBranch(slot, branch)
else:
debug "LC bootstrap unavailable: Data not cached", slot
return err()
@ -829,8 +879,9 @@ proc getLightClientUpdateForPeriod*(
return
if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand:
dag.initLightClientUpdateForPeriod(period)
result = some(dag.lcDataStore.cache.bestUpdates.getOrDefault(period))
if dag.initLightClientUpdateForPeriod(period).isErr:
return
result = some(dag.lcDataStore.db.getBestUpdate(period))
let numParticipants = countOnes(result.get.sync_aggregate.sync_committee_bits)
if numParticipants < MIN_SYNC_COMMITTEE_PARTICIPANTS:
result.reset()

View File

@ -0,0 +1,348 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
# This implements the pre-release proposal of the libp2p based light client sync
# protocol. See https://github.com/ethereum/consensus-specs/pull/2802
import
# Standard library
std/os,
# Status libraries
chronicles,
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers],
./filepath
logScope: topics = "lcdata"
# `altair_current_sync_committee_branches` holds merkle proofs needed to
# construct `LightClientBootstrap` objects. The sync committee needs to
# be computed from the main DAG on-demand (usually a fast state access).
# SSZ because this data does not compress well, and because this data
# needs to be bundled together with other data to fulfill requests.
#
# `altair_best_updates` holds full `LightClientUpdate` objects in SSZ form.
# These objects are frequently queried in bunk, but there is only one per
# sync committee period, so storing the full sync committee is acceptable.
# This data could be stored as SZSSZ to avoid on-the-fly compression when a
# libp2p request is handled. However, the space savings are quite small.
# Furthermore, `LightClientUpdate` is consulted on each new block to attempt
# improving it. Continuously decompressing and recompressing seems inefficient.
# Finally, the libp2p context bytes depend on `attested_header.slot` to derive
# the underlying fork digest. The table name is insufficient to determine this
# unless one is made for each fork, even if there was no structural change.
# SSZ storage selected due to the small size and reduced logic complexity.
#
# `sealed_sync_committee_periods` contains the sync committee periods for which
# full light client data was imported. Data for these periods may no longer
# improve regardless of further block processing. The listed periods are skipped
# when restarting the program.
type
CurrentSyncCommitteeBranchStore = object
containsStmt: SqliteStmt[int64, int64]
getStmt: SqliteStmt[int64, seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
keepFromStmt: SqliteStmt[int64, void]
BestLightClientUpdateStore = object
getStmt: SqliteStmt[int64, seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
delStmt: SqliteStmt[int64, void]
delFromStmt: SqliteStmt[int64, void]
keepFromStmt: SqliteStmt[int64, void]
SealedSyncCommitteePeriodStore = object
containsStmt: SqliteStmt[int64, int64]
putStmt: SqliteStmt[int64, void]
delFromStmt: SqliteStmt[int64, void]
keepFromStmt: SqliteStmt[int64, void]
LightClientDataDB* = object
backend: SqStoreRef
## SQLite backend
currentBranches: CurrentSyncCommitteeBranchStore
## Slot -> altair.CurrentSyncCommitteeBranch
## Cached data for creating future `LightClientBootstrap` instances.
## Key is the block slot of which the post state was used to get the data.
## Data stored for all finalized epoch boundary blocks.
bestUpdates: BestLightClientUpdateStore
## SyncCommitteePeriod -> altair.LightClientUpdate
## Stores the `LightClientUpdate` with the most `sync_committee_bits` per
## `SyncCommitteePeriod`. Sync committee finality gives precedence.
sealedPeriods: SealedSyncCommitteePeriodStore
## {SyncCommitteePeriod}
## Tracks the finalized sync committee periods for which complete data
## has been imported (from `dag.tail.slot`).
# No `uint64` support in Sqlite
template isSupportedBySQLite(slot: Slot): bool =
slot <= int64.high.Slot
template isSupportedBySQLite(period: SyncCommitteePeriod): bool =
period <= int64.high.SyncCommitteePeriod
proc initCurrentSyncCommitteeBranchStore(
backend: SqStoreRef): KvResult[CurrentSyncCommitteeBranchStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `altair_current_sync_committee_branches` (
`slot` INTEGER PRIMARY KEY, -- `Slot` (up through 2^63-1)
`branch` BLOB -- `altair.CurrentSyncCommitteeBranch` (SSZ)
);
""")
let
containsStmt = ? backend.prepareStmt("""
SELECT 1 AS `exists`
FROM `altair_current_sync_committee_branches`
WHERE `slot` = ?;
""", int64, int64)
getStmt = ? backend.prepareStmt("""
SELECT `branch`
FROM `altair_current_sync_committee_branches`
WHERE `slot` = ?;
""", int64, seq[byte])
putStmt = ? backend.prepareStmt("""
INSERT INTO `altair_current_sync_committee_branches` (
`slot`, `branch`
) VALUES (?, ?);
""", (int64, seq[byte]), void)
keepFromStmt = ? backend.prepareStmt("""
DELETE FROM `altair_current_sync_committee_branches`
WHERE `slot` < ?;
""", int64, void)
ok CurrentSyncCommitteeBranchStore(
containsStmt: containsStmt,
getStmt: getStmt,
putStmt: putStmt,
keepFromStmt: keepFromStmt)
func hasCurrentSyncCommitteeBranch*(
db: LightClientDataDB, slot: Slot): bool =
if not slot.isSupportedBySQLite:
return false
var exists: int64
for res in db.currentBranches.containsStmt.exec(slot.int64, exists):
res.expect("SQL query OK")
doAssert exists == 1
return true
false
proc getCurrentSyncCommitteeBranch*(
db: LightClientDataDB, slot: Slot): altair.CurrentSyncCommitteeBranch =
if not slot.isSupportedBySQLite:
return default(altair.CurrentSyncCommitteeBranch)
var branch: seq[byte]
for res in db.currentBranches.getStmt.exec(slot.int64, branch):
res.expect("SQL query OK")
try:
return SSZ.decode(branch, altair.CurrentSyncCommitteeBranch)
except MalformedSszError, SszSizeMismatchError:
error "LC store corrupted", store = "currentBranches", slot,
exc = getCurrentException().name, err = getCurrentExceptionMsg()
return default(altair.CurrentSyncCommitteeBranch)
func putCurrentSyncCommitteeBranch*(
db: LightClientDataDB, slot: Slot,
branch: altair.CurrentSyncCommitteeBranch) =
if not slot.isSupportedBySQLite:
return
let res = db.currentBranches.putStmt.exec((slot.int64, SSZ.encode(branch)))
res.expect("SQL query OK")
proc initBestUpdateStore(
backend: SqStoreRef): KvResult[BestLightClientUpdateStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `altair_best_updates` (
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
`update` BLOB -- `altair.LightClientUpdate` (SSZ)
);
""")
let
getStmt = ? backend.prepareStmt("""
SELECT `update`
FROM `altair_best_updates`
WHERE `period` = ?;
""", int64, seq[byte])
putStmt = ? backend.prepareStmt("""
REPLACE INTO `altair_best_updates` (
`period`, `update`
) VALUES (?, ?);
""", (int64, seq[byte]), void)
delStmt = ? backend.prepareStmt("""
DELETE FROM `altair_best_updates`
WHERE `period` = ?;
""", int64, void)
delFromStmt = ? backend.prepareStmt("""
DELETE FROM `altair_best_updates`
WHERE `period` >= ?;
""", int64, void)
keepFromStmt = ? backend.prepareStmt("""
DELETE FROM `altair_best_updates`
WHERE `period` < ?;
""", int64, void)
ok BestLightClientUpdateStore(
getStmt: getStmt,
putStmt: putStmt,
delStmt: delStmt,
delFromStmt: delFromStmt,
keepFromStmt: keepFromStmt)
proc getBestUpdate*(
db: LightClientDataDB, period: SyncCommitteePeriod
): altair.LightClientUpdate =
doAssert period.isSupportedBySQLite
var update: seq[byte]
for res in db.bestUpdates.getStmt.exec(period.int64, update):
res.expect("SQL query OK")
try:
return SSZ.decode(update, altair.LightClientUpdate)
except MalformedSszError, SszSizeMismatchError:
error "LC store corrupted", store = "bestUpdates", period,
exc = getCurrentException().name, err = getCurrentExceptionMsg()
return default(altair.LightClientUpdate)
func putBestUpdate*(
db: LightClientDataDB, period: SyncCommitteePeriod,
update: altair.LightClientUpdate) =
doAssert period.isSupportedBySQLite
let numParticipants = countOnes(update.sync_aggregate.sync_committee_bits)
if numParticipants < MIN_SYNC_COMMITTEE_PARTICIPANTS:
let res = db.bestUpdates.delStmt.exec(period.int64)
res.expect("SQL query OK")
else:
let res = db.bestUpdates.putStmt.exec(
(period.int64, SSZ.encode(update)))
res.expect("SQL query OK")
proc putUpdateIfBetter*(
db: LightClientDataDB, period: SyncCommitteePeriod,
update: altair.LightClientUpdate) =
let existing = db.getBestUpdate(period)
if is_better_update(update, existing):
db.putBestUpdate(period, update)
proc initSealedPeriodStore(
backend: SqStoreRef): KvResult[SealedSyncCommitteePeriodStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `sealed_sync_committee_periods` (
`period` INTEGER PRIMARY KEY -- `SyncCommitteePeriod`
);
""")
let
containsStmt = ? backend.prepareStmt("""
SELECT 1 AS `exists`
FROM `sealed_sync_committee_periods`
WHERE `period` = ?;
""", int64, int64)
putStmt = ? backend.prepareStmt("""
INSERT INTO `sealed_sync_committee_periods` (
`period`
) VALUES (?);
""", int64, void)
delFromStmt = ? backend.prepareStmt("""
DELETE FROM `sealed_sync_committee_periods`
WHERE `period` >= ?;
""", int64, void)
keepFromStmt = ? backend.prepareStmt("""
DELETE FROM `sealed_sync_committee_periods`
WHERE `period` < ?;
""", int64, void)
ok SealedSyncCommitteePeriodStore(
containsStmt: containsStmt,
putStmt: putStmt,
delFromStmt: delFromStmt,
keepFromStmt: keepFromStmt)
func isPeriodSealed*(
db: LightClientDataDB, period: SyncCommitteePeriod): bool =
doAssert period.isSupportedBySQLite
var exists: int64
for res in db.sealedPeriods.containsStmt.exec(period.int64, exists):
res.expect("SQL query OK")
doAssert exists == 1
return true
false
func sealPeriod*(
db: LightClientDataDB, period: SyncCommitteePeriod) =
doAssert period.isSupportedBySQLite
let res = db.sealedPeriods.putStmt.exec(period.int64)
res.expect("SQL query OK")
func delPeriodsFrom*(
db: LightClientDataDB, minPeriod: SyncCommitteePeriod) =
doAssert minPeriod.isSupportedBySQLite
let res1 = db.sealedPeriods.delFromStmt.exec(minPeriod.int64)
res1.expect("SQL query OK")
let res2 = db.bestUpdates.delFromStmt.exec(minPeriod.int64)
res2.expect("SQL query OK")
func keepPeriodsFrom*(
db: LightClientDataDB, minPeriod: SyncCommitteePeriod) =
doAssert minPeriod.isSupportedBySQLite
let res1 = db.sealedPeriods.keepFromStmt.exec(minPeriod.int64)
res1.expect("SQL query OK")
let res2 = db.bestUpdates.keepFromStmt.exec(minPeriod.int64)
res2.expect("SQL query OK")
let
minSlot = min(minPeriod.start_slot, int64.high.Slot)
res3 = db.currentBranches.keepFromStmt.exec(minSlot.int64)
res3.expect("SQL query OK")
proc initLightClientDataDB*(
dir: string, inMemory = false): Opt[LightClientDataDB] =
logScope:
path = dir
inMemory
if not inMemory:
let res = secureCreatePath(dir)
if res.isErr:
warn "Failed to create DB directory", err = ioErrorMsg(res.error)
return err()
const dbName = "lcdataV1"
let
backend = SqStoreRef.init(dir, dbName, inMemory = inMemory).valueOr:
warn "Failed to create LC data DB", err = error
return err()
currentBranches = backend.initCurrentSyncCommitteeBranchStore().valueOr:
warn "Failed to init LC store", store = "currentBranches", err = error
backend.close()
return err()
bestUpdates = backend.initBestUpdateStore().valueOr:
warn "Failed to init LC store", store = "bestUpdates", err = error
backend.close()
return err()
sealedPeriods = backend.initSealedPeriodStore().valueOr:
warn "Failed to init LC store", store = "sealedPeriods", err = error
backend.close()
return err()
ok LightClientDataDB(
backend: backend,
currentBranches: currentBranches,
bestUpdates: bestUpdates,
sealedPeriods: sealedPeriods)
proc close*(db: var LightClientDataDB) =
if db.backend != nil:
db.backend.close()
db.reset()

View File

@ -192,6 +192,7 @@ proc loadChainDag(
cfg, db, validatorMonitor, extraFlags + chainDagFlags, config.eraDir,
vanityLogs = getPandas(detectTTY(config.logStdout)),
lcDataConfig = LightClientDataConfig(
dbDir: some config.cachesDir,
serve: config.lightClientDataServe.get,
importMode: config.lightClientDataImportMode.get,
maxPeriods: config.lightClientDataMaxPeriods,
@ -1500,6 +1501,7 @@ proc stop(node: BeaconNode) =
except CatchableError as exc:
warn "Couldn't stop network", msg = exc.msg
node.dag.closeLightClientDataStore()
node.attachedValidators.slashingProtection.close()
node.db.close()
notice "Databases closed"