rm beacon_chain/networking/network_metadata_downloads.nim

This commit is contained in:
Jenkins 2024-03-19 01:40:49 +02:00
parent 7d329f877f
commit 8a605ab9e1
5 changed files with 20 additions and 166 deletions

View File

@ -13,60 +13,41 @@ import
state_transition],
./spec/datatypes/[phase0, altair, bellatrix],
"."/filepath
from ./spec/datatypes/capella import BeaconState
from ./spec/datatypes/deneb import TrustedSignedBeaconBlock
export
phase0, altair, eth2_ssz_serialization, eth2_merkleization, kvstore,
eth2_ssz_serialization, eth2_merkleization, kvstore,
kvstore_sqlite3, deposit_snapshots
type
DbSeq[T] = object
insertStmt: SqliteStmt[openArray[byte], void]
selectStmt: SqliteStmt[int64, openArray[byte]]
recordCount: int64
FinalizedBlocks = object
insertStmt: SqliteStmt[(int64, array[32, byte]), void]
selectStmt: SqliteStmt[int64, array[32, byte]]
selectAllStmt: SqliteStmt[NoParams, (int64, array[32, byte])]
low: Opt[Slot]
high: Opt[Slot]
DepositsSeq = DbSeq[DepositData]
BeaconChainDBV0 = ref object
backend: KvStoreRef # kvstore
stateStore: KvStoreRef # state_no_validators
BeaconChainDB* = ref object
db: SqStoreRef
v0: BeaconChainDBV0
genesisDeposits: DepositsSeq
immutableValidatorsDb: DbSeq[ImmutableValidatorDataDb2]
immutableValidators: seq[ImmutableValidatorData2]
checkpoint: proc() {.gcsafe, raises: [].}
keyValues: KvStoreRef # Random stuff using DbKeyKind - suitable for small values mainly!
blocks: array[ConsensusFork, KvStoreRef] # BlockRoot -> TrustedSignedBeaconBlock
blobs: KvStoreRef # (BlockRoot -> BlobSidecar)
stateRoots: KvStoreRef # (Slot, BlockRoot) -> StateRoot
statesNoVal: array[ConsensusFork, KvStoreRef] # StateRoot -> ForkBeaconStateNoImmutableValidators
stateDiffs: KvStoreRef ##\
summaries: KvStoreRef
finalizedBlocks: FinalizedBlocks
DbKeyKind = enum
kHashToState
kHashToBlock
@ -83,31 +64,23 @@ type
kHashToStateOnlyMutableValidators
kBackfillBlock # Obsolete, was in `unstable` for a while, but never released
kDepositTreeSnapshot # EIP-4881-compatible deposit contract state snapshot
BeaconBlockSummary = object
slot: Slot
parent_root: Eth2Digest
func subkey(kind: DbKeyKind): array[1, byte] =
result[0] = byte ord(kind)
func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]):
array[N + 1, byte] =
result[0] = byte ord(kind)
result[1 .. ^1] = key
func subkey(kind: type phase0.BeaconState, key: Eth2Digest): auto =
subkey(kHashToState, key.data)
func subkey(kind: type phase0.SignedBeaconBlock, key: Eth2Digest): auto =
subkey(kHashToBlock, key.data)
func subkey(kind: type BeaconBlockSummary, key: Eth2Digest): auto =
subkey(kHashToBlockSummary, key.data)
template expectDb(x: auto): untyped =
x.expect("working database (disk broken/full?)")
proc init[T](
Seq: type DbSeq[T], db: SqStoreRef, name: string,
readOnly = false): KvResult[Seq] =
@ -126,60 +99,46 @@ proc init[T](
insertStmt = db.prepareStmt(
"INSERT INTO '" & name & "'(value) VALUES (?);",
openArray[byte], void, managed = false).expect("this is a valid statement")
selectStmt = db.prepareStmt(
"SELECT value FROM '" & name & "' WHERE id = ?;",
int64, openArray[byte], managed = false).expect("this is a valid statement")
countStmt = db.prepareStmt(
"SELECT COUNT(1) FROM '" & name & "';",
NoParams, int64, managed = false).expect("this is a valid statement")
var recordCount = int64 0
let countQueryRes = countStmt.exec do (res: int64):
recordCount = res
let found = ? countQueryRes
if not found:
return err("Cannot count existing items")
countStmt.dispose()
ok(Seq(insertStmt: insertStmt,
selectStmt: selectStmt,
recordCount: recordCount))
else:
ok(Seq())
proc close(s: var DbSeq) =
s.insertStmt.dispose()
s.selectStmt.dispose()
reset(s)
proc add[T](s: var DbSeq[T], val: T) =
doAssert(distinctBase(s.insertStmt) != nil, "database closed or table not preset")
let bytes = SSZ.encode(val)
s.insertStmt.exec(bytes).expectDb()
inc s.recordCount
template len[T](s: DbSeq[T]): int64 =
s.recordCount
proc get[T](s: DbSeq[T], idx: int64): T =
doAssert(distinctBase(s.selectStmt) != nil, $T & " table not present for read at " & $(idx))
let resultAddr = addr result
let queryRes = s.selectStmt.exec(idx + 1) do (recordBytes: openArray[byte]):
try:
resultAddr[] = decode(SSZ, recordBytes, T)
except SerializationError as exc:
raiseAssert "cannot decode " & $T & " at index " & $idx & ": " & exc.msg
let found = queryRes.expectDb()
if not found:
raiseAssert $T & " not found at index " & $(idx)
proc init(T: type FinalizedBlocks, db: SqStoreRef, name: string,
readOnly = false): KvResult[T] =
let hasTable = if db.readOnly or readOnly:
@ -191,45 +150,36 @@ proc init(T: type FinalizedBlocks, db: SqStoreRef, name: string,
value BLOB NOT NULL
);""")
true
if hasTable:
let
insertStmt = db.prepareStmt(
"REPLACE INTO '" & name & "'(id, value) VALUES (?, ?);",
(int64, array[32, byte]), void, managed = false).expect("this is a valid statement")
selectStmt = db.prepareStmt(
"SELECT value FROM '" & name & "' WHERE id = ?;",
int64, array[32, byte], managed = false).expect("this is a valid statement")
selectAllStmt = db.prepareStmt(
"SELECT id, value FROM '" & name & "' ORDER BY id;",
NoParams, (int64, array[32, byte]), managed = false).expect("this is a valid statement")
maxIdStmt = db.prepareStmt(
"SELECT MAX(id) FROM '" & name & "';",
NoParams, Option[int64], managed = false).expect("this is a valid statement")
minIdStmt = db.prepareStmt(
"SELECT MIN(id) FROM '" & name & "';",
NoParams, Option[int64], managed = false).expect("this is a valid statement")
var
low, high: Opt[Slot]
tmp: Option[int64]
for rowRes in minIdStmt.exec(tmp):
expectDb rowRes
if tmp.isSome():
low.ok(Slot(tmp.get()))
for rowRes in maxIdStmt.exec(tmp):
expectDb rowRes
if tmp.isSome():
high.ok(Slot(tmp.get()))
maxIdStmt.dispose()
minIdStmt.dispose()
ok(T(insertStmt: insertStmt,
selectStmt: selectStmt,
selectAllStmt: selectAllStmt,
@ -237,22 +187,18 @@ proc init(T: type FinalizedBlocks, db: SqStoreRef, name: string,
high: high))
else:
ok(T())
proc close(s: var FinalizedBlocks) =
s.insertStmt.dispose()
s.selectStmt.dispose()
s.selectAllStmt.dispose()
reset(s)
proc get(s: FinalizedBlocks, idx: Slot): Opt[Eth2Digest] =
if distinctBase(s.selectStmt) == nil: return Opt.none(Eth2Digest)
var row: s.selectStmt.Result
for rowRes in s.selectStmt.exec(int64(idx), row):
expectDb rowRes
return ok(Eth2Digest(data: row))
return Opt.none(Eth2Digest)
proc loadImmutableValidators(vals: DbSeq[ImmutableValidatorDataDb2]): seq[ImmutableValidatorData2] =
result = newSeqOfCap[ImmutableValidatorData2](vals.len())
for i in 0 ..< vals.len:
@ -260,14 +206,12 @@ proc loadImmutableValidators(vals: DbSeq[ImmutableValidatorDataDb2]): seq[Immuta
result.add ImmutableValidatorData2(
pubkey: tmp.pubkey.loadValid(),
withdrawal_credentials: tmp.withdrawal_credentials)
proc new(T: type BeaconChainDBV0,
db: SqStoreRef,
readOnly = false
): BeaconChainDBV0 =
BeaconChainDBV0(
)
proc new*(T: type BeaconChainDB,
db: SqStoreRef,
cfg: RuntimeConfig = defaultRuntimeConfig
@ -275,13 +219,11 @@ proc new*(T: type BeaconChainDB,
if not db.readOnly:
discard db.exec("DROP TABLE IF EXISTS deposits;")
discard db.exec("DROP TABLE IF EXISTS validatorIndexFromPubKey;")
var
genesisDepositsSeq =
DbSeq[DepositData].init(db, "genesis_deposits").expectDb()
immutableValidatorsDb =
DbSeq[ImmutableValidatorDataDb2].init(db, "immutable_validators2").expectDb()
keyValues = kvStore db.openKvStore("key_values", true).expectDb()
blocks = [
kvStore db.openKvStore("blocks").expectDb(),
@ -290,9 +232,7 @@ proc new*(T: type BeaconChainDB,
kvStore db.openKvStore("capella_blocks").expectDb(),
kvStore db.openKvStore("deneb_blocks").expectDb(),
kvStore db.openKvStore("electra_blocks").expectDb()]
stateRoots = kvStore db.openKvStore("state_roots", true).expectDb()
statesNoVal = [
kvStore db.openKvStore("state_no_validators2").expectDb(),
kvStore db.openKvStore("altair_state_no_validators").expectDb(),
@ -300,19 +240,15 @@ proc new*(T: type BeaconChainDB,
kvStore db.openKvStore("capella_state_no_validator_pubkeys").expectDb(),
kvStore db.openKvStore("deneb_state_no_validator_pubkeys").expectDb(),
kvStore db.openKvStore("electra_state_no_validator_pubkeys").expectDb()]
stateDiffs = kvStore db.openKvStore("state_diffs").expectDb()
summaries = kvStore db.openKvStore("beacon_block_summaries", true).expectDb()
finalizedBlocks = FinalizedBlocks.init(db, "finalized_blocks").expectDb()
var blobs : KvStoreRef
if cfg.DENEB_FORK_EPOCH != FAR_FUTURE_EPOCH:
blobs = kvStore db.openKvStore("deneb_blobs").expectDb()
block:
var immutableValidatorsDb1 = DbSeq[ImmutableValidatorData].init(
db, "immutable_validators", readOnly = true).expectDb()
if immutableValidatorsDb.len() < immutableValidatorsDb1.len():
while immutableValidatorsDb.len() < immutableValidatorsDb1.len():
let val = immutableValidatorsDb1.get(immutableValidatorsDb.len())
@ -321,10 +257,8 @@ proc new*(T: type BeaconChainDB,
withdrawal_credentials: val.withdrawal_credentials
))
immutableValidatorsDb1.close()
if not db.readOnly:
discard db.exec("DROP TABLE IF EXISTS immutable_validators;")
T(
db: db,
v0: BeaconChainDBV0.new(db, readOnly = true),
@ -341,7 +275,6 @@ proc new*(T: type BeaconChainDB,
summaries: summaries,
finalizedBlocks: finalizedBlocks
)
proc new*(T: type BeaconChainDB,
dir: string,
cfg: RuntimeConfig = defaultRuntimeConfig,
@ -355,7 +288,6 @@ proc new*(T: type BeaconChainDB,
else:
if (let res = secureCreatePath(dir); res.isErr):
quit 1
SqStoreRef.init(
dir, "nbc", readOnly = readOnly, manualCheckpoint = true).expectDb()
BeaconChainDB.new(db, cfg)

View File

@ -1,12 +1,3 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/[sequtils, strutils, os],
stew/[byteutils, objects], stew/shims/macros, nimcrypto/hash,
@ -476,3 +467,17 @@ else:
func bakedGenesisValidatorsRoot*(metadata: Eth2NetworkMetadata): Opt[Eth2Digest] =
Opt.none Eth2Digest
import stew/io2
proc fetchGenesisBytes*(
metadata: Eth2NetworkMetadata): seq[byte] =
case metadata.genesis.kind
of NoGenesis:
raiseAssert "fetchGenesisBytes should be called only when metadata.hasGenesis is true"
of BakedIn:
result = @(metadata.genesis.bakedBytes)
of BakedInUrl:
raiseAssert "genesis state downlading unsuppoorted"
of UserSuppliedFile:
result = readAllBytes(metadata.genesis.path).tryGet()

View File

@ -1,82 +0,0 @@
# beacon_chain
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/uri,
stew/io2, chronos, chronos/apps/http/httpclient, snappy,
../spec/[digest, forks], ../spec/datatypes/base
import ./network_metadata
export network_metadata
type
HttpFetchError* = object of CatchableError
status*: int
DigestMismatchError* = object of CatchableError
proc downloadFile(url: Uri): Future[seq[byte]] {.async.} =
var httpSession = HttpSessionRef.new()
let response = await httpSession.fetch(url)
if response[0] == 200:
return response[1]
else:
raise (ref HttpFetchError)(
msg: "Unexpected status code " & $response[0] & " when fetching " & $url,
status: response[0])
proc fetchGenesisBytes*(
metadata: Eth2NetworkMetadata,
genesisStateUrlOverride = none(Uri)): Future[seq[byte]] {.async.} =
case metadata.genesis.kind
of NoGenesis:
raiseAssert "fetchGenesisBytes should be called only when metadata.hasGenesis is true"
of BakedIn:
result = @(metadata.genesis.bakedBytes)
of BakedInUrl:
result = await downloadFile(genesisStateUrlOverride.get(parseUri metadata.genesis.url))
# Under the built-in default URL, we serve a snappy-encoded BeaconState in order
# to reduce the size of the downloaded file with roughly 50% (this precise ratio
# depends on the number of validator recors). The user is still free to provide
# any URL which may serve an uncompressed state (e.g. a Beacon API endpoint)
#
# Since a SSZ-encoded BeaconState will start with a LittleEndian genesis time
# (64 bits) while a snappy framed stream will always start with a fixed header
# that will decoded as a timestamp with the value 5791996851603375871 (year 2153).
#
# TODO: A more complete solution will implement compression on the HTTP level,
# by relying on the Content-Encoding header to determine the compression
# algorithm. The detection method used here will not interfere with such
# an implementation and it may remain useful when dealing with misconfigured
# HTTP servers.
if result.isSnappyFramedStream:
result = decodeFramed(result)
let state = newClone(readSszForkedHashedBeaconState(metadata.cfg, result))
withState(state[]):
if forkyState.root != metadata.genesis.digest:
raise (ref DigestMismatchError)(
msg: "The downloaded genesis state cannot be verified (checksum mismatch)")
of UserSuppliedFile:
result = readAllBytes(metadata.genesis.path).tryGet()
proc sourceDesc*(metadata: GenesisMetadata): string =
case metadata.kind
of NoGenesis:
"no genesis"
of BakedIn:
metadata.networkName
of BakedInUrl:
metadata.url
of UserSuppliedFile:
metadata.path
when isMainModule:
let holeskyMetadata = getMetadataForNetwork("holesky")
io2.writeFile(
"holesky-genesis.ssz",
waitFor holeskyMetadata.fetchGenesisBytes()
).expect("success")

View File

@ -4,7 +4,6 @@ import
std/[os, times],
chronos,
stew/io2,
./networking/network_metadata_downloads,
./spec/datatypes/[altair, bellatrix, phase0],
./spec/deposit_snapshots,
./validators/[keystore_management, beacon_validators],
@ -76,7 +75,7 @@ proc init*(T: type BeaconNode,
quit 1
elif metadata.hasGenesis:
try:
await metadata.fetchGenesisBytes(config.genesisStateUrl)
metadata.fetchGenesisBytes()
except CatchableError as err:
quit 1
else:

View File

@ -49,6 +49,10 @@ switch("passC", "-fvisibility=hidden")
switch("passC", "-fno-omit-frame-pointer")
switch("passL", "-fno-omit-frame-pointer")
when false:
--define:nimStackTraceOverride
switch("import", "libbacktrace")
when false:
switch("passC", "-fstack-protector-all")
switch("passL", "-fstack-protector-all")
@ -61,10 +65,6 @@ when false:
switch("define", "nim_compiler_path=" & currentDir & "env.sh nim")
switch("define", "withoutPCRE")
when true:
--define:nimStackTraceOverride
switch("import", "libbacktrace")
var canEnableDebuggingSymbols = true
if defined(macosx):
# The default open files limit is too low on macOS (512), breaking the