merge LC db into main BN db (#3832)

* merge LC db into main BN db

To treat derived LC data similar to derived state caches, merge it into
the main beacon node DB.

* shorten table names, group with lc prefix
This commit is contained in:
Etan Kissling 2022-07-04 13:46:32 -07:00 committed by GitHub
parent 1221bb66e8
commit aff53e962f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 112 additions and 110 deletions

View File

@ -15,7 +15,7 @@ import
./networking/network_metadata, ./beacon_chain_db_immutable, ./networking/network_metadata, ./beacon_chain_db_immutable,
./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition], ./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition],
./spec/datatypes/[phase0, altair, bellatrix], ./spec/datatypes/[phase0, altair, bellatrix],
./filepath "."/[beacon_chain_db_light_client, filepath]
export export
phase0, altair, eth2_ssz_serialization, eth2_merkleization, kvstore, phase0, altair, eth2_ssz_serialization, eth2_merkleization, kvstore,
@ -145,6 +145,9 @@ type
## ##
## See `summaries` for an index in the other direction. ## See `summaries` for an index in the other direction.
lcData: LightClientDataDB
## Persistent light client data to avoid expensive recomputations
DbKeyKind = enum DbKeyKind = enum
kHashToState kHashToState
kHashToBlock kHashToBlock
@ -458,6 +461,11 @@ proc new*(T: type BeaconChainDB,
summaries = kvStore db.openKvStore("beacon_block_summaries", true).expectDb() summaries = kvStore db.openKvStore("beacon_block_summaries", true).expectDb()
finalizedBlocks = FinalizedBlocks.init(db, "finalized_blocks").expectDb() finalizedBlocks = FinalizedBlocks.init(db, "finalized_blocks").expectDb()
lcData = db.initLightClientDataDB(LightClientDataDBNames(
altairCurrentBranches: "lc_altair_current_branches",
altairBestUpdates: "lc_altair_best_updates",
sealedPeriods: "lc_sealed_periods")).expectDb()
# Versions prior to 1.4.0 (altair) stored validators in `immutable_validators` # Versions prior to 1.4.0 (altair) stored validators in `immutable_validators`
# which stores validator keys in compressed format - this is # which stores validator keys in compressed format - this is
# slow to load and has been superceded by `immutable_validators2` which uses # slow to load and has been superceded by `immutable_validators2` which uses
@ -499,8 +507,12 @@ proc new*(T: type BeaconChainDB,
stateDiffs: stateDiffs, stateDiffs: stateDiffs,
summaries: summaries, summaries: summaries,
finalizedBlocks: finalizedBlocks, finalizedBlocks: finalizedBlocks,
lcData: lcData
) )
template getLightClientDataDB*(db: BeaconChainDB): LightClientDataDB =
db.lcData
proc decodeSSZ[T](data: openArray[byte], output: var T): bool = proc decodeSSZ[T](data: openArray[byte], output: var T): bool =
try: try:
readSszBytes(data, output, updateRoot = false) readSszBytes(data, output, updateRoot = false)
@ -637,6 +649,7 @@ proc close*(db: BeaconChainDB) =
if db.db == nil: return if db.db == nil: return
# Close things roughly in reverse order # Close things roughly in reverse order
db.lcData.close()
db.finalizedBlocks.close() db.finalizedBlocks.close()
discard db.summaries.close() discard db.summaries.close()
discard db.stateDiffs.close() discard db.stateDiffs.close()

View File

