ncli_db: import states and blocks from era file (#3313)

This commit is contained in:
Jacek Sieka 2022-01-25 09:28:26 +01:00 committed by GitHub
parent 00a347457a
commit d076e1a11b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 132 additions and 37 deletions

View File

@ -72,10 +72,16 @@ type
## revisited in the future, when/if the calling code safely can handle ## revisited in the future, when/if the calling code safely can handle
## corruption of this kind. ## corruption of this kind.
## ##
## We do however make an effort not to crash on invalid data inside the ## The database follows an "mostly-consistent" model where it's possible
## database - this may have a number of "natural" causes such as switching ## that some data has been lost to crashes and restarts - for example,
## between different versions of the client and accidentally using an old ## the state root table might contain entries that don't lead to a state
## database. ## etc - this makes it easier to defer certain operations such as pruning
## and cleanup, but also means that some amount of "junk" is left behind
## when the application is restarted or crashes in the wrong moment.
##
## Generally, sqlite performs a commit at the end of every write, meaning
## that data write order is respected - the strategy thus becomes to write
## bulk data first, then update pointers like the `head root` entry.
db*: SqStoreRef db*: SqStoreRef
v0: BeaconChainDBV0 v0: BeaconChainDBV0
@ -266,6 +272,24 @@ proc loadImmutableValidators(vals: DbSeq[ImmutableValidatorDataDb2]): seq[Immuta
pubkey: tmp.pubkey.loadValid(), pubkey: tmp.pubkey.loadValid(),
withdrawal_credentials: tmp.withdrawal_credentials) withdrawal_credentials: tmp.withdrawal_credentials)
template withManyWrites*(db: BeaconChainDB, body: untyped) =
# We don't enforce strong ordering or atomicity requirements in the beacon
# chain db in general, relying instead on readers to be able to deal with
# minor inconsistencies - however, putting writes in a transaction is orders
# of magnitude faster when doing many small writes, so we use this as an
# optimization technique and the templace is named accordingly.
mixin expectDb
db.db.exec("BEGIN TRANSACTION;").expectDb()
var commit = false
try:
body
commit = true
finally:
if commit:
db.db.exec("COMMIT TRANSACTION;").expectDb()
else:
db.db.exec("ROLLBACK TRANSACTION;").expectDb()
proc new*(T: type BeaconChainDB, proc new*(T: type BeaconChainDB,
dir: string, dir: string,
inMemory = false, inMemory = false,
@ -489,14 +513,17 @@ proc putBeaconBlockSummary(
db.summaries.putSSZ(root.data, value) db.summaries.putSSZ(root.data, value)
proc putBlock*(db: BeaconChainDB, value: phase0.TrustedSignedBeaconBlock) = proc putBlock*(db: BeaconChainDB, value: phase0.TrustedSignedBeaconBlock) =
db.withManyWrites:
db.blocks.putSnappySSZ(value.root.data, value) db.blocks.putSnappySSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary()) db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())
proc putBlock*(db: BeaconChainDB, value: altair.TrustedSignedBeaconBlock) = proc putBlock*(db: BeaconChainDB, value: altair.TrustedSignedBeaconBlock) =
db.withManyWrites:
db.altairBlocks.putSnappySSZ(value.root.data, value) db.altairBlocks.putSnappySSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary()) db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())
proc putBlock*(db: BeaconChainDB, value: bellatrix.TrustedSignedBeaconBlock) = proc putBlock*(db: BeaconChainDB, value: bellatrix.TrustedSignedBeaconBlock) =
db.withManyWrites:
db.mergeBlocks.putSnappySSZ(value.root.data, value) db.mergeBlocks.putSnappySSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary()) db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())
@ -540,6 +567,10 @@ proc putState*(db: BeaconChainDB, key: Eth2Digest, value: bellatrix.BeaconState)
db.mergeStatesNoVal.putSnappySSZ( db.mergeStatesNoVal.putSnappySSZ(
key.data, toBeaconStateNoImmutableValidators(value)) key.data, toBeaconStateNoImmutableValidators(value))
proc putState*(db: BeaconChainDB, state: ForkyHashedBeaconState) =
db.withManyWrites:
db.putStateRoot(state.latest_block_root(), state.data.slot, state.root)
db.putState(state.root, state.data)
# For testing rollback # For testing rollback
proc putCorruptPhase0State*(db: BeaconChainDB, key: Eth2Digest) = proc putCorruptPhase0State*(db: BeaconChainDB, key: Eth2Digest) =
db.statesNoVal.putSnappySSZ(key.data, Validator()) db.statesNoVal.putSnappySSZ(key.data, Validator())
@ -926,10 +957,9 @@ iterator getAncestorSummaries*(db: BeaconChainDB, root: Eth2Digest):
# Write the newly found summaries in a single transaction - on first migration # Write the newly found summaries in a single transaction - on first migration
# from the old format, this brings down the write from minutes to seconds # from the old format, this brings down the write from minutes to seconds
if newSummaries.len() > 0: if newSummaries.len() > 0:
db.db.exec("BEGIN TRANSACTION;").expectDb() db.withManyWrites:
for s in newSummaries: for s in newSummaries:
db.putBeaconBlockSummary(s.root, s.summary) db.putBeaconBlockSummary(s.root, s.summary)
db.db.exec("COMMIT;").expectDb()
if false: if false:
# When the current version has been online for a bit, we can safely remove # When the current version has been online for a bit, we can safely remove

