create new database in separate file (#2596)
The V1 table structure shows great improvements in performance, but if there's an old `kvstore` without rowid:s, these benefits are nullified: reorgs during writes and deletes remain expensive (even if the degradation is reduced somewhat). This PR creates the tables in a new file instead, and uses the old file as a read-only store - this has several interesting properties: * the old database is left completely untouched - this guarantees that downgrades work smooth (they'll only need to resync their missing portions) * starting sync after this PR means only a v1 database is created * v0 databases stick around - no migration is performed (for now) Future PR:s can introduce migration of the data from one database to another - a simply copy will take hours which is downtime we want to avoid - at that point, it might make sense to migrate straight to era files instead.
This commit is contained in:
parent
d69e06e519
commit
eebc828778
|
@ -17,6 +17,8 @@ import
|
|||
./ssz/[ssz_serialization, merkleization],
|
||||
./filepath
|
||||
|
||||
logScope: topics = "bc_db"
|
||||
|
||||
type
|
||||
DbSeq*[T] = object
|
||||
insertStmt: SqliteStmt[openArray[byte], void]
|
||||
|
@ -50,7 +52,12 @@ type
|
|||
##
|
||||
## 1.2 moved BeaconStateNoImmutableValidators to a separate table to
|
||||
## alleviate some of the btree balancing issues - this doubled the speed but
|
||||
## was still slow
|
||||
## was still
|
||||
##
|
||||
## 1.3 creates `kvstore` with rowid, making it quite fast, but doesn't do
|
||||
## anything about existing databases. Versions after that use a separate
|
||||
## file instead (V1)
|
||||
db: SqStoreRef
|
||||
backend: KvStoreRef # kvstore
|
||||
stateStore: KvStoreRef # state_no_validators
|
||||
|
||||
|
@ -186,13 +193,13 @@ template expectDb(x: auto): untyped =
|
|||
# full disk - this requires manual intervention, so we'll panic for now
|
||||
x.expect("working database (disk broken/full?)")
|
||||
|
||||
proc init*[T](Seq: type DbSeq[T], db: SqStoreRef, name: string): Seq =
|
||||
db.exec("""
|
||||
proc init*[T](Seq: type DbSeq[T], db: SqStoreRef, name: string): KvResult[Seq] =
|
||||
? db.exec("""
|
||||
CREATE TABLE IF NOT EXISTS """ & name & """(
|
||||
id INTEGER PRIMARY KEY,
|
||||
value BLOB
|
||||
);
|
||||
""").expectDb()
|
||||
""")
|
||||
|
||||
let
|
||||
insertStmt = db.prepareStmt(
|
||||
|
@ -211,13 +218,14 @@ proc init*[T](Seq: type DbSeq[T], db: SqStoreRef, name: string): Seq =
|
|||
let countQueryRes = countStmt.exec do (res: int64):
|
||||
recordCount = res
|
||||
|
||||
let found = countQueryRes.expectDb()
|
||||
if not found: panic()
|
||||
let found = ? countQueryRes
|
||||
if not found:
|
||||
return err("Cannot count existing items")
|
||||
countStmt.dispose()
|
||||
|
||||
Seq(insertStmt: insertStmt,
|
||||
selectStmt: selectStmt,
|
||||
recordCount: recordCount)
|
||||
ok(Seq(insertStmt: insertStmt,
|
||||
selectStmt: selectStmt,
|
||||
recordCount: recordCount))
|
||||
|
||||
proc close*(s: DbSeq) =
|
||||
s.insertStmt.dispose()
|
||||
|
@ -249,41 +257,75 @@ proc loadImmutableValidators(db: BeaconChainDB): seq[ImmutableValidatorData] =
|
|||
for i in 0 ..< db.immutableValidators.len:
|
||||
result.add db.immutableValidators.get(i)
|
||||
|
||||
proc copyDbSeq[T](db0, db: SqStoreRef, name: string) =
|
||||
var
|
||||
src = DbSeq[T].init(db0, "genesis_deposits")
|
||||
|
||||
if src.isErr():
|
||||
debug "Skipping migration", name, msg = src.error()
|
||||
return
|
||||
|
||||
var tgt = DbSeq[T].init(db, "genesis_deposits").expectDb()
|
||||
|
||||
while tgt.len < src.get().len():
|
||||
tgt.add src.get().get(tgt.len())
|
||||
|
||||
src.get().close()
|
||||
tgt.close()
|
||||
|
||||
proc migrateV0(db0, db: SqStoreRef) =
|
||||
# Minimal migration of tables that are the same in V0 and V1 - a full
|
||||
# migration might take hours
|
||||
copyDbSeq[DepositData](db0, db, "genesis_deposits")
|
||||
copyDbSeq[ImmutableValidatorData](db0, db, "immutable_validators")
|
||||
|
||||
proc new*(T: type BeaconChainDB,
|
||||
preset: RuntimePreset,
|
||||
dir: string,
|
||||
inMemory = false,
|
||||
): BeaconChainDB =
|
||||
var db = if inMemory:
|
||||
SqStoreRef.init("", "test", inMemory = true).expect(
|
||||
"working database (out of memory?)")
|
||||
var
|
||||
(db0, db) = if inMemory:
|
||||
(SqStoreRef.init("", "test", readOnly = true, inMemory = true),
|
||||
SqStoreRef.init("", "test", inMemory = true).expect(
|
||||
"working database (out of memory?)"))
|
||||
else:
|
||||
let s = secureCreatePath(dir)
|
||||
doAssert s.isOk # TODO(zah) Handle this in a better way
|
||||
# The original "nbc" database had tables created "WITHOUT ROWID" which
|
||||
# turned out to perform poorly with large blobs when rebalancing its
|
||||
# internal tree - this affects all tables in the database, so we need
|
||||
# to create a new database instead.
|
||||
# In this version, the old database is used as a read-only data source
|
||||
# which enables easy rollback for users that want to downgrade - later,
|
||||
# a migration should be performed instead.
|
||||
(SqStoreRef.init(dir, "nbc", readOnly = true),
|
||||
SqStoreRef.init(
|
||||
dir, "nbcv1", manualCheckpoint = true).expectDb())
|
||||
|
||||
SqStoreRef.init(
|
||||
dir, "nbc", manualCheckpoint = true).expectDb()
|
||||
let v0 =
|
||||
if not db0.isErr():
|
||||
debug "Using V0 database as backing store"
|
||||
migrateV0(db0.get(), db)
|
||||
|
||||
# Remove the deposits table we used before we switched
|
||||
# to storing only deposit contract checkpoints
|
||||
if db.exec("DROP TABLE IF EXISTS deposits;").isErr:
|
||||
debug "Failed to drop the deposits table"
|
||||
BeaconChainDBV0(
|
||||
db: db0.get(),
|
||||
# kvstore used to be created "WITHOUT ROWID"
|
||||
backend: kvStore db0.get().openKvStore().get(nil),
|
||||
# state_no_validators is similar to state_no_validators2 but uses a
|
||||
# different key encoding and was created WITHOUT ROWID
|
||||
stateStore: kvStore db0.get().openKvStore("state_no_validators").get(nil),
|
||||
)
|
||||
else:
|
||||
debug "No V0 database found, running in v1 mode", msg = db0.error()
|
||||
|
||||
# An old pubkey->index mapping that hasn't been used on any mainnet release
|
||||
if db.exec("DROP TABLE IF EXISTS validatorIndexFromPubKey;").isErr:
|
||||
debug "Failed to drop the validatorIndexFromPubKey table"
|
||||
BeaconChainDBV0()
|
||||
|
||||
var
|
||||
# V0 compatibility tables
|
||||
backend = kvStore db.openKvStore().expectDb()
|
||||
# state_no_validators is similar to state_no_validators2 but uses a
|
||||
# different key encoding and was created WITHOUT ROWID
|
||||
stateStore = kvStore db.openKvStore("state_no_validators").expectDb()
|
||||
|
||||
genesisDepositsSeq =
|
||||
DbSeq[DepositData].init(db, "genesis_deposits")
|
||||
DbSeq[DepositData].init(db, "genesis_deposits").expectDb()
|
||||
immutableValidatorsSeq =
|
||||
DbSeq[ImmutableValidatorData].init(db, "immutable_validators")
|
||||
DbSeq[ImmutableValidatorData].init(db, "immutable_validators").expectDb()
|
||||
|
||||
# V1 - expected-to-be small rows get without rowid optimizations
|
||||
keyValues = kvStore db.openKvStore("key_values", true).expectDb()
|
||||
|
@ -295,10 +337,7 @@ proc new*(T: type BeaconChainDB,
|
|||
|
||||
T(
|
||||
db: db,
|
||||
v0: BeaconChainDBV0(
|
||||
backend: backend,
|
||||
stateStore: stateStore,
|
||||
),
|
||||
v0: v0,
|
||||
preset: preset,
|
||||
genesisDeposits: genesisDepositsSeq,
|
||||
immutableValidators: immutableValidatorsSeq,
|
||||
|
@ -312,6 +351,8 @@ proc new*(T: type BeaconChainDB,
|
|||
summaries: summaries,
|
||||
)
|
||||
|
||||
func hasV0*(db: BeaconChainDB): bool = not isNil(db.v0.db)
|
||||
|
||||
proc decodeSSZ[T](data: openArray[byte], output: var T): bool =
|
||||
try:
|
||||
readSszBytes(data, output, updateRoot = false)
|
||||
|
@ -409,8 +450,17 @@ proc putSnappySSZ(db: KvStoreRef, key: openArray[byte], v: auto) =
|
|||
db.put(key, encodeSnappySSZ(v)).expectDb()
|
||||
|
||||
proc close*(db: BeaconChainDBV0) =
|
||||
discard db.stateStore.close()
|
||||
discard db.backend.close()
|
||||
if isNil(db.db): return
|
||||
|
||||
if not isNil(db.stateStore):
|
||||
discard db.stateStore.close()
|
||||
db.stateStore = nil
|
||||
if not isNil(db.backend):
|
||||
discard db.backend.close()
|
||||
db.backend = nil
|
||||
|
||||
db.db.close()
|
||||
db.db = nil
|
||||
|
||||
proc close*(db: BeaconchainDB) =
|
||||
if db.db == nil: return
|
||||
|
@ -515,12 +565,15 @@ proc putEth2FinalizedTo*(db: BeaconChainDB,
|
|||
|
||||
proc getBlock(db: BeaconChainDBV0, key: Eth2Digest): Opt[TrustedSignedBeaconBlock] =
|
||||
# We only store blocks that we trust in the database
|
||||
result.ok(TrustedSignedBeaconBlock())
|
||||
if db.backend.getSnappySSZ(subkey(SignedBeaconBlock, key), result.get) != GetResult.found:
|
||||
result.err()
|
||||
if not isNil(db.backend):
|
||||
result.ok(TrustedSignedBeaconBlock())
|
||||
if db.backend.getSnappySSZ(subkey(SignedBeaconBlock, key), result.get) != GetResult.found:
|
||||
result.err()
|
||||
else:
|
||||
# set root after deserializing (so it doesn't get zeroed)
|
||||
result.get().root = key
|
||||
else:
|
||||
# set root after deserializing (so it doesn't get zeroed)
|
||||
result.get().root = key
|
||||
result.err()
|
||||
|
||||
proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Opt[TrustedSignedBeaconBlock] =
|
||||
# We only store blocks that we trust in the database
|
||||
|
@ -581,10 +634,14 @@ proc getState(
|
|||
# Nimbus 1.2 writes a genesis BeaconStateNoImmutableValidators to `stateStore`
|
||||
# and reads BeaconState from `backend` and BeaconStateNoImmutableValidators
|
||||
# from `stateStore`. We will try to read the state from all these locations.
|
||||
if getStateOnlyMutableValidators(
|
||||
if not isNil(db.stateStore) and getStateOnlyMutableValidators(
|
||||
immutableValidatorsMem, db.stateStore,
|
||||
subkey(BeaconStateNoImmutableValidators, key), output, rollback):
|
||||
return true
|
||||
|
||||
if isNil(db.backend):
|
||||
return false
|
||||
|
||||
if getStateOnlyMutableValidators(
|
||||
immutableValidatorsMem, db.backend,
|
||||
subkey(BeaconStateNoImmutableValidators, key), output, rollback):
|
||||
|
@ -621,7 +678,10 @@ proc getState*(
|
|||
proc getStateRoot(db: BeaconChainDBV0,
|
||||
root: Eth2Digest,
|
||||
slot: Slot): Opt[Eth2Digest] =
|
||||
db.backend.getRaw(subkey(root, slot), Eth2Digest)
|
||||
if not isNil(db.backend):
|
||||
db.backend.getRaw(subkey(root, slot), Eth2Digest)
|
||||
else:
|
||||
err()
|
||||
|
||||
proc getStateRoot*(db: BeaconChainDB,
|
||||
root: Eth2Digest,
|
||||
|
@ -636,30 +696,42 @@ proc getStateDiff*(db: BeaconChainDB,
|
|||
result.err
|
||||
|
||||
proc getHeadBlock(db: BeaconChainDBV0): Opt[Eth2Digest] =
|
||||
db.backend.getRaw(subkey(kHeadBlock), Eth2Digest)
|
||||
if not isNil(db.backend):
|
||||
db.backend.getRaw(subkey(kHeadBlock), Eth2Digest)
|
||||
else:
|
||||
err()
|
||||
|
||||
proc getHeadBlock*(db: BeaconChainDB): Opt[Eth2Digest] =
|
||||
db.keyValues.getRaw(subkey(kHeadBlock), Eth2Digest) or
|
||||
db.v0.getHeadBlock()
|
||||
|
||||
proc getTailBlock(db: BeaconChainDBV0): Opt[Eth2Digest] =
|
||||
db.backend.getRaw(subkey(kTailBlock), Eth2Digest)
|
||||
if not isNil(db.backend):
|
||||
db.backend.getRaw(subkey(kTailBlock), Eth2Digest)
|
||||
else:
|
||||
err()
|
||||
|
||||
proc getTailBlock*(db: BeaconChainDB): Opt[Eth2Digest] =
|
||||
db.keyValues.getRaw(subkey(kTailBlock), Eth2Digest) or
|
||||
db.v0.getTailBlock()
|
||||
|
||||
proc getGenesisBlockRoot(db: BeaconChainDBV0): Eth2Digest =
|
||||
db.backend.getRaw(subkey(kGenesisBlockRoot), Eth2Digest).expectDb()
|
||||
proc getGenesisBlockRoot(db: BeaconChainDBV0): Opt[Eth2Digest] =
|
||||
if not isNil(db.backend):
|
||||
db.backend.getRaw(subkey(kGenesisBlockRoot), Eth2Digest)
|
||||
else:
|
||||
err()
|
||||
|
||||
proc getGenesisBlockRoot*(db: BeaconChainDB): Eth2Digest =
|
||||
db.keyValues.getRaw(subkey(kGenesisBlockRoot), Eth2Digest).expect(
|
||||
"The database must be seeded with the genesis state")
|
||||
proc getGenesisBlockRoot*(db: BeaconChainDB): Opt[Eth2Digest] =
|
||||
db.keyValues.getRaw(subkey(kGenesisBlockRoot), Eth2Digest) or
|
||||
db.v0.getGenesisBlockRoot()
|
||||
|
||||
proc getEth2FinalizedTo(db: BeaconChainDBV0): Opt[DepositContractSnapshot] =
|
||||
result.ok(DepositContractSnapshot())
|
||||
let r = db.backend.getSnappySSZ(subkey(kDepositsFinalizedByEth2), result.get)
|
||||
if r != found: result.err()
|
||||
if not isNil(db.backend):
|
||||
result.ok(DepositContractSnapshot())
|
||||
let r = db.backend.getSnappySSZ(subkey(kDepositsFinalizedByEth2), result.get)
|
||||
if r != found: result.err()
|
||||
else:
|
||||
result.err()
|
||||
|
||||
proc getEth2FinalizedTo*(db: BeaconChainDB): Opt[DepositContractSnapshot] =
|
||||
result.ok(DepositContractSnapshot())
|
||||
|
@ -667,16 +739,18 @@ proc getEth2FinalizedTo*(db: BeaconChainDB): Opt[DepositContractSnapshot] =
|
|||
if r != found: return db.v0.getEth2FinalizedTo()
|
||||
|
||||
proc containsBlock*(db: BeaconChainDBV0, key: Eth2Digest): bool =
|
||||
db.backend.contains(subkey(SignedBeaconBlock, key)).expectDb()
|
||||
not isNil(db.backend) and
|
||||
db.backend.contains(subkey(SignedBeaconBlock, key)).expectDb()
|
||||
|
||||
proc containsBlock*(db: BeaconChainDB, key: Eth2Digest): bool =
|
||||
db.blocks.contains(key.data).expectDb() or db.v0.containsBlock(key)
|
||||
|
||||
proc containsState*(db: BeaconChainDBV0, key: Eth2Digest): bool =
|
||||
let sk = subkey(BeaconStateNoImmutableValidators, key)
|
||||
db.stateStore.contains(sk).expectDb() or
|
||||
db.backend.contains(sk).expectDb() or
|
||||
db.backend.contains(subkey(BeaconState, key)).expectDb
|
||||
(not isNil(db.stateStore) and db.stateStore.contains(sk).expectDb()) or
|
||||
(not isNil(db.backend) and (
|
||||
db.backend.contains(sk).expectDb() or
|
||||
db.backend.contains(subkey(BeaconState, key)).expectDb()))
|
||||
|
||||
proc containsState*(db: BeaconChainDB, key: Eth2Digest): bool =
|
||||
db.statesNoVal.contains(key.data).expectDb or db.v0.containsState(key)
|
||||
|
@ -692,8 +766,8 @@ iterator getAncestors*(db: BeaconChainDB, root: Eth2Digest):
|
|||
res: TrustedSignedBeaconBlock
|
||||
root = root
|
||||
while db.blocks.getSnappySSZ(root.data, res) == GetResult.found or
|
||||
db.v0.backend.getSnappySSZ(
|
||||
subkey(SignedBeaconBlock, root), res) == GetResult.found:
|
||||
(not isNil(db.v0.backend) and db.v0.backend.getSnappySSZ(
|
||||
subkey(SignedBeaconBlock, root), res) == GetResult.found):
|
||||
res.root = root
|
||||
yield res
|
||||
root = res.message.parent_root
|
||||
|
@ -743,9 +817,11 @@ iterator getAncestorSummaries*(db: BeaconChainDB, root: Eth2Digest):
|
|||
res.summary = summary[]
|
||||
yield res
|
||||
do: # Summary was not found in summary table, look elsewhere
|
||||
if db.v0.backend.getSnappySSZ(subkey(BeaconBlockSummary, res.root), res.summary) == GetResult.found:
|
||||
if (not isNil(db.v0.backend) and db.v0.backend.getSnappySSZ(
|
||||
subkey(BeaconBlockSummary, res.root), res.summary) == GetResult.found):
|
||||
yield res
|
||||
elif db.v0.backend.getSnappySSZ(subkey(SignedBeaconBlock, res.root), blck) == GetResult.found:
|
||||
elif (not isNil(db.v0.backend) and db.v0.backend.getSnappySSZ(
|
||||
subkey(SignedBeaconBlock, res.root), blck) == GetResult.found):
|
||||
res.summary = blck.message.toBeaconBlockSummary()
|
||||
yield res
|
||||
else:
|
||||
|
@ -754,12 +830,3 @@ iterator getAncestorSummaries*(db: BeaconChainDB, root: Eth2Digest):
|
|||
db.putBeaconBlockSummary(res.root, res.summary)
|
||||
|
||||
res.root = res.summary.parent_root
|
||||
|
||||
if false:
|
||||
# When the current version has been online for a bit, we can safely remove
|
||||
# summaries from kvstore by enabling this little snippet - if users were
|
||||
# to downgrade after the summaries have been purged, the old versions that
|
||||
# use summaries can also recreate them on the fly from blocks.
|
||||
db.db.exec(
|
||||
"DELETE FROM kvstore WHERE key >= ? and key < ?",
|
||||
([byte ord(kHashToBlockSummary)], [byte ord(kHashToBlockSummary) + 1])).expectDb()
|
||||
|
|
|
@ -343,7 +343,8 @@ proc init*(T: type ChainDAGRef,
|
|||
tailRef
|
||||
else:
|
||||
let
|
||||
genesisBlockRoot = db.getGenesisBlockRoot()
|
||||
genesisBlockRoot = db.getGenesisBlockRoot().expect(
|
||||
"preInit should have initialized the database with a genesis block root")
|
||||
genesisBlock = db.getBlock(genesisBlockRoot).expect(
|
||||
"preInit should have initialized the database with a genesis block")
|
||||
BlockRef.init(genesisBlockRoot, genesisBlock.message)
|
||||
|
|
|
@ -107,8 +107,7 @@ proc init*(T: type BeaconNode,
|
|||
raises: [Defect, CatchableError].} =
|
||||
let
|
||||
db = BeaconChainDB.new(
|
||||
runtimePreset, config.databaseDir,
|
||||
inMemory = false)
|
||||
runtimePreset, config.databaseDir, inMemory = false)
|
||||
|
||||
var
|
||||
genesisState, checkpointState: ref BeaconState
|
||||
|
@ -233,7 +232,8 @@ proc init*(T: type BeaconNode,
|
|||
# Doesn't use std/random directly, but dependencies might
|
||||
randomize(rng[].rand(high(int)))
|
||||
|
||||
info "Loading block dag from database", path = config.databaseDir
|
||||
info "Loading block dag from database",
|
||||
path = config.databaseDir, hasV0 = db.hasV0
|
||||
|
||||
let
|
||||
chainDagFlags = if config.verifyFinalization: {verifyFinalization}
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 8abe6b7144d87315cae5c2b7a74d792f3dc527aa
|
||||
Subproject commit cf3b7a30bfe666689c38c7a2b8704fc8cf85ece0
|
Loading…
Reference in New Issue