Allow Portal beacon LC sync to start from a stored LC bootstrap (#2715)

Portal beacon LC sync can be started now from a provided trusted
block root or, in case it has been running before, from a
previously verified and stored LC bootstrap.

This required altering the the beacon db on how the bootstraps
are stored.
This commit is contained in:
Kim De Mey 2024-10-09 10:21:00 +02:00 committed by GitHub
parent 11646ad3c4
commit 5edb0b320f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 219 additions and 51 deletions

View File

@ -34,11 +34,19 @@ type
getBulkStmt: SqliteStmt[(int64, int64), seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
delStmt: SqliteStmt[int64, void]
keepFromStmt: SqliteStmt[int64, void]
BootstrapStore = ref object
getStmt: SqliteStmt[array[32, byte], seq[byte]]
getLatestStmt: SqliteStmt[NoParams, seq[byte]]
putStmt: SqliteStmt[(array[32, byte], seq[byte], int64), void]
keepFromStmt: SqliteStmt[int64, void]
BeaconDb* = ref object
backend: SqStoreRef
kv: KvStoreRef
kv: KvStoreRef # TODO: kv only used for summaries at this point
dataRadius*: UInt256
bootstraps: BootstrapStore
bestUpdates: BestLightClientUpdateStore
forkDigests: ForkDigests
cfg*: RuntimeConfig
@ -66,15 +74,14 @@ template disposeSafe(s: untyped): untyped =
s.dispose()
s = typeof(s)(nil)
proc initBestUpdatesStore(
backend: SqStoreRef, name: string
): KvResult[BestLightClientUpdateStore] =
proc initBootstrapStore(backend: SqStoreRef, name: string): KvResult[BootstrapStore] =
?backend.exec(
"""
CREATE TABLE IF NOT EXISTS `""" & name &
"""` (
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
`update` BLOB -- `altair.LightClientUpdate` (SSZ)
`contentId` BLOB PRIMARY KEY, -- `ContentId`
`bootstrap` BLOB, -- `LightClientBootstrap` (SSZ)
`slot` INTEGER UNIQUE -- `Slot`
);
"""
)
@ -83,11 +90,85 @@ proc initBestUpdatesStore(
getStmt = backend
.prepareStmt(
"""
SELECT `update`
FROM `""" & name &
SELECT `bootstrap`
FROM `""" & name &
"""`
WHERE `period` = ?;
""",
WHERE `contentId` = ?;
""",
array[32, byte],
seq[byte],
managed = false,
)
.expect("SQL query OK")
getLatestStmt = backend
.prepareStmt(
"""
SELECT `bootstrap`
FROM `""" & name &
"""`
WHERE `slot` = (SELECT MAX(slot) FROM `""" & name &
"""`);
""",
NoParams,
seq[byte],
managed = false,
)
.expect("SQL query OK")
putStmt = backend
.prepareStmt(
"""
REPLACE INTO `""" & name &
"""` (
`contentId`, `bootstrap`, `slot`
) VALUES (?, ?, ?);
""",
(array[32, byte], seq[byte], int64),
void,
managed = false,
)
.expect("SQL query OK")
keepFromStmt = backend
.prepareStmt(
"""
DELETE FROM `""" & name &
"""`
WHERE `slot` < ?;
""",
int64,
void,
managed = false,
)
.expect("SQL query OK")
ok BootstrapStore(
getStmt: getStmt,
getLatestStmt: getLatestStmt,
putStmt: putStmt,
keepFromStmt: keepFromStmt,
)
proc initBestUpdateStore(
backend: SqStoreRef, name: string
): KvResult[BestLightClientUpdateStore] =
?backend.exec(
"""
CREATE TABLE IF NOT EXISTS `""" & name &
"""` (
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
`update` BLOB -- `LightClientUpdate` (SSZ)
);
"""
)
let
getStmt = backend
.prepareStmt(
"""
SELECT `update`
FROM `""" & name &
"""`
WHERE `period` = ?;
""",
int64,
seq[byte],
managed = false,
@ -96,11 +177,11 @@ proc initBestUpdatesStore(
getBulkStmt = backend
.prepareStmt(
"""
SELECT `update`
FROM `""" & name &
SELECT `update`
FROM `""" & name &
"""`
WHERE `period` >= ? AND `period` < ?;
""",
WHERE `period` >= ? AND `period` < ?;
""",
(int64, int64),
seq[byte],
managed = false,
@ -109,11 +190,11 @@ proc initBestUpdatesStore(
putStmt = backend
.prepareStmt(
"""
REPLACE INTO `""" & name &
REPLACE INTO `""" & name &
"""` (
`period`, `update`
) VALUES (?, ?);
""",
`period`, `update`
) VALUES (?, ?);
""",
(int64, seq[byte]),
void,
managed = false,
@ -122,10 +203,22 @@ proc initBestUpdatesStore(
delStmt = backend
.prepareStmt(
"""
DELETE FROM `""" & name &
DELETE FROM `""" & name &
"""`
WHERE `period` = ?;
""",
WHERE `period` = ?;
""",
int64,
void,
managed = false,
)
.expect("SQL query OK")
keepFromStmt = backend
.prepareStmt(
"""
DELETE FROM `""" & name &
"""`
WHERE `period` < ?;
""",
int64,
void,
managed = false,
@ -133,14 +226,25 @@ proc initBestUpdatesStore(
.expect("SQL query OK")
ok BestLightClientUpdateStore(
getStmt: getStmt, getBulkStmt: getBulkStmt, putStmt: putStmt, delStmt: delStmt
getStmt: getStmt,
getBulkStmt: getBulkStmt,
putStmt: putStmt,
delStmt: delStmt,
keepFromStmt: keepFromStmt,
)
func close*(store: var BestLightClientUpdateStore) =
func close(store: var BestLightClientUpdateStore) =
store.getStmt.disposeSafe()
store.getBulkStmt.disposeSafe()
store.putStmt.disposeSafe()
store.delStmt.disposeSafe()
store.keepFromStmt.disposeSafe()
func close(store: var BootstrapStore) =
store.getStmt.disposeSafe()
store.getLatestStmt.disposeSafe()
store.putStmt.disposeSafe()
store.keepFromStmt.disposeSafe()
proc new*(
T: type BeaconDb, networkData: NetworkInitData, path: string, inMemory = false
@ -155,17 +259,24 @@ proc new*(
SqStoreRef.init(path, "lc").expectDb()
kvStore = kvStore db.openKvStore().expectDb()
bestUpdates = initBestUpdatesStore(db, "lcu").expectDb()
bootstraps = initBootstrapStore(db, "lc_bootstraps").expectDb()
bestUpdates = initBestUpdateStore(db, "lc_best_updates").expectDb()
BeaconDb(
backend: db,
kv: kvStore,
dataRadius: UInt256.high(), # Radius to max to accept all data
bootstraps: bootstraps,
bestUpdates: bestUpdates,
cfg: networkData.metadata.cfg,
forkDigests: (newClone networkData.forks)[],
)
proc close*(db: BeaconDb) =
db.bootstraps.close()
db.bestUpdates.close()
discard db.kv.close()
## Private KvStoreRef Calls
proc get(kv: KvStoreRef, key: openArray[byte]): results.Opt[seq[byte]] =
var res: results.Opt[seq[byte]] = Opt.none(seq[byte])
@ -217,6 +328,42 @@ proc getBestUpdate*(
res.expect("SQL query OK")
return decodeLightClientUpdateForked(db.forkDigests, update)
proc getBootstrap*(db: BeaconDb, contentId: ContentId): Opt[seq[byte]] =
doAssert distinctBase(db.bootstraps.getStmt) != nil
var bootstrap: seq[byte]
for res in db.bootstraps.getStmt.exec(contentId.toBytesBE(), bootstrap):
res.expect("SQL query OK")
return ok(bootstrap)
proc getLatestBootstrap*(db: BeaconDb): Opt[ForkedLightClientBootstrap] =
doAssert distinctBase(db.bootstraps.getLatestStmt) != nil
var bootstrap: seq[byte]
for res in db.bootstraps.getLatestStmt.exec(bootstrap):
res.expect("SQL query OK")
let forkedBootstrap = decodeLightClientBootstrapForked(db.forkDigests, bootstrap).valueOr:
raiseAssert "Stored bootstrap must be valid"
return ok(forkedBootstrap)
proc getLatestBlockRoot*(db: BeaconDb): Opt[Digest] =
let bootstrap = db.getLatestBootstrap()
if bootstrap.isSome():
withForkyBootstrap(bootstrap.value()):
when lcDataFork > LightClientDataFork.None:
Opt.some(hash_tree_root(forkyBootstrap.header.beacon))
else:
raiseAssert "Stored bootstrap must >= Altair"
else:
Opt.none(Digest)
proc putBootstrap*(
db: BeaconDb, contentId: ContentId, bootstrap: seq[byte], slot: Slot
) =
db.bootstraps.putStmt.exec((contentId.toBytesBE(), bootstrap, slot.int64)).expect(
"SQL query OK"
)
proc putBootstrap*(
db: BeaconDb, blockRoot: Digest, bootstrap: ForkedLightClientBootstrap
) =
@ -231,7 +378,7 @@ proc putBootstrap*(
)
encodedBootstrap = encodeBootstrapForked(forkDigest, bootstrap)
db.put(contentId, encodedBootstrap)
db.putBootstrap(contentId, encodedBootstrap, forkyBootstrap.header.beacon.slot)
func putLightClientUpdate*(db: BeaconDb, period: uint64, update: seq[byte]) =
# Put an encoded ForkedLightClientUpdate in the db.
@ -288,6 +435,14 @@ proc getLastFinalityUpdate*(db: BeaconDb): Opt[ForkedLightClientFinalityUpdate]
raiseAssert "Stored finality update must be valid"
)
func keepUpdatesFrom*(db: BeaconDb, minPeriod: SyncCommitteePeriod) =
let res = db.bestUpdates.keepFromStmt.exec(minPeriod.int64)
res.expect("SQL query OK")
func keepBootstrapsFrom*(db: BeaconDb, minSlot: Slot) =
let res = db.bootstraps.keepFromStmt.exec(minSlot.int64)
res.expect("SQL query OK")
proc createGetHandler*(db: BeaconDb): DbGetHandler =
return (
proc(contentKey: ContentKeyByteList, contentId: ContentId): results.Opt[seq[byte]] =
@ -299,7 +454,7 @@ proc createGetHandler*(db: BeaconDb): DbGetHandler =
of unused:
raiseAssert "Should not be used and fail at decoding"
of lightClientBootstrap:
db.get(contentId)
db.getBootstrap(contentId)
of lightClientUpdate:
let
# TODO: add validation that startPeriod is not from the future,
@ -365,7 +520,12 @@ proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
of unused:
raiseAssert "Should not be used and fail at decoding"
of lightClientBootstrap:
db.put(contentId, content)
let bootstrap = decodeLightClientBootstrapForked(db.forkDigests, content).valueOr:
return
withForkyObject(bootstrap):
when lcDataFork > LightClientDataFork.None:
db.putBootstrap(contentId, content, forkyObject.header.beacon.slot)
of lightClientUpdate:
let updates = decodeSsz(content, ForkedLightClientUpdateBytesList).valueOr:
return

View File

@ -34,7 +34,6 @@ type
processor*: ref LightClientProcessor
manager: LightClientManager
onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback
trustedBlockRoot*: Opt[Eth2Digest]
func getFinalizedHeader*(lightClient: LightClient): ForkedLightClientHeader =
withForkyStore(lightClient.store[]):
@ -54,6 +53,9 @@ func getOptimisticHeader*(lightClient: LightClient): ForkedLightClientHeader =
else:
default(ForkedLightClientHeader)
func trustedBlockRoot(lightClient: LightClient): Opt[Eth2Digest] =
lightClient.network.trustedBlockRoot
proc new*(
T: type LightClient,
network: BeaconNetwork,
@ -76,8 +78,9 @@ proc new*(
func getTrustedBlockRoot(): Option[Eth2Digest] =
# TODO: use Opt in LC processor
if lightClient.trustedBlockRoot.isSome():
some(lightClient.trustedBlockRoot.value)
let trustedBlockRootOpt = lightClient.trustedBlockRoot()
if trustedBlockRootOpt.isSome():
some(trustedBlockRootOpt.value)
else:
none(Eth2Digest)
@ -178,7 +181,8 @@ proc new*(
)
proc start*(lightClient: LightClient) =
info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot
info "Starting beacon light client",
trusted_block_root = lightClient.trustedBlockRoot()
lightClient.manager.start()
proc stop*(lightClient: LightClient) {.async: (raises: []).} =

View File

@ -13,7 +13,6 @@ import
chronicles,
eth/p2p/discoveryv5/[protocol, enr],
beacon_chain/spec/forks,
beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
beacon_chain/gossip_processing/light_client_processor,
../wire/[portal_protocol, portal_stream, portal_protocol_config],
"."/[beacon_content, beacon_db, beacon_validation, beacon_chain_historical_summaries]
@ -21,7 +20,7 @@ import
export beacon_content, beacon_db
logScope:
topics = "beacon_network"
topics = "portal_beacon"
type BeaconNetwork* = ref object
portalProtocol*: PortalProtocol
@ -29,7 +28,7 @@ type BeaconNetwork* = ref object
processor*: ref LightClientProcessor
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
forkDigests*: ForkDigests
trustedBlockRoot: Opt[Eth2Digest]
trustedBlockRoot*: Opt[Eth2Digest]
processContentLoop: Future[void]
statusLogLoop: Future[void]
@ -209,12 +208,19 @@ proc new*(
config = portalConfig,
)
let beaconBlockRoot =
# TODO: Need to have some form of weak subjectivity check here.
if trustedBlockRoot.isNone():
beaconDb.getLatestBlockRoot()
else:
trustedBlockRoot
BeaconNetwork(
portalProtocol: portalProtocol,
beaconDb: beaconDb,
contentQueue: contentQueue,
forkDigests: forkDigests,
trustedBlockRoot: trustedBlockRoot,
trustedBlockRoot: beaconBlockRoot,
)
proc lightClientVerifier(
@ -396,5 +402,7 @@ proc stop*(n: BeaconNetwork) {.async: (raises: []).} =
await noCancel(allFutures(futures))
n.beaconDb.close()
n.processContentLoop = nil
n.statusLogLoop = nil

View File

@ -108,7 +108,7 @@ proc new*(
loadAccumulator()
beaconNetwork =
if PortalSubnetwork.beacon in subnetworks and config.trustedBlockRoot.isSome():
if PortalSubnetwork.beacon in subnetworks:
let
beaconDb = BeaconDb.new(networkData, config.dataDir / "db" / "beacon_db")
beaconNetwork = BeaconNetwork.new(
@ -166,7 +166,6 @@ proc new*(
beaconLightClient.onFinalizedHeader = onFinalizedHeader
beaconLightClient.onOptimisticHeader = onOptimisticHeader
beaconLightClient.trustedBlockRoot = config.trustedBlockRoot
# TODO:
# Quite dirty. Use register validate callbacks instead. Or, revisit

View File

@ -18,19 +18,17 @@ type BeaconNode* = ref object
beaconNetwork*: BeaconNetwork
proc newLCNode*(
rng: ref HmacDrbgContext, port: int, networkData: NetworkInitData
rng: ref HmacDrbgContext,
port: int,
networkData: NetworkInitData,
trustedBlockRoot: Opt[Digest] = Opt.none(Digest),
): BeaconNode =
let
node = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(port))
db = BeaconDb.new(networkData, "", inMemory = true)
streamManager = StreamManager.new(node)
network = BeaconNetwork.new(
PortalNetwork.none,
node,
db,
streamManager,
networkData.forks,
Opt.none(Eth2Digest),
PortalNetwork.none, node, db, streamManager, networkData.forks, trustedBlockRoot
)
return BeaconNode(discoveryProtocol: node, beaconNetwork: network)

View File

@ -40,13 +40,13 @@ procSuite "Beacon Light Client":
optimisticHeaders = newAsyncQueue[ForkedLightClientHeader]()
# Test data is retrieved from mainnet
networkData = loadNetworkData("mainnet")
lcNode1 = newLCNode(rng, 20302, networkData)
lcNode2 = newLCNode(rng, 20303, networkData)
altairData = SSZ.decode(bootstrapBytes, altair.LightClientBootstrap)
bootstrap = ForkedLightClientBootstrap(
kind: LightClientDataFork.Altair, altairData: altairData
)
bootstrapHeaderHash = hash_tree_root(altairData.header)
bootstrapBlockRoot = hash_tree_root(altairData.header)
lcNode1 = newLCNode(rng, 20302, networkData, Opt.some(bootstrapBlockRoot))
lcNode2 = newLCNode(rng, 20303, networkData, Opt.some(bootstrapBlockRoot))
check:
lcNode1.portalProtocol().addNode(lcNode2.localNode()) == Added
@ -56,7 +56,7 @@ procSuite "Beacon Light Client":
(await lcNode2.portalProtocol().ping(lcNode1.localNode())).isOk()
let
bootstrapKey = LightClientBootstrapKey(blockHash: bootstrapHeaderHash)
bootstrapKey = LightClientBootstrapKey(blockHash: bootstrapBlockRoot)
bootstrapContentKey = ContentKey(
contentType: lightClientBootstrap, lightClientBootstrapKey: bootstrapKey
)
@ -76,7 +76,6 @@ procSuite "Beacon Light Client":
lc.onFinalizedHeader = headerCallback(finalizedHeaders)
lc.onOptimisticHeader = headerCallback(optimisticHeaders)
lc.trustedBlockRoot = Opt.some bootstrapHeaderHash
# When running start the beacon light client will first try to retrieve the
# bootstrap for given trustedBlockRoot
@ -90,5 +89,5 @@ procSuite "Beacon Light Client":
receivedOptimisticHeader = await optimisticHeaders.get()
check:
hash_tree_root(receivedFinalHeader.altairData) == bootstrapHeaderHash
hash_tree_root(receivedOptimisticHeader.altairData) == bootstrapHeaderHash
hash_tree_root(receivedFinalHeader.altairData) == bootstrapBlockRoot
hash_tree_root(receivedOptimisticHeader.altairData) == bootstrapBlockRoot