@ -66,7 +66,7 @@ type
delFromStmt: SqliteStmt[int64, void] delFromStmt: SqliteStmt[int64, void]
keepFromStmt: SqliteStmt[int64, void] keepFromStmt: SqliteStmt[int64, void]
LightClientDataDB* = object LightClientDataDB* = ref object
backend: SqStoreRef backend: SqStoreRef
## SQLite backend ## SQLite backend
@ -92,35 +92,36 @@ template isSupportedBySQLite(slot: Slot): bool =
template isSupportedBySQLite(period: SyncCommitteePeriod): bool = template isSupportedBySQLite(period: SyncCommitteePeriod): bool =
period <= int64.high.SyncCommitteePeriod period <= int64.high.SyncCommitteePeriod
proc initCurrentSyncCommitteeBranchStore( proc initCurrentBranchesStore(
backend: SqStoreRef): KvResult[CurrentSyncCommitteeBranchStore] = backend: SqStoreRef,
name: string): KvResult[CurrentSyncCommitteeBranchStore] =
? backend.exec(""" ? backend.exec("""
CREATE TABLE IF NOT EXISTS `altair_current_sync_committee_branches` ( CREATE TABLE IF NOT EXISTS `""" & name & """` (
`slot` INTEGER PRIMARY KEY, -- `Slot` (up through 2^63-1) `slot` INTEGER PRIMARY KEY, -- `Slot` (up through 2^63-1)
`branch` BLOB -- `altair.CurrentSyncCommitteeBranch` (SSZ) `branch` BLOB -- `altair.CurrentSyncCommitteeBranch` (SSZ)
); );
""") """)
let let
containsStmt = ? backend.prepareStmt(""" containsStmt = backend.prepareStmt("""
SELECT 1 AS `exists` SELECT 1 AS `exists`
FROM `altair_current_sync_committee_branches` FROM `""" & name & """`
WHERE `slot` = ?; WHERE `slot` = ?;
""", int64, int64) """, int64, int64, managed = false).expect("SQL query OK")
getStmt = ? backend.prepareStmt(""" getStmt = backend.prepareStmt("""
SELECT `branch` SELECT `branch`
FROM `altair_current_sync_committee_branches` FROM `""" & name & """`
WHERE `slot` = ?; WHERE `slot` = ?;
""", int64, seq[byte]) """, int64, seq[byte], managed = false).expect("SQL query OK")
putStmt = ? backend.prepareStmt(""" putStmt = backend.prepareStmt("""
INSERT INTO `altair_current_sync_committee_branches` ( INSERT INTO `""" & name & """` (
`slot`, `branch` `slot`, `branch`
) VALUES (?, ?); ) VALUES (?, ?);
""", (int64, seq[byte]), void) """, (int64, seq[byte]), void, managed = false).expect("SQL query OK")
keepFromStmt = ? backend.prepareStmt(""" keepFromStmt = backend.prepareStmt("""
DELETE FROM `altair_current_sync_committee_branches` DELETE FROM `""" & name & """`
WHERE `slot` < ?; WHERE `slot` < ?;
""", int64, void) """, int64, void, managed = false).expect("SQL query OK")
ok CurrentSyncCommitteeBranchStore( ok CurrentSyncCommitteeBranchStore(
containsStmt: containsStmt, containsStmt: containsStmt,
@ -128,6 +129,12 @@ proc initCurrentSyncCommitteeBranchStore(
putStmt: putStmt, putStmt: putStmt,
keepFromStmt: keepFromStmt) keepFromStmt: keepFromStmt)
func close(store: CurrentSyncCommitteeBranchStore) =
store.containsStmt.dispose()
store.getStmt.dispose()
store.putStmt.dispose()
store.keepFromStmt.dispose()
func hasCurrentSyncCommitteeBranch*( func hasCurrentSyncCommitteeBranch*(
db: LightClientDataDB, slot: Slot): bool = db: LightClientDataDB, slot: Slot): bool =
if not slot.isSupportedBySQLite: if not slot.isSupportedBySQLite:
@ -161,38 +168,39 @@ func putCurrentSyncCommitteeBranch*(
let res = db.currentBranches.putStmt.exec((slot.int64, SSZ.encode(branch))) let res = db.currentBranches.putStmt.exec((slot.int64, SSZ.encode(branch)))
res.expect("SQL query OK") res.expect("SQL query OK")
proc initBestUpdateStore( proc initBestUpdatesStore(
backend: SqStoreRef): KvResult[BestLightClientUpdateStore] = backend: SqStoreRef,
name: string): KvResult[BestLightClientUpdateStore] =
? backend.exec(""" ? backend.exec("""
CREATE TABLE IF NOT EXISTS `altair_best_updates` ( CREATE TABLE IF NOT EXISTS `""" & name & """` (
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod` `period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
`update` BLOB -- `altair.LightClientUpdate` (SSZ) `update` BLOB -- `altair.LightClientUpdate` (SSZ)
); );
""") """)
let let
getStmt = ? backend.prepareStmt(""" getStmt = backend.prepareStmt("""
SELECT `update` SELECT `update`
FROM `altair_best_updates` FROM `""" & name & """`
WHERE `period` = ?; WHERE `period` = ?;
""", int64, seq[byte]) """, int64, seq[byte], managed = false).expect("SQL query OK")
putStmt = ? backend.prepareStmt(""" putStmt = backend.prepareStmt("""
REPLACE INTO `altair_best_updates` ( REPLACE INTO `""" & name & """` (
`period`, `update` `period`, `update`
) VALUES (?, ?); ) VALUES (?, ?);
""", (int64, seq[byte]), void) """, (int64, seq[byte]), void, managed = false).expect("SQL query OK")
delStmt = ? backend.prepareStmt(""" delStmt = backend.prepareStmt("""
DELETE FROM `altair_best_updates` DELETE FROM `""" & name & """`
WHERE `period` = ?; WHERE `period` = ?;
""", int64, void) """, int64, void, managed = false).expect("SQL query OK")
delFromStmt = ? backend.prepareStmt(""" delFromStmt = backend.prepareStmt("""
DELETE FROM `altair_best_updates` DELETE FROM `""" & name & """`
WHERE `period` >= ?; WHERE `period` >= ?;
""", int64, void) """, int64, void, managed = false).expect("SQL query OK")
keepFromStmt = ? backend.prepareStmt(""" keepFromStmt = backend.prepareStmt("""
DELETE FROM `altair_best_updates` DELETE FROM `""" & name & """`
WHERE `period` < ?; WHERE `period` < ?;
""", int64, void) """, int64, void, managed = false).expect("SQL query OK")
ok BestLightClientUpdateStore( ok BestLightClientUpdateStore(
getStmt: getStmt, getStmt: getStmt,
@ -201,6 +209,13 @@ proc initBestUpdateStore(
delFromStmt: delFromStmt, delFromStmt: delFromStmt,
keepFromStmt: keepFromStmt) keepFromStmt: keepFromStmt)
func close(store: BestLightClientUpdateStore) =
store.getStmt.dispose()
store.putStmt.dispose()
store.delStmt.dispose()
store.delFromStmt.dispose()
store.keepFromStmt.dispose()
proc getBestUpdate*( proc getBestUpdate*(
db: LightClientDataDB, period: SyncCommitteePeriod db: LightClientDataDB, period: SyncCommitteePeriod
): altair.LightClientUpdate = ): altair.LightClientUpdate =
@ -235,33 +250,34 @@ proc putUpdateIfBetter*(
if is_better_update(update, existing): if is_better_update(update, existing):
db.putBestUpdate(period, update) db.putBestUpdate(period, update)
proc initSealedPeriodStore( proc initSealedPeriodsStore(
backend: SqStoreRef): KvResult[SealedSyncCommitteePeriodStore] = backend: SqStoreRef,
name: string): KvResult[SealedSyncCommitteePeriodStore] =
? backend.exec(""" ? backend.exec("""
CREATE TABLE IF NOT EXISTS `sealed_sync_committee_periods` ( CREATE TABLE IF NOT EXISTS `""" & name & """` (
`period` INTEGER PRIMARY KEY -- `SyncCommitteePeriod` `period` INTEGER PRIMARY KEY -- `SyncCommitteePeriod`
); );
""") """)
let let
containsStmt = ? backend.prepareStmt(""" containsStmt = backend.prepareStmt("""
SELECT 1 AS `exists` SELECT 1 AS `exists`
FROM `sealed_sync_committee_periods` FROM `""" & name & """`
WHERE `period` = ?; WHERE `period` = ?;
""", int64, int64) """, int64, int64, managed = false).expect("SQL query OK")
putStmt = ? backend.prepareStmt(""" putStmt = backend.prepareStmt("""
INSERT INTO `sealed_sync_committee_periods` ( INSERT INTO `""" & name & """` (
`period` `period`
) VALUES (?); ) VALUES (?);
""", int64, void) """, int64, void, managed = false).expect("SQL query OK")
delFromStmt = ? backend.prepareStmt(""" delFromStmt = backend.prepareStmt("""
DELETE FROM `sealed_sync_committee_periods` DELETE FROM `""" & name & """`
WHERE `period` >= ?; WHERE `period` >= ?;
""", int64, void) """, int64, void, managed = false).expect("SQL query OK")
keepFromStmt = ? backend.prepareStmt(""" keepFromStmt = backend.prepareStmt("""
DELETE FROM `sealed_sync_committee_periods` DELETE FROM `""" & name & """`
WHERE `period` < ?; WHERE `period` < ?;
""", int64, void) """, int64, void, managed = false).expect("SQL query OK")
ok SealedSyncCommitteePeriodStore( ok SealedSyncCommitteePeriodStore(
containsStmt: containsStmt, containsStmt: containsStmt,
@ -269,6 +285,12 @@ proc initSealedPeriodStore(
delFromStmt: delFromStmt, delFromStmt: delFromStmt,
keepFromStmt: keepFromStmt) keepFromStmt: keepFromStmt)
func close(store: SealedSyncCommitteePeriodStore) =
store.containsStmt.dispose()
store.putStmt.dispose()
store.delFromStmt.dispose()
store.keepFromStmt.dispose()
func isPeriodSealed*( func isPeriodSealed*(
db: LightClientDataDB, period: SyncCommitteePeriod): bool = db: LightClientDataDB, period: SyncCommitteePeriod): bool =
doAssert period.isSupportedBySQLite doAssert period.isSupportedBySQLite
@ -305,36 +327,21 @@ func keepPeriodsFrom*(
res3 = db.currentBranches.keepFromStmt.exec(minSlot.int64) res3 = db.currentBranches.keepFromStmt.exec(minSlot.int64)
res3.expect("SQL query OK") res3.expect("SQL query OK")
type LightClientDataDBNames* = object
altairCurrentBranches*: string
altairBestUpdates*: string
sealedPeriods*: string
proc initLightClientDataDB*( proc initLightClientDataDB*(
dir: string, inMemory = false): Opt[LightClientDataDB] = backend: SqStoreRef,
logScope: names: LightClientDataDBNames): KvResult[LightClientDataDB] =
path = dir
inMemory
if not inMemory:
let res = secureCreatePath(dir)
if res.isErr:
warn "Failed to create DB directory", err = ioErrorMsg(res.error)
return err()
const dbName = "lcdataV1"
let let
backend = SqStoreRef.init(dir, dbName, inMemory = inMemory).valueOr: currentBranches =
warn "Failed to create LC data DB", err = error ? backend.initCurrentBranchesStore(names.altairCurrentBranches)
return err() bestUpdates =
? backend.initBestUpdatesStore(names.altairBestUpdates)
currentBranches = backend.initCurrentSyncCommitteeBranchStore().valueOr: sealedPeriods =
warn "Failed to init LC store", store = "currentBranches", err = error ? backend.initSealedPeriodsStore(names.sealedPeriods)
backend.close()
return err()
bestUpdates = backend.initBestUpdateStore().valueOr:
warn "Failed to init LC store", store = "bestUpdates", err = error
backend.close()
return err()
sealedPeriods = backend.initSealedPeriodStore().valueOr:
warn "Failed to init LC store", store = "sealedPeriods", err = error
backend.close()
return err()
ok LightClientDataDB( ok LightClientDataDB(
backend: backend, backend: backend,
@ -342,7 +349,9 @@ proc initLightClientDataDB*(
bestUpdates: bestUpdates, bestUpdates: bestUpdates,
sealedPeriods: sealedPeriods) sealedPeriods: sealedPeriods)
proc close*(db: var LightClientDataDB) = proc close*(db: LightClientDataDB) =
if db.backend != nil: if db.backend != nil:
db.backend.close() db.currentBranches.close()
db.reset() db.bestUpdates.close()
db.sealedPeriods.close()
db[].reset()

View File

@ -1056,9 +1056,6 @@ func outWalletFile*(config: BeaconNodeConf): Option[OutFile] =
func databaseDir*(config: AnyConf): string = func databaseDir*(config: AnyConf): string =
config.dataDir / "db" config.dataDir / "db"
func cachesDir*(config: AnyConf): string =
config.databaseDir / "caches"
func runAsService*(config: BeaconNodeConf): bool = func runAsService*(config: BeaconNodeConf): bool =
config.cmd == noCommand and config.runAsServiceFlag config.cmd == noCommand and config.runAsServiceFlag

View File

@ -13,7 +13,7 @@
import import
# Beacon chain internals # Beacon chain internals
../spec/datatypes/altair, ../spec/datatypes/altair,
../light_client_data_db, ../beacon_chain_db_light_client,
./block_dag ./block_dag
type type
@ -62,8 +62,6 @@ type
## The earliest slot for which light client data is imported. ## The earliest slot for which light client data is imported.
LightClientDataConfig* = object LightClientDataConfig* = object
dbDir*: Option[string]
## Directory to store light client data DB in
serve*: bool serve*: bool
## Whether to make local light client data available or not ## Whether to make local light client data available or not
importMode*: LightClientDataImportMode importMode*: LightClientDataImportMode

View File

@ -580,7 +580,6 @@ proc updateBeaconMetrics(
import blockchain_dag_light_client import blockchain_dag_light_client
export export
blockchain_dag_light_client.closeLightClientDataStore,
blockchain_dag_light_client.getLightClientBootstrap, blockchain_dag_light_client.getLightClientBootstrap,
blockchain_dag_light_client.getLightClientUpdateForPeriod, blockchain_dag_light_client.getLightClientUpdateForPeriod,
blockchain_dag_light_client.getLightClientFinalityUpdate, blockchain_dag_light_client.getLightClientFinalityUpdate,
@ -722,7 +721,8 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
vanityLogs: vanityLogs, vanityLogs: vanityLogs,
lcDataStore: initLightClientDataStore(lcDataConfig, cfg), lcDataStore: initLightClientDataStore(
lcDataConfig, cfg, db.getLightClientDataDB()),
onBlockAdded: onBlockCb, onBlockAdded: onBlockCb,
onHeadChanged: onHeadCb, onHeadChanged: onHeadCb,

View File

@ -15,7 +15,7 @@ import
stew/[bitops2, objects], stew/[bitops2, objects],
# Beacon chain internals # Beacon chain internals
../spec/datatypes/[phase0, altair, bellatrix], ../spec/datatypes/[phase0, altair, bellatrix],
../light_client_data_db, ../beacon_chain_db_light_client,
"."/[block_pools_types, blockchain_dag] "."/[block_pools_types, blockchain_dag]
logScope: topics = "chaindag" logScope: topics = "chaindag"
@ -124,31 +124,18 @@ proc syncCommitteeRootForPeriod(
do: err() do: err()
proc initLightClientDataStore*( proc initLightClientDataStore*(
config: LightClientDataConfig, cfg: RuntimeConfig): LightClientDataStore = config: LightClientDataConfig,
cfg: RuntimeConfig,
db: LightClientDataDB): LightClientDataStore =
## Initialize light client data store. ## Initialize light client data store.
var lcDataStore = LightClientDataStore( LightClientDataStore(
db: db,
serve: config.serve, serve: config.serve,
importMode: config.importMode, importMode: config.importMode,
maxPeriods: config.maxPeriods.get(cfg.defaultLightClientDataMaxPeriods), maxPeriods: config.maxPeriods.get(cfg.defaultLightClientDataMaxPeriods),
onLightClientFinalityUpdate: config.onLightClientFinalityUpdate, onLightClientFinalityUpdate: config.onLightClientFinalityUpdate,
onLightClientOptimisticUpdate: config.onLightClientOptimisticUpdate) onLightClientOptimisticUpdate: config.onLightClientOptimisticUpdate)
if config.serve or config.importMode != LightClientDataImportMode.None:
lcDataStore.db =
if config.dbDir.isSome:
initLightClientDataDB(config.dbDir.get, inMemory = false).valueOr:
warn "Falling back to in-memory LC data DB"
initLightClientDataDB(config.dbDir.get, inMemory = true).expect(
"In-memory LC data DB expected to succeed")
else:
initLightClientDataDB(".", inMemory = true).expect(
"In-memory LC data DB expected to succeed")
lcDataStore
proc closeLightClientDataStore*(dag: ChainDAGRef) =
dag.lcDataStore.db.close()
func targetLightClientTailSlot(dag: ChainDAGRef): Slot = func targetLightClientTailSlot(dag: ChainDAGRef): Slot =
## Earliest slot for which light client data is retained. ## Earliest slot for which light client data is retained.
let let

View File

@ -192,7 +192,6 @@ proc loadChainDag(
cfg, db, validatorMonitor, extraFlags + chainDagFlags, config.eraDir, cfg, db, validatorMonitor, extraFlags + chainDagFlags, config.eraDir,
vanityLogs = getPandas(detectTTY(config.logStdout)), vanityLogs = getPandas(detectTTY(config.logStdout)),
lcDataConfig = LightClientDataConfig( lcDataConfig = LightClientDataConfig(
dbDir: some config.cachesDir,
serve: config.lightClientDataServe.get, serve: config.lightClientDataServe.get,
importMode: config.lightClientDataImportMode.get, importMode: config.lightClientDataImportMode.get,
maxPeriods: config.lightClientDataMaxPeriods, maxPeriods: config.lightClientDataMaxPeriods,
@ -1500,7 +1499,6 @@ proc stop(node: BeaconNode) =
except CatchableError as exc: except CatchableError as exc:
warn "Couldn't stop network", msg = exc.msg warn "Couldn't stop network", msg = exc.msg
node.dag.closeLightClientDataStore()
node.attachedValidators.slashingProtection.close() node.attachedValidators.slashingProtection.close()
node.db.close() node.db.close()
notice "Databases closed" notice "Databases closed"