From 2e09011d49aa647f42c4d59d0de4dc2c98258300 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Wed, 30 Nov 2022 04:45:03 +0100 Subject: [PATCH] persist LC sync progress across restarts (#4371) Persist the latest finalized header and sync committee across restarts of `nimbus_light_client` to avoid redoing time-consuming bootstrap step. --- beacon_chain/beacon_chain_db_light_client.nim | 21 +- beacon_chain/conf.nim | 7 +- beacon_chain/conf_light_client.nim | 3 + beacon_chain/db_limits.nim | 19 ++ beacon_chain/light_client.nim | 7 + beacon_chain/light_client_db.nim | 187 ++++++++++++++++++ beacon_chain/nimbus_light_client.nim | 49 ++++- 7 files changed, 269 insertions(+), 24 deletions(-) create mode 100644 beacon_chain/db_limits.nim create mode 100644 beacon_chain/light_client_db.nim diff --git a/beacon_chain/beacon_chain_db_light_client.nim b/beacon_chain/beacon_chain_db_light_client.nim index 4326a4f62..dcf3ed374 100644 --- a/beacon_chain/beacon_chain_db_light_client.nim +++ b/beacon_chain/beacon_chain_db_light_client.nim @@ -16,7 +16,8 @@ import eth/db/kvstore_sqlite3, # Beacon chain internals spec/datatypes/altair, - spec/[eth2_ssz_serialization, helpers] + spec/[eth2_ssz_serialization, helpers], + ./db_limits logScope: topics = "lcdata" @@ -83,12 +84,6 @@ type ## Tracks the finalized sync committee periods for which complete data ## has been imported (from `dag.tail.slot`). -# No `uint64` support in Sqlite -template isSupportedBySQLite(slot: Slot): bool = - slot <= int64.high.Slot -template isSupportedBySQLite(period: SyncCommitteePeriod): bool = - period <= int64.high.SyncCommitteePeriod - proc initCurrentBranchesStore( backend: SqStoreRef, name: string): KvResult[CurrentSyncCommitteeBranchStore] = @@ -152,9 +147,9 @@ proc getCurrentSyncCommitteeBranch*( res.expect("SQL query OK") try: return SSZ.decode(branch, altair.CurrentSyncCommitteeBranch) - except MalformedSszError, SszSizeMismatchError: - error "LC store corrupted", store = "currentBranches", slot, - exc = getCurrentException().name, err = getCurrentExceptionMsg() + except SszError as exc: + error "LC data store corrupted", store = "currentBranches", + slot, exc = exc.msg return default(altair.CurrentSyncCommitteeBranch) func putCurrentSyncCommitteeBranch*( @@ -222,9 +217,9 @@ proc getBestUpdate*( res.expect("SQL query OK") try: return SSZ.decode(update, altair.LightClientUpdate) - except MalformedSszError, SszSizeMismatchError: - error "LC store corrupted", store = "bestUpdates", period, - exc = getCurrentException().name, err = getCurrentExceptionMsg() + except SszError as exc: + error "LC data store corrupted", store = "bestUpdates", + period, exc = exc.msg return default(altair.LightClientUpdate) func putBestUpdate*( diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 3a4a4ef97..e39aba8c1 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -1169,8 +1169,11 @@ func outWalletFile*(config: BeaconNodeConf): Option[OutFile] = else: fail() -func databaseDir*(config: AnyConf): string = - config.dataDir / "db" +func databaseDir*(dataDir: OutDir): string = + dataDir / "db" + +template databaseDir*(config: AnyConf): string = + config.dataDir.databaseDir func runAsService*(config: BeaconNodeConf): bool = config.cmd == noCommand and config.runAsServiceFlag diff --git a/beacon_chain/conf_light_client.nim b/beacon_chain/conf_light_client.nim index 16e569318..4c2d32967 100644 --- a/beacon_chain/conf_light_client.nim +++ b/beacon_chain/conf_light_client.nim @@ -140,6 +140,9 @@ type LightClientConf* = object defaultValue: 0 name: "stop-at-epoch" .}: uint64 +template databaseDir*(config: LightClientConf): string = + config.dataDir.databaseDir + template loadJwtSecret*( rng: var HmacDrbgContext, config: LightClientConf, diff --git a/beacon_chain/db_limits.nim b/beacon_chain/db_limits.nim new file mode 100644 index 000000000..4774c2294 --- /dev/null +++ b/beacon_chain/db_limits.nim @@ -0,0 +1,19 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import spec/datatypes/constants + +# No `uint64` support in Sqlite +template isSupportedBySQLite*(slot: Slot): bool = + slot <= int64.high.Slot +template isSupportedBySQLite*(period: SyncCommitteePeriod): bool = + period <= int64.high.SyncCommitteePeriod diff --git a/beacon_chain/light_client.nim b/beacon_chain/light_client.nim index 5343732f0..65c348fe2 100644 --- a/beacon_chain/light_client.nim +++ b/beacon_chain/light_client.nim @@ -53,6 +53,13 @@ func optimisticHeader*(lightClient: LightClient): Opt[BeaconBlockHeader] = else: err() +func finalizedSyncCommittee*( + lightClient: LightClient): Opt[altair.SyncCommittee] = + if lightClient.store[].isSome: + ok lightClient.store[].get.current_sync_committee + else: + err() + proc createLightClient( network: Eth2Node, rng: ref HmacDrbgContext, diff --git a/beacon_chain/light_client_db.nim b/beacon_chain/light_client_db.nim new file mode 100644 index 000000000..df6cd07ee --- /dev/null +++ b/beacon_chain/light_client_db.nim @@ -0,0 +1,187 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + # Status libraries + chronicles, + eth/db/kvstore_sqlite3, + # Beacon chain internals + spec/datatypes/altair, + spec/[eth2_ssz_serialization, helpers], + ./db_limits + +logScope: topics = "lcdb" + +# `altair_lc_headers` holds the latest `LightClientStore.finalized_header`. +# +# `altair_sync_committees` holds finalized `SyncCommittee` by period, needed to +# continue an interrupted sync process without having to obtain bootstrap info. + +type + LightClientHeaderKind {.pure.} = enum + Finalized = 1 + + LightClientHeadersStore = object + getStmt: SqliteStmt[int64, seq[byte]] + putStmt: SqliteStmt[(int64, seq[byte]), void] + + SyncCommitteeStore = object + getStmt: SqliteStmt[int64, seq[byte]] + putStmt: SqliteStmt[(int64, seq[byte]), void] + keepFromStmt: SqliteStmt[int64, void] + + LightClientDB* = ref object + backend: SqStoreRef + ## SQLite backend + + headers: LightClientHeadersStore + ## LightClientHeaderKind -> BeaconBlockHeader + ## Stores the latest light client headers. + + syncCommittees: SyncCommitteeStore + ## SyncCommitteePeriod -> altair.SyncCommittee + ## Stores finalized `SyncCommittee` by sync committee period. + +func initLightClientHeadersStore( + backend: SqStoreRef, + name: string): KvResult[LightClientHeadersStore] = + ? backend.exec(""" + CREATE TABLE IF NOT EXISTS `""" & name & """` ( + `kind` INTEGER PRIMARY KEY, -- `LightClientHeaderKind` + `header` BLOB -- `BeaconBlockHeader` (SSZ) + ); + """) + + let + getStmt = backend.prepareStmt(""" + SELECT `header` + FROM `""" & name & """` + WHERE `kind` = ?; + """, int64, seq[byte], managed = false).expect("SQL query OK") + putStmt = backend.prepareStmt(""" + REPLACE INTO `""" & name & """` ( + `kind`, `header` + ) VALUES (?, ?); + """, (int64, seq[byte]), void, managed = false).expect("SQL query OK") + + ok LightClientHeadersStore( + getStmt: getStmt, + putStmt: putStmt) + +func close(store: LightClientHeadersStore) = + store.getStmt.dispose() + store.putStmt.dispose() + +proc getLatestFinalizedHeader*(db: LightClientDB): Opt[BeaconBlockHeader] = + var header: seq[byte] + for res in db.headers.getStmt.exec( + LightClientHeaderKind.Finalized.int64, header): + res.expect("SQL query OK") + try: + return ok SSZ.decode(header, BeaconBlockHeader) + except SszError as exc: + error "LC store corrupted", store = "headers", + kind = "Finalized", exc = exc.msg + return err() + +func putLatestFinalizedHeader*( + db: LightClientDB, header: BeaconBlockHeader) = + block: + let res = db.headers.putStmt.exec( + (LightClientHeaderKind.Finalized.int64, SSZ.encode(header))) + res.expect("SQL query OK") + block: + let period = header.slot.sync_committee_period + doAssert period.isSupportedBySQLite + let res = db.syncCommittees.keepFromStmt.exec(period.int64) + res.expect("SQL query OK") + +func initSyncCommitteesStore( + backend: SqStoreRef, + name: string): KvResult[SyncCommitteeStore] = + ? backend.exec(""" + CREATE TABLE IF NOT EXISTS `""" & name & """` ( + `period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod` + `sync_committee` BLOB -- `altair.SyncCommittee` (SSZ) + ); + """) + + let + getStmt = backend.prepareStmt(""" + SELECT `sync_committee` + FROM `""" & name & """` + WHERE `period` = ?; + """, int64, seq[byte], managed = false).expect("SQL query OK") + putStmt = backend.prepareStmt(""" + REPLACE INTO `""" & name & """` ( + `period`, `sync_committee` + ) VALUES (?, ?); + """, (int64, seq[byte]), void, managed = false).expect("SQL query OK") + keepFromStmt = backend.prepareStmt(""" + DELETE FROM `""" & name & """` + WHERE `period` < ?; + """, int64, void, managed = false).expect("SQL query OK") + + ok SyncCommitteeStore( + getStmt: getStmt, + putStmt: putStmt, + keepFromStmt: keepFromStmt) + +func close(store: SyncCommitteeStore) = + store.getStmt.dispose() + store.putStmt.dispose() + store.keepFromStmt.dispose() + +proc getSyncCommittee*( + db: LightClientDB, period: SyncCommitteePeriod): Opt[altair.SyncCommittee] = + doAssert period.isSupportedBySQLite + var syncCommittee: seq[byte] + for res in db.syncCommittees.getStmt.exec(period.int64, syncCommittee): + res.expect("SQL query OK") + try: + return ok SSZ.decode(syncCommittee, altair.SyncCommittee) + except SszError as exc: + error "LC store corrupted", store = "syncCommittees", + period, exc = exc.msg + return err() + +func putSyncCommittee*( + db: LightClientDB, period: SyncCommitteePeriod, + syncCommittee: altair.SyncCommittee) = + doAssert period.isSupportedBySQLite + let res = db.syncCommittees.putStmt.exec( + (period.int64, SSZ.encode(syncCommittee))) + res.expect("SQL query OK") + +type LightClientDBNames* = object + altairHeaders*: string + altairSyncCommittees*: string + +func initLightClientDB*( + backend: SqStoreRef, + names: LightClientDBNames): KvResult[LightClientDB] = + let + headers = + ? backend.initLightClientHeadersStore(names.altairHeaders) + syncCommittees = + ? backend.initSyncCommitteesStore(names.altairSyncCommittees) + + ok LightClientDB( + backend: backend, + headers: headers, + syncCommittees: syncCommittees) + +func close*(db: LightClientDB) = + if db.backend != nil: + db.headers.close() + db.syncCommittees.close() + db[].reset() diff --git a/beacon_chain/nimbus_light_client.nim b/beacon_chain/nimbus_light_client.nim index 72e382030..af5564ceb 100644 --- a/beacon_chain/nimbus_light_client.nim +++ b/beacon_chain/nimbus_light_client.nim @@ -7,14 +7,14 @@ import std/os, - chronicles, chronicles/chronos_tools, chronos, - eth/keys, + chronicles, chronicles/chronos_tools, chronos, stew/io2, + eth/db/kvstore_sqlite3, eth/keys, ./eth1/eth1_monitor, ./gossip_processing/optimistic_processor, ./networking/topic_params, ./spec/beaconstate, ./spec/datatypes/[phase0, altair, bellatrix], - "."/[light_client, nimbus_binary_common, version] + "."/[filepath, light_client, light_client_db, nimbus_binary_common, version] from ./consensus_object_pools/consensus_manager import runForkchoiceUpdated from ./gossip_processing/block_processor import newExecutionPayload @@ -45,6 +45,18 @@ programMain: notice "Launching light client", version = fullVersionStr, cmdParams = commandLineParams(), config + let dbDir = config.databaseDir + if (let res = secureCreatePath(dbDir); res.isErr): + fatal "Failed to create create database directory", + path = dbDir, err = ioErrorMsg(res.error) + quit 1 + let backend = SqStoreRef.init(dbDir, "nlc").expect("Database OK") + defer: backend.close() + let db = backend.initLightClientDB(LightClientDBNames( + altairHeaders: "altair_lc_headers", + altairSyncCommittees: "altair_sync_committees")).expect("Database OK") + defer: db.close() + let metadata = loadEth2Network(config.eth2Network) for node in metadata.bootstrapNodes: config.bootstrapNodes.add node @@ -145,6 +157,12 @@ programMain: info "New LC finalized header", finalized_header = shortLog(finalizedHeader) + let + period = finalizedHeader.slot.sync_committee_period + syncCommittee = lightClient.finalizedSyncCommittee.expect("Bootstrap OK") + db.putSyncCommittee(period, syncCommittee) + db.putLatestFinalizedHeader(finalizedHeader) + proc onOptimisticHeader( lightClient: LightClient, optimisticHeader: BeaconBlockHeader) = info "New LC optimistic header", @@ -155,6 +173,16 @@ programMain: lightClient.onOptimisticHeader = onOptimisticHeader lightClient.trustedBlockRoot = some config.trustedBlockRoot + let latestHeader = db.getLatestFinalizedHeader() + if latestHeader.isOk: + let + period = latestHeader.get.slot.sync_committee_period + syncCommittee = db.getSyncCommittee(period) + if syncCommittee.isErr: + error "LC store lacks sync committee", finalized_header = latestHeader.get + else: + lightClient.resetToFinalizedHeader(latestHeader.get, syncCommittee.get) + # Full blocks gossip is required to portably drive an EL client: # - EL clients may not sync when only driven with `forkChoiceUpdated`, # e.g., Geth: "Forkchoice requested unknown head" @@ -166,11 +194,7 @@ programMain: # Therefore, this current mechanism is to be seen as temporary; it is not # optimized for reducing code duplication, e.g., with `nimbus_beacon_node`. - func shouldSyncOptimistically(wallSlot: Slot): bool = - # Check whether an EL is connected - if eth1Monitor == nil: - return false - + func isSynced(wallSlot: Slot): bool = # Check whether light client is used let optimisticHeader = lightClient.optimisticHeader.valueOr: return false @@ -182,6 +206,13 @@ programMain: true + func shouldSyncOptimistically(wallSlot: Slot): bool = + # Check whether an EL is connected + if eth1Monitor == nil: + return false + + isSynced(wallSlot) + var blocksGossipState: GossipState = {} proc updateBlocksGossipStatus(slot: Slot) = let @@ -243,7 +274,7 @@ programMain: syncStatus = if optimisticHeader.isNone: "bootstrapping(" & $config.trustedBlockRoot & ")" - elif not shouldSyncOptimistically(wallSlot): + elif not isSynced(wallSlot): "syncing" else: "synced"