ncli_db: validator performance database tool

Record attestation performance per epoch in sqlite database
This commit is contained in:
Jacek Sieka 2021-05-27 15:22:38 +02:00 committed by zah
parent 90e3fb246f
commit d16da06c92
3 changed files with 248 additions and 21 deletions

View File

@ -218,7 +218,7 @@ type
internalIds: Table[ValidatorIndex, ValidatorInternalID] internalIds: Table[ValidatorIndex, ValidatorInternalID]
ValidatorInternalID = int32 ValidatorInternalID = int64
## Validator internal ID in the DB ## Validator internal ID in the DB
## This is cached to cost querying cost ## This is cached to cost querying cost
@ -307,13 +307,13 @@ proc checkDB(db: SlashingProtectionDB_v2, genesis_validators_root: Eth2Digest) =
## Check the metadata of the DB ## Check the metadata of the DB
let selectStmt = db.backend.prepareStmt( let selectStmt = db.backend.prepareStmt(
"SELECT * FROM metadata;", "SELECT * FROM metadata;",
NoParams, (int32, Hash32), NoParams, (int64, Hash32),
managed = false # manual memory management managed = false # manual memory management
).get() ).get()
var version: int32 var version: int64
var root: Eth2Digest var root: Eth2Digest
let status = selectStmt.exec do (res: (int32, Hash32)): let status = selectStmt.exec do (res: (int64, Hash32)):
version = res[0] version = res[0]
root.data = res[1] root.data = res[1]
@ -584,12 +584,12 @@ proc getMetadataTable_DbV2*(db: SlashingProtectionDB_v2): Option[Eth2Digest] =
WHERE 1=1 WHERE 1=1
and type='table' and type='table'
and name='metadata' and name='metadata'
""", NoParams, int32, """, NoParams, int64,
managed = false # manual memory management managed = false # manual memory management
).get() ).get()
var hasV2: int32 var hasV2: int64
let v2exists = existenceStmt.exec do (res: int32): let v2exists = existenceStmt.exec do (res: int64):
hasV2 = res hasV2 = res
existenceStmt.dispose() existenceStmt.dispose()
@ -602,13 +602,13 @@ proc getMetadataTable_DbV2*(db: SlashingProtectionDB_v2): Option[Eth2Digest] =
let selectStmt = db.backend.prepareStmt( let selectStmt = db.backend.prepareStmt(
"SELECT * FROM metadata;", "SELECT * FROM metadata;",
NoParams, (int32, Hash32), NoParams, (int64, Hash32),
managed = false # manual memory management managed = false # manual memory management
).get() ).get()
var version: int32 var version: int64
var root: Eth2Digest var root: Eth2Digest
let status = selectStmt.exec do (res: (int32, Hash32)): let status = selectStmt.exec do (res: (int64, Hash32)):
version = res[0] version = res[0]
root.data = res[1] root.data = res[1]

View File

