import os, stats, strformat, tables, chronicles, confutils, stew/[byteutils, io2], eth/db/kvstore_sqlite3, ../beacon_chain/networking/network_metadata, ../beacon_chain/[beacon_chain_db], ../beacon_chain/consensus_object_pools/[blockchain_dag], ../beacon_chain/spec/datatypes/[phase0, altair, merge], ../beacon_chain/spec/[ beaconstate, helpers, state_transition, state_transition_epoch, validator], ../beacon_chain/sszdump, ../research/simutils, ./e2store type Timers = enum tInit = "Initialize DB" tLoadBlock = "Load block from database" tLoadState = "Load state from database" tAdvanceSlot = "Advance slot, non-epoch" tAdvanceEpoch = "Advance slot, epoch" tApplyBlock = "Apply block, no slot processing" tDbLoad = "Database load" tDbStore = "Database store" type DbCmd* {.pure.} = enum bench = "Run a replay benchmark for block and epoch processing" dumpState = "Extract a state from the database as-is - only works for states that have been explicitly stored" putState = "Store a given BeaconState in the database" dumpBlock = "Extract a (trusted) SignedBeaconBlock from the database" putBlock = "Store a given SignedBeaconBlock in the database, potentially updating some of the pointers" pruneDatabase rewindState = "Extract any state from the database based on a given block and slot, replaying if needed" exportEra = "Write an experimental era file" validatorPerf validatorDb = "Create or update attestation performance database" # TODO: # This should probably allow specifying a run-time preset DbConf = object databaseDir* {. defaultValue: "" desc: "Directory where `nbc.sqlite` is stored" name: "db" }: InputDir eth2Network* {. desc: "The Eth2 network preset to use" name: "network" }: Option[string] case cmd* {. command desc: "" .}: DbCmd of DbCmd.bench: benchSlot* {. defaultValue: 0 name: "start-slot" desc: "Starting slot, negative = backwards from head".}: int64 benchSlots* {. defaultValue: 50000 name: "slots" desc: "Number of slots to run benchmark for, 0 = all the way to head".}: uint64 storeBlocks* {. defaultValue: false desc: "Store each read block back into a separate database".}: bool storeStates* {. defaultValue: false desc: "Store a state each epoch into a separate database".}: bool printTimes* {. defaultValue: true desc: "Print csv of block processing time".}: bool resetCache* {. defaultValue: false desc: "Process each block with a fresh cache".}: bool of DbCmd.dumpState: stateRoot* {. argument desc: "State roots to save".}: seq[string] of DbCmd.putState: stateFile {. argument name: "file" desc: "Files to import".}: seq[string] of DbCmd.dumpBlock: blockRootx* {. argument desc: "Block roots to save".}: seq[string] of DbCmd.putBlock: blckFile {. argument name: "file" desc: "Files to import".}: seq[string] setHead {. defaultValue: false name: "set-head" desc: "Update head to this block"}: bool setTail {. defaultValue: false name: "set-tail" desc: "Update tail to this block"}: bool setGenesis {. defaultValue: false name: "set-genesis" desc: "Update genesis to this block"}: bool of DbCmd.pruneDatabase: dryRun* {. defaultValue: false desc: "Don't write to the database copy; only simulate actions; default false".}: bool keepOldStates* {. defaultValue: true desc: "Keep pre-finalization states; default true".}: bool verbose* {. defaultValue: false desc: "Enables verbose output; default false".}: bool of DbCmd.rewindState: blockRoot* {. argument desc: "Block root".}: string slot* {. argument desc: "Slot".}: uint64 of DbCmd.exportEra: era* {. defaultValue: 0 desc: "The era number to write".}: uint64 eraCount* {. defaultValue: 1 desc: "Number of eras to write".}: uint64 of DbCmd.validatorPerf: perfSlot* {. defaultValue: -128 * SLOTS_PER_EPOCH.int64 name: "start-slot" desc: "Starting slot, negative = backwards from head".}: int64 perfSlots* {. defaultValue: 0 name: "slots" desc: "Number of slots to run benchmark for, 0 = all the way to head".}: uint64 of DbCmd.validatorDb: outDir* {. defaultValue: "" name: "out-db" desc: "Output database".}: string perfect* {. defaultValue: false name: "perfect" desc: "Include perfect records (full rewards)".}: bool proc putState(db: BeaconChainDB, state: ForkedHashedBeaconState) = withState(state): db.putStateRoot(state.latest_block_root(), state.data.slot, state.root) db.putState(state.root, state.data) func getSlotRange(dag: ChainDAGRef, startSlot: int64, count: uint64): (Slot, Slot) = let start = if startSlot >= 0: Slot(startSlot) elif uint64(-startSlot) >= dag.head.slot: Slot(0) else: Slot(dag.head.slot - uint64(-startSlot)) ends = if count == 0: dag.head.slot + 1 else: start + count (start, ends) func getBlockRange(dag: ChainDAGRef, start, ends: Slot): seq[BlockRef] = # Range of block in reverse order var blockRefs: seq[BlockRef] cur = dag.head while cur != nil: if cur.slot < ends: if cur.slot < start or cur.slot == 0: # skip genesis break else: blockRefs.add cur cur = cur.parent blockRefs proc cmdBench(conf: DbConf, cfg: RuntimeConfig) = var timers: array[Timers, RunningStat] echo "Opening database..." let db = BeaconChainDB.new(conf.databaseDir.string,) dbBenchmark = BeaconChainDB.new("benchmark") defer: db.close() dbBenchmark.close() if (let v = ChainDAGRef.isInitialized(db); v.isErr()): echo "Database not initialized: ", v.error() quit 1 echo "Initializing block pool..." let validatorMonitor = newClone(ValidatorMonitor.init()) dag = withTimerRet(timers[tInit]): ChainDAGRef.init(cfg, db, validatorMonitor, {}) var (start, ends) = dag.getSlotRange(conf.benchSlot, conf.benchSlots) blockRefs = dag.getBlockRange(start, ends) blocks: ( seq[phase0.TrustedSignedBeaconBlock], seq[altair.TrustedSignedBeaconBlock], seq[merge.TrustedSignedBeaconBlock]) echo &"Loaded {dag.blocks.len} blocks, head slot {dag.head.slot}, selected {blockRefs.len} blocks" doAssert blockRefs.len() > 0, "Must select at least one block" for b in 0.. dag.head.slot: echo "Written all complete eras" break var e2s = E2Store.open(".", name, firstSlot).get() defer: e2s.close() dag.withState(tmpState[], canonical): e2s.appendRecord(stateData.data.phase0Data.data).get() var ancestors: seq[BlockRef] cur = canonical.blck if era != 0: while cur != nil and cur.slot >= firstSlot: ancestors.add(cur) cur = cur.parent for i in 0.. 0, "Must select at least one block" echo "# Analyzing performance for epochs ", blockRefs[^1].slot.epoch, " - ", blockRefs[0].slot.epoch let state = newClone(dag.headState) dag.updateStateData( state[], blockRefs[^1].atSlot(blockRefs[^1].slot - 1), false, cache) proc processEpoch() = let prev_epoch_target_slot = state[].data.get_previous_epoch().compute_start_slot_at_epoch() penultimate_epoch_end_slot = if prev_epoch_target_slot == 0: Slot(0) else: prev_epoch_target_slot - 1 first_slot_empty = state[].data.get_block_root_at_slot(prev_epoch_target_slot) == state[].data.get_block_root_at_slot(penultimate_epoch_end_slot) let first_slot_attesters = block: let committee_count = state[].data.get_committee_count_per_slot( prev_epoch_target_slot.epoch, cache) var indices = HashSet[ValidatorIndex]() for committee_index in 0.. ends: echo "No (new) data found, database at ", minEpoch, ", finalized to ", ends.epoch quit 1 let blockRefs = dag.getBlockRange(start, ends) echo "Analyzing performance for epochs ", start.epoch, " - ", ends.epoch let state = newClone(dag.headState) dag.updateStateData( state[], blockRefs[^1].atSlot(if start > 0: start - 1 else: 0.Slot), false, cache) var inTxn = false proc processEpoch() = echo getStateField(state[].data, slot).epoch if not inTxn: outDb.exec("BEGIN TRANSACTION;").expect("DB") inTxn = true case info.kind of EpochInfoFork.Phase0: template info: untyped = info.phase0Data insertEpochInfo.exec( (getStateField(state[].data, slot).epoch.int64, info.balances.current_epoch_raw.int64, info.balances.previous_epoch_raw.int64, info.balances.current_epoch_attesters_raw.int64, info.balances.current_epoch_target_attesters_raw.int64, info.balances.previous_epoch_attesters_raw.int64, info.balances.previous_epoch_target_attesters_raw.int64, info.balances.previous_epoch_head_attesters_raw.int64) ).expect("DB") for index, status in info.validators.pairs(): if not is_eligible_validator(status): continue let notSlashed = (RewardFlags.isSlashed notin status.flags) source_attester = notSlashed and status.is_previous_epoch_attester.isSome() target_attester = notSlashed and RewardFlags.isPreviousEpochTargetAttester in status.flags head_attester = notSlashed and RewardFlags.isPreviousEpochHeadAttester in status.flags delay = if notSlashed and status.is_previous_epoch_attester.isSome(): some(int64(status.is_previous_epoch_attester.get().delay)) else: none(int64) if conf.perfect or not (source_attester and target_attester and head_attester and delay.isSome() and delay.get() == 1): insertValidatorInfo.exec( (index.int64, getStateField(state[].data, slot).epoch.int64, status.delta.rewards.int64, status.delta.penalties.int64, int64(source_attester), # Source delta int64(target_attester), # Target delta int64(head_attester), # Head delta delay)).expect("DB") of EpochInfoFork.Altair: echo "TODO altair support" if getStateField(state[].data, slot).epoch.int64 mod 16 == 0: inTxn = false outDb.exec("COMMIT;").expect("DB") for bi in 0..