# beacon_chain # Copyright (c) 2018-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [Defect].} import std/[typetraits, tables], stew/[arrayops, assign2, byteutils, endians2, io2, objects, results], serialization, chronicles, snappy, eth/db/[kvstore, kvstore_sqlite3], ./networking/network_metadata, ./beacon_chain_db_immutable, ./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition], ./spec/datatypes/[phase0, altair, bellatrix], ./filepath export phase0, altair, eth2_ssz_serialization, eth2_merkleization, kvstore, kvstore_sqlite3 logScope: topics = "bc_db" type DbSeq*[T] = object insertStmt: SqliteStmt[openArray[byte], void] selectStmt: SqliteStmt[int64, openArray[byte]] recordCount: int64 FinalizedBlocks* = object # A sparse version of DbSeq - can have holes but not duplicate entries insertStmt: SqliteStmt[(int64, array[32, byte]), void] selectStmt: SqliteStmt[int64, array[32, byte]] selectAllStmt: SqliteStmt[NoParams, (int64, array[32, byte])] low*: Opt[Slot] high*: Opt[Slot] DepositsSeq = DbSeq[DepositData] DepositContractSnapshot* = object eth1Block*: Eth2Digest depositContractState*: DepositContractState BeaconChainDBV0* = ref object ## BeaconChainDBV0 based on old kvstore table that sets the WITHOUT ROWID ## option which becomes unbearably slow with large blobs. It is used as a ## read-only store to support old versions - by freezing it at its current ## data set, downgrading remains possible since it's no longer touched - ## anyone downgrading will have to sync up whatever they missed. ## ## Newer versions read from the new tables first - if the data is not found, ## they turn to the old tables for reading. Writing is done only to the new ## tables. ## ## V0 stored most data in a single table, prefixing each key with a tag ## identifying the type of data. ## ## 1.1 introduced BeaconStateNoImmutableValidators storage where immutable ## validator data is stored in a separate table and only a partial ## BeaconState is written to kvstore ## ## 1.2 moved BeaconStateNoImmutableValidators to a separate table to ## alleviate some of the btree balancing issues - this doubled the speed but ## was still ## ## 1.3 creates `kvstore` with rowid, making it quite fast, but doesn't do ## anything about existing databases. Versions after that use a separate ## file instead (V1) backend: KvStoreRef # kvstore stateStore: KvStoreRef # state_no_validators BeaconChainDB* = ref object ## Database storing resolved blocks and states - resolved blocks are such ## blocks that form a chain back to the tail block. ## ## We assume that the database backend is working / not corrupt - as such, ## we will raise a Defect any time there is an issue. This should be ## revisited in the future, when/if the calling code safely can handle ## corruption of this kind. ## ## The database follows an "mostly-consistent" model where it's possible ## that some data has been lost to crashes and restarts - for example, ## the state root table might contain entries that don't lead to a state ## etc - this makes it easier to defer certain operations such as pruning ## and cleanup, but also means that some amount of "junk" is left behind ## when the application is restarted or crashes in the wrong moment. ## ## Generally, sqlite performs a commit at the end of every write, meaning ## that data write order is respected - the strategy thus becomes to write ## bulk data first, then update pointers like the `head root` entry. db*: SqStoreRef v0: BeaconChainDBV0 genesisDeposits*: DepositsSeq # immutableValidatorsDb only stores the total count; it's a proxy for SQL # queries. (v1.4.0+) immutableValidatorsDb*: DbSeq[ImmutableValidatorDataDb2] immutableValidators*: seq[ImmutableValidatorData2] checkpoint*: proc() {.gcsafe, raises: [Defect].} keyValues: KvStoreRef # Random stuff using DbKeyKind - suitable for small values mainly! blocks: array[BeaconBlockFork, KvStoreRef] # BlockRoot -> TrustedSignedBeaconBlock stateRoots: KvStoreRef # (Slot, BlockRoot) -> StateRoot statesNoVal: array[BeaconStateFork, KvStoreRef] # StateRoot -> ForkBeaconStateNoImmutableValidators stateDiffs: KvStoreRef ##\ ## StateRoot -> BeaconStateDiff ## Instead of storing full BeaconStates, one can store only the diff from ## a different state. As 75% of a typical BeaconState's serialized form's ## the validators, which are mostly immutable and append-only, just using ## a simple append-diff representation helps significantly. Various roots ## are stored in a mod-increment pattern across fixed-sized arrays, which ## addresses most of the rest of the BeaconState sizes. summaries: KvStoreRef ## BlockRoot -> BeaconBlockSummary - permits looking up basic block ## information via block root - contains only summaries that were valid ## at some point in history - it is however possible that entries exist ## that are no longer part of the finalized chain history, thus the ## cache should not be used to answer fork choice questions - see ## `getHeadBlock` and `finalizedBlocks` instead. ## ## May contain entries for blocks that are not stored in the database. ## ## See `finalizedBlocks` for an index in the other direction. finalizedBlocks*: FinalizedBlocks ## Blocks that are known to be finalized, per the latest head (v1.7.0+) ## Only blocks that have passed verification, either via state transition ## or backfilling are indexed here - thus, similar to `head`, it is part ## of the inner security ring and is used to answer security questions ## in the chaindag. ## ## May contain entries for blocks that are not stored in the database. ## ## See `summaries` for an index in the other direction. DbKeyKind = enum kHashToState kHashToBlock kHeadBlock ## Pointer to the most recent block selected by the fork choice kTailBlock ## Pointer to the earliest finalized block - this is the genesis block when ## the chain starts, but might advance as the database gets pruned ## TODO: determine how aggressively the database should be pruned. For a ## healthy network sync, we probably need to store blocks at least ## past the weak subjectivity period. kBlockSlotStateRoot ## BlockSlot -> state_root mapping kGenesisBlock ## Immutable reference to the network genesis state ## (needed for satisfying requests to the beacon node API). kEth1PersistedTo # Obsolete kDepositsFinalizedByEth1 # Obsolete kDepositsFinalizedByEth2 ## A merkleizer checkpoint used for computing merkle proofs of ## deposits added to Eth2 blocks (it may lag behind the finalized ## eth1 deposits checkpoint). kHashToBlockSummary # Block summaries for fast startup kSpeculativeDeposits ## A merkelizer checkpoint created on the basis of deposit events ## that we were not able to verify against a `deposit_root` served ## by the web3 provider. This may happen on Geth nodes that serve ## only recent contract state data (i.e. only recent `deposit_roots`). kHashToStateDiff # Obsolete kHashToStateOnlyMutableValidators kBackfillBlock # Obsolete, was in `unstable` for a while, but never released BeaconBlockSummary* = object ## Cache of beacon block summaries - during startup when we construct the ## chain dag, loading full blocks takes a lot of time - the block ## summary contains a minimal snapshot of what's needed to instanciate ## the BlockRef tree. slot*: Slot parent_root*: Eth2Digest const # The largest object we're saving is the BeaconState, and by far, the largest # part of it is the validator - each validator takes up at least 129 bytes # in phase0, which means 100k validators is >12mb - in addition to this, # there are several MB of hashes. maxDecompressedDbRecordSize = 64*1024*1024 # Subkeys essentially create "tables" within the key-value store by prefixing # each entry with a table id func subkey(kind: DbKeyKind): array[1, byte] = result[0] = byte ord(kind) func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]): array[N + 1, byte] = result[0] = byte ord(kind) result[1 .. ^1] = key func subkey(kind: type phase0.BeaconState, key: Eth2Digest): auto = subkey(kHashToState, key.data) func subkey( kind: type Phase0BeaconStateNoImmutableValidators, key: Eth2Digest): auto = subkey(kHashToStateOnlyMutableValidators, key.data) func subkey(kind: type phase0.SignedBeaconBlock, key: Eth2Digest): auto = subkey(kHashToBlock, key.data) func subkey(kind: type BeaconBlockSummary, key: Eth2Digest): auto = subkey(kHashToBlockSummary, key.data) func subkey(root: Eth2Digest, slot: Slot): array[40, byte] = var ret: array[40, byte] # big endian to get a naturally ascending order on slots in sorted indices ret[0..<8] = toBytesBE(slot.uint64) # .. but 7 bytes should be enough for slots - in return, we get a nicely # rounded key length ret[0] = byte ord(kBlockSlotStateRoot) ret[8..<40] = root.data ret template panic = # TODO(zah): Could we recover from a corrupted database? # Review all usages. raiseAssert "The database should not be corrupted" template expectDb(x: auto): untyped = # There's no meaningful error handling implemented for a corrupt database or # full disk - this requires manual intervention, so we'll panic for now x.expect("working database (disk broken/full?)") proc init*[T](Seq: type DbSeq[T], db: SqStoreRef, name: string): KvResult[Seq] = ? db.exec(""" CREATE TABLE IF NOT EXISTS """ & name & """( id INTEGER PRIMARY KEY, value BLOB ); """) let insertStmt = db.prepareStmt( "INSERT INTO " & name & "(value) VALUES (?);", openArray[byte], void, managed = false).expect("this is a valid statement") selectStmt = db.prepareStmt( "SELECT value FROM " & name & " WHERE id = ?;", int64, openArray[byte], managed = false).expect("this is a valid statement") countStmt = db.prepareStmt( "SELECT COUNT(1) FROM " & name & ";", NoParams, int64, managed = false).expect("this is a valid statement") var recordCount = int64 0 let countQueryRes = countStmt.exec do (res: int64): recordCount = res let found = ? countQueryRes if not found: return err("Cannot count existing items") countStmt.dispose() ok(Seq(insertStmt: insertStmt, selectStmt: selectStmt, recordCount: recordCount)) proc close*(s: DbSeq) = s.insertStmt.dispose() s.selectStmt.dispose() proc add*[T](s: var DbSeq[T], val: T) = var bytes = SSZ.encode(val) s.insertStmt.exec(bytes).expectDb() inc s.recordCount template len*[T](s: DbSeq[T]): int64 = s.recordCount proc get*[T](s: DbSeq[T], idx: int64): T = # This is used only locally let resultAddr = addr result let queryRes = s.selectStmt.exec(idx + 1) do (recordBytes: openArray[byte]): try: resultAddr[] = decode(SSZ, recordBytes, T) except SerializationError: panic() let found = queryRes.expectDb() if not found: panic() proc init*(T: type FinalizedBlocks, db: SqStoreRef, name: string, readOnly = false): KvResult[T] = if not readOnly: ? db.exec(""" CREATE TABLE IF NOT EXISTS """ & name & """( id INTEGER PRIMARY KEY, value BLOB NOT NULL ); """) let insertStmt = db.prepareStmt( "REPLACE INTO " & name & "(id, value) VALUES (?, ?);", (int64, array[32, byte]), void, managed = false).expect("this is a valid statement") selectStmt = db.prepareStmt( "SELECT value FROM " & name & " WHERE id = ?;", int64, array[32, byte], managed = false).expect("this is a valid statement") selectAllStmt = db.prepareStmt( "SELECT id, value FROM " & name & " ORDER BY id;", NoParams, (int64, array[32, byte]), managed = false).expect("this is a valid statement") maxIdStmt = db.prepareStmt( "SELECT MAX(id) FROM " & name & ";", NoParams, Option[int64], managed = false).expect("this is a valid statement") minIdStmt = db.prepareStmt( "SELECT MIN(id) FROM " & name & ";", NoParams, Option[int64], managed = false).expect("this is a valid statement") var low, high: Opt[Slot] tmp: Option[int64] for rowRes in minIdStmt.exec(tmp): expectDb rowRes if tmp.isSome(): low.ok(Slot(tmp.get())) for rowRes in maxIdStmt.exec(tmp): expectDb rowRes if tmp.isSome(): high.ok(Slot(tmp.get())) maxIdStmt.dispose() minIdStmt.dispose() ok(T(insertStmt: insertStmt, selectStmt: selectStmt, selectAllStmt: selectAllStmt, low: low, high: high)) proc close*(s: FinalizedBlocks) = s.insertStmt.dispose() s.selectStmt.dispose() s.selectAllStmt.dispose() proc insert*(s: var FinalizedBlocks, slot: Slot, val: Eth2Digest) = doAssert slot.uint64 < int64.high.uint64, "Only reasonable slots supported" s.insertStmt.exec((slot.int64, val.data)).expectDb() s.low.ok(min(slot, s.low.get(slot))) s.high.ok(max(slot, s.high.get(slot))) proc get*(s: FinalizedBlocks, idx: Slot): Opt[Eth2Digest] = var row: s.selectStmt.Result for rowRes in s.selectStmt.exec(int64(idx), row): expectDb rowRes return ok(Eth2Digest(data: row)) err() iterator pairs*(s: FinalizedBlocks): (Slot, Eth2Digest) = var row: s.selectAllStmt.Result for rowRes in s.selectAllStmt.exec(row): expectDb rowRes yield (Slot(row[0]), Eth2Digest(data: row[1])) proc loadImmutableValidators(vals: DbSeq[ImmutableValidatorDataDb2]): seq[ImmutableValidatorData2] = result = newSeqOfCap[ImmutableValidatorData2](vals.len()) for i in 0 ..< vals.len: let tmp = vals.get(i) result.add ImmutableValidatorData2( pubkey: tmp.pubkey.loadValid(), withdrawal_credentials: tmp.withdrawal_credentials) template withManyWrites*(dbParam: BeaconChainDB, body: untyped) = # We don't enforce strong ordering or atomicity requirements in the beacon # chain db in general, relying instead on readers to be able to deal with # minor inconsistencies - however, putting writes in a transaction is orders # of magnitude faster when doing many small writes, so we use this as an # optimization technique and the templace is named accordingly. let db = dbParam expectDb db.db.exec("BEGIN TRANSACTION;") var commit = false try: body commit = true finally: if commit: expectDb db.db.exec("COMMIT TRANSACTION;") else: expectDb db.db.exec("ROLLBACK TRANSACTION;") proc new*(T: type BeaconChainDB, dir: string, inMemory = false, readOnly = false ): BeaconChainDB = var db = if inMemory: SqStoreRef.init("", "test", readOnly = readOnly, inMemory = true).expect( "working database (out of memory?)") else: let s = secureCreatePath(dir) doAssert s.isOk # TODO(zah) Handle this in a better way SqStoreRef.init( dir, "nbc", readOnly = readOnly, manualCheckpoint = true).expectDb() if not readOnly: # Remove the deposits table we used before we switched # to storing only deposit contract checkpoints if db.exec("DROP TABLE IF EXISTS deposits;").isErr: debug "Failed to drop the deposits table" # An old pubkey->index mapping that hasn't been used on any mainnet release if db.exec("DROP TABLE IF EXISTS validatorIndexFromPubKey;").isErr: debug "Failed to drop the validatorIndexFromPubKey table" var # V0 compatibility tables - these were created WITHOUT ROWID which is slow # for large blobs backend = kvStore db.openKvStore().expectDb() # state_no_validators is similar to state_no_validators2 but uses a # different key encoding and was created WITHOUT ROWID stateStore = kvStore db.openKvStore("state_no_validators").expectDb() genesisDepositsSeq = DbSeq[DepositData].init(db, "genesis_deposits").expectDb() immutableValidatorsDb = DbSeq[ImmutableValidatorDataDb2].init(db, "immutable_validators2").expectDb() # V1 - expected-to-be small rows get without rowid optimizations keyValues = kvStore db.openKvStore("key_values", true).expectDb() blocks = [ kvStore db.openKvStore("blocks").expectDb(), kvStore db.openKvStore("altair_blocks").expectDb(), kvStore db.openKvStore("merge_blocks").expectDb()] stateRoots = kvStore db.openKvStore("state_roots", true).expectDb() statesNoVal = [ kvStore db.openKvStore("state_no_validators2").expectDb(), kvStore db.openKvStore("altair_state_no_validators").expectDb(), kvStore db.openKvStore("merge_state_no_validators").expectDb()] stateDiffs = kvStore db.openKvStore("state_diffs").expectDb() summaries = kvStore db.openKvStore("beacon_block_summaries", true).expectDb() finalizedBlocks = FinalizedBlocks.init(db, "finalized_blocks").expectDb() # Versions prior to 1.4.0 (altair) stored validators in `immutable_validators` # which stores validator keys in compressed format - this is # slow to load and has been superceded by `immutable_validators2` which uses # uncompressed keys instead. We still support upgrading a database from the # old format, but don't need to support downgrading, and therefore safely can # remove the keys let immutableValidatorsDb1 = DbSeq[ImmutableValidatorData].init(db, "immutable_validators").expectDb() if immutableValidatorsDb.len() < immutableValidatorsDb1.len(): notice "Migrating validator keys, this may take a minute", len = immutableValidatorsDb1.len() while immutableValidatorsDb.len() < immutableValidatorsDb1.len(): let val = immutableValidatorsDb1.get(immutableValidatorsDb.len()) immutableValidatorsDb.add(ImmutableValidatorDataDb2( pubkey: val.pubkey.loadValid().toUncompressed(), withdrawal_credentials: val.withdrawal_credentials )) immutableValidatorsDb1.close() # Safe because nobody will be downgrading to pre-altair versions # TODO: drop table maybe? that would require not creating the table just above discard db.exec("DELETE FROM immutable_validators;") T( db: db, v0: BeaconChainDBV0( backend: backend, stateStore: stateStore, ), genesisDeposits: genesisDepositsSeq, immutableValidatorsDb: immutableValidatorsDb, immutableValidators: loadImmutableValidators(immutableValidatorsDb), checkpoint: proc() = db.checkpoint(), keyValues: keyValues, blocks: blocks, stateRoots: stateRoots, statesNoVal: statesNoVal, stateDiffs: stateDiffs, summaries: summaries, finalizedBlocks: finalizedBlocks, ) proc decodeSSZ[T](data: openArray[byte], output: var T): bool = try: readSszBytes(data, output, updateRoot = false) true except SerializationError as e: # If the data can't be deserialized, it could be because it's from a # version of the software that uses a different SSZ encoding warn "Unable to deserialize data, old database?", err = e.msg, typ = name(T), dataLen = data.len false proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool = try: let decompressed = snappy.decode(data, maxDecompressedDbRecordSize) readSszBytes(decompressed, output, updateRoot = false) true except SerializationError as e: # If the data can't be deserialized, it could be because it's from a # version of the software that uses a different SSZ encoding warn "Unable to deserialize data, old database?", err = e.msg, typ = name(T), dataLen = data.len false proc encodeSSZ(v: auto): seq[byte] = try: SSZ.encode(v) except IOError as err: raiseAssert err.msg proc encodeSnappySSZ(v: auto): seq[byte] = try: snappy.encode(SSZ.encode(v)) except CatchableError as err: # In-memory encode shouldn't fail! raiseAssert err.msg proc getRaw(db: KvStoreRef, key: openArray[byte], T: type Eth2Digest): Opt[T] = var res: Opt[T] proc decode(data: openArray[byte]) = if data.len == sizeof(Eth2Digest): res.ok Eth2Digest(data: toArray(sizeof(Eth2Digest), data)) else: # If the data can't be deserialized, it could be because it's from a # version of the software that uses a different SSZ encoding warn "Unable to deserialize data, old database?", typ = name(T), dataLen = data.len discard discard db.get(key, decode).expectDb() res proc putRaw(db: KvStoreRef, key: openArray[byte], v: Eth2Digest) = db.put(key, v.data).expectDb() type GetResult = enum found = "Found" notFound = "Not found" corrupted = "Corrupted" proc getSSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult = var status = GetResult.notFound # TODO address is needed because there's no way to express lifetimes in nim # we'll use unsafeAddr to find the code later var outputPtr = unsafeAddr output # callback is local, ptr wont escape proc decode(data: openArray[byte]) = status = if decodeSSZ(data, outputPtr[]): GetResult.found else: GetResult.corrupted discard db.get(key, decode).expectDb() status proc putSSZ(db: KvStoreRef, key: openArray[byte], v: auto) = db.put(key, encodeSSZ(v)).expectDb() proc getSnappySSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult = var status = GetResult.notFound # TODO address is needed because there's no way to express lifetimes in nim # we'll use unsafeAddr to find the code later var outputPtr = unsafeAddr output # callback is local, ptr wont escape proc decode(data: openArray[byte]) = status = if decodeSnappySSZ(data, outputPtr[]): GetResult.found else: GetResult.corrupted discard db.get(key, decode).expectDb() status proc putSnappySSZ(db: KvStoreRef, key: openArray[byte], v: auto) = db.put(key, encodeSnappySSZ(v)).expectDb() proc close*(db: BeaconChainDBV0) = discard db.stateStore.close() discard db.backend.close() proc close*(db: BeaconChainDB) = if db.db == nil: return # Close things roughly in reverse order db.finalizedBlocks.close() discard db.summaries.close() discard db.stateDiffs.close() for kv in db.statesNoVal: discard kv.close() discard db.stateRoots.close() for kv in db.blocks: discard kv.close() discard db.keyValues.close() db.immutableValidatorsDb.close() db.genesisDeposits.close() db.v0.close() db.db.close() db.db = nil func toBeaconBlockSummary*(v: SomeForkyBeaconBlock): BeaconBlockSummary = BeaconBlockSummary( slot: v.slot, parent_root: v.parent_root, ) proc putBeaconBlockSummary( db: BeaconChainDB, root: Eth2Digest, value: BeaconBlockSummary) = # Summaries are too simple / small to compress, store them as plain SSZ db.summaries.putSSZ(root.data, value) proc putBlock*(db: BeaconChainDB, value: ForkyTrustedSignedBeaconBlock) = db.withManyWrites: db.blocks[type(value).toFork].putSnappySSZ(value.root.data, value) db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary()) proc updateImmutableValidators*( db: BeaconChainDB, validators: openArray[Validator]) = # Must be called before storing a state that references the new validators let numValidators = validators.len while db.immutableValidators.len() < numValidators: let immutableValidator = getImmutableValidatorData(validators[db.immutableValidators.len()]) if not db.db.readOnly: db.immutableValidatorsDb.add ImmutableValidatorDataDb2( pubkey: immutableValidator.pubkey.toUncompressed(), withdrawal_credentials: immutableValidator.withdrawal_credentials) db.immutableValidators.add immutableValidator template toBeaconStateNoImmutableValidators(state: phase0.BeaconState): Phase0BeaconStateNoImmutableValidators = isomorphicCast[Phase0BeaconStateNoImmutableValidators](state) template toBeaconStateNoImmutableValidators(state: altair.BeaconState): AltairBeaconStateNoImmutableValidators = isomorphicCast[AltairBeaconStateNoImmutableValidators](state) template toBeaconStateNoImmutableValidators(state: bellatrix.BeaconState): BellatrixBeaconStateNoImmutableValidators = isomorphicCast[BellatrixBeaconStateNoImmutableValidators](state) proc putState*(db: BeaconChainDB, key: Eth2Digest, value: ForkyBeaconState) = db.updateImmutableValidators(value.validators.asSeq()) db.statesNoVal[type(value).toFork()].putSnappySSZ( key.data, toBeaconStateNoImmutableValidators(value)) proc putState*(db: BeaconChainDB, state: ForkyHashedBeaconState) = db.withManyWrites: db.putStateRoot(state.latest_block_root, state.data.slot, state.root) db.putState(state.root, state.data) # For testing rollback proc putCorruptState*( db: BeaconChainDB, fork: static BeaconStateFork, key: Eth2Digest) = db.statesNoVal[fork].putSnappySSZ(key.data, Validator()) func stateRootKey(root: Eth2Digest, slot: Slot): array[40, byte] = var ret: array[40, byte] # big endian to get a naturally ascending order on slots in sorted indices ret[0..<8] = toBytesBE(slot.uint64) ret[8..<40] = root.data ret proc putStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot, value: Eth2Digest) = db.stateRoots.putRaw(stateRootKey(root, slot), value) proc putStateDiff*(db: BeaconChainDB, root: Eth2Digest, value: BeaconStateDiff) = db.stateDiffs.putSnappySSZ(root.data, value) proc delBlock*(db: BeaconChainDB, key: Eth2Digest) = db.withManyWrites: for kv in db.blocks: kv.del(key.data).expectDb() db.summaries.del(key.data).expectDb() proc delState*(db: BeaconChainDB, key: Eth2Digest) = db.withManyWrites: for kv in db.statesNoVal: kv.del(key.data).expectDb() proc delStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot) = db.stateRoots.del(stateRootKey(root, slot)).expectDb() proc delStateDiff*(db: BeaconChainDB, root: Eth2Digest) = db.stateDiffs.del(root.data).expectDb() proc putHeadBlock*(db: BeaconChainDB, key: Eth2Digest) = db.keyValues.putRaw(subkey(kHeadBlock), key) proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) = db.keyValues.putRaw(subkey(kTailBlock), key) proc putGenesisBlock*(db: BeaconChainDB, key: Eth2Digest) = db.keyValues.putRaw(subkey(kGenesisBlock), key) proc putEth2FinalizedTo*(db: BeaconChainDB, eth1Checkpoint: DepositContractSnapshot) = db.keyValues.putSnappySSZ(subkey(kDepositsFinalizedByEth2), eth1Checkpoint) proc getPhase0Block( db: BeaconChainDBV0, key: Eth2Digest): Opt[phase0.TrustedSignedBeaconBlock] = # We only store blocks that we trust in the database result.ok(default(phase0.TrustedSignedBeaconBlock)) if db.backend.getSnappySSZ( subkey(phase0.SignedBeaconBlock, key), result.get) != GetResult.found: result.err() else: # set root after deserializing (so it doesn't get zeroed) result.get().root = key proc getBlock*( db: BeaconChainDB, key: Eth2Digest, T: type phase0.TrustedSignedBeaconBlock): Opt[T] = # We only store blocks that we trust in the database result.ok(default(T)) if db.blocks[T.toFork].getSnappySSZ(key.data, result.get) != GetResult.found: # During the initial releases phase0, we stored blocks in a different table result = db.v0.getPhase0Block(key) else: # set root after deserializing (so it doesn't get zeroed) result.get().root = key proc getBlock*[ X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock]( db: BeaconChainDB, key: Eth2Digest, T: type X): Opt[T] = # We only store blocks that we trust in the database result.ok(default(T)) if db.blocks[T.toFork].getSnappySSZ(key.data, result.get) == GetResult.found: # set root after deserializing (so it doesn't get zeroed) result.get().root = key else: result.err() proc getPhase0BlockSSZ(db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool = let dataPtr = unsafeAddr data # Short-lived var success = true proc decode(data: openArray[byte]) = try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize) except CatchableError: success = false db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and success # SSZ implementations are separate so as to avoid unnecessary data copies proc getBlockSSZ*( db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], T: type phase0.TrustedSignedBeaconBlock): bool = let dataPtr = unsafeAddr data # Short-lived var success = true proc decode(data: openArray[byte]) = try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize) except CatchableError: success = false db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or db.v0.getPhase0BlockSSZ(key, data) proc getBlockSSZ*[ X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock]( db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], T: type X): bool = let dataPtr = unsafeAddr data # Short-lived var success = true proc decode(data: openArray[byte]) = try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize) except CatchableError: success = false db.blocks[T.toFork].get(key.data, decode).expectDb() and success proc getBlockSSZ*( db: BeaconChainDB, key: Eth2Digest, data: var seq[byte], fork: BeaconBlockFork): bool = case fork of BeaconBlockFork.Phase0: getBlockSSZ(db, key, data, phase0.TrustedSignedBeaconBlock) of BeaconBlockFork.Altair: getBlockSSZ(db, key, data, altair.TrustedSignedBeaconBlock) of BeaconBlockFork.Bellatrix: getBlockSSZ(db, key, data, bellatrix.TrustedSignedBeaconBlock) proc getStateOnlyMutableValidators( immutableValidators: openArray[ImmutableValidatorData2], store: KvStoreRef, key: openArray[byte], output: var ForkyBeaconState, rollback: RollbackProc): bool = ## Load state into `output` - BeaconState is large so we want to avoid ## re-allocating it if possible ## Return `true` iff the entry was found in the database and `output` was ## overwritten. ## Rollback will be called only if output was partially written - if it was ## not found at all, rollback will not be called # TODO rollback is needed to deal with bug - use `noRollback` to ignore: # https://github.com/nim-lang/Nim/issues/14126 # TODO RVO is inefficient for large objects: # https://github.com/nim-lang/Nim/issues/13879 case store.getSnappySSZ(key, toBeaconStateNoImmutableValidators(output)) of GetResult.found: let numValidators = output.validators.len doAssert immutableValidators.len >= numValidators for i in 0 ..< numValidators: let # Bypass hash cache invalidation dstValidator = addr output.validators.data[i] assign( dstValidator.pubkey, immutableValidators[i].pubkey.toPubKey()) assign( dstValidator.withdrawal_credentials, immutableValidators[i].withdrawal_credentials) output.validators.resetCache() true of GetResult.notFound: false of GetResult.corrupted: rollback() false proc getState( db: BeaconChainDBV0, immutableValidators: openArray[ImmutableValidatorData2], key: Eth2Digest, output: var phase0.BeaconState, rollback: RollbackProc): bool = # Nimbus 1.0 reads and writes writes genesis BeaconState to `backend` # Nimbus 1.1 writes a genesis BeaconStateNoImmutableValidators to `backend` and # reads both BeaconState and BeaconStateNoImmutableValidators from `backend` # Nimbus 1.2 writes a genesis BeaconStateNoImmutableValidators to `stateStore` # and reads BeaconState from `backend` and BeaconStateNoImmutableValidators # from `stateStore`. We will try to read the state from all these locations. if getStateOnlyMutableValidators( immutableValidators, db.stateStore, subkey(Phase0BeaconStateNoImmutableValidators, key), output, rollback): return true if getStateOnlyMutableValidators( immutableValidators, db.backend, subkey(Phase0BeaconStateNoImmutableValidators, key), output, rollback): return true case db.backend.getSnappySSZ(subkey(phase0.BeaconState, key), output) of GetResult.found: true of GetResult.notFound: false of GetResult.corrupted: rollback() false proc getState*( db: BeaconChainDB, key: Eth2Digest, output: var phase0.BeaconState, rollback: RollbackProc): bool = ## Load state into `output` - BeaconState is large so we want to avoid ## re-allocating it if possible ## Return `true` iff the entry was found in the database and `output` was ## overwritten. ## Rollback will be called only if output was partially written - if it was ## not found at all, rollback will not be called # TODO rollback is needed to deal with bug - use `noRollback` to ignore: # https://github.com/nim-lang/Nim/issues/14126 # TODO RVO is inefficient for large objects: # https://github.com/nim-lang/Nim/issues/13879 type T = type(output) if not getStateOnlyMutableValidators( db.immutableValidators, db.statesNoVal[T.toFork], key.data, output, rollback): db.v0.getState(db.immutableValidators, key, output, rollback) else: true proc getState*( db: BeaconChainDB, key: Eth2Digest, output: var (altair.BeaconState | bellatrix.BeaconState), rollback: RollbackProc): bool = ## Load state into `output` - BeaconState is large so we want to avoid ## re-allocating it if possible ## Return `true` iff the entry was found in the database and `output` was ## overwritten. ## Rollback will be called only if output was partially written - if it was ## not found at all, rollback will not be called # TODO rollback is needed to deal with bug - use `noRollback` to ignore: # https://github.com/nim-lang/Nim/issues/14126 # TODO RVO is inefficient for large objects: # https://github.com/nim-lang/Nim/issues/13879 type T = type(output) getStateOnlyMutableValidators( db.immutableValidators, db.statesNoVal[T.toFork], key.data, output, rollback) proc getStateRoot(db: BeaconChainDBV0, root: Eth2Digest, slot: Slot): Opt[Eth2Digest] = db.backend.getRaw(subkey(root, slot), Eth2Digest) proc getStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot): Opt[Eth2Digest] = db.stateRoots.getRaw(stateRootKey(root, slot), Eth2Digest) or db.v0.getStateRoot(root, slot) proc getStateDiff*(db: BeaconChainDB, root: Eth2Digest): Opt[BeaconStateDiff] = result.ok(BeaconStateDiff()) if db.stateDiffs.getSnappySSZ(root.data, result.get) != GetResult.found: result.err proc getHeadBlock(db: BeaconChainDBV0): Opt[Eth2Digest] = db.backend.getRaw(subkey(kHeadBlock), Eth2Digest) proc getHeadBlock*(db: BeaconChainDB): Opt[Eth2Digest] = db.keyValues.getRaw(subkey(kHeadBlock), Eth2Digest) or db.v0.getHeadBlock() proc getTailBlock(db: BeaconChainDBV0): Opt[Eth2Digest] = db.backend.getRaw(subkey(kTailBlock), Eth2Digest) proc getTailBlock*(db: BeaconChainDB): Opt[Eth2Digest] = db.keyValues.getRaw(subkey(kTailBlock), Eth2Digest) or db.v0.getTailBlock() proc getGenesisBlock(db: BeaconChainDBV0): Opt[Eth2Digest] = db.backend.getRaw(subkey(kGenesisBlock), Eth2Digest) proc getGenesisBlock*(db: BeaconChainDB): Opt[Eth2Digest] = db.keyValues.getRaw(subkey(kGenesisBlock), Eth2Digest) or db.v0.getGenesisBlock() proc getEth2FinalizedTo(db: BeaconChainDBV0): Opt[DepositContractSnapshot] = result.ok(DepositContractSnapshot()) let r = db.backend.getSnappySSZ(subkey(kDepositsFinalizedByEth2), result.get) if r != found: result.err() proc getEth2FinalizedTo*(db: BeaconChainDB): Opt[DepositContractSnapshot] = result.ok(DepositContractSnapshot()) let r = db.keyValues.getSnappySSZ(subkey(kDepositsFinalizedByEth2), result.get) if r != found: return db.v0.getEth2FinalizedTo() proc containsBlock*(db: BeaconChainDBV0, key: Eth2Digest): bool = db.backend.contains(subkey(phase0.SignedBeaconBlock, key)).expectDb() proc containsBlock*( db: BeaconChainDB, key: Eth2Digest, T: type phase0.TrustedSignedBeaconBlock): bool = db.blocks[T.toFork].contains(key.data).expectDb() or db.v0.containsBlock(key) proc containsBlock*[ X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock]( db: BeaconChainDB, key: Eth2Digest, T: type X): bool = db.blocks[X.toFork].contains(key.data).expectDb() proc containsBlock*(db: BeaconChainDB, key: Eth2Digest, fork: BeaconBlockFork): bool = case fork of BeaconBlockFork.Phase0: containsBlock(db, key, phase0.TrustedSignedBeaconBlock) else: db.blocks[fork].contains(key.data).expectDb() proc containsBlock*(db: BeaconChainDB, key: Eth2Digest): bool = db.containsBlock(key, bellatrix.TrustedSignedBeaconBlock) or db.containsBlock(key, altair.TrustedSignedBeaconBlock) or db.containsBlock(key, phase0.TrustedSignedBeaconBlock) proc containsState*(db: BeaconChainDBV0, key: Eth2Digest): bool = let sk = subkey(Phase0BeaconStateNoImmutableValidators, key) db.stateStore.contains(sk).expectDb() or db.backend.contains(sk).expectDb() or db.backend.contains(subkey(phase0.BeaconState, key)).expectDb() proc containsState*(db: BeaconChainDB, key: Eth2Digest, legacy: bool = true): bool = db.statesNoVal[BeaconStateFork.Bellatrix].contains(key.data).expectDb or db.statesNoVal[BeaconStateFork.Altair].contains(key.data).expectDb or db.statesNoVal[BeaconStateFork.Phase0].contains(key.data).expectDb or (legacy and db.v0.containsState(key)) proc getBeaconBlockSummary*(db: BeaconChainDB, root: Eth2Digest): Opt[BeaconBlockSummary] = var summary: BeaconBlockSummary if db.summaries.getSSZ(root.data, summary) == GetResult.found: ok(summary) else: err() proc loadStateRoots*(db: BeaconChainDB): Table[(Slot, Eth2Digest), Eth2Digest] = ## Load all known state roots - just because we have a state root doesn't ## mean we also have a state (and vice versa)! var state_roots = initTable[(Slot, Eth2Digest), Eth2Digest](1024) discard db.state_roots.find([], proc(k, v: openArray[byte]) = if k.len() == 40 and v.len() == 32: # For legacy reasons, the first byte of the slot is not part of the slot # but rather a subkey identifier - see subkey var tmp = toArray(8, k.toOpenArray(0, 7)) tmp[0] = 0 state_roots[ (Slot(uint64.fromBytesBE(tmp)), Eth2Digest(data: toArray(sizeof(Eth2Digest), k.toOpenArray(8, 39))))] = Eth2Digest(data: toArray(sizeof(Eth2Digest), v)) else: warn "Invalid state root in database", klen = k.len(), vlen = v.len() ) state_roots proc loadSummaries*(db: BeaconChainDB): Table[Eth2Digest, BeaconBlockSummary] = # Load summaries into table - there's no telling what order they're in so we # load them all - bugs in nim prevent this code from living in the iterator. var summaries = initTable[Eth2Digest, BeaconBlockSummary](1024*1024) discard db.summaries.find([], proc(k, v: openArray[byte]) = var output: BeaconBlockSummary if k.len() == sizeof(Eth2Digest) and decodeSSZ(v, output): summaries[Eth2Digest(data: toArray(sizeof(Eth2Digest), k))] = output else: warn "Invalid summary in database", klen = k.len(), vlen = v.len() ) summaries type RootedSummary = tuple[root: Eth2Digest, summary: BeaconBlockSummary] iterator getAncestorSummaries*(db: BeaconChainDB, root: Eth2Digest): RootedSummary = ## Load a chain of ancestors for blck - iterates over the block starting from ## root and moving parent by parent ## ## The search will go on until an ancestor cannot be found. var res: RootedSummary newSummaries: seq[RootedSummary] res.root = root # Yield summaries in reverse chain order by walking the parent references. # If a summary is missing, try loading it from the older version or create one # from block data. const summariesQuery = """ WITH RECURSIVE next(v) as ( SELECT value FROM beacon_block_summaries WHERE `key` == ? UNION ALL SELECT value FROM beacon_block_summaries INNER JOIN next ON `key` == substr(v, 9, 32) ) SELECT v FROM next; """ let stmt = expectDb db.db.prepareStmt( summariesQuery, array[32, byte], array[sizeof(BeaconBlockSummary), byte], managed = false) defer: # in case iteration is stopped along the way # Write the newly found summaries in a single transaction - on first migration # from the old format, this brings down the write from minutes to seconds stmt.dispose() if not db.db.readOnly: if newSummaries.len() > 0: db.withManyWrites: for s in newSummaries: db.putBeaconBlockSummary(s.root, s.summary) # Clean up pre-altair summaries - by now, we will have moved them to the # new table db.db.exec( "DELETE FROM kvstore WHERE key >= ? and key < ?", ([byte ord(kHashToBlockSummary)], [byte ord(kHashToBlockSummary) + 1])).expectDb() var row: stmt.Result for rowRes in exec(stmt, root.data, row): expectDb rowRes if decodeSSZ(row, res.summary): yield res res.root = res.summary.parent_root # Backwards compat for reading old databases, or those that for whatever # reason lost a summary along the way.. while true: if db.v0.backend.getSnappySSZ( subkey(BeaconBlockSummary, res.root), res.summary) == GetResult.found: discard # Just yield below elif (let blck = db.getBlock(res.root, phase0.TrustedSignedBeaconBlock); blck.isSome()): res.summary = blck.get().message.toBeaconBlockSummary() elif (let blck = db.getBlock(res.root, altair.TrustedSignedBeaconBlock); blck.isSome()): res.summary = blck.get().message.toBeaconBlockSummary() elif (let blck = db.getBlock(res.root, bellatrix.TrustedSignedBeaconBlock); blck.isSome()): res.summary = blck.get().message.toBeaconBlockSummary() else: break yield res # Next time, load them from the right place newSummaries.add(res) res.root = res.summary.parent_root # Test operations used to create broken and/or legacy database proc putStateV0*(db: BeaconChainDB, key: Eth2Digest, value: phase0.BeaconState) = # Writes to KVStore, as done in 1.0.12 and earlier db.v0.backend.putSnappySSZ(subkey(type value, key), value) proc putBlockV0*(db: BeaconChainDB, value: phase0.TrustedSignedBeaconBlock) = # Write to KVStore, as done in 1.0.12 and earlier # In particular, no summary is written here - it should be recreated # automatically db.v0.backend.putSnappySSZ(subkey(phase0.SignedBeaconBlock, value.root), value)