@ -5,7 +5,7 @@ import
../beacon_chain/[beacon_chain_db, extras], ../beacon_chain/[beacon_chain_db, extras],
../beacon_chain/consensus_object_pools/[blockchain_dag, statedata_helpers], ../beacon_chain/consensus_object_pools/[blockchain_dag, statedata_helpers],
../beacon_chain/spec/[crypto, datatypes, digest, helpers, state_transition, ../beacon_chain/spec/[crypto, datatypes, digest, helpers, state_transition,
presets], state_transition_epoch, presets],
../beacon_chain/ssz, ../beacon_chain/ssz/sszdump, ../beacon_chain/ssz, ../beacon_chain/ssz/sszdump,
../research/simutils, ./e2store ../research/simutils, ./e2store
@ -28,6 +28,7 @@ type
rewindState rewindState
exportEra exportEra
validatorPerf validatorPerf
validatorDb = "Create or update attestation performance database"
# TODO: # TODO:
# This should probably allow specifying a run-time preset # This should probably allow specifying a run-time preset
@ -115,9 +116,17 @@ type
defaultValue: 0 defaultValue: 0
name: "slots" name: "slots"
desc: "Number of slots to run benchmark for, 0 = all the way to head".}: uint64 desc: "Number of slots to run benchmark for, 0 = all the way to head".}: uint64
of validatorDb:
outDir* {.
defaultValue: ""
name: "out-db"
desc: "Output database".}: string
perfect* {.
defaultValue: false
name: "perfect"
desc: "Include perfect records (full rewards)".}: bool
proc getBlockRange(dag: ChainDAGRef, startSlot: int64, count: uint64): seq[BlockRef] = proc getSlotRange(dag: ChainDAGRef, startSlot: int64, count: uint64): (Slot, Slot) =
# Range of block in reverse order
let let
start = start =
if startSlot >= 0: Slot(startSlot) if startSlot >= 0: Slot(startSlot)
@ -126,6 +135,10 @@ proc getBlockRange(dag: ChainDAGRef, startSlot: int64, count: uint64): seq[Block
ends = ends =
if count == 0: dag.head.slot + 1 if count == 0: dag.head.slot + 1
else: start + count else: start + count
(start, ends)
proc getBlockRange(dag: ChainDAGRef, start, ends: Slot): seq[BlockRef] =
# Range of block in reverse order
var var
blockRefs: seq[BlockRef] blockRefs: seq[BlockRef]
cur = dag.head cur = dag.head
@ -160,7 +173,8 @@ proc cmdBench(conf: DbConf, runtimePreset: RuntimePreset) =
ChainDAGRef.init(runtimePreset, db, {}) ChainDAGRef.init(runtimePreset, db, {})
var var
blockRefs = dag.getBlockRange(conf.benchSlot, conf.benchSlots) (start, ends) = dag.getSlotRange(conf.benchSlot, conf.benchSlots)
blockRefs = dag.getBlockRange(start, ends)
blocks: seq[TrustedSignedBeaconBlock] blocks: seq[TrustedSignedBeaconBlock]
echo &"Loaded {dag.blocks.len} blocks, head slot {dag.head.slot}, selected {blockRefs.len} blocks" echo &"Loaded {dag.blocks.len} blocks, head slot {dag.head.slot}, selected {blockRefs.len} blocks"
@ -438,7 +452,8 @@ proc cmdValidatorPerf(conf: DbConf, runtimePreset: RuntimePreset) =
let dag = ChainDAGRef.init(runtimePreset, db, {}) let dag = ChainDAGRef.init(runtimePreset, db, {})
var var
blockRefs = dag.getBlockRange(conf.perfSlot, conf.perfSlots) (start, ends) = dag.getSlotRange(conf.perfSlot, conf.perfSlots)
blockRefs = dag.getBlockRange(start, ends)
perfs = newSeq[ValidatorPerformance]( perfs = newSeq[ValidatorPerformance](
getStateField(dag.headState, validators).len()) getStateField(dag.headState, validators).len())
cache = StateCache() cache = StateCache()
@ -504,7 +519,6 @@ proc cmdValidatorPerf(conf: DbConf, runtimePreset: RuntimePreset) =
else: else:
perf.attestation_misses += 1; perf.attestation_misses += 1;
for bi in 0..<blockRefs.len: for bi in 0..<blockRefs.len:
blck = db.getBlock(blockRefs[blockRefs.len - bi - 1].root).get() blck = db.getBlock(blockRefs[blockRefs.len - bi - 1].root).get()
while getStateField(state[], slot) < blck.message.slot: while getStateField(state[], slot) < blck.message.slot:
@ -520,10 +534,14 @@ proc cmdValidatorPerf(conf: DbConf, runtimePreset: RuntimePreset) =
echo "State transition failed (!)" echo "State transition failed (!)"
quit 1 quit 1
# Capture rewards from the epoch leading up to the last block # Capture rewards of empty slots as well
let nextEpochStart = (blck.message.slot.epoch + 1).compute_start_slot_at_epoch while getStateField(state[], slot) < ends:
doAssert process_slots(state[].data, nextEpochStart, cache, rewards, {}) let ok = process_slots(
processEpoch() state[].data, getStateField(state[], slot) + 1, cache, rewards, {})
doAssert ok, "Slot processing can't fail with correct inputs"
if getStateField(state[], slot).isEpoch():
processEpoch()
echo "validator_index,attestation_hits,attestation_misses,head_attestation_hits,head_attestation_misses,target_attestation_hits,target_attestation_misses,delay_avg,first_slot_head_attester_when_first_slot_empty,first_slot_head_attester_when_first_slot_not_empty" echo "validator_index,attestation_hits,attestation_misses,head_attestation_hits,head_attestation_misses,target_attestation_hits,target_attestation_misses,delay_avg,first_slot_head_attester_when_first_slot_empty,first_slot_head_attester_when_first_slot_not_empty"
@ -546,6 +564,213 @@ proc cmdValidatorPerf(conf: DbConf, runtimePreset: RuntimePreset) =
perf.first_slot_head_attester_when_first_slot_empty,",", perf.first_slot_head_attester_when_first_slot_empty,",",
perf.first_slot_head_attester_when_first_slot_not_empty perf.first_slot_head_attester_when_first_slot_not_empty
proc cmdValidatorDb(conf: DbConf, runtimePreset: RuntimePreset) =
# Create a database with performance information for every epoch
echo "Opening database..."
let
db = BeaconChainDB.new(
runtimePreset, conf.databaseDir.string,)
defer:
db.close()
if not ChainDAGRef.isInitialized(db):
echo "Database not initialized"
quit 1
echo "Initializing block pool..."
let dag = ChainDAGRef.init(runtimePreset, db, {})
let outDb = SqStoreRef.init(conf.outDir, "validatorDb").expect("DB")
defer: outDb.close()
outDb.exec("""
CREATE TABLE IF NOT EXISTS validators_raw(
validator_index INTEGER PRIMARY KEY,
pubkey BLOB NOT NULL,
withdrawal_credentials BLOB NOT NULL
);
""").expect("DB")
# For convenient viewing
outDb.exec("""
CREATE VIEW IF NOT EXISTS validators AS
SELECT
validator_index,
'0x' || lower(hex(pubkey)) as pubkey,
'0x' || lower(hex(withdrawal_credentials)) as with_cred
FROM validators_raw;
""").expect("DB")
outDb.exec("""
CREATE TABLE IF NOT EXISTS epoch_info(
epoch INTEGER PRIMARY KEY,
current_epoch_raw INTEGER NOT NULL,
previous_epoch_raw INTEGER NOT NULL,
current_epoch_attesters_raw INTEGER NOT NULL,
current_epoch_target_attesters_raw INTEGER NOT NULL,
previous_epoch_attesters_raw INTEGER NOT NULL,
previous_epoch_target_attesters_raw INTEGER NOT NULL,
previous_epoch_head_attesters_raw INTEGER NOT NULL
);
""").expect("DB")
outDb.exec("""
CREATE TABLE IF NOT EXISTS validator_epoch_info(
validator_index INTEGER,
epoch INTEGER,
rewards INTEGER NOT NULL,
penalties INTEGER NOT NULL,
source_attester INTEGER NOT NULL,
target_attester INTEGER NOT NULL,
head_attester INTEGER NOT NULL,
inclusion_delay INTEGER NULL,
PRIMARY KEY(validator_index, epoch)
);
""").expect("DB")
let
insertValidator = outDb.prepareStmt("""
INSERT INTO validators_raw(
validator_index,
pubkey,
withdrawal_credentials)
VALUES(?, ?, ?);""",
(int64, array[48, byte], array[32, byte]), void).expect("DB")
insertEpochInfo = outDb.prepareStmt("""
INSERT INTO epoch_info(
epoch,
current_epoch_raw,
previous_epoch_raw,
current_epoch_attesters_raw,
current_epoch_target_attesters_raw,
previous_epoch_attesters_raw,
previous_epoch_target_attesters_raw,
previous_epoch_head_attesters_raw)
VALUES(?, ?, ?, ?, ?, ?, ?, ?);""",
(int64, int64, int64, int64, int64, int64, int64, int64), void).expect("DB")
insertValidatorInfo = outDb.prepareStmt("""
INSERT INTO validator_epoch_info(
validator_index,
epoch,
rewards,
penalties,
source_attester,
target_attester,
head_attester,
inclusion_delay)
VALUES(?, ?, ?, ?, ?, ?, ?, ?);""",
(int64, int64, int64, int64, int64, int64, int64, Option[int64]), void).expect("DB")
var vals: int64
discard outDb.exec("SELECT count(*) FROM validators", ()) do (res: int64):
vals = res
outDb.exec("BEGIN TRANSACTION;").expect("DB")
for i in vals..<getStateField(dag.headState, validators).len():
insertValidator.exec((
i,
getStateField(dag.headState, validators).data[i].pubkey.toRaw(),
getStateField(dag.headState, validators).data[i].withdrawal_credentials.data)).expect("DB")
outDb.exec("COMMIT;").expect("DB")
var minEpoch: Epoch
discard outDb.exec("SELECT MAX(epoch) FROM epoch_info", ()) do (res: int64):
minEpoch = (res + 1).Epoch
var
cache = StateCache()
rewards = RewardInfo()
blck: TrustedSignedBeaconBlock
let
start = minEpoch.compute_start_slot_at_epoch()
ends = dag.finalizedHead.slot # Avoid dealing with changes
if start > ends:
echo "No (new) data found, database at ", minEpoch, ", finalized to ", ends.epoch
quit 1
let blockRefs = dag.getBlockRange(start, ends)
echo "Analyzing performance for epochs ",
start.epoch, " - ", ends.epoch
let state = newClone(dag.headState)
dag.updateStateData(
state[], blockRefs[^1].atSlot(if start > 0: start - 1 else: 0.Slot),
false, cache)
proc processEpoch() =
echo getStateField(state[], slot).epoch
outDb.exec("BEGIN TRANSACTION;").expect("DB")
insertEpochInfo.exec(
(getStateField(state[], slot).epoch.int64,
rewards.total_balances.current_epoch_raw.int64,
rewards.total_balances.previous_epoch_raw.int64,
rewards.total_balances.current_epoch_attesters_raw.int64,
rewards.total_balances.current_epoch_target_attesters_raw.int64,
rewards.total_balances.previous_epoch_attesters_raw.int64,
rewards.total_balances.previous_epoch_target_attesters_raw.int64,
rewards.total_balances.previous_epoch_head_attesters_raw.int64)
).expect("DB")
for index, status in rewards.statuses.pairs():
if not is_eligible_validator(status):
continue
let
notSlashed = (RewardFlags.isSlashed notin status.flags)
source_attester =
notSlashed and status.is_previous_epoch_attester.isSome()
target_attester =
notSlashed and RewardFlags.isPreviousEpochTargetAttester in status.flags
head_attester =
notSlashed and RewardFlags.isPreviousEpochHeadAttester in status.flags
delay =
if notSlashed and status.is_previous_epoch_attester.isSome():
some(int64(status.is_previous_epoch_attester.get().delay))
else:
none(int64)
if conf.perfect or not
(source_attester and target_attester and head_attester and
delay.isSome() and delay.get() == 1):
insertValidatorInfo.exec(
(index.int64,
getStateField(state[], slot).epoch.int64,
status.delta.rewards.int64,
status.delta.penalties.int64,
int64(source_attester), # Source delta
int64(target_attester), # Target delta
int64(head_attester), # Head delta
delay)).expect("DB")
outDb.exec("COMMIT;").expect("DB")
for bi in 0..<blockRefs.len:
blck = db.getBlock(blockRefs[blockRefs.len - bi - 1].root).get()
while getStateField(state[], slot) < blck.message.slot:
let ok = process_slots(
state[].data, getStateField(state[], slot) + 1, cache, rewards, {})
doAssert ok, "Slot processing can't fail with correct inputs"
if getStateField(state[], slot).isEpoch():
processEpoch()
if not state_transition(
runtimePreset, state[].data, blck, cache, rewards, {slotProcessed}, noRollback):
echo "State transition failed (!)"
quit 1
# Capture rewards of empty slots as well, including the epoch that got
# finalized
while getStateField(state[], slot) <= ends:
let ok = process_slots(
state[].data, getStateField(state[], slot) + 1, cache, rewards, {})
doAssert ok, "Slot processing can't fail with correct inputs"
if getStateField(state[], slot).isEpoch():
processEpoch()
when isMainModule: when isMainModule:
var var
conf = DbConf.load() conf = DbConf.load()
@ -566,3 +791,5 @@ when isMainModule:
cmdExportEra(conf, runtimePreset) cmdExportEra(conf, runtimePreset)
of validatorPerf: of validatorPerf:
cmdValidatorPerf(conf, runtimePreset) cmdValidatorPerf(conf, runtimePreset)
of validatorDb:
cmdValidatorDb(conf, runtimePreset)

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit 2a292cfb621643d61112f6c049dbd9fe9ba37343 Subproject commit 3514ee6484038b7e66540f5b5853831198562412