nimbus-eth2/beacon_chain/beacon_chain_db_light_client.nim
Etan Kissling 7e276937dc
make LC data fork aware (#4493)
In a future fork, light client data will be extended with execution info
to support more use cases. To anticipate such an upgrade, introduce
`Forky` and `Forked` types, and ready the database schema.
Because the mapping of sync committee periods to fork versions is not
necessarily unique (fork schedule not in sync with period boundaries),
an additional column is added to `period` -> `LightClientUpdate` table.
2023-01-12 18:11:38 +01:00

384 lines
14 KiB
Nim

# beacon_chain
# Copyright (c) 2022-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
# Status libraries
stew/base10,
chronicles,
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers],
./db_limits
logScope: topics = "lcdata"
# `lc_altair_current_branches` holds merkle proofs needed to
# construct `LightClientBootstrap` objects.
# SSZ because this data does not compress well, and because this data
# needs to be bundled together with other data to fulfill requests.
#
# `lc_best_updates` holds full `LightClientUpdate` objects in SSZ form.
# These objects are frequently queried in bulk, but there is only one per
# sync committee period, so storing the full sync committee is acceptable.
# This data could be stored as SZSSZ to avoid on-the-fly compression when a
# libp2p request is handled. However, the space savings are quite small.
# Furthermore, `LightClientUpdate` is consulted on each new block to attempt
# improving it. Continuously decompressing and recompressing seems inefficient.
# Finally, the libp2p context bytes depend on `attested_header.slot` to derive
# the underlying fork digest; the `kind` column is not sufficient to derive
# the fork digest, because the same storage format may be used across forks.
# SSZ storage selected due to the small size and reduced logic complexity.
#
# `lc_sealed_periods` contains the sync committee periods for which
# full light client data was imported. Data for these periods may no longer
# improve regardless of further block processing. The listed periods are skipped
# when restarting the program.
type
CurrentSyncCommitteeBranchStore = object
containsStmt: SqliteStmt[int64, int64]
getStmt: SqliteStmt[int64, seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
keepFromStmt: SqliteStmt[int64, void]
BestLightClientUpdateStore = object
getStmt: SqliteStmt[int64, (int64, seq[byte])]
putStmt: SqliteStmt[(int64, int64, seq[byte]), void]
delStmt: SqliteStmt[int64, void]
delFromStmt: SqliteStmt[int64, void]
keepFromStmt: SqliteStmt[int64, void]
SealedSyncCommitteePeriodStore = object
containsStmt: SqliteStmt[int64, int64]
putStmt: SqliteStmt[int64, void]
delFromStmt: SqliteStmt[int64, void]
keepFromStmt: SqliteStmt[int64, void]
LightClientDataDB* = ref object
backend: SqStoreRef
## SQLite backend
currentBranches: CurrentSyncCommitteeBranchStore
## Slot -> altair.CurrentSyncCommitteeBranch
## Cached data for creating future `LightClientBootstrap` instances.
## Key is the block slot of which the post state was used to get the data.
## Data stored for all finalized epoch boundary blocks.
bestUpdates: BestLightClientUpdateStore
## SyncCommitteePeriod -> (LightClientDataFork, LightClientUpdate)
## Stores the `LightClientUpdate` with the most `sync_committee_bits` per
## `SyncCommitteePeriod`. Sync committee finality gives precedence.
sealedPeriods: SealedSyncCommitteePeriodStore
## {SyncCommitteePeriod}
## Tracks the finalized sync committee periods for which complete data
## has been imported (from `dag.tail.slot`).
proc initCurrentBranchesStore(
backend: SqStoreRef,
name: string): KvResult[CurrentSyncCommitteeBranchStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `""" & name & """` (
`slot` INTEGER PRIMARY KEY, -- `Slot` (up through 2^63-1)
`branch` BLOB -- `altair.CurrentSyncCommitteeBranch` (SSZ)
);
""")
let
containsStmt = backend.prepareStmt("""
SELECT 1 AS `exists`
FROM `""" & name & """`
WHERE `slot` = ?;
""", int64, int64, managed = false).expect("SQL query OK")
getStmt = backend.prepareStmt("""
SELECT `branch`
FROM `""" & name & """`
WHERE `slot` = ?;
""", int64, seq[byte], managed = false).expect("SQL query OK")
putStmt = backend.prepareStmt("""
INSERT INTO `""" & name & """` (
`slot`, `branch`
) VALUES (?, ?);
""", (int64, seq[byte]), void, managed = false).expect("SQL query OK")
keepFromStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `slot` < ?;
""", int64, void, managed = false).expect("SQL query OK")
ok CurrentSyncCommitteeBranchStore(
containsStmt: containsStmt,
getStmt: getStmt,
putStmt: putStmt,
keepFromStmt: keepFromStmt)
func close(store: CurrentSyncCommitteeBranchStore) =
store.containsStmt.dispose()
store.getStmt.dispose()
store.putStmt.dispose()
store.keepFromStmt.dispose()
func hasCurrentSyncCommitteeBranch*(
db: LightClientDataDB, slot: Slot): bool =
if not slot.isSupportedBySQLite:
return false
var exists: int64
for res in db.currentBranches.containsStmt.exec(slot.int64, exists):
res.expect("SQL query OK")
doAssert exists == 1
return true
false
proc getCurrentSyncCommitteeBranch*(
db: LightClientDataDB, slot: Slot): altair.CurrentSyncCommitteeBranch =
if not slot.isSupportedBySQLite:
return default(altair.CurrentSyncCommitteeBranch)
var branch: seq[byte]
for res in db.currentBranches.getStmt.exec(slot.int64, branch):
res.expect("SQL query OK")
try:
return SSZ.decode(branch, altair.CurrentSyncCommitteeBranch)
except SszError as exc:
error "LC data store corrupted", store = "currentBranches",
slot, exc = exc.msg
return default(altair.CurrentSyncCommitteeBranch)
func putCurrentSyncCommitteeBranch*(
db: LightClientDataDB, slot: Slot,
branch: altair.CurrentSyncCommitteeBranch) =
if not slot.isSupportedBySQLite:
return
let res = db.currentBranches.putStmt.exec((slot.int64, SSZ.encode(branch)))
res.expect("SQL query OK")
proc initBestUpdatesStore(
backend: SqStoreRef,
name, legacyAltairName: string,
): KvResult[BestLightClientUpdateStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `""" & name & """` (
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
`kind` INTEGER, -- `LightClientDataFork`
`update` BLOB -- `LightClientUpdate` (SSZ)
);
""")
if backend.hasTable(legacyAltairName).expect("SQL query OK"):
info "Importing Altair light client data"
# SyncCommitteePeriod -> altair.LightClientUpdate
const legacyKind = Base10.toString(ord(LightClientDataFork.Altair).uint)
? backend.exec("""
INSERT OR IGNORE INTO `""" & name & """` (
`period`, `kind`, `update`
)
SELECT `period`, """ & legacyKind & """ AS `kind`, `update`
FROM `""" & legacyAltairName & """`;
""")
? backend.exec("""
DROP TABLE `""" & legacyAltairName & """`;
""")
let
getStmt = backend.prepareStmt("""
SELECT `kind`, `update`
FROM `""" & name & """`
WHERE `period` = ?;
""", int64, (int64, seq[byte]), managed = false)
.expect("SQL query OK")
putStmt = backend.prepareStmt("""
REPLACE INTO `""" & name & """` (
`period`, `kind`, `update`
) VALUES (?, ?, ?);
""", (int64, int64, seq[byte]), void, managed = false)
.expect("SQL query OK")
delStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `period` = ?;
""", int64, void, managed = false).expect("SQL query OK")
delFromStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `period` >= ?;
""", int64, void, managed = false).expect("SQL query OK")
keepFromStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `period` < ?;
""", int64, void, managed = false).expect("SQL query OK")
ok BestLightClientUpdateStore(
getStmt: getStmt,
putStmt: putStmt,
delStmt: delStmt,
delFromStmt: delFromStmt,
keepFromStmt: keepFromStmt)
func close(store: BestLightClientUpdateStore) =
store.getStmt.dispose()
store.putStmt.dispose()
store.delStmt.dispose()
store.delFromStmt.dispose()
store.keepFromStmt.dispose()
proc getBestUpdate*(
db: LightClientDataDB, period: SyncCommitteePeriod
): ForkedLightClientUpdate =
doAssert period.isSupportedBySQLite
var update: (int64, seq[byte])
for res in db.bestUpdates.getStmt.exec(period.int64, update):
res.expect("SQL query OK")
try:
case update[0]
of ord(LightClientDataFork.Altair).int64:
return ForkedLightClientUpdate(
kind: LightClientDataFork.Altair,
altairData: SSZ.decode(update[1], altair.LightClientUpdate))
else:
warn "Unsupported LC data store kind", store = "bestUpdates",
period, kind = update[0]
return default(ForkedLightClientUpdate)
except SszError as exc:
error "LC data store corrupted", store = "bestUpdates",
period, kind = update[0], exc = exc.msg
return default(ForkedLightClientUpdate)
func putBestUpdate*(
db: LightClientDataDB, period: SyncCommitteePeriod,
update: ForkedLightClientUpdate) =
doAssert period.isSupportedBySQLite
withForkyUpdate(update):
when lcDataFork >= LightClientDataFork.Altair:
let numParticipants = forkyUpdate.sync_aggregate.num_active_participants
if numParticipants < MIN_SYNC_COMMITTEE_PARTICIPANTS:
let res = db.bestUpdates.delStmt.exec(period.int64)
res.expect("SQL query OK")
else:
let res = db.bestUpdates.putStmt.exec(
(period.int64, lcDataFork.int64, SSZ.encode(forkyUpdate)))
res.expect("SQL query OK")
else:
let res = db.bestUpdates.delStmt.exec(period.int64)
res.expect("SQL query OK")
proc putUpdateIfBetter*(
db: LightClientDataDB, period: SyncCommitteePeriod,
update: ForkedLightClientUpdate) =
let existing = db.getBestUpdate(period)
if is_better_update(update, existing):
db.putBestUpdate(period, update)
proc initSealedPeriodsStore(
backend: SqStoreRef,
name: string): KvResult[SealedSyncCommitteePeriodStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `""" & name & """` (
`period` INTEGER PRIMARY KEY -- `SyncCommitteePeriod`
);
""")
let
containsStmt = backend.prepareStmt("""
SELECT 1 AS `exists`
FROM `""" & name & """`
WHERE `period` = ?;
""", int64, int64, managed = false).expect("SQL query OK")
putStmt = backend.prepareStmt("""
INSERT INTO `""" & name & """` (
`period`
) VALUES (?);
""", int64, void, managed = false).expect("SQL query OK")
delFromStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `period` >= ?;
""", int64, void, managed = false).expect("SQL query OK")
keepFromStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `period` < ?;
""", int64, void, managed = false).expect("SQL query OK")
ok SealedSyncCommitteePeriodStore(
containsStmt: containsStmt,
putStmt: putStmt,
delFromStmt: delFromStmt,
keepFromStmt: keepFromStmt)
func close(store: SealedSyncCommitteePeriodStore) =
store.containsStmt.dispose()
store.putStmt.dispose()
store.delFromStmt.dispose()
store.keepFromStmt.dispose()
func isPeriodSealed*(
db: LightClientDataDB, period: SyncCommitteePeriod): bool =
doAssert period.isSupportedBySQLite
var exists: int64
for res in db.sealedPeriods.containsStmt.exec(period.int64, exists):
res.expect("SQL query OK")
doAssert exists == 1
return true
false
func sealPeriod*(
db: LightClientDataDB, period: SyncCommitteePeriod) =
doAssert period.isSupportedBySQLite
let res = db.sealedPeriods.putStmt.exec(period.int64)
res.expect("SQL query OK")
func delNonFinalizedPeriodsFrom*(
db: LightClientDataDB, minPeriod: SyncCommitteePeriod) =
doAssert minPeriod.isSupportedBySQLite
let res1 = db.sealedPeriods.delFromStmt.exec(minPeriod.int64)
res1.expect("SQL query OK")
let res2 = db.bestUpdates.delFromStmt.exec(minPeriod.int64)
res2.expect("SQL query OK")
# `currentBranches` only has finalized data
func keepPeriodsFrom*(
db: LightClientDataDB, minPeriod: SyncCommitteePeriod) =
doAssert minPeriod.isSupportedBySQLite
let res1 = db.sealedPeriods.keepFromStmt.exec(minPeriod.int64)
res1.expect("SQL query OK")
let res2 = db.bestUpdates.keepFromStmt.exec(minPeriod.int64)
res2.expect("SQL query OK")
let
minSlot = min(minPeriod.start_slot, int64.high.Slot)
res3 = db.currentBranches.keepFromStmt.exec(minSlot.int64)
res3.expect("SQL query OK")
type LightClientDataDBNames* = object
altairCurrentBranches*: string
legacyAltairBestUpdates*: string
bestUpdates*: string
sealedPeriods*: string
proc initLightClientDataDB*(
backend: SqStoreRef,
names: LightClientDataDBNames): KvResult[LightClientDataDB] =
let
currentBranches =
? backend.initCurrentBranchesStore(names.altairCurrentBranches)
bestUpdates =
? backend.initBestUpdatesStore(
names.bestUpdates, names.legacyAltairBestUpdates)
sealedPeriods =
? backend.initSealedPeriodsStore(names.sealedPeriods)
ok LightClientDataDB(
backend: backend,
currentBranches: currentBranches,
bestUpdates: bestUpdates,
sealedPeriods: sealedPeriods)
proc close*(db: LightClientDataDB) =
if db.backend != nil:
db.currentBranches.close()
db.bestUpdates.close()
db.sealedPeriods.close()
db[].reset()