Store the deposits and the immutable validator data in Sqlite
This commit is contained in:
parent
e69af00e3a
commit
ce1fda1195
|
@ -10,10 +10,9 @@ import
|
|||
|
||||
type
|
||||
DbSeq*[T] = object
|
||||
db: SqStoreRef
|
||||
name: string
|
||||
file: File
|
||||
endPos: uint64
|
||||
insertStmt: SqliteStmt[openarray[byte], void]
|
||||
selectStmt: SqliteStmt[int64, seq[byte]]
|
||||
recordCount: int64
|
||||
|
||||
DbMap*[K, V] = object
|
||||
db: SqStoreRef
|
||||
|
@ -40,7 +39,6 @@ type
|
|||
|
||||
Keyspaces* = enum
|
||||
defaultKeyspace = "kvstore"
|
||||
seqMetadata
|
||||
validatorIndexFromPubKey
|
||||
|
||||
DbKeyKind = enum
|
||||
|
@ -99,63 +97,57 @@ template panic =
|
|||
# Review all usages.
|
||||
raiseAssert "The database should not be corrupted"
|
||||
|
||||
proc createSeq*(db: SqStoreRef, baseDir, seqFile: string, T: type): DbSeq[T] =
|
||||
var endPos: uint64 = 0
|
||||
proc onData(data: openArray[byte]) =
|
||||
endPos = uint64.fromBytesBE(data)
|
||||
proc createSeq*(db: SqStoreRef, baseDir, name: string, T: type): DbSeq[T] =
|
||||
db.exec("""
|
||||
CREATE TABLE IF NOT EXISTS """ & name & """(
|
||||
id INTEGER PRIMARY KEY,
|
||||
value BLOB
|
||||
);
|
||||
""").expect "working database"
|
||||
|
||||
discard db.get(
|
||||
ord seqMetadata,
|
||||
seqFile.toOpenArrayByte(0, seqFile.len - 1),
|
||||
onData).expect("working database")
|
||||
let
|
||||
insertStmt = db.prepareStmt(
|
||||
"INSERT INTO " & name & "(value) VALUES (?);",
|
||||
openarray[byte], void).expect("this is a valid statement")
|
||||
|
||||
let f = try: open(baseDir / seqFile, fmWrite)
|
||||
except IOError: panic()
|
||||
selectStmt = db.prepareStmt(
|
||||
"SELECT value FROM " & name & " WHERE id = ?;",
|
||||
int64, seq[byte]).expect("this is a valid statement")
|
||||
|
||||
let fileSize = try: getFileSize(f).uint64
|
||||
except IOError: panic()
|
||||
if endPos > fileSize: panic()
|
||||
countStmt = db.prepareStmt(
|
||||
"SELECT COUNT(*) FROM " & name & ";",
|
||||
NoParams, int64).expect("this is a valid statement")
|
||||
|
||||
DbSeq[T](db: db, name: seqFile, file: f, endPos: endPos)
|
||||
var recordCount = int64 0
|
||||
let countQueryRes = countStmt.exec do (res: int64):
|
||||
recordCount = res
|
||||
|
||||
let found = countQueryRes.expect("working database")
|
||||
if not found: panic()
|
||||
|
||||
DbSeq[T](insertStmt: insertStmt,
|
||||
selectStmt: selectStmt,
|
||||
recordCount: recordCount)
|
||||
|
||||
proc add*[T](s: var DbSeq[T], val: T) =
|
||||
var bytes = SSZ.encode(val)
|
||||
try:
|
||||
setFilePos(s.file, s.endPos.int64)
|
||||
write(s.file, bytes)
|
||||
except IOError:
|
||||
panic()
|
||||
s.endPos += bytes.len.uint64
|
||||
s.insertStmt.exec(bytes).expect "working database"
|
||||
|
||||
proc len*[T](s: DbSeq[T]): uint64 =
|
||||
const elemSize = fixedPortionSize(T).uint64
|
||||
s.endPos div elemSize
|
||||
template len*[T](s: DbSeq[T]): uint64 =
|
||||
s.recordCount.uint64
|
||||
|
||||
proc get*[T](s: DbSeq[T], idx: uint64): T =
|
||||
const size = uint64 fixedPortionSize(T)
|
||||
var recordBytes: array[size, byte]
|
||||
# This is used only locally
|
||||
let resultAddr = addr result
|
||||
|
||||
let queryRes = s.selectStmt.exec(int64(idx) + 1) do (recordBytes: seq[byte]):
|
||||
try:
|
||||
# TODO: check for invalid coercion here
|
||||
let pos = size * idx
|
||||
setFilePos(s.file, pos.int64)
|
||||
let bytesRead = readBytes(s.file, recordBytes, 0, size)
|
||||
# TODO Can we recover from a corrupted database?
|
||||
if bytesRead.uint64 != size: panic()
|
||||
except IOError:
|
||||
panic()
|
||||
|
||||
try:
|
||||
decode(SSZ, recordBytes, T)
|
||||
resultAddr[] = decode(SSZ, recordBytes, T)
|
||||
except SerializationError:
|
||||
panic()
|
||||
|
||||
proc flush*(s: DbSeq) =
|
||||
s.file.flushFile()
|
||||
s.db.put(
|
||||
ord seqMetadata,
|
||||
s.name.toOpenArrayByte(0, s.name.len - 1),
|
||||
s.endPos.toBytesBE()).expect("working database")
|
||||
let found = queryRes.expect("working database")
|
||||
if not found: panic()
|
||||
|
||||
proc createMap*(db: SqStoreRef, keyspace: int;
|
||||
K, V: distinct type): DbMap[K, V] =
|
||||
|
@ -179,7 +171,7 @@ proc init*(T: type BeaconChainDB, dir: string, inMemory = false): BeaconChainDB
|
|||
else:
|
||||
T(backend: kvStore sqliteStore,
|
||||
deposits: createSeq(sqliteStore, dir, "deposits", DepositData),
|
||||
validators: createSeq(sqliteStore, dir, "validators", ImmutableValidatorData),
|
||||
validators: createSeq(sqliteStore, dir, "immutableValidatorData", ImmutableValidatorData),
|
||||
validatorsByKey: createMap(sqliteStore, int validatorIndexFromPubKey,
|
||||
ValidatorPubKey, ValidatorIndex))
|
||||
|
||||
|
|
|
@ -511,8 +511,6 @@ proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[
|
|||
|
||||
if prevBlock != nil:
|
||||
# TODO commit transaction
|
||||
m.db.validators.flush()
|
||||
m.db.deposits.flush()
|
||||
m.db.putEth1PersistedTo prevBlock.voteData.block_hash
|
||||
|
||||
# TODO Commit
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit c1037213910ac5ec2156740698d7ecc9dcbfbaeb
|
||||
Subproject commit cc0d15ccacaac220695943d28c04f2d82be9b979
|
Loading…
Reference in New Issue