persist LC sync progress across restarts (#4371)

Persist the latest finalized header and sync committee across restarts
of `nimbus_light_client` to avoid redoing time-consuming bootstrap step.
This commit is contained in:
Etan Kissling 2022-11-30 04:45:03 +01:00 committed by GitHub
parent c46dc3d7e8
commit 2e09011d49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 269 additions and 24 deletions

View File

@ -16,7 +16,8 @@ import
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers]
spec/[eth2_ssz_serialization, helpers],
./db_limits
logScope: topics = "lcdata"
@ -83,12 +84,6 @@ type
## Tracks the finalized sync committee periods for which complete data
## has been imported (from `dag.tail.slot`).
# No `uint64` support in Sqlite
template isSupportedBySQLite(slot: Slot): bool =
slot <= int64.high.Slot
template isSupportedBySQLite(period: SyncCommitteePeriod): bool =
period <= int64.high.SyncCommitteePeriod
proc initCurrentBranchesStore(
backend: SqStoreRef,
name: string): KvResult[CurrentSyncCommitteeBranchStore] =
@ -152,9 +147,9 @@ proc getCurrentSyncCommitteeBranch*(
res.expect("SQL query OK")
try:
return SSZ.decode(branch, altair.CurrentSyncCommitteeBranch)
except MalformedSszError, SszSizeMismatchError:
error "LC store corrupted", store = "currentBranches", slot,
exc = getCurrentException().name, err = getCurrentExceptionMsg()
except SszError as exc:
error "LC data store corrupted", store = "currentBranches",
slot, exc = exc.msg
return default(altair.CurrentSyncCommitteeBranch)
func putCurrentSyncCommitteeBranch*(
@ -222,9 +217,9 @@ proc getBestUpdate*(
res.expect("SQL query OK")
try:
return SSZ.decode(update, altair.LightClientUpdate)
except MalformedSszError, SszSizeMismatchError:
error "LC store corrupted", store = "bestUpdates", period,
exc = getCurrentException().name, err = getCurrentExceptionMsg()
except SszError as exc:
error "LC data store corrupted", store = "bestUpdates",
period, exc = exc.msg
return default(altair.LightClientUpdate)
func putBestUpdate*(

View File

@ -1169,8 +1169,11 @@ func outWalletFile*(config: BeaconNodeConf): Option[OutFile] =
else:
fail()
func databaseDir*(config: AnyConf): string =
config.dataDir / "db"
func databaseDir*(dataDir: OutDir): string =
dataDir / "db"
template databaseDir*(config: AnyConf): string =
config.dataDir.databaseDir
func runAsService*(config: BeaconNodeConf): bool =
config.cmd == noCommand and config.runAsServiceFlag

View File

@ -140,6 +140,9 @@ type LightClientConf* = object
defaultValue: 0
name: "stop-at-epoch" .}: uint64
template databaseDir*(config: LightClientConf): string =
config.dataDir.databaseDir
template loadJwtSecret*(
rng: var HmacDrbgContext,
config: LightClientConf,

View File

@ -0,0 +1,19 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import spec/datatypes/constants
# No `uint64` support in Sqlite
template isSupportedBySQLite*(slot: Slot): bool =
slot <= int64.high.Slot
template isSupportedBySQLite*(period: SyncCommitteePeriod): bool =
period <= int64.high.SyncCommitteePeriod

View File

@ -53,6 +53,13 @@ func optimisticHeader*(lightClient: LightClient): Opt[BeaconBlockHeader] =
else:
err()
func finalizedSyncCommittee*(
lightClient: LightClient): Opt[altair.SyncCommittee] =
if lightClient.store[].isSome:
ok lightClient.store[].get.current_sync_committee
else:
err()
proc createLightClient(
network: Eth2Node,
rng: ref HmacDrbgContext,

View File

@ -0,0 +1,187 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
# Status libraries
chronicles,
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers],
./db_limits
logScope: topics = "lcdb"
# `altair_lc_headers` holds the latest `LightClientStore.finalized_header`.
#
# `altair_sync_committees` holds finalized `SyncCommittee` by period, needed to
# continue an interrupted sync process without having to obtain bootstrap info.
type
LightClientHeaderKind {.pure.} = enum
Finalized = 1
LightClientHeadersStore = object
getStmt: SqliteStmt[int64, seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
SyncCommitteeStore = object
getStmt: SqliteStmt[int64, seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
keepFromStmt: SqliteStmt[int64, void]
LightClientDB* = ref object
backend: SqStoreRef
## SQLite backend
headers: LightClientHeadersStore
## LightClientHeaderKind -> BeaconBlockHeader
## Stores the latest light client headers.
syncCommittees: SyncCommitteeStore
## SyncCommitteePeriod -> altair.SyncCommittee
## Stores finalized `SyncCommittee` by sync committee period.
func initLightClientHeadersStore(
backend: SqStoreRef,
name: string): KvResult[LightClientHeadersStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `""" & name & """` (
`kind` INTEGER PRIMARY KEY, -- `LightClientHeaderKind`
`header` BLOB -- `BeaconBlockHeader` (SSZ)
);
""")
let
getStmt = backend.prepareStmt("""
SELECT `header`
FROM `""" & name & """`
WHERE `kind` = ?;
""", int64, seq[byte], managed = false).expect("SQL query OK")
putStmt = backend.prepareStmt("""
REPLACE INTO `""" & name & """` (
`kind`, `header`
) VALUES (?, ?);
""", (int64, seq[byte]), void, managed = false).expect("SQL query OK")
ok LightClientHeadersStore(
getStmt: getStmt,
putStmt: putStmt)
func close(store: LightClientHeadersStore) =
store.getStmt.dispose()
store.putStmt.dispose()
proc getLatestFinalizedHeader*(db: LightClientDB): Opt[BeaconBlockHeader] =
var header: seq[byte]
for res in db.headers.getStmt.exec(
LightClientHeaderKind.Finalized.int64, header):
res.expect("SQL query OK")
try:
return ok SSZ.decode(header, BeaconBlockHeader)
except SszError as exc:
error "LC store corrupted", store = "headers",
kind = "Finalized", exc = exc.msg
return err()
func putLatestFinalizedHeader*(
db: LightClientDB, header: BeaconBlockHeader) =
block:
let res = db.headers.putStmt.exec(
(LightClientHeaderKind.Finalized.int64, SSZ.encode(header)))
res.expect("SQL query OK")
block:
let period = header.slot.sync_committee_period
doAssert period.isSupportedBySQLite
let res = db.syncCommittees.keepFromStmt.exec(period.int64)
res.expect("SQL query OK")
func initSyncCommitteesStore(
backend: SqStoreRef,
name: string): KvResult[SyncCommitteeStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `""" & name & """` (
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
`sync_committee` BLOB -- `altair.SyncCommittee` (SSZ)
);
""")
let
getStmt = backend.prepareStmt("""
SELECT `sync_committee`
FROM `""" & name & """`
WHERE `period` = ?;
""", int64, seq[byte], managed = false).expect("SQL query OK")
putStmt = backend.prepareStmt("""
REPLACE INTO `""" & name & """` (
`period`, `sync_committee`
) VALUES (?, ?);
""", (int64, seq[byte]), void, managed = false).expect("SQL query OK")
keepFromStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `period` < ?;
""", int64, void, managed = false).expect("SQL query OK")
ok SyncCommitteeStore(
getStmt: getStmt,
putStmt: putStmt,
keepFromStmt: keepFromStmt)
func close(store: SyncCommitteeStore) =
store.getStmt.dispose()
store.putStmt.dispose()
store.keepFromStmt.dispose()
proc getSyncCommittee*(
db: LightClientDB, period: SyncCommitteePeriod): Opt[altair.SyncCommittee] =
doAssert period.isSupportedBySQLite
var syncCommittee: seq[byte]
for res in db.syncCommittees.getStmt.exec(period.int64, syncCommittee):
res.expect("SQL query OK")
try:
return ok SSZ.decode(syncCommittee, altair.SyncCommittee)
except SszError as exc:
error "LC store corrupted", store = "syncCommittees",
period, exc = exc.msg
return err()
func putSyncCommittee*(
db: LightClientDB, period: SyncCommitteePeriod,
syncCommittee: altair.SyncCommittee) =
doAssert period.isSupportedBySQLite
let res = db.syncCommittees.putStmt.exec(
(period.int64, SSZ.encode(syncCommittee)))
res.expect("SQL query OK")
type LightClientDBNames* = object
altairHeaders*: string
altairSyncCommittees*: string
func initLightClientDB*(
backend: SqStoreRef,
names: LightClientDBNames): KvResult[LightClientDB] =
let
headers =
? backend.initLightClientHeadersStore(names.altairHeaders)
syncCommittees =
? backend.initSyncCommitteesStore(names.altairSyncCommittees)
ok LightClientDB(
backend: backend,
headers: headers,
syncCommittees: syncCommittees)
func close*(db: LightClientDB) =
if db.backend != nil:
db.headers.close()
db.syncCommittees.close()
db[].reset()

View File

@ -7,14 +7,14 @@
import
std/os,
chronicles, chronicles/chronos_tools, chronos,
eth/keys,
chronicles, chronicles/chronos_tools, chronos, stew/io2,
eth/db/kvstore_sqlite3, eth/keys,
./eth1/eth1_monitor,
./gossip_processing/optimistic_processor,
./networking/topic_params,
./spec/beaconstate,
./spec/datatypes/[phase0, altair, bellatrix],
"."/[light_client, nimbus_binary_common, version]
"."/[filepath, light_client, light_client_db, nimbus_binary_common, version]
from ./consensus_object_pools/consensus_manager import runForkchoiceUpdated
from ./gossip_processing/block_processor import newExecutionPayload
@ -45,6 +45,18 @@ programMain:
notice "Launching light client",
version = fullVersionStr, cmdParams = commandLineParams(), config
let dbDir = config.databaseDir
if (let res = secureCreatePath(dbDir); res.isErr):
fatal "Failed to create create database directory",
path = dbDir, err = ioErrorMsg(res.error)
quit 1
let backend = SqStoreRef.init(dbDir, "nlc").expect("Database OK")
defer: backend.close()
let db = backend.initLightClientDB(LightClientDBNames(
altairHeaders: "altair_lc_headers",
altairSyncCommittees: "altair_sync_committees")).expect("Database OK")
defer: db.close()
let metadata = loadEth2Network(config.eth2Network)
for node in metadata.bootstrapNodes:
config.bootstrapNodes.add node
@ -145,6 +157,12 @@ programMain:
info "New LC finalized header",
finalized_header = shortLog(finalizedHeader)
let
period = finalizedHeader.slot.sync_committee_period
syncCommittee = lightClient.finalizedSyncCommittee.expect("Bootstrap OK")
db.putSyncCommittee(period, syncCommittee)
db.putLatestFinalizedHeader(finalizedHeader)
proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: BeaconBlockHeader) =
info "New LC optimistic header",
@ -155,6 +173,16 @@ programMain:
lightClient.onOptimisticHeader = onOptimisticHeader
lightClient.trustedBlockRoot = some config.trustedBlockRoot
let latestHeader = db.getLatestFinalizedHeader()
if latestHeader.isOk:
let
period = latestHeader.get.slot.sync_committee_period
syncCommittee = db.getSyncCommittee(period)
if syncCommittee.isErr:
error "LC store lacks sync committee", finalized_header = latestHeader.get
else:
lightClient.resetToFinalizedHeader(latestHeader.get, syncCommittee.get)
# Full blocks gossip is required to portably drive an EL client:
# - EL clients may not sync when only driven with `forkChoiceUpdated`,
# e.g., Geth: "Forkchoice requested unknown head"
@ -166,11 +194,7 @@ programMain:
# Therefore, this current mechanism is to be seen as temporary; it is not
# optimized for reducing code duplication, e.g., with `nimbus_beacon_node`.
func shouldSyncOptimistically(wallSlot: Slot): bool =
# Check whether an EL is connected
if eth1Monitor == nil:
return false
func isSynced(wallSlot: Slot): bool =
# Check whether light client is used
let optimisticHeader = lightClient.optimisticHeader.valueOr:
return false
@ -182,6 +206,13 @@ programMain:
true
func shouldSyncOptimistically(wallSlot: Slot): bool =
# Check whether an EL is connected
if eth1Monitor == nil:
return false
isSynced(wallSlot)
var blocksGossipState: GossipState = {}
proc updateBlocksGossipStatus(slot: Slot) =
let
@ -243,7 +274,7 @@ programMain:
syncStatus =
if optimisticHeader.isNone:
"bootstrapping(" & $config.trustedBlockRoot & ")"
elif not shouldSyncOptimistically(wallSlot):
elif not isSynced(wallSlot):
"syncing"
else:
"synced"