mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-15 17:15:37 +00:00
0f8a3a5ae8
* checkpoint database at end of each slot To avoid spending time on synchronizing with the file system while doing processing, the manual checkpointing mode turns off fsync during processing and instead checkpoints the database when the slot has ended. From an sqlite perspecitve, in WAL mode this guaranees database consistency but may lead to data loss which is fine - anything missing from the beacon chain database can be recovered on the next startup. * log sync status and delay in slot start message * bump
474 lines
17 KiB
Nim
474 lines
17 KiB
Nim
{.push raises: [Defect].}
|
|
|
|
import
|
|
typetraits, tables,
|
|
stew/[results, objects, endians2, io2],
|
|
serialization, chronicles, snappy,
|
|
eth/db/[kvstore, kvstore_sqlite3],
|
|
./network_metadata,
|
|
./spec/[datatypes, digest, crypto, state_transition],
|
|
./ssz/[ssz_serialization, merkleization],
|
|
merkle_minimal, filepath
|
|
|
|
type
|
|
DbSeq*[T] = object
|
|
insertStmt: SqliteStmt[openArray[byte], void]
|
|
selectStmt: SqliteStmt[int64, openArray[byte]]
|
|
recordCount: int64
|
|
|
|
DbMap*[K, V] = object
|
|
db: SqStoreRef
|
|
keyspace: int
|
|
|
|
DepositsSeq = DbSeq[DepositData]
|
|
|
|
DepositsMerkleizer* = SszMerkleizer[depositContractLimit]
|
|
|
|
DepositContractSnapshot* = object
|
|
eth1Block*: Eth2Digest
|
|
depositContractState*: DepositContractState
|
|
|
|
BeaconChainDB* = ref object
|
|
## Database storing resolved blocks and states - resolved blocks are such
|
|
## blocks that form a chain back to the tail block.
|
|
##
|
|
## We assume that the database backend is working / not corrupt - as such,
|
|
## we will raise a Defect any time there is an issue. This should be
|
|
## revisited in the future, when/if the calling code safely can handle
|
|
## corruption of this kind.
|
|
##
|
|
## We do however make an effort not to crash on invalid data inside the
|
|
## database - this may have a number of "natural" causes such as switching
|
|
## between different versions of the client and accidentally using an old
|
|
## database.
|
|
backend: KvStoreRef
|
|
preset: RuntimePreset
|
|
genesisDeposits*: DepositsSeq
|
|
checkpoint*: proc() {.gcsafe.}
|
|
|
|
Keyspaces* = enum
|
|
defaultKeyspace = "kvstore"
|
|
validatorIndexFromPubKey
|
|
|
|
DbKeyKind = enum
|
|
kHashToState
|
|
kHashToBlock
|
|
kHeadBlock
|
|
## Pointer to the most recent block selected by the fork choice
|
|
kTailBlock
|
|
## Pointer to the earliest finalized block - this is the genesis block when
|
|
## the chain starts, but might advance as the database gets pruned
|
|
## TODO: determine how aggressively the database should be pruned. For a
|
|
## healthy network sync, we probably need to store blocks at least
|
|
## past the weak subjectivity period.
|
|
kBlockSlotStateRoot
|
|
## BlockSlot -> state_root mapping
|
|
kGenesisBlockRoot
|
|
## Immutable reference to the network genesis state
|
|
## (needed for satisfying requests to the beacon node API).
|
|
kEth1PersistedTo
|
|
## (Obsolete) Used to point to the the latest ETH1 block hash which
|
|
## satisfied the follow distance and had its deposits persisted to disk.
|
|
kDepositsFinalizedByEth1
|
|
## A merkleizer checkpoint which can be used for computing the
|
|
## `deposit_root` of all eth1 finalized deposits (i.e. deposits
|
|
## confirmed by ETH1_FOLLOW_DISTANCE blocks). The `deposit_root`
|
|
## is acknowledged and confirmed by the attached web3 provider.
|
|
kDepositsFinalizedByEth2
|
|
## A merkleizer checkpoint used for computing merkle proofs of
|
|
## deposits added to Eth2 blocks (it may lag behind the finalized
|
|
## eth1 deposits checkpoint).
|
|
kHashToBlockSummary
|
|
## Cache of beacon block summaries - during startup when we construct the
|
|
## chain dag, loading full blocks takes a lot of time - the block
|
|
## summary contains a minimal snapshot of what's needed to instanciate
|
|
## the BlockRef tree.
|
|
kSpeculativeDeposits
|
|
## A merkelizer checkpoint created on the basis of deposit events
|
|
## that we were not able to verify against a `deposit_root` served
|
|
## by the web3 provider. This may happen on Geth nodes that serve
|
|
## only recent contract state data (i.e. only recent `deposit_roots`).
|
|
|
|
BeaconBlockSummary* = object
|
|
slot*: Slot
|
|
parent_root*: Eth2Digest
|
|
|
|
const
|
|
# The largest object we're saving is the BeaconState, and by far, the largest
|
|
# part of it is the validator - each validator takes up at least 129 bytes
|
|
# in phase0, which means 100k validators is >12mb - in addition to this,
|
|
# there are several MB of hashes.
|
|
maxDecompressedDbRecordSize = 64*1024*1024
|
|
|
|
# Subkeys essentially create "tables" within the key-value store by prefixing
|
|
# each entry with a table id
|
|
|
|
func subkey(kind: DbKeyKind): array[1, byte] =
|
|
result[0] = byte ord(kind)
|
|
|
|
func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]):
|
|
array[N + 1, byte] =
|
|
result[0] = byte ord(kind)
|
|
result[1 .. ^1] = key
|
|
|
|
func subkey(kind: type BeaconState, key: Eth2Digest): auto =
|
|
subkey(kHashToState, key.data)
|
|
|
|
func subkey(kind: type SignedBeaconBlock, key: Eth2Digest): auto =
|
|
subkey(kHashToBlock, key.data)
|
|
|
|
func subkey(kind: type BeaconBlockSummary, key: Eth2Digest): auto =
|
|
subkey(kHashToBlockSummary, key.data)
|
|
|
|
func subkey(root: Eth2Digest, slot: Slot): array[40, byte] =
|
|
var ret: array[40, byte]
|
|
# big endian to get a naturally ascending order on slots in sorted indices
|
|
ret[0..<8] = toBytesBE(slot.uint64)
|
|
# .. but 7 bytes should be enough for slots - in return, we get a nicely
|
|
# rounded key length
|
|
ret[0] = byte ord(kBlockSlotStateRoot)
|
|
ret[8..<40] = root.data
|
|
|
|
ret
|
|
|
|
template panic =
|
|
# TODO(zah): Could we recover from a corrupted database?
|
|
# Review all usages.
|
|
raiseAssert "The database should not be corrupted"
|
|
|
|
proc init*[T](Seq: type DbSeq[T], db: SqStoreRef, name: string): Seq =
|
|
db.exec("""
|
|
CREATE TABLE IF NOT EXISTS """ & name & """(
|
|
id INTEGER PRIMARY KEY,
|
|
value BLOB
|
|
);
|
|
""").expect "working database"
|
|
|
|
let
|
|
insertStmt = db.prepareStmt(
|
|
"INSERT INTO " & name & "(value) VALUES (?);",
|
|
openArray[byte], void).expect("this is a valid statement")
|
|
|
|
selectStmt = db.prepareStmt(
|
|
"SELECT value FROM " & name & " WHERE id = ?;",
|
|
int64, openArray[byte]).expect("this is a valid statement")
|
|
|
|
countStmt = db.prepareStmt(
|
|
"SELECT COUNT(1) FROM " & name & ";",
|
|
NoParams, int64).expect("this is a valid statement")
|
|
|
|
var recordCount = int64 0
|
|
let countQueryRes = countStmt.exec do (res: int64):
|
|
recordCount = res
|
|
|
|
let found = countQueryRes.expect("working database")
|
|
if not found: panic()
|
|
|
|
Seq(insertStmt: insertStmt,
|
|
selectStmt: selectStmt,
|
|
recordCount: recordCount)
|
|
|
|
proc add*[T](s: var DbSeq[T], val: T) =
|
|
var bytes = SSZ.encode(val)
|
|
s.insertStmt.exec(bytes).expect "working database"
|
|
inc s.recordCount
|
|
|
|
template len*[T](s: DbSeq[T]): uint64 =
|
|
s.recordCount.uint64
|
|
|
|
proc get*[T](s: DbSeq[T], idx: uint64): T =
|
|
# This is used only locally
|
|
let resultAddr = addr result
|
|
|
|
let queryRes = s.selectStmt.exec(int64(idx) + 1) do (recordBytes: openArray[byte]):
|
|
try:
|
|
resultAddr[] = decode(SSZ, recordBytes, T)
|
|
except SerializationError:
|
|
panic()
|
|
|
|
let found = queryRes.expect("working database")
|
|
if not found: panic()
|
|
|
|
proc createMap*(db: SqStoreRef, keyspace: int;
|
|
K, V: distinct type): DbMap[K, V] =
|
|
DbMap[K, V](db: db, keyspace: keyspace)
|
|
|
|
proc insert*[K, V](m: var DbMap[K, V], key: K, value: V) =
|
|
m.db.put(m.keyspace, SSZ.encode key, SSZ.encode value).expect("working database")
|
|
|
|
proc contains*[K, V](m: DbMap[K, V], key: K): bool =
|
|
contains(m.db, SSZ.encode key).expect("working database")
|
|
|
|
template insert*[K, V](t: var Table[K, V], key: K, value: V) =
|
|
add(t, key, value)
|
|
|
|
proc init*(T: type BeaconChainDB,
|
|
preset: RuntimePreset,
|
|
dir: string,
|
|
inMemory = false): BeaconChainDB =
|
|
if inMemory:
|
|
# TODO
|
|
# To support testing, the inMemory store should offer the complete
|
|
# functionalityof the database-backed one (i.e. tracking of deposits
|
|
# and validators)
|
|
T(backend: kvStore MemStoreRef.init(),
|
|
preset: preset)
|
|
else:
|
|
let s = secureCreatePath(dir)
|
|
doAssert s.isOk # TODO(zah) Handle this in a better way
|
|
|
|
let sqliteStore = SqStoreRef.init(
|
|
dir, "nbc", Keyspaces, manualCheckpoint = true).expect("working database")
|
|
|
|
# Remove the deposits table we used before we switched
|
|
# to storing only deposit contract checkpoints
|
|
if sqliteStore.exec("DROP TABLE IF EXISTS deposits;").isErr:
|
|
debug "Failed to drop the deposits table"
|
|
|
|
var
|
|
validatorKeyToIndex = initTable[ValidatorPubKey, ValidatorIndex]()
|
|
genesisDepositsSeq = DbSeq[DepositData].init(sqliteStore, "genesis_deposits")
|
|
|
|
|
|
T(backend: kvStore sqliteStore,
|
|
preset: preset,
|
|
genesisDeposits: genesisDepositsSeq,
|
|
checkpoint: proc() = sqliteStore.checkpoint()
|
|
)
|
|
|
|
proc snappyEncode(inp: openArray[byte]): seq[byte] =
|
|
try:
|
|
snappy.encode(inp)
|
|
except CatchableError as err:
|
|
raiseAssert err.msg
|
|
|
|
proc put(db: BeaconChainDB, key: openArray[byte], v: Eth2Digest) =
|
|
db.backend.put(key, v.data).expect("working database")
|
|
|
|
proc put(db: BeaconChainDB, key: openArray[byte], v: auto) =
|
|
db.backend.put(key, snappyEncode(SSZ.encode(v))).expect("working database")
|
|
|
|
proc get(db: BeaconChainDB, key: openArray[byte], T: type Eth2Digest): Opt[T] =
|
|
var res: Opt[T]
|
|
proc decode(data: openArray[byte]) =
|
|
if data.len == 32:
|
|
res.ok Eth2Digest(data: toArray(32, data))
|
|
else:
|
|
# If the data can't be deserialized, it could be because it's from a
|
|
# version of the software that uses a different SSZ encoding
|
|
warn "Unable to deserialize data, old database?",
|
|
typ = name(T), dataLen = data.len
|
|
discard
|
|
|
|
discard db.backend.get(key, decode).expect("working database")
|
|
|
|
res
|
|
|
|
type GetResult = enum
|
|
found = "Found"
|
|
notFound = "Not found"
|
|
corrupted = "Corrupted"
|
|
|
|
proc get[T](db: BeaconChainDB, key: openArray[byte], output: var T): GetResult =
|
|
var status = GetResult.notFound
|
|
|
|
# TODO address is needed because there's no way to express lifetimes in nim
|
|
# we'll use unsafeAddr to find the code later
|
|
var outputPtr = unsafeAddr output # callback is local, ptr wont escape
|
|
proc decode(data: openArray[byte]) =
|
|
try:
|
|
let decompressed = snappy.decode(data, maxDecompressedDbRecordSize)
|
|
if decompressed.len > 0:
|
|
outputPtr[] = SSZ.decode(decompressed, T, updateRoot = false)
|
|
status = GetResult.found
|
|
else:
|
|
warn "Corrupt snappy record found in database", typ = name(T)
|
|
status = GetResult.corrupted
|
|
except SerializationError as e:
|
|
# If the data can't be deserialized, it could be because it's from a
|
|
# version of the software that uses a different SSZ encoding
|
|
warn "Unable to deserialize data, old database?",
|
|
err = e.msg, typ = name(T), dataLen = data.len
|
|
status = GetResult.corrupted
|
|
|
|
discard db.backend.get(key, decode).expect("working database")
|
|
|
|
status
|
|
|
|
proc close*(db: BeaconChainDB) =
|
|
discard db.backend.close()
|
|
|
|
func toBeaconBlockSummary(v: SomeBeaconBlock): BeaconBlockSummary =
|
|
BeaconBlockSummary(
|
|
slot: v.slot,
|
|
parent_root: v.parent_root,
|
|
)
|
|
|
|
proc putBlock*(db: BeaconChainDB, value: SignedBeaconBlock) =
|
|
db.put(subkey(type value, value.root), value)
|
|
db.put(subkey(BeaconBlockSummary, value.root), value.message.toBeaconBlockSummary())
|
|
proc putBlock*(db: BeaconChainDB, value: TrustedSignedBeaconBlock) =
|
|
db.put(subkey(SignedBeaconBlock, value.root), value)
|
|
db.put(subkey(BeaconBlockSummary, value.root), value.message.toBeaconBlockSummary())
|
|
|
|
proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
|
|
# TODO prune old states - this is less easy than it seems as we never know
|
|
# when or if a particular state will become finalized.
|
|
|
|
db.put(subkey(type value, key), value)
|
|
|
|
proc putState*(db: BeaconChainDB, value: BeaconState) =
|
|
db.putState(hash_tree_root(value), value)
|
|
|
|
proc putStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot,
|
|
value: Eth2Digest) =
|
|
db.put(subkey(root, slot), value)
|
|
|
|
proc delBlock*(db: BeaconChainDB, key: Eth2Digest) =
|
|
db.backend.del(subkey(SignedBeaconBlock, key)).expect("working database")
|
|
db.backend.del(subkey(BeaconBlockSummary, key)).expect("working database")
|
|
|
|
proc delState*(db: BeaconChainDB, key: Eth2Digest) =
|
|
db.backend.del(subkey(BeaconState, key)).expect("working database")
|
|
|
|
proc delStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot) =
|
|
db.backend.del(subkey(root, slot)).expect("working database")
|
|
|
|
proc putHeadBlock*(db: BeaconChainDB, key: Eth2Digest) =
|
|
db.put(subkey(kHeadBlock), key)
|
|
|
|
proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) =
|
|
db.put(subkey(kTailBlock), key)
|
|
|
|
proc putGenesisBlockRoot*(db: BeaconChainDB, key: Eth2Digest) =
|
|
db.put(subkey(kGenesisBlockRoot), key)
|
|
|
|
proc putEth1FinalizedTo*(db: BeaconChainDB,
|
|
eth1Checkpoint: DepositContractSnapshot) =
|
|
db.put(subkey(kDepositsFinalizedByEth1), eth1Checkpoint)
|
|
|
|
proc putEth2FinalizedTo*(db: BeaconChainDB,
|
|
eth1Checkpoint: DepositContractSnapshot) =
|
|
db.put(subkey(kDepositsFinalizedByEth2), eth1Checkpoint)
|
|
|
|
proc putSpeculativeDeposits*(db: BeaconChainDB,
|
|
eth1Checkpoint: DepositContractSnapshot) =
|
|
db.put(subkey(kSpeculativeDeposits), eth1Checkpoint)
|
|
|
|
proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Opt[TrustedSignedBeaconBlock] =
|
|
# We only store blocks that we trust in the database
|
|
result.ok(TrustedSignedBeaconBlock())
|
|
if db.get(subkey(SignedBeaconBlock, key), result.get) != GetResult.found:
|
|
result.err()
|
|
else:
|
|
# set root after deserializing (so it doesn't get zeroed)
|
|
result.get().root = key
|
|
|
|
proc getBlockSummary*(db: BeaconChainDB, key: Eth2Digest): Opt[BeaconBlockSummary] =
|
|
# We only store blocks that we trust in the database
|
|
result.ok(BeaconBlockSummary())
|
|
if db.get(subkey(BeaconBlockSummary, key), result.get) != GetResult.found:
|
|
result.err()
|
|
|
|
proc getState*(
|
|
db: BeaconChainDB, key: Eth2Digest, output: var BeaconState,
|
|
rollback: RollbackProc): bool =
|
|
## Load state into `output` - BeaconState is large so we want to avoid
|
|
## re-allocating it if possible
|
|
## Return `true` iff the entry was found in the database and `output` was
|
|
## overwritten.
|
|
## Rollback will be called only if output was partially written - if it was
|
|
## not found at all, rollback will not be called
|
|
# TODO rollback is needed to deal with bug - use `noRollback` to ignore:
|
|
# https://github.com/nim-lang/Nim/issues/14126
|
|
# TODO RVO is inefficient for large objects:
|
|
# https://github.com/nim-lang/Nim/issues/13879
|
|
case db.get(subkey(BeaconState, key), output)
|
|
of GetResult.found:
|
|
true
|
|
of GetResult.notFound:
|
|
false
|
|
of GetResult.corrupted:
|
|
rollback(output)
|
|
false
|
|
|
|
proc getStateRoot*(db: BeaconChainDB,
|
|
root: Eth2Digest,
|
|
slot: Slot): Opt[Eth2Digest] =
|
|
db.get(subkey(root, slot), Eth2Digest)
|
|
|
|
proc getHeadBlock*(db: BeaconChainDB): Opt[Eth2Digest] =
|
|
db.get(subkey(kHeadBlock), Eth2Digest)
|
|
|
|
proc getTailBlock*(db: BeaconChainDB): Opt[Eth2Digest] =
|
|
db.get(subkey(kTailBlock), Eth2Digest)
|
|
|
|
proc getGenesisBlockRoot*(db: BeaconChainDB): Eth2Digest =
|
|
db.get(subkey(kGenesisBlockRoot), Eth2Digest).expect(
|
|
"The database must be seeded with the genesis state")
|
|
|
|
proc getEth1FinalizedTo*(db: BeaconChainDB): Opt[DepositContractSnapshot] =
|
|
result.ok(DepositContractSnapshot())
|
|
let r = db.get(subkey(kDepositsFinalizedByEth1), result.get)
|
|
if r != found: result.err()
|
|
|
|
proc getEth2FinalizedTo*(db: BeaconChainDB): Opt[DepositContractSnapshot] =
|
|
result.ok(DepositContractSnapshot())
|
|
let r = db.get(subkey(kDepositsFinalizedByEth2), result.get)
|
|
if r != found: result.err()
|
|
|
|
proc getSpeculativeDeposits*(db: BeaconChainDB): Opt[DepositContractSnapshot] =
|
|
result.ok(DepositContractSnapshot())
|
|
let r = db.get(subkey(kSpeculativeDeposits), result.get)
|
|
if r != found: result.err()
|
|
|
|
proc delSpeculativeDeposits*(db: BeaconChainDB) =
|
|
db.backend.del(subkey(kSpeculativeDeposits)).expect("working database")
|
|
|
|
proc containsBlock*(db: BeaconChainDB, key: Eth2Digest): bool =
|
|
db.backend.contains(subkey(SignedBeaconBlock, key)).expect("working database")
|
|
|
|
proc containsState*(db: BeaconChainDB, key: Eth2Digest): bool =
|
|
db.backend.contains(subkey(BeaconState, key)).expect("working database")
|
|
|
|
iterator getAncestors*(db: BeaconChainDB, root: Eth2Digest):
|
|
TrustedSignedBeaconBlock =
|
|
## Load a chain of ancestors for blck - returns a list of blocks with the
|
|
## oldest block last (blck will be at result[0]).
|
|
##
|
|
## The search will go on until the ancestor cannot be found.
|
|
|
|
var
|
|
res: TrustedSignedBeaconBlock
|
|
root = root
|
|
while db.get(subkey(SignedBeaconBlock, root), res) == GetResult.found:
|
|
res.root = root
|
|
yield res
|
|
root = res.message.parent_root
|
|
|
|
iterator getAncestorSummaries*(db: BeaconChainDB, root: Eth2Digest):
|
|
tuple[root: Eth2Digest, summary: BeaconBlockSummary] =
|
|
## Load a chain of ancestors for blck - returns a list of blocks with the
|
|
## oldest block last (blck will be at result[0]).
|
|
##
|
|
## The search will go on until the ancestor cannot be found.
|
|
|
|
var
|
|
res: tuple[root: Eth2Digest, summary: BeaconBlockSummary]
|
|
tmp: TrustedSignedBeaconBlock
|
|
root = root
|
|
|
|
while true:
|
|
if db.get(subkey(BeaconBlockSummary, root), res.summary) == GetResult.found:
|
|
res.root = root
|
|
yield res
|
|
elif db.get(subkey(SignedBeaconBlock, root), tmp) == GetResult.found:
|
|
res.summary = tmp.message.toBeaconBlockSummary()
|
|
db.put(subkey(BeaconBlockSummary, root), res.summary)
|
|
res.root = root
|
|
yield res
|
|
else:
|
|
break
|
|
|
|
root = res.summary.parent_root
|