Store all deposit-derived data in memory

This commit is contained in:
Zahary Karadjov 2020-10-15 14:49:02 +03:00 committed by zah
parent 7a577b2cef
commit 5f6bdc6709
8 changed files with 110 additions and 64 deletions

View File

@ -1,11 +1,11 @@
{.push raises: [Defect].}
import
typetraits,
typetraits, tables,
stew/[results, objects, endians2, io2],
serialization, chronicles, snappy,
eth/db/[kvstore, kvstore_sqlite3],
./spec/[datatypes, digest, crypto, state_transition],
./spec/[datatypes, digest, crypto, state_transition, signatures],
./ssz/[ssz_serialization, merkleization]
type
@ -18,6 +18,10 @@ type
db: SqStoreRef
keyspace: int
DepositsSeq = DbSeq[DepositData]
ImmutableValidatorDataSeq = seq[ImmutableValidatorData]
ValidatorKeyToIndexMap = Table[ValidatorPubKey, ValidatorIndex]
BeaconChainDB* = ref object
## Database storing resolved blocks and states - resolved blocks are such
## blocks that form a chain back to the tail block.
@ -32,10 +36,11 @@ type
## between different versions of the client and accidentally using an old
## database.
backend: KvStoreRef
preset: RuntimePreset
deposits*: DbSeq[DepositData]
validators*: DbSeq[ImmutableValidatorData]
validatorsByKey*: DbMap[ValidatorPubKey, ValidatorIndex]
deposits*: DepositsSeq
immutableValidatorData*: ImmutableValidatorDataSeq
validatorKeyToIndex*: ValidatorKeyToIndexMap
Keyspaces* = enum
defaultKeyspace = "kvstore"
@ -97,7 +102,7 @@ template panic =
# Review all usages.
raiseAssert "The database should not be corrupted"
proc createSeq*(db: SqStoreRef, baseDir, name: string, T: type): DbSeq[T] =
proc init*[T](Seq: type DbSeq[T], db: SqStoreRef, name: string): Seq =
db.exec("""
CREATE TABLE IF NOT EXISTS """ & name & """(
id INTEGER PRIMARY KEY,
@ -125,9 +130,9 @@ proc createSeq*(db: SqStoreRef, baseDir, name: string, T: type): DbSeq[T] =
let found = countQueryRes.expect("working database")
if not found: panic()
DbSeq[T](insertStmt: insertStmt,
selectStmt: selectStmt,
recordCount: recordCount)
Seq(insertStmt: insertStmt,
selectStmt: selectStmt,
recordCount: recordCount)
proc add*[T](s: var DbSeq[T], val: T) =
var bytes = SSZ.encode(val)
@ -154,27 +159,73 @@ proc createMap*(db: SqStoreRef, keyspace: int;
K, V: distinct type): DbMap[K, V] =
DbMap[K, V](db: db, keyspace: keyspace)
proc insert*[K, V](m: DbMap[K, V], key: K, value: V) =
proc insert*[K, V](m: var DbMap[K, V], key: K, value: V) =
m.db.put(m.keyspace, SSZ.encode key, SSZ.encode value).expect("working database")
proc contains*[K, V](m: DbMap[K, V], key: K): bool =
contains(m.db, SSZ.encode key).expect("working database")
proc init*(T: type BeaconChainDB, dir: string, inMemory = false): BeaconChainDB =
let s = createPath(dir, 0o750)
doAssert s.isOk # TODO Handle this in a better way
template insert*[K, V](t: var Table[K, V], key: K, value: V) =
add(t, key, value)
let sqliteStore = SqStoreRef.init(dir, "nbc", Keyspaces).expect(
"working database")
proc produceDerivedData(deposit: DepositData,
preset: RuntimePreset,
deposits: var DepositsSeq,
validators: var ImmutableValidatorDataSeq,
validatorKeyToIndex: var ValidatorKeyToIndexMap) =
if verify_deposit_signature(preset, deposit):
let pubkey = deposit.pubkey
if pubkey notin validatorKeyToIndex:
let idx = ValidatorIndex validators.len
validators.add ImmutableValidatorData(
pubkey: pubkey,
withdrawal_credentials: deposit.withdrawal_credentials)
validatorKeyToIndex.insert(pubkey, idx)
proc processDeposit*(db: BeaconChainDB, newDeposit: DepositData) =
db.deposits.add newDeposit
produceDerivedData(
newDeposit,
db.preset,
db.deposits,
db.immutableValidatorData,
db.validatorKeyToIndex)
proc init*(T: type BeaconChainDB,
preset: RuntimePreset,
dir: string,
inMemory = false): BeaconChainDB =
if inMemory:
T(backend: kvStore MemStoreRef.init())
# TODO
# The inMemory store shuold offer the complete functionality
# of the database-backed one (i.e. tracking of deposits and validators)
T(backend: kvStore MemStoreRef.init(), preset: preset)
else:
let s = createPath(dir, 0o750)
doAssert s.isOk # TODO Handle this in a better way
let sqliteStore = SqStoreRef.init(dir, "nbc", Keyspaces).expect(
"working database")
var
immutableValidatorData = newSeq[ImmutableValidatorData]()
validatorKeyToIndex = initTable[ValidatorPubKey, ValidatorIndex]()
depositsSeq = DbSeq[DepositData].init(sqliteStore, "deposits")
for i in 0 ..< depositsSeq.len:
produceDerivedData(
depositsSeq.get(i),
preset,
depositsSeq,
immutableValidatorData,
validatorKeyToIndex)
T(backend: kvStore sqliteStore,
deposits: createSeq(sqliteStore, dir, "deposits", DepositData),
validators: createSeq(sqliteStore, dir, "immutableValidatorData", ImmutableValidatorData),
validatorsByKey: createMap(sqliteStore, int validatorIndexFromPubKey,
ValidatorPubKey, ValidatorIndex))
preset: preset,
deposits: depositsSeq,
immutableValidatorData: immutableValidatorData,
validatorKeyToIndex: validatorKeyToIndex)
proc snappyEncode(inp: openArray[byte]): seq[byte] =
try:

View File

@ -98,7 +98,7 @@ proc init*(T: type BeaconNode,
netKeys = getPersistentNetKeys(rng[], conf)
nickname = if conf.nodeName == "auto": shortForm(netKeys)
else: conf.nodeName
db = BeaconChainDB.init(conf.databaseDir)
db = BeaconChainDB.init(conf.runtimePreset, conf.databaseDir)
var
mainchainMonitor: MainchainMonitor

View File

@ -624,7 +624,7 @@ proc createWalletInteractively*(
"consisting of 24 words. In case you lose your wallet and you " &
"need to restore it on a different machine, you can use the " &
"seed phrase to re-generate your signing and withdrawal keys."
echoP "The seed phrase should be kept secretly in a safe location as if " &
echoP "The seed phrase should be kept secret in a safe location as if " &
"you are protecting a sensitive password. It can be used to withdraw " &
"funds from your wallet."
echoP "We will display the seed phrase on the next screen. Please make sure " &
@ -663,9 +663,10 @@ proc createWalletInteractively*(
clearScreen()
echoP "To confirm that you've saved the seed phrase, please enter the first and the " &
"last three words of it. In case you've saved the seek phrase in your clipboard, " &
"we strongly advice clearing the clipboard now."
echoP "To confirm that you've saved the seed phrase, please enter the " &
"first and the last three words of it. In case you've saved the " &
"seek phrase in your clipboard, we strongly advice clearing the " &
"clipboard now."
echo ""
for i in countdown(2, 0):

View File

@ -2,7 +2,7 @@ import
std/[deques, tables, hashes, options, strformat, strutils],
chronos, web3, web3/ethtypes as web3Types, json, chronicles,
eth/common/eth_types, eth/async_utils,
spec/[datatypes, digest, crypto, beaconstate, helpers, signatures],
spec/[datatypes, digest, crypto, beaconstate, helpers],
ssz, beacon_chain_db, network_metadata, merkle_minimal, beacon_node_status
from times import epochTime
@ -272,7 +272,7 @@ proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] =
proc fetchDepositData*(p: Web3DataProviderRef,
fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]]
{.async, locks: 0.} =
info "Obtaining deposit log events", fromBlock, toBlock
debug "Obtaining deposit log events", fromBlock, toBlock
return readJsonDeposits(await p.ns.getJsonLogs(DepositEvent,
fromBlock = some blockId(fromBlock),
toBlock = some blockId(toBlock)))
@ -415,20 +415,12 @@ proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[
break
for deposit in blk.deposits:
m.db.deposits.add deposit.data
if verify_deposit_signature(m.preset, deposit.data):
let pubkey = deposit.data.pubkey
if pubkey notin m.db.validatorsByKey:
let idx = m.db.validators.len
m.db.validators.add ImmutableValidatorData(
pubkey: pubkey,
withdrawal_credentials: deposit.data.withdrawal_credentials)
m.db.validatorsByKey.insert(pubkey, ValidatorIndex idx)
m.db.processDeposit(deposit.data)
# TODO The len property is currently stored in memory which
# makes it unsafe in the face of failed transactions
blk.knownValidatorsCount = some m.db.validators.len
let activeValidatorsCount = m.db.immutableValidatorData.lenu64
blk.knownValidatorsCount = some activeValidatorsCount
discard m.eth1Chain.blocks.popFirst()
m.eth1Chain.blocksByHash.del blk.voteData.block_hash.asBlockHash
@ -436,7 +428,7 @@ proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[
let blockGenesisTime = genesis_time_from_eth1_timestamp(m.preset,
blk.timestamp)
if blockGenesisTime >= m.preset.MIN_GENESIS_TIME and
m.db.validators.len >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT:
activeValidatorsCount >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT:
result = (blk, prevBlock)
prevBlock = blk

View File

@ -6,7 +6,8 @@ import
../beacon_chain/block_pools/[chain_dag],
../beacon_chain/spec/[crypto, datatypes, digest, helpers,
state_transition, presets],
../beacon_chain/sszdump, ../research/simutils
../beacon_chain/[ssz, sszdump],
../research/simutils
type Timers = enum
tInit = "Initialize DB"
@ -84,8 +85,8 @@ proc cmdBench(conf: DbConf, runtimePreset: RuntimePreset) =
echo "Opening database..."
let
db = BeaconChainDB.init(conf.databaseDir.string)
dbBenchmark = BeaconChainDB.init("benchmark")
db = BeaconChainDB.init(runtimePreset, conf.databaseDir.string)
dbBenchmark = BeaconChainDB.init(runtimePreset, "benchmark")
defer: db.close()
if not ChainDAGRef.isInitialized(db):
@ -136,8 +137,8 @@ proc cmdBench(conf: DbConf, runtimePreset: RuntimePreset) =
printTimers(false, timers)
proc cmdDumpState(conf: DbConf) =
let db = BeaconChainDB.init(conf.databaseDir.string)
proc cmdDumpState(conf: DbConf, preset: RuntimePreset) =
let db = BeaconChainDB.init(preset, conf.databaseDir.string)
defer: db.close()
for stateRoot in conf.stateRoot:
@ -151,8 +152,8 @@ proc cmdDumpState(conf: DbConf) =
except CatchableError as e:
echo "Couldn't load ", stateRoot, ": ", e.msg
proc cmdDumpBlock(conf: DbConf) =
let db = BeaconChainDB.init(conf.databaseDir.string)
proc cmdDumpBlock(conf: DbConf, preset: RuntimePreset) =
let db = BeaconChainDB.init(preset, conf.databaseDir.string)
defer: db.close()
for blockRoot in conf.blockRootx:
@ -236,11 +237,11 @@ proc copyPrunedDatabase(
copyDb.putHeadBlock(headBlock.get)
copyDb.putTailBlock(tailBlock.get)
proc cmdPrune(conf: DbConf) =
proc cmdPrune(conf: DbConf, preset: RuntimePreset) =
let
db = BeaconChainDB.init(conf.databaseDir.string)
db = BeaconChainDB.init(preset, conf.databaseDir.string)
# TODO: add the destination as CLI paramter
copyDb = BeaconChainDB.init("pruned_db")
copyDb = BeaconChainDB.init(preset, "pruned_db")
defer:
db.close()
@ -248,9 +249,9 @@ proc cmdPrune(conf: DbConf) =
db.copyPrunedDatabase(copyDb, conf.dryRun, conf.verbose, conf.keepOldStates)
proc cmdRewindState(conf: DbConf, runtimePreset: RuntimePreset) =
proc cmdRewindState(conf: DbConf, preset: RuntimePreset) =
echo "Opening database..."
let db = BeaconChainDB.init(conf.databaseDir.string)
let db = BeaconChainDB.init(preset, conf.databaseDir.string)
defer: db.close()
if not ChainDAGRef.isInitialized(db):
@ -258,7 +259,7 @@ proc cmdRewindState(conf: DbConf, runtimePreset: RuntimePreset) =
quit 1
echo "Initializing block pool..."
let dag = init(ChainDAGRef, runtimePreset, db)
let dag = init(ChainDAGRef, preset, db)
let blckRef = dag.getRef(fromHex(Eth2Digest, conf.blockRoot))
if blckRef == nil:
@ -278,10 +279,10 @@ when isMainModule:
of bench:
cmdBench(conf, runtimePreset)
of dumpState:
cmdDumpState(conf)
cmdDumpState(conf, runtimePreset)
of dumpBlock:
cmdDumpBlock(conf)
cmdDumpBlock(conf, runtimePreset)
of pruneDatabase:
cmdPrune(conf)
cmdPrune(conf, runtimePreset)
of rewindState:
cmdRewindState(conf, runtimePreset)

View File

@ -46,16 +46,17 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
let
state = loadGenesis(validators, true)
genesisBlock = get_initial_beacon_block(state[].data)
runtimePreset = defaultRuntimePreset
echo "Starting simulation..."
let db = BeaconChainDB.init("block_sim_db")
let db = BeaconChainDB.init(runtimePreset, "block_sim_db")
defer: db.close()
ChainDAGRef.preInit(db, state[].data, state[].data, genesisBlock)
var
chainDag = init(ChainDAGRef, defaultRuntimePreset, db)
chainDag = init(ChainDAGRef, runtimePreset, db)
quarantine = QuarantineRef()
attPool = AttestationPool.init(chainDag, quarantine)
timers: array[Timers, RunningStat]
@ -108,7 +109,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
eth1data = get_eth1data_stub(
state.eth1_deposit_index, slot.compute_epoch_at_slot())
message = makeBeaconBlock(
defaultRuntimePreset,
runtimePreset,
hashedState,
proposerIdx,
head.root,

View File

@ -38,14 +38,14 @@ func withDigest(blck: TrustedBeaconBlock): TrustedSignedBeaconBlock =
suiteReport "Beacon chain DB" & preset():
wrappedTimedTest "empty database" & preset():
var
db = init(BeaconChainDB, "", inMemory = true)
db = BeaconChainDB.init(defaultRuntimePreset, "", inMemory = true)
check:
db.getStateRef(Eth2Digest()).isNil
db.getBlock(Eth2Digest()).isNone
wrappedTimedTest "sanity check blocks" & preset():
var
db = init(BeaconChainDB, "", inMemory = true)
db = BeaconChainDB.init(defaultRuntimePreset, "", inMemory = true)
let
signedBlock = withDigest(TrustedBeaconBlock())
@ -70,7 +70,7 @@ suiteReport "Beacon chain DB" & preset():
wrappedTimedTest "sanity check states" & preset():
var
db = init(BeaconChainDB, "", inMemory = true)
db = BeaconChainDB.init(defaultRuntimePreset, "", inMemory = true)
let
state = BeaconStateRef()
@ -86,7 +86,7 @@ suiteReport "Beacon chain DB" & preset():
wrappedTimedTest "find ancestors" & preset():
var
db = init(BeaconChainDB, "", inMemory = true)
db = BeaconChainDB.init(defaultRuntimePreset, "", inMemory = true)
let
a0 = withDigest(
@ -120,7 +120,7 @@ suiteReport "Beacon chain DB" & preset():
# serialization where an all-zero default-initialized bls signature could
# not be deserialized because the deserialization was too strict.
var
db = init(BeaconChainDB, "", inMemory = true)
db = BeaconChainDB.init(defaultRuntimePreset, "", inMemory = true)
let
state = initialize_beacon_state_from_eth1(

View File

@ -98,7 +98,7 @@ template timedTest*(name, body) =
testTimes.add (f, name)
proc makeTestDB*(tailState: BeaconState, tailBlock: SignedBeaconBlock): BeaconChainDB =
result = init(BeaconChainDB, "", inMemory = true)
result = BeaconChainDB.init(defaultRuntimePreset, "", inMemory = true)
ChainDAGRef.preInit(result, tailState, tailState, tailBlock)
proc makeTestDB*(validators: Natural): BeaconChainDB =