more strict read-only database mode (#4362)
* avoid creating pre-altair backwards compatibility tables * allow running ncli_db era export without above tables present * drop unused pre-altair backwards compatibility tables * run benchmark on read-ronly database * fix running benchmark from genesis
This commit is contained in:
parent
07885a7210
commit
cd160b5650
|
@ -240,45 +240,56 @@ template expectDb(x: auto): untyped =
|
|||
# full disk - this requires manual intervention, so we'll panic for now
|
||||
x.expect("working database (disk broken/full?)")
|
||||
|
||||
proc init*[T](Seq: type DbSeq[T], db: SqStoreRef, name: string): KvResult[Seq] =
|
||||
? db.exec("""
|
||||
CREATE TABLE IF NOT EXISTS """ & name & """(
|
||||
id INTEGER PRIMARY KEY,
|
||||
value BLOB
|
||||
);
|
||||
""")
|
||||
proc init*[T](
|
||||
Seq: type DbSeq[T], db: SqStoreRef, name: string,
|
||||
readOnly = false): KvResult[Seq] =
|
||||
let hasTable = if db.readOnly or readOnly:
|
||||
? db.hasTable(name)
|
||||
else:
|
||||
? db.exec("""
|
||||
CREATE TABLE IF NOT EXISTS '""" & name & """'(
|
||||
id INTEGER PRIMARY KEY,
|
||||
value BLOB
|
||||
);
|
||||
""")
|
||||
true
|
||||
if hasTable:
|
||||
let
|
||||
insertStmt = db.prepareStmt(
|
||||
"INSERT INTO '" & name & "'(value) VALUES (?);",
|
||||
openArray[byte], void, managed = false).expect("this is a valid statement")
|
||||
|
||||
let
|
||||
insertStmt = db.prepareStmt(
|
||||
"INSERT INTO " & name & "(value) VALUES (?);",
|
||||
openArray[byte], void, managed = false).expect("this is a valid statement")
|
||||
selectStmt = db.prepareStmt(
|
||||
"SELECT value FROM '" & name & "' WHERE id = ?;",
|
||||
int64, openArray[byte], managed = false).expect("this is a valid statement")
|
||||
|
||||
selectStmt = db.prepareStmt(
|
||||
"SELECT value FROM " & name & " WHERE id = ?;",
|
||||
int64, openArray[byte], managed = false).expect("this is a valid statement")
|
||||
countStmt = db.prepareStmt(
|
||||
"SELECT COUNT(1) FROM '" & name & "';",
|
||||
NoParams, int64, managed = false).expect("this is a valid statement")
|
||||
|
||||
countStmt = db.prepareStmt(
|
||||
"SELECT COUNT(1) FROM " & name & ";",
|
||||
NoParams, int64, managed = false).expect("this is a valid statement")
|
||||
var recordCount = int64 0
|
||||
let countQueryRes = countStmt.exec do (res: int64):
|
||||
recordCount = res
|
||||
|
||||
var recordCount = int64 0
|
||||
let countQueryRes = countStmt.exec do (res: int64):
|
||||
recordCount = res
|
||||
let found = ? countQueryRes
|
||||
if not found:
|
||||
return err("Cannot count existing items")
|
||||
countStmt.dispose()
|
||||
|
||||
let found = ? countQueryRes
|
||||
if not found:
|
||||
return err("Cannot count existing items")
|
||||
countStmt.dispose()
|
||||
ok(Seq(insertStmt: insertStmt,
|
||||
selectStmt: selectStmt,
|
||||
recordCount: recordCount))
|
||||
else:
|
||||
ok(Seq())
|
||||
|
||||
ok(Seq(insertStmt: insertStmt,
|
||||
selectStmt: selectStmt,
|
||||
recordCount: recordCount))
|
||||
|
||||
proc close*(s: DbSeq) =
|
||||
proc close*(s: var DbSeq) =
|
||||
s.insertStmt.dispose()
|
||||
s.selectStmt.dispose()
|
||||
|
||||
reset(s)
|
||||
|
||||
proc add*[T](s: var DbSeq[T], val: T) =
|
||||
doAssert(distinctBase(s.insertStmt) != nil, "database closed or table not preset")
|
||||
var bytes = SSZ.encode(val)
|
||||
s.insertStmt.exec(bytes).expectDb()
|
||||
inc s.recordCount
|
||||
|
@ -288,6 +299,8 @@ template len*[T](s: DbSeq[T]): int64 =
|
|||
|
||||
proc get*[T](s: DbSeq[T], idx: int64): T =
|
||||
# This is used only locally
|
||||
doAssert(distinctBase(s.selectStmt) != nil, $T & " table not present for read at " & $(idx))
|
||||
|
||||
let resultAddr = addr result
|
||||
|
||||
let queryRes = s.selectStmt.exec(idx + 1) do (recordBytes: openArray[byte]):
|
||||
|
@ -302,81 +315,91 @@ proc get*[T](s: DbSeq[T], idx: int64): T =
|
|||
|
||||
proc init*(T: type FinalizedBlocks, db: SqStoreRef, name: string,
|
||||
readOnly = false): KvResult[T] =
|
||||
if not readOnly:
|
||||
let hasTable = if db.readOnly or readOnly:
|
||||
? db.hasTable(name)
|
||||
else:
|
||||
? db.exec("""
|
||||
CREATE TABLE IF NOT EXISTS """ & name & """(
|
||||
CREATE TABLE IF NOT EXISTS '""" & name & """'(
|
||||
id INTEGER PRIMARY KEY,
|
||||
value BLOB NOT NULL
|
||||
);
|
||||
""")
|
||||
);""")
|
||||
true
|
||||
|
||||
let
|
||||
insertStmt = db.prepareStmt(
|
||||
"REPLACE INTO " & name & "(id, value) VALUES (?, ?);",
|
||||
(int64, array[32, byte]), void, managed = false).expect("this is a valid statement")
|
||||
if hasTable:
|
||||
let
|
||||
insertStmt = db.prepareStmt(
|
||||
"REPLACE INTO '" & name & "'(id, value) VALUES (?, ?);",
|
||||
(int64, array[32, byte]), void, managed = false).expect("this is a valid statement")
|
||||
|
||||
selectStmt = db.prepareStmt(
|
||||
"SELECT value FROM " & name & " WHERE id = ?;",
|
||||
int64, array[32, byte], managed = false).expect("this is a valid statement")
|
||||
selectAllStmt = db.prepareStmt(
|
||||
"SELECT id, value FROM " & name & " ORDER BY id;",
|
||||
NoParams, (int64, array[32, byte]), managed = false).expect("this is a valid statement")
|
||||
selectStmt = db.prepareStmt(
|
||||
"SELECT value FROM '" & name & "' WHERE id = ?;",
|
||||
int64, array[32, byte], managed = false).expect("this is a valid statement")
|
||||
selectAllStmt = db.prepareStmt(
|
||||
"SELECT id, value FROM '" & name & "' ORDER BY id;",
|
||||
NoParams, (int64, array[32, byte]), managed = false).expect("this is a valid statement")
|
||||
|
||||
maxIdStmt = db.prepareStmt(
|
||||
"SELECT MAX(id) FROM " & name & ";",
|
||||
NoParams, Option[int64], managed = false).expect("this is a valid statement")
|
||||
maxIdStmt = db.prepareStmt(
|
||||
"SELECT MAX(id) FROM '" & name & "';",
|
||||
NoParams, Option[int64], managed = false).expect("this is a valid statement")
|
||||
|
||||
minIdStmt = db.prepareStmt(
|
||||
"SELECT MIN(id) FROM " & name & ";",
|
||||
NoParams, Option[int64], managed = false).expect("this is a valid statement")
|
||||
minIdStmt = db.prepareStmt(
|
||||
"SELECT MIN(id) FROM '" & name & "';",
|
||||
NoParams, Option[int64], managed = false).expect("this is a valid statement")
|
||||
|
||||
var
|
||||
low, high: Opt[Slot]
|
||||
tmp: Option[int64]
|
||||
var
|
||||
low, high: Opt[Slot]
|
||||
tmp: Option[int64]
|
||||
|
||||
for rowRes in minIdStmt.exec(tmp):
|
||||
expectDb rowRes
|
||||
if tmp.isSome():
|
||||
low.ok(Slot(tmp.get()))
|
||||
for rowRes in minIdStmt.exec(tmp):
|
||||
expectDb rowRes
|
||||
if tmp.isSome():
|
||||
low.ok(Slot(tmp.get()))
|
||||
|
||||
for rowRes in maxIdStmt.exec(tmp):
|
||||
expectDb rowRes
|
||||
if tmp.isSome():
|
||||
high.ok(Slot(tmp.get()))
|
||||
for rowRes in maxIdStmt.exec(tmp):
|
||||
expectDb rowRes
|
||||
if tmp.isSome():
|
||||
high.ok(Slot(tmp.get()))
|
||||
|
||||
maxIdStmt.dispose()
|
||||
minIdStmt.dispose()
|
||||
maxIdStmt.dispose()
|
||||
minIdStmt.dispose()
|
||||
|
||||
ok(T(insertStmt: insertStmt,
|
||||
selectStmt: selectStmt,
|
||||
selectAllStmt: selectAllStmt,
|
||||
low: low,
|
||||
high: high))
|
||||
ok(T(insertStmt: insertStmt,
|
||||
selectStmt: selectStmt,
|
||||
selectAllStmt: selectAllStmt,
|
||||
low: low,
|
||||
high: high))
|
||||
else:
|
||||
ok(T())
|
||||
|
||||
proc close*(s: FinalizedBlocks) =
|
||||
proc close*(s: var FinalizedBlocks) =
|
||||
s.insertStmt.dispose()
|
||||
s.selectStmt.dispose()
|
||||
s.selectAllStmt.dispose()
|
||||
reset(s)
|
||||
|
||||
proc insert*(s: var FinalizedBlocks, slot: Slot, val: Eth2Digest) =
|
||||
doAssert slot.uint64 < int64.high.uint64, "Only reasonable slots supported"
|
||||
doAssert(distinctBase(s.insertStmt) != nil, "database closed or table not present")
|
||||
|
||||
s.insertStmt.exec((slot.int64, val.data)).expectDb()
|
||||
s.low.ok(min(slot, s.low.get(slot)))
|
||||
s.high.ok(max(slot, s.high.get(slot)))
|
||||
|
||||
proc get*(s: FinalizedBlocks, idx: Slot): Opt[Eth2Digest] =
|
||||
if distinctBase(s.selectStmt) == nil: return Opt.none(Eth2Digest)
|
||||
var row: s.selectStmt.Result
|
||||
for rowRes in s.selectStmt.exec(int64(idx), row):
|
||||
expectDb rowRes
|
||||
return ok(Eth2Digest(data: row))
|
||||
|
||||
err()
|
||||
return Opt.none(Eth2Digest)
|
||||
|
||||
iterator pairs*(s: FinalizedBlocks): (Slot, Eth2Digest) =
|
||||
var row: s.selectAllStmt.Result
|
||||
for rowRes in s.selectAllStmt.exec(row):
|
||||
expectDb rowRes
|
||||
yield (Slot(row[0]), Eth2Digest(data: row[1]))
|
||||
if distinctBase(s.selectAllStmt) != nil:
|
||||
var row: s.selectAllStmt.Result
|
||||
for rowRes in s.selectAllStmt.exec(row):
|
||||
expectDb rowRes
|
||||
yield (Slot(row[0]), Eth2Digest(data: row[1]))
|
||||
|
||||
proc loadImmutableValidators(vals: DbSeq[ImmutableValidatorDataDb2]): seq[ImmutableValidatorData2] =
|
||||
result = newSeqOfCap[ImmutableValidatorData2](vals.len())
|
||||
|
@ -420,24 +443,29 @@ template withManyWrites*(dbParam: BeaconChainDB, body: untyped) =
|
|||
if isInsideTransaction(db.db): # calls `sqlite3_get_autocommit`
|
||||
expectDb db.db.exec("ROLLBACK TRANSACTION;")
|
||||
|
||||
proc new*(T: type BeaconChainDB,
|
||||
dir: string,
|
||||
inMemory = false,
|
||||
proc new*(T: type BeaconChainDBV0,
|
||||
db: SqStoreRef,
|
||||
readOnly = false
|
||||
): BeaconChainDBV0 =
|
||||
var
|
||||
# V0 compatibility tables - these were created WITHOUT ROWID which is slow
|
||||
# for large blobs
|
||||
backendV0 = kvStore db.openKvStore(
|
||||
readOnly = db.readOnly or readOnly).expectDb()
|
||||
# state_no_validators is similar to state_no_validators2 but uses a
|
||||
# different key encoding and was created WITHOUT ROWID
|
||||
stateStoreV0 = kvStore db.openKvStore(
|
||||
"state_no_validators", readOnly = db.readOnly or readOnly).expectDb()
|
||||
|
||||
BeaconChainDBV0(
|
||||
backend: backendV0,
|
||||
stateStore: stateStoreV0,
|
||||
)
|
||||
|
||||
proc new*(T: type BeaconChainDB,
|
||||
db: SqStoreRef
|
||||
): BeaconChainDB =
|
||||
var db = if inMemory:
|
||||
SqStoreRef.init("", "test", readOnly = readOnly, inMemory = true).expect(
|
||||
"working database (out of memory?)")
|
||||
else:
|
||||
if (let res = secureCreatePath(dir); res.isErr):
|
||||
fatal "Failed to create create database directory",
|
||||
path = dir, err = ioErrorMsg(res.error)
|
||||
quit 1
|
||||
|
||||
SqStoreRef.init(
|
||||
dir, "nbc", readOnly = readOnly, manualCheckpoint = true).expectDb()
|
||||
|
||||
if not readOnly:
|
||||
if not db.readOnly:
|
||||
# Remove the deposits table we used before we switched
|
||||
# to storing only deposit contract checkpoints
|
||||
if db.exec("DROP TABLE IF EXISTS deposits;").isErr:
|
||||
|
@ -448,13 +476,6 @@ proc new*(T: type BeaconChainDB,
|
|||
debug "Failed to drop the validatorIndexFromPubKey table"
|
||||
|
||||
var
|
||||
# V0 compatibility tables - these were created WITHOUT ROWID which is slow
|
||||
# for large blobs
|
||||
backend = kvStore db.openKvStore().expectDb()
|
||||
# state_no_validators is similar to state_no_validators2 but uses a
|
||||
# different key encoding and was created WITHOUT ROWID
|
||||
stateStore = kvStore db.openKvStore("state_no_validators").expectDb()
|
||||
|
||||
genesisDepositsSeq =
|
||||
DbSeq[DepositData].init(db, "genesis_deposits").expectDb()
|
||||
immutableValidatorsDb =
|
||||
|
@ -491,30 +512,28 @@ proc new*(T: type BeaconChainDB,
|
|||
# uncompressed keys instead. We still support upgrading a database from the
|
||||
# old format, but don't need to support downgrading, and therefore safely can
|
||||
# remove the keys
|
||||
let immutableValidatorsDb1 =
|
||||
DbSeq[ImmutableValidatorData].init(db, "immutable_validators").expectDb()
|
||||
block:
|
||||
var immutableValidatorsDb1 = DbSeq[ImmutableValidatorData].init(
|
||||
db, "immutable_validators", readOnly = true).expectDb()
|
||||
|
||||
if immutableValidatorsDb.len() < immutableValidatorsDb1.len():
|
||||
notice "Migrating validator keys, this may take a minute",
|
||||
len = immutableValidatorsDb1.len()
|
||||
while immutableValidatorsDb.len() < immutableValidatorsDb1.len():
|
||||
let val = immutableValidatorsDb1.get(immutableValidatorsDb.len())
|
||||
immutableValidatorsDb.add(ImmutableValidatorDataDb2(
|
||||
pubkey: val.pubkey.loadValid().toUncompressed(),
|
||||
withdrawal_credentials: val.withdrawal_credentials
|
||||
))
|
||||
immutableValidatorsDb1.close()
|
||||
if immutableValidatorsDb.len() < immutableValidatorsDb1.len():
|
||||
notice "Migrating validator keys, this may take a minute",
|
||||
len = immutableValidatorsDb1.len()
|
||||
while immutableValidatorsDb.len() < immutableValidatorsDb1.len():
|
||||
let val = immutableValidatorsDb1.get(immutableValidatorsDb.len())
|
||||
immutableValidatorsDb.add(ImmutableValidatorDataDb2(
|
||||
pubkey: val.pubkey.loadValid().toUncompressed(),
|
||||
withdrawal_credentials: val.withdrawal_credentials
|
||||
))
|
||||
immutableValidatorsDb1.close()
|
||||
|
||||
# Safe because nobody will be downgrading to pre-altair versions
|
||||
# TODO: drop table maybe? that would require not creating the table just above
|
||||
discard db.exec("DELETE FROM immutable_validators;")
|
||||
if not db.readOnly:
|
||||
# Safe because nobody will be downgrading to pre-altair versions
|
||||
discard db.exec("DROP TABLE IF EXISTS immutable_validators;")
|
||||
|
||||
T(
|
||||
db: db,
|
||||
v0: BeaconChainDBV0(
|
||||
backend: backend,
|
||||
stateStore: stateStore,
|
||||
),
|
||||
v0: BeaconChainDBV0.new(db, readOnly = true),
|
||||
genesisDeposits: genesisDepositsSeq,
|
||||
immutableValidatorsDb: immutableValidatorsDb,
|
||||
immutableValidators: loadImmutableValidators(immutableValidatorsDb),
|
||||
|
@ -529,6 +548,25 @@ proc new*(T: type BeaconChainDB,
|
|||
lcData: lcData
|
||||
)
|
||||
|
||||
proc new*(T: type BeaconChainDB,
|
||||
dir: string,
|
||||
inMemory = false,
|
||||
readOnly = false
|
||||
): BeaconChainDB =
|
||||
let db =
|
||||
if inMemory:
|
||||
SqStoreRef.init("", "test", readOnly = readOnly, inMemory = true).expect(
|
||||
"working database (out of memory?)")
|
||||
else:
|
||||
if (let res = secureCreatePath(dir); res.isErr):
|
||||
fatal "Failed to create create database directory",
|
||||
path = dir, err = ioErrorMsg(res.error)
|
||||
quit 1
|
||||
|
||||
SqStoreRef.init(
|
||||
dir, "nbc", readOnly = readOnly, manualCheckpoint = true).expectDb()
|
||||
BeaconChainDB.new(db)
|
||||
|
||||
template getLightClientDataDB*(db: BeaconChainDB): LightClientDataDB =
|
||||
db.lcData
|
||||
|
||||
|
@ -1316,11 +1354,12 @@ iterator getAncestorSummaries*(db: BeaconChainDB, root: Eth2Digest):
|
|||
for s in newSummaries:
|
||||
db.putBeaconBlockSummary(s.root, s.summary)
|
||||
|
||||
# Clean up pre-altair summaries - by now, we will have moved them to the
|
||||
# new table
|
||||
db.db.exec(
|
||||
"DELETE FROM kvstore WHERE key >= ? and key < ?",
|
||||
([byte ord(kHashToBlockSummary)], [byte ord(kHashToBlockSummary) + 1])).expectDb()
|
||||
if db.db.hasTable("kvstore").expectDb():
|
||||
# Clean up pre-altair summaries - by now, we will have moved them to the
|
||||
# new table
|
||||
db.db.exec(
|
||||
"DELETE FROM kvstore WHERE key >= ? and key < ?",
|
||||
([byte ord(kHashToBlockSummary)], [byte ord(kHashToBlockSummary) + 1])).expectDb()
|
||||
|
||||
var row: stmt.Result
|
||||
for rowRes in exec(stmt, root.data, row):
|
||||
|
@ -1353,12 +1392,12 @@ iterator getAncestorSummaries*(db: BeaconChainDB, root: Eth2Digest):
|
|||
|
||||
# Test operations used to create broken and/or legacy database
|
||||
|
||||
proc putStateV0*(db: BeaconChainDB, key: Eth2Digest, value: phase0.BeaconState) =
|
||||
proc putStateV0*(db: BeaconChainDBV0, key: Eth2Digest, value: phase0.BeaconState) =
|
||||
# Writes to KVStore, as done in 1.0.12 and earlier
|
||||
db.v0.backend.putSnappySSZ(subkey(type value, key), value)
|
||||
db.backend.putSnappySSZ(subkey(type value, key), value)
|
||||
|
||||
proc putBlockV0*(db: BeaconChainDB, value: phase0.TrustedSignedBeaconBlock) =
|
||||
proc putBlockV0*(db: BeaconChainDBV0, value: phase0.TrustedSignedBeaconBlock) =
|
||||
# Write to KVStore, as done in 1.0.12 and earlier
|
||||
# In particular, no summary is written here - it should be recreated
|
||||
# automatically
|
||||
db.v0.backend.putSnappySSZ(subkey(phase0.SignedBeaconBlock, value.root), value)
|
||||
db.backend.putSnappySSZ(subkey(phase0.SignedBeaconBlock, value.root), value)
|
||||
|
|
|
@ -215,7 +215,7 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) =
|
|||
|
||||
echo "Opening database..."
|
||||
let
|
||||
db = BeaconChainDB.new(conf.databaseDir.string,)
|
||||
db = BeaconChainDB.new(conf.databaseDir.string, readOnly = true)
|
||||
dbBenchmark = BeaconChainDB.new("benchmark")
|
||||
defer:
|
||||
db.close()
|
||||
|
@ -233,7 +233,7 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) =
|
|||
|
||||
var
|
||||
(start, ends) = dag.getSlotRange(conf.benchSlot, conf.benchSlots)
|
||||
blockRefs = dag.getBlockRange(start, ends)
|
||||
blockRefs = dag.getBlockRange(max(start, Slot 1), ends)
|
||||
blocks: (
|
||||
seq[phase0.TrustedSignedBeaconBlock],
|
||||
seq[altair.TrustedSignedBeaconBlock],
|
||||
|
@ -245,6 +245,7 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) =
|
|||
|
||||
for b in 0 ..< blockRefs.len:
|
||||
let blck = blockRefs[blockRefs.len - b - 1]
|
||||
|
||||
withTimer(timers[tLoadBlock]):
|
||||
case cfg.blockForkAtEpoch(blck.slot.epoch)
|
||||
of BeaconBlockFork.Phase0:
|
||||
|
@ -501,7 +502,8 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
|
|||
else: some((era - 1).start_slot)
|
||||
endSlot = era.start_slot
|
||||
eraBid = dag.atSlot(dag.head.bid, endSlot).valueOr:
|
||||
echo "Skipping ", era, ", blocks not available"
|
||||
echo "Skipping era ", era, ", blocks not available"
|
||||
era += 1
|
||||
continue
|
||||
|
||||
if endSlot > dag.head.slot:
|
||||
|
|
|
@ -699,15 +699,18 @@ suite "Old database versions" & preset():
|
|||
|
||||
test "pre-1.1.0":
|
||||
# only kvstore, no immutable validator keys
|
||||
|
||||
let db = BeaconChainDB.new("", inMemory = true)
|
||||
let
|
||||
sq = SqStoreRef.init("", "test", inMemory = true).expect(
|
||||
"working database (out of memory?)")
|
||||
v0 = BeaconChainDBV0.new(sq, readOnly = false)
|
||||
db = BeaconChainDB.new(sq)
|
||||
|
||||
# preInit a database to a v1.0.12 state
|
||||
v0.putStateV0(genState[].root, genState[].data)
|
||||
v0.putBlockV0(genBlock)
|
||||
|
||||
db.putStateRoot(
|
||||
genState[].latest_block_root, genState[].data.slot, genState[].root)
|
||||
db.putStateV0(genState[].root, genState[].data)
|
||||
|
||||
db.putBlockV0(genBlock)
|
||||
db.putTailBlock(genBlock.root)
|
||||
db.putHeadBlock(genBlock.root)
|
||||
db.putGenesisBlock(genBlock.root)
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 6499ee2bc5d264fdc68f5f08b647222a5c5252fa
|
||||
Subproject commit 8f4ef19fc91a6dbd075a7af5a20082060bdc0920
|
Loading…
Reference in New Issue