Rework fluffy beacon lc db to allow storing processed content (#1821)

This commit is contained in:
Kim De Mey 2023-10-18 16:59:44 +02:00 committed by GitHub
parent 04c7ed8ec9
commit 0472b75e23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 225 additions and 86 deletions

View File

@ -174,7 +174,7 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
# Portal works only over mainnet data currently
networkData = loadNetworkData("mainnet")
beaconLightClientDb = LightClientDb.new(
config.dataDir / "db" / "beacon_lc_db")
networkData, config.dataDir / "db" / "beacon_lc_db")
lightClientNetwork = LightClientNetwork.new(
d,
beaconLightClientDb,

View File

@ -79,6 +79,10 @@ type
ForkedLightClientUpdateList* =
List[ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]
func forkDigestAtEpoch*(
forkDigests: ForkDigests, epoch: Epoch, cfg: RuntimeConfig): ForkDigest =
forkDigests.atEpoch(epoch, cfg)
func encode*(contentKey: ContentKey): ByteList =
ByteList.init(SSZ.encode(contentKey))

View File

@ -13,26 +13,33 @@ import
eth/db/kvstore,
eth/db/kvstore_sqlite3,
stint,
stew/[results, byteutils],
stew/results,
ssz_serialization,
beacon_chain/db_limits,
beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
beacon_chain/spec/forks,
beacon_chain/spec/forks_light_client,
./beacon_light_client_content,
./beacon_light_client_init_loader,
../wire/portal_protocol
from beacon_chain/spec/helpers import is_better_update, toMeta
export kvstore_sqlite3
# We only one best optimistic and one best final update
const
bestFinalUpdateKey = toContentId(ByteList.init(toBytes("bestFinal")))
bestOptimisticUpdateKey = toContentId(ByteList.init(toBytes("bestOptimistic")))
type
BestLightClientUpdateStore = ref object
putStmt: SqliteStmt[(int64, seq[byte]), void]
getStmt: SqliteStmt[int64, seq[byte]]
getBulkStmt: SqliteStmt[(int64, int64), seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
delStmt: SqliteStmt[int64, void]
LightClientDb* = ref object
backend: SqStoreRef
kv: KvStoreRef
lcuStore: BestLightClientUpdateStore
bestUpdates: BestLightClientUpdateStore
forkDigests: ForkDigests
cfg: RuntimeConfig
finalityUpdateCache: Opt[LightClientFinalityUpdateCache]
optimisticUpdateCache: Opt[LightClientOptimisticUpdateCache]
@ -51,6 +58,11 @@ template expectDb(x: auto): untyped =
# full disk - this requires manual intervention, so we'll panic for now
x.expect("working database (disk broken/full?)")
template disposeSafe(s: untyped): untyped =
if distinctBase(s) != nil:
s.dispose()
s = typeof(s)(nil)
proc initBestUpdatesStore(
backend: SqStoreRef,
name: string): KvResult[BestLightClientUpdateStore] =
@ -62,60 +74,63 @@ proc initBestUpdatesStore(
""")
let
putStmt = backend.prepareStmt("""
REPLACE INTO `""" & name & """` (
`period`, `update`
) VALUES (?, ?);
""", (int64, seq[byte]), void, managed = false).expect("SQL query OK")
getStmt = backend.prepareStmt("""
SELECT `update`
FROM `""" & name & """`
WHERE `period` = ?;
""", int64, seq[byte], managed = false).expect("SQL query OK")
getBulkStmt = backend.prepareStmt("""
SELECT `update`
FROM `""" & name & """`
WHERE `period` >= ? AND `period` < ?;
""", (int64, int64), seq[byte], managed = false).expect("SQL query OK")
putStmt = backend.prepareStmt("""
REPLACE INTO `""" & name & """` (
`period`, `update`
) VALUES (?, ?);
""", (int64, seq[byte]), void, managed = false).expect("SQL query OK")
delStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `period` = ?;
""", int64, void, managed = false).expect("SQL query OK")
ok BestLightClientUpdateStore(
getStmt: getStmt,
getBulkStmt: getBulkStmt,
putStmt: putStmt,
getBulkStmt: getBulkStmt
delStmt: delStmt
)
func close(store: var BestLightClientUpdateStore) =
store.getStmt.disposeSafe()
store.getBulkStmt.disposeSafe()
store.putStmt.disposeSafe()
store.delStmt.disposeSafe()
proc new*(
T: type LightClientDb, path: string, inMemory = false):
T: type LightClientDb, networkData: NetworkInitData,
path: string, inMemory = false):
LightClientDb =
let db =
if inMemory:
SqStoreRef.init("", "lc-test", inMemory = true).expect(
"working database (out of memory?)")
else:
SqStoreRef.init(path, "lc").expectDb()
let
db =
if inMemory:
SqStoreRef.init("", "lc-test", inMemory = true).expect(
"working database (out of memory?)")
else:
SqStoreRef.init(path, "lc").expectDb()
let kvStore = kvStore db.openKvStore().expectDb()
let lcuStore = initBestUpdatesStore(db, "lcu").expectDb()
kvStore = kvStore db.openKvStore().expectDb()
bestUpdates = initBestUpdatesStore(db, "lcu").expectDb()
LightClientDb(
backend: db,
kv: kvStore,
lcuStore: lcuStore
bestUpdates: bestUpdates,
cfg: networkData.metadata.cfg,
forkDigests: (newClone networkData.forks)[]
)
# TODO Add checks that uint64 can be safely casted to int64
proc getLightClientUpdates(
db: LightClientDb, start: uint64, to: uint64
): ForkedLightClientUpdateBytesList =
var updates: ForkedLightClientUpdateBytesList
var update: seq[byte]
for res in db.lcuStore.getBulkStmt.exec((start.int64, to.int64), update):
res.expect("SQL query OK")
let byteList = List[byte, MAX_LIGHT_CLIENT_UPDATE_SIZE].init(update)
discard updates.add(byteList)
return updates
func putLightClientUpdate(
db: LightClientDb, period: uint64, update: seq[byte]) =
let res = db.lcuStore.putStmt.exec((period.int64, update))
res.expect("SQL query OK")
## Private KvStoreRef Calls
proc get(kv: KvStoreRef, key: openArray[byte]): results.Opt[seq[byte]] =
var res: results.Opt[seq[byte]] = Opt.none(seq[byte])
proc onData(data: openArray[byte]) = res = ok(@data)
@ -131,6 +146,7 @@ proc get(db: LightClientDb, key: openArray[byte]): results.Opt[seq[byte]] =
proc put(db: LightClientDb, key, value: openArray[byte]) =
db.kv.put(key, value).expectDb()
## Public ContentId based ContentDB calls
proc get*(db: LightClientDb, key: ContentId): results.Opt[seq[byte]] =
# TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256.
db.get(key.toBytesBE())
@ -138,6 +154,97 @@ proc get*(db: LightClientDb, key: ContentId): results.Opt[seq[byte]] =
proc put*(db: LightClientDb, key: ContentId, value: openArray[byte]) =
db.put(key.toBytesBE(), value)
# TODO Add checks that uint64 can be safely casted to int64
proc getLightClientUpdates(
db: LightClientDb, start: uint64, to: uint64):
ForkedLightClientUpdateBytesList =
## Get multiple consecutive LightClientUpdates for given periods
var updates: ForkedLightClientUpdateBytesList
var update: seq[byte]
for res in db.bestUpdates.getBulkStmt.exec((start.int64, to.int64), update):
res.expect("SQL query OK")
let byteList = List[byte, MAX_LIGHT_CLIENT_UPDATE_SIZE].init(update)
discard updates.add(byteList)
return updates
proc getBestUpdate*(
db: LightClientDb, period: SyncCommitteePeriod):
Result[ForkedLightClientUpdate, string] =
## Get the best ForkedLightClientUpdate for given period
## Note: Only the best one for a given period is being stored.
doAssert period.isSupportedBySQLite
doAssert distinctBase(db.bestUpdates.getStmt) != nil
var update: seq[byte]
for res in db.bestUpdates.getStmt.exec(period.int64, update):
res.expect("SQL query OK")
return decodeLightClientUpdateForked(db.forkDigests, update)
proc putBootstrap*(
db: LightClientDb,
blockRoot: Digest, bootstrap: ForkedLightClientBootstrap) =
# Put a ForkedLightClientBootstrap in the db.
withForkyBootstrap(bootstrap):
when lcDataFork > LightClientDataFork.None:
let
contentKey = bootstrapContentKey(blockRoot)
contentId = toContentId(contentKey)
forkDigest = forkDigestAtEpoch(
db.forkDigests, epoch(forkyBootstrap.header.beacon.slot), db.cfg)
encodedBootstrap = encodeBootstrapForked(forkDigest, bootstrap)
db.put(contentId, encodedBootstrap)
func putLightClientUpdate*(
db: LightClientDb, period: uint64, update: seq[byte]) =
# Put an encoded ForkedLightClientUpdate in the db.
let res = db.bestUpdates.putStmt.exec((period.int64, update))
res.expect("SQL query OK")
func putBestUpdate*(
db: LightClientDb, period: SyncCommitteePeriod,
update: ForkedLightClientUpdate) =
# Put a ForkedLightClientUpdate in the db.
doAssert not db.backend.readOnly # All `stmt` are non-nil
doAssert period.isSupportedBySQLite
withForkyUpdate(update):
when lcDataFork > LightClientDataFork.None:
let numParticipants = forkyUpdate.sync_aggregate.num_active_participants
if numParticipants < MIN_SYNC_COMMITTEE_PARTICIPANTS:
let res = db.bestUpdates.delStmt.exec(period.int64)
res.expect("SQL query OK")
else:
let
forkDigest = forkDigestAtEpoch(
db.forkDigests, epoch(forkyUpdate.attested_header.beacon.slot),
db.cfg)
encodedUpdate = encodeForkedLightClientObject(update, forkDigest)
res = db.bestUpdates.putStmt.exec((period.int64, encodedUpdate))
res.expect("SQL query OK")
else:
db.bestUpdates.delStmt.exec(period.int64).expect("SQL query OK")
proc putUpdateIfBetter*(
db: LightClientDb,
period: SyncCommitteePeriod, update: ForkedLightClientUpdate) =
let currentUpdate = db.getBestUpdate(period).valueOr:
# No current update for that period so we can just put this one
db.putBestUpdate(period, update)
return
if is_better_update(update, currentUpdate):
db.putBestUpdate(period, update)
proc putUpdateIfBetter*(
db: LightClientDb, period: SyncCommitteePeriod, update: seq[byte]) =
let newUpdate = decodeLightClientUpdateForked(db.forkDigests, update).valueOr:
# TODO:
# Need to go over the usage in offer/accept vs findcontent/content
# and in some (all?) decoding has already been verified.
return
db.putUpdateIfBetter(period, newUpdate)
proc createGetHandler*(db: LightClientDb): DbGetHandler =
return (
proc(contentKey: ByteList, contentId: ContentId): results.Opt[seq[byte]] =
@ -220,7 +327,9 @@ proc createStoreHandler*(db: LightClientDb): DbStoreHandler =
let updates = updatesResult.get()
for update in updates.asSeq():
db.putLightClientUpdate(period, update.asSeq())
# Only put the update if it is better, although in currently a new offer
# should not be accepted as it is based on only the period.
db.putUpdateIfBetter(SyncCommitteePeriod(period), update.asSeq())
inc period
elif ck.contentType == lightClientFinalityUpdate:
db.finalityUpdateCache =

View File

@ -15,7 +15,8 @@ import
beacon_chain/spec/[forks_light_client, digest],
beacon_chain/beacon_clock,
beacon_chain/sync/light_client_sync_helpers,
"."/[beacon_light_client_network, beacon_light_client_content]
"."/[beacon_light_client_network, beacon_light_client_content,
beacon_light_client_db]
from beacon_chain/consensus_object_pools/block_pools_types import VerifierError
@ -221,6 +222,31 @@ proc workerTask[E](
warn "Received invalid value", endpoint = E.name
return didProgress
else:
# TODO:
# This is data coming from either the network or the database.
# Either way it comes in encoded and is passed along till here in its
# decoded format. It only gets stored in the database here as it is
# required to pass validation first ( didprogress == true). Next it
# gets encoded again before dropped in the database. Optimisations
# are possible here if the beacon_light_client_manager and the
# manager are better interfaced with each other.
when E.V is ForkedLightClientBootstrap:
withForkyObject(val):
when lcDataFork > LightClientDataFork.None:
self.network.lightClientDb.putBootstrap(key, val)
else:
notice "Received value from an unviable fork",
endpoint = E.name
elif E.V is ForkedLightClientUpdate:
withForkyObject(val):
when lcDataFork > LightClientDataFork.None:
let period =
forkyObject.attested_header.beacon.slot.sync_committee_period
self.network.lightClientDb.putUpdateIfBetter(period, val)
else:
notice "Received value from an unviable fork",
endpoint = E.name
didProgress = true
else:
debug "Failed to receive value on request", value, endpoint = E.name
@ -323,7 +349,6 @@ proc loop(self: LightClientManager) {.async.} =
didProgress =
case syncTask.kind
of LcSyncKind.UpdatesByRange:
discard
await self.query(UpdatesByRange,
(startPeriod: syncTask.startPeriod, count: syncTask.count))
of LcSyncKind.FinalityUpdate:

View File

@ -43,7 +43,12 @@ proc getContent(
let
contentKeyEncoded = encode(contentKey)
contentId = toContentId(contentKeyEncoded)
contentRes = await n.portalProtocol.contentLookup(
localContent = n.portalProtocol.dbGet(contentKeyEncoded, contentId)
if localContent.isSome():
return localContent
let contentRes = await n.portalProtocol.contentLookup(
contentKeyEncoded, contentId)
if contentRes.isNone():
@ -196,6 +201,12 @@ proc validateContent(
# Later on we need to either provide a list of acceptable bootstraps (not
# really scalable and requires quite some configuration) or find some
# way to proof these.
# They could be proven at moment of creation by checking finality update
# its finalized_header. And verifying the current_sync_committee with the
# header state root and current_sync_committee_branch?
# Perhaps can be expanded to being able to verify back fill by storing
# also the past beacon headers (This is sorta stored in a proof format
# for history network also)
return true
else:
return false

View File

@ -11,6 +11,7 @@ import
beacon_chain/spec/forks,
../../network/wire/[portal_protocol, portal_stream],
../../network/beacon_light_client/[
beacon_light_client_init_loader,
beacon_light_client_network
],
../test_helpers
@ -19,24 +20,15 @@ type LightClientNode* = ref object
discoveryProtocol*: discv5_protocol.Protocol
lightClientNetwork*: LightClientNetwork
const testForkDigests* =
ForkDigests(
phase0: ForkDigest([0'u8, 0, 0, 1]),
altair: ForkDigest([0'u8, 0, 0, 2]),
bellatrix: ForkDigest([0'u8, 0, 0, 3]),
capella: ForkDigest([0'u8, 0, 0, 4]),
deneb: ForkDigest([0'u8, 0, 0, 5])
)
proc newLCNode*(
rng: ref HmacDrbgContext,
port: int,
forkDigests: ForkDigests = testForkDigests): LightClientNode =
networkData: NetworkInitData): LightClientNode =
let
node = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(port))
db = LightClientDb.new("", inMemory = true)
db = LightClientDb.new(networkData, "", inMemory = true)
streamManager = StreamManager.new(node)
network = LightClientNetwork.new(node, db, streamManager, forkDigests)
network = LightClientNetwork.new(node, db, streamManager, networkData.forks)
return LightClientNode(discoveryProtocol: node, lightClientNetwork: network)

View File

@ -38,8 +38,8 @@ procSuite "Portal Beacon Light Client":
optimisticHeaders = newAsyncQueue[ForkedLightClientHeader]()
# Test data is retrieved from mainnet
networkData = loadNetworkData("mainnet")
lcNode1 = newLCNode(rng, 20302, networkData.forks)
lcNode2 = newLCNode(rng, 20303, networkData.forks)
lcNode1 = newLCNode(rng, 20302, networkData)
lcNode2 = newLCNode(rng, 20303, networkData)
altairData = SSZ.decode(bootstrapBytes, altair.LightClientBootstrap)
bootstrap = ForkedLightClientBootstrap(
kind: LightClientDataFork.Altair, altairData: altairData)

View File

@ -183,7 +183,14 @@ suite "Beacon Light Client Content Encodings":
# TODO: These tests are less useful now and should instead be altered to
# use the consensus test vectors to simply test if encoding / decoding works
# fine for the different forks.
let forkDigests = testForkDigests
const forkDigests =
ForkDigests(
phase0: ForkDigest([0'u8, 0, 0, 1]),
altair: ForkDigest([0'u8, 0, 0, 2]),
bellatrix: ForkDigest([0'u8, 0, 0, 3]),
capella: ForkDigest([0'u8, 0, 0, 4]),
deneb: ForkDigest([0'u8, 0, 0, 5])
)
test "LightClientBootstrap":
let

View File

@ -11,7 +11,8 @@ import
beacon_chain/spec/forks,
beacon_chain/spec/datatypes/altair,
../../network/wire/portal_protocol,
../../network/beacon_light_client/beacon_light_client_network,
../../network/beacon_light_client/[beacon_light_client_network,
beacon_light_client_init_loader],
"."/[light_client_test_data, beacon_light_client_test_helpers]
procSuite "Beacon Light Client Content Network":
@ -19,9 +20,10 @@ procSuite "Beacon Light Client Content Network":
asyncTest "Get bootstrap by trusted block hash":
let
lcNode1 = newLCNode(rng, 20302)
lcNode2 = newLCNode(rng, 20303)
forkDigests = testForkDigests
networkData = loadNetworkData("mainnet")
lcNode1 = newLCNode(rng, 20302, networkData)
lcNode2 = newLCNode(rng, 20303, networkData)
forkDigests = (newClone networkData.forks)[]
check:
lcNode1.portalProtocol().addNode(lcNode2.localNode()) == Added
@ -66,9 +68,10 @@ procSuite "Beacon Light Client Content Network":
asyncTest "Get latest optimistic and finality updates":
let
lcNode1 = newLCNode(rng, 20302)
lcNode2 = newLCNode(rng, 20303)
forkDigests = testForkDigests
networkData = loadNetworkData("mainnet")
lcNode1 = newLCNode(rng, 20302, networkData)
lcNode2 = newLCNode(rng, 20303, networkData)
forkDigests = (newClone networkData.forks)[]
check:
lcNode1.portalProtocol().addNode(lcNode2.localNode()) == Added
@ -139,9 +142,10 @@ procSuite "Beacon Light Client Content Network":
asyncTest "Get range of light client updates":
let
lcNode1 = newLCNode(rng, 20302)
lcNode2 = newLCNode(rng, 20303)
forkDigests = testForkDigests
networkData = loadNetworkData("mainnet")
lcNode1 = newLCNode(rng, 20302, networkData)
lcNode2 = newLCNode(rng, 20303, networkData)
forkDigests = (newClone networkData.forks)[]
check:
lcNode1.portalProtocol().addNode(lcNode2.localNode()) == Added

View File

@ -40,11 +40,6 @@ import
const
restRequestsTimeout = 30.seconds
# TODO: Move somewhere common
func forkDigestAtEpoch(
forkDigests: ForkDigests, epoch: Epoch, cfg: RuntimeConfig): ForkDigest =
forkDigests.atEpoch(epoch, cfg)
# TODO: From nimbus_binary_common, but we don't want to import that.
proc sleepAsync(t: TimeDiff): Future[void] =
sleepAsync(nanoseconds(

View File

@ -326,10 +326,6 @@ proc asPortalBlockData*(
(hash, headerWithProof, body)
func forkDigestAtEpoch(
forkDigests: ForkDigests, epoch: Epoch, cfg: RuntimeConfig): ForkDigest =
forkDigests.atEpoch(epoch, cfg)
proc getBlockReceipts(
client: RpcClient, transactions: seq[TypedTransaction], blockHash: Hash256):
Future[Result[seq[Receipt], string]] {.async.} =

View File

@ -44,10 +44,6 @@ proc getBeaconData*(): (
return (metadata.cfg, forkDigests, beaconClock)
func forkDigestAtEpoch(
forkDigests: ForkDigests, epoch: Epoch, cfg: RuntimeConfig): ForkDigest =
forkDigests.atEpoch(epoch, cfg)
proc exportLCBootstrapUpdate*(
restUrl: string, dataDir: string,
trustedBlockRoot: Eth2Digest,