View File

@ -761,8 +761,7 @@ proc putState(dag: ChainDAGRef, state: StateData) =
# transaction to prevent database inconsistencies, but the state loading code # transaction to prevent database inconsistencies, but the state loading code
# is resilient against one or the other going missing # is resilient against one or the other going missing
withState(state.data): withState(state.data):
dag.db.putStateRoot(state.latest_block_root(), state.data.slot, state.root) dag.db.putState(state)
dag.db.putState(state.root, state.data)
debug "Stored state", putStateDur = Moment.now() - startTick debug "Stored state", putStateDur = Moment.now() - startTick
@ -1537,11 +1536,11 @@ proc preInit*(
quit 1 quit 1
let blck = get_initial_beacon_block(state) let blck = get_initial_beacon_block(state)
db.putGenesisBlock(blck.root)
db.putBlock(blck) db.putBlock(blck)
db.putState(state)
db.putGenesisBlock(blck.root)
db.putStateRoot(state.latest_block_root(), state.data.slot, state.root)
db.putState(state.root, state.data)
blck.root blck.root
else: # tail and genesis are the same else: # tail and genesis are the same
withBlck(tailBlock): withBlck(tailBlock):
@ -1561,12 +1560,11 @@ proc preInit*(
quit 1 quit 1
db.putBlock(blck) db.putBlock(blck)
db.putState(state)
db.putTailBlock(blck.root) db.putTailBlock(blck.root)
db.putHeadBlock(blck.root) db.putHeadBlock(blck.root)
db.putStateRoot(state.latest_block_root(), state.data.slot, state.root)
db.putState(state.root, state.data)
notice "New database from snapshot", notice "New database from snapshot",
genesisBlockRoot = shortLog(genesisBlockRoot), genesisBlockRoot = shortLog(genesisBlockRoot),
genesisStateRoot = shortLog(getStateRoot(genesisState)), genesisStateRoot = shortLog(getStateRoot(genesisState)),

View File

@ -140,8 +140,7 @@ proc doTrustedNodeSync*(
stateRoot = shortLog(state.root), stateRoot = shortLog(state.root),
genesis_validators_root = shortLog(state.data.genesis_validators_root) genesis_validators_root = shortLog(state.data.genesis_validators_root)
db.putStateRoot(state.latest_block_root(), state.data.slot, state.root) db.putState(state)
db.putState(state.root, state.data)
let blck = get_initial_beacon_block(state) let blck = get_initial_beacon_block(state)
@ -271,8 +270,7 @@ proc doTrustedNodeSync*(
info "Writing checkpoint state", info "Writing checkpoint state",
stateRoot = shortLog(state.root) stateRoot = shortLog(state.root)
db.putStateRoot(state.latest_block_root(), state.data.slot, state.root) db.putState(state)
db.putState(state.root, state.data)
withBlck(checkpointBlock): withBlck(checkpointBlock):
info "Writing checkpoint block", info "Writing checkpoint block",

View File

@ -132,7 +132,7 @@ proc readHeader(f: IoHandle): Result[Header, string] =
ok(Header(typ: typ, len: int(len))) ok(Header(typ: typ, len: int(len)))
proc readRecord(f: IoHandle, data: var seq[byte]): Result[Header, string] = proc readRecord*(f: IoHandle, data: var seq[byte]): Result[Header, string] =
let header = ? readHeader(f) let header = ? readHeader(f)
if header.len > 0: if header.len > 0:
? f.checkBytesLeft(header.len) ? f.checkBytesLeft(header.len)

View File

@ -1,12 +1,13 @@
import import
os, stats, strformat, tables, snappy, std/[os, stats, strformat, tables],
snappy, snappy/framing,
chronicles, confutils, stew/[byteutils, io2], eth/db/kvstore_sqlite3, chronicles, confutils, stew/[byteutils, io2], eth/db/kvstore_sqlite3,
../beacon_chain/networking/network_metadata, ../beacon_chain/networking/network_metadata,
../beacon_chain/[beacon_chain_db], ../beacon_chain/[beacon_chain_db],
../beacon_chain/consensus_object_pools/[blockchain_dag], ../beacon_chain/consensus_object_pools/[blockchain_dag],
../beacon_chain/spec/datatypes/[phase0, altair, bellatrix], ../beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
../beacon_chain/spec/[ ../beacon_chain/spec/[
beaconstate, helpers, state_transition, state_transition_epoch, validator, beaconstate, state_transition, state_transition_epoch, validator,
ssz_codec], ssz_codec],
../beacon_chain/sszdump, ../beacon_chain/sszdump,
../research/simutils, ../research/simutils,
@ -35,6 +36,7 @@ type
pruneDatabase pruneDatabase
rewindState = "Extract any state from the database based on a given block and slot, replaying if needed" rewindState = "Extract any state from the database based on a given block and slot, replaying if needed"
exportEra = "Write an experimental era file" exportEra = "Write an experimental era file"
importEra = "Import era files to the database"
validatorPerf validatorPerf
validatorDb = "Create or update attestation performance database" validatorDb = "Create or update attestation performance database"
@ -69,18 +71,22 @@ type
desc: "Store each read block back into a separate database".}: bool desc: "Store each read block back into a separate database".}: bool
storeStates* {. storeStates* {.
defaultValue: false defaultValue: false
name: "store-states"
desc: "Store a state each epoch into a separate database".}: bool desc: "Store a state each epoch into a separate database".}: bool
printTimes* {. printTimes* {.
defaultValue: true defaultValue: true
name: "print-times"
desc: "Print csv of block processing time".}: bool desc: "Print csv of block processing time".}: bool
resetCache* {. resetCache* {.
defaultValue: false defaultValue: false
name: "reset-cache"
desc: "Process each block with a fresh cache".}: bool desc: "Process each block with a fresh cache".}: bool
of DbCmd.dumpState: of DbCmd.dumpState:
stateRoot* {. stateRoot* {.
argument argument
desc: "State roots to save".}: seq[string] name: "state-root"
desc: "State root(s) to save".}: seq[string]
of DbCmd.putState: of DbCmd.putState:
stateFile {. stateFile {.
@ -91,7 +97,8 @@ type
of DbCmd.dumpBlock: of DbCmd.dumpBlock:
blockRootx* {. blockRootx* {.
argument argument
desc: "Block roots to save".}: seq[string] name: "block-root"
desc: "Block root(s) to save".}: seq[string]
of DbCmd.putBlock: of DbCmd.putBlock:
blckFile {. blckFile {.
@ -114,9 +121,11 @@ type
of DbCmd.pruneDatabase: of DbCmd.pruneDatabase:
dryRun* {. dryRun* {.
defaultValue: false defaultValue: false
name: "dry-run"
desc: "Don't write to the database copy; only simulate actions; default false".}: bool desc: "Don't write to the database copy; only simulate actions; default false".}: bool
keepOldStates* {. keepOldStates* {.
defaultValue: true defaultValue: true
name: "keep-old"
desc: "Keep pre-finalization states; default true".}: bool desc: "Keep pre-finalization states; default true".}: bool
verbose* {. verbose* {.
defaultValue: false defaultValue: false
@ -125,6 +134,7 @@ type
of DbCmd.rewindState: of DbCmd.rewindState:
blockRoot* {. blockRoot* {.
argument argument
name: "block-root"
desc: "Block root".}: string desc: "Block root".}: string
slot* {. slot* {.
@ -137,8 +147,15 @@ type
desc: "The era number to write".}: uint64 desc: "The era number to write".}: uint64
eraCount* {. eraCount* {.
defaultValue: 1 defaultValue: 1
name: "count"
desc: "Number of eras to write".}: uint64 desc: "Number of eras to write".}: uint64
of DbCmd.importEra:
eraFiles* {.
argument
name: "file"
desc: "The name of the era file(s) to import".}: seq[string]
of DbCmd.validatorPerf: of DbCmd.validatorPerf:
perfSlot* {. perfSlot* {.
defaultValue: -128 * SLOTS_PER_EPOCH.int64 defaultValue: -128 * SLOTS_PER_EPOCH.int64
@ -166,11 +183,6 @@ type
var shouldShutDown = false var shouldShutDown = false
proc putState(db: BeaconChainDB, state: ForkedHashedBeaconState) =
withState(state):
db.putStateRoot(state.latest_block_root(), state.data.slot, state.root)
db.putState(state.root, state.data)
func getSlotRange(dag: ChainDAGRef, startSlot: int64, count: uint64): (Slot, Slot) = func getSlotRange(dag: ChainDAGRef, startSlot: int64, count: uint64): (Slot, Slot) =
let let
start = start =
@ -335,7 +347,8 @@ proc cmdPutState(conf: DbConf, cfg: RuntimeConfig) =
if shouldShutDown: quit QuitSuccess if shouldShutDown: quit QuitSuccess
let state = newClone(readSszForkedHashedBeaconState( let state = newClone(readSszForkedHashedBeaconState(
cfg, readAllBytes(file).tryGet())) cfg, readAllBytes(file).tryGet()))
db.putState(state[]) withState(state[]):
db.putState(state)
proc cmdDumpBlock(conf: DbConf) = proc cmdDumpBlock(conf: DbConf) =
let db = BeaconChainDB.new(conf.databaseDir.string) let db = BeaconChainDB.new(conf.databaseDir.string)
@ -559,6 +572,60 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
printTimers(true, timers) printTimers(true, timers)
proc cmdImportEra(conf: DbConf, cfg: RuntimeConfig) =
let db = BeaconChainDB.new(conf.databaseDir.string)
defer: db.close()
type Timers = enum
tBlock
tState
var
blocks = 0
states = 0
others = 0
timers: array[Timers, RunningStat]
var data: seq[byte]
for file in conf.eraFiles:
let f = openFile(file, {OpenFlags.Read}).valueOr:
warn "Can't open ", file
continue
defer: discard closeFile(f)
while true:
let header = readRecord(f, data).valueOr:
break
if header.typ == SnappyBeaconBlock:
withTimer(timers[tBlock]):
let uncompressed = framingFormatUncompress(data)
let blck = try: readSszForkedSignedBeaconBlock(cfg, uncompressed)
except CatchableError as exc:
error "Invalid snappy block", msg = exc.msg, file
continue
withBlck(blck.asTrusted()):
db.putBlock(blck)
blocks += 1
elif header.typ == SnappyBeaconState:
withTimer(timers[tState]):
let uncompressed = framingFormatUncompress(data)
let state = try: newClone(
readSszForkedHashedBeaconState(cfg, uncompressed))
except CatchableError as exc:
error "Invalid snappy state", msg = exc.msg, file
continue
withState(state[]):
db.putState(state)
states += 1
else:
info "Skipping record", typ = toHex(header.typ)
others += 1
notice "Done", blocks, states, others
printTimers(true, timers)
type type
# Validator performance metrics tool based on # Validator performance metrics tool based on
# https://github.com/paulhauner/lighthouse/blob/etl/lcli/src/etl/validator_performance.rs # https://github.com/paulhauner/lighthouse/blob/etl/lcli/src/etl/validator_performance.rs
@ -998,6 +1065,8 @@ when isMainModule:
cmdRewindState(conf, cfg) cmdRewindState(conf, cfg)
of DbCmd.exportEra: of DbCmd.exportEra:
cmdExportEra(conf, cfg) cmdExportEra(conf, cfg)
of DbCmd.importEra:
cmdImportEra(conf, cfg)
of DbCmd.validatorPerf: of DbCmd.validatorPerf:
cmdValidatorPerf(conf, cfg) cmdValidatorPerf(conf, cfg)
of DbCmd.validatorDb: of DbCmd.validatorDb: