More efficient reward data persistance; Address review comments

The new format is based on compressed CSV files in two channels:

* Detailed per-epoch data
* Aggregated "daily" summaries

The use of append-only CSV file speeds up significantly the epoch
processing speed during data generation. The use of compression
results in smaller storage requirements overall. The use of the
aggregated files has a very minor cost in both CPU and storage,
but leads to near interactive speed for report generation.

Other changes:

- Implemented support for graceful shut downs to avoid corrupting
  the saved files.

- Fixed a memory leak caused by lacking `StateCache` clean up on each
  iteration.

- Addressed review comments

- Moved the rewards and penalties calculation code in a separate module

Required invasive changes to existing modules:

- The `data` field of the `KeyedBlockRef` type is made public to be used
  by the validator rewards monitor's Chain DAG update procedure.

- The `getForkedBlock` procedure from the `blockchain_dag.nim` module
  is made public to be used by the validator rewards monitor's Chain DAG
  update procedure.
This commit is contained in:
Zahary Karadjov 2022-01-17 14:58:33 +02:00 committed by zah
parent 29aad0241b
commit 47f1f7ff1a
13 changed files with 1092 additions and 554 deletions

View File

@ -75,8 +75,9 @@ OK: 16/16 Fail: 0/16 Skip: 0/16
+ Smoke test initialize_beacon_state_from_eth1 [Preset: mainnet] OK
+ get_beacon_proposer_index OK
+ latest_block_root OK
+ process_slots OK
```
OK: 3/3 Fail: 0/3 Skip: 0/3
OK: 4/4 Fail: 0/4 Skip: 0/4
## Beacon time
```diff
+ basics OK
@ -441,4 +442,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL---
OK: 237/239 Fail: 0/239 Skip: 2/239
OK: 238/240 Fail: 0/240 Skip: 2/240

View File

@ -47,7 +47,8 @@ TOOLS := \
ncli_db \
stack_sizes \
nimbus_validator_client \
nimbus_signing_node
nimbus_signing_node \
validator_db_aggregator
.PHONY: $(TOOLS)
# bench_bls_sig_agggregation TODO reenable after bls v0.10.1 changes

View File

@ -61,7 +61,7 @@ type
# by root without keeping a Table that keeps a separate copy of the digest
# At the time of writing, a Table[Eth2Digest, BlockRef] adds about 100mb of
# unnecessary overhead.
data: BlockRef
data*: BlockRef
ChainDAGRef* = ref object
## Pool of blocks responsible for keeping a DAG of resolved blocks.

View File

@ -342,7 +342,7 @@ proc getStateData(
true
proc getForkedBlock(db: BeaconChainDB, root: Eth2Digest):
proc getForkedBlock*(db: BeaconChainDB, root: Eth2Digest):
Opt[ForkedTrustedSignedBeaconBlock] =
# When we only have a digest, we don't know which fork it's from so we try
# them one by one - this should be used sparingly

View File

@ -1851,7 +1851,7 @@ template gossipMaxSize(T: untyped): uint32 =
T is altair.SignedBeaconBlock:
GOSSIP_MAX_SIZE
else:
static: raiseAssert "unknown type"
{.fatal: "unknown type".}
static: doAssert maxSize <= maxGossipMaxSize()
maxSize.uint32

View File

@ -130,27 +130,27 @@ proc get_slashing_penalty*(state: ForkyBeaconState,
validator_effective_balance div MIN_SLASHING_PENALTY_QUOTIENT
elif state is altair.BeaconState:
validator_effective_balance div MIN_SLASHING_PENALTY_QUOTIENT_ALTAIR
elif state is merge.BeaconState:
elif state is bellatrix.BeaconState:
validator_effective_balance div MIN_SLASHING_PENALTY_QUOTIENT_MERGE
else:
raiseAssert "invalid BeaconState type"
{.fatal: "invalid BeaconState type".}
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/beacon-chain.md#slash_validator
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/altair/beacon-chain.md#modified-slash_validator
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/merge/beacon-chain.md#modified-slash_validator
proc get_whistleblower_reward*(validator_effective_balance: Gwei): Gwei =
func get_whistleblower_reward*(validator_effective_balance: Gwei): Gwei =
validator_effective_balance div WHISTLEBLOWER_REWARD_QUOTIENT
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/beacon-chain.md#slash_validator
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/altair/beacon-chain.md#modified-slash_validator
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/merge/beacon-chain.md#modified-slash_validator
proc get_proposer_reward(state: ForkyBeaconState, whistleblower_reward: Gwei): Gwei =
func get_proposer_reward(state: ForkyBeaconState, whistleblower_reward: Gwei): Gwei =
when state is phase0.BeaconState:
whistleblower_reward div PROPOSER_REWARD_QUOTIENT
elif state is altair.BeaconState or state is merge.BeaconState:
elif state is altair.BeaconState or state is bellatrix.BeaconState:
whistleblower_reward * PROPOSER_WEIGHT div WEIGHT_DENOMINATOR
else:
raiseAssert "invalid BeaconState type"
{.fatal: "invalid BeaconState type".}
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/beacon-chain.md#slash_validator
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/altair/beacon-chain.md#modified-slash_validator

View File

@ -895,3 +895,8 @@ template isomorphicCast*[T, U](x: U): T =
doAssert sizeof(T) == sizeof(U)
doAssert getSizeofSig(T()) == getSizeofSig(U())
cast[ptr T](unsafeAddr x)[]
proc clear*(cache: var StateCache) =
cache.shuffled_active_validator_indices.clear
cache.beacon_proposer_indices.clear
cache.sync_committees.clear

View File

@ -434,7 +434,7 @@ func get_participant_reward*(total_active_balance: Gwei): Gwei =
get_base_reward_per_increment(total_active_balance) * total_active_increments
max_participant_rewards =
total_base_rewards * SYNC_REWARD_WEIGHT div WEIGHT_DENOMINATOR div SLOTS_PER_EPOCH
return max_participant_rewards div SYNC_COMMITTEE_SIZE
max_participant_rewards div SYNC_COMMITTEE_SIZE
# https://github.com/ethereum/consensus-specs/blob/v1.1.0-alpha.6/specs/altair/beacon-chain.md#sync-committee-processing
func get_proposer_reward*(participant_reward: Gwei): Gwei =

View File

@ -859,22 +859,22 @@ func get_adjusted_total_slashing_balance*(
let multiplier =
# tradeoff here about interleaving phase0/altair, but for these
# single-constant changes...
uint64(when state is phase0.BeaconState:
when state is phase0.BeaconState:
PROPORTIONAL_SLASHING_MULTIPLIER
elif state is altair.BeaconState:
PROPORTIONAL_SLASHING_MULTIPLIER_ALTAIR
elif state is bellatrix.BeaconState:
PROPORTIONAL_SLASHING_MULTIPLIER_MERGE
else:
raiseAssert "process_slashings: incorrect BeaconState type")
return min(sum(state.slashings.data) * multiplier, total_balance)
{.fatal: "process_slashings: incorrect BeaconState type".}
min(sum(state.slashings.data) * multiplier, total_balance)
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/beacon-chain.md#slashings
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/beacon-chain.md#slashings
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/merge/beacon-chain.md#slashings
func slashing_penalty_applies*(validator: Validator, epoch: Epoch): bool =
return validator.slashed and
epoch + EPOCHS_PER_SLASHINGS_VECTOR div 2 == validator.withdrawable_epoch
validator.slashed and
epoch + EPOCHS_PER_SLASHINGS_VECTOR div 2 == validator.withdrawable_epoch
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/beacon-chain.md#slashings
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/beacon-chain.md#slashings
@ -886,7 +886,7 @@ func get_slashing_penalty*(validator: Validator,
# numerator to avoid uint64 overflow
let penalty_numerator = validator.effective_balance div increment *
adjusted_total_slashing_balance
return penalty_numerator div total_balance * increment
penalty_numerator div total_balance * increment
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/beacon-chain.md#slashings
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/beacon-chain.md#slashings

370
ncli/ncli_common.nim Normal file
View File

@ -0,0 +1,370 @@
import
re, strutils, os,
../beacon_chain/spec/[
datatypes/base,
datatypes/phase0,
datatypes/altair,
datatypes/merge,
beaconstate,
state_transition_epoch,
state_transition_block,
signatures],
../beacon_chain/consensus_object_pools/block_pools_types
type
RewardsAndPenalties* = object
source_outcome*: int64
max_source_reward*: Gwei
target_outcome*: int64
max_target_reward*: Gwei
head_outcome*: int64
max_head_reward*: Gwei
inclusion_delay_outcome*: int64
max_inclusion_delay_reward*: Gwei
sync_committee_outcome*: int64
max_sync_committee_reward*: Gwei
proposer_outcome*: int64
inactivity_penalty*: Gwei
slashing_outcome*: int64
deposits*: Gwei
inclusion_delay*: Option[uint64]
ParticipationFlags* = object
currentEpochParticipation: EpochParticipationFlags
previousEpochParticipation: EpochParticipationFlags
PubkeyToIndexTable = Table[ValidatorPubKey, int]
AuxiliaryState* = object
epochParticipationFlags: ParticipationFlags
pubkeyToIndex: PubkeyToIndexTable
const
epochInfoFileNameDigitsCount = 8
epochFileNameExtension* = ".epoch"
proc copyParticipationFlags*(auxiliaryState: var AuxiliaryState,
forkedState: ForkedHashedBeaconState) =
withState(forkedState):
when stateFork > BeaconStateFork.Phase0:
template flags: untyped = auxiliaryState.epochParticipationFlags
flags.currentEpochParticipation = state.data.current_epoch_participation
flags.previousEpochParticipation = state.data.previous_epoch_participation
proc getEpochRange*(dir: string):
tuple[firstEpoch, lastEpoch: Epoch] =
const epochInfoFileNameRegexStr =
r"\d{" & $epochInfoFileNameDigitsCount & r"}\" & epochFileNameExtension
var pattern {.global.}: Regex
once: pattern = re(epochInfoFileNameRegexStr)
var smallestEpochFileName =
'9'.repeat(epochInfoFileNameDigitsCount) & epochFileNameExtension
var largestEpochFileName =
'0'.repeat(epochInfoFileNameDigitsCount) & epochFileNameExtension
for (_, fn) in walkDir(dir.string, relative = true):
if fn.match(pattern):
if fn < smallestEpochFileName:
smallestEpochFileName = fn
if fn > largestEpochFileName:
largestEpochFileName = fn
result.firstEpoch = parseUInt(
smallestEpochFileName[0 ..< epochInfoFileNameDigitsCount]).Epoch
result.lastEpoch = parseUInt(
largestEpochFileName[0 ..< epochInfoFileNameDigitsCount]).Epoch
proc epochAsString*(epoch: Epoch): string =
let strEpoch = $epoch
'0'.repeat(epochInfoFileNameDigitsCount - strEpoch.len) & strEpoch
proc getFilePathForEpoch*(epoch: Epoch, dir: string): string =
dir / epochAsString(epoch) & epochFileNameExtension
func getBlockRange*(dag: ChainDAGRef, start, ends: Slot): seq[BlockRef] =
# Range of block in reverse order
doAssert start < ends
result = newSeqOfCap[BlockRef](ends - start)
var current = dag.head
while current != nil:
if current.slot < ends:
if current.slot < start or current.slot == 0: # skip genesis
break
else:
result.add current
current = current.parent
func getOutcome(delta: RewardDelta): int64 =
delta.rewards.int64 - delta.penalties.int64
proc collectSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: ForkyBeaconState, total_balance: Gwei) =
let
epoch = get_current_epoch(state)
adjusted_total_slashing_balance = get_adjusted_total_slashing_balance(
state, total_balance)
for index in 0 ..< state.validators.len:
let validator = unsafeAddr state.validators.asSeq()[index]
if slashing_penalty_applies(validator[], epoch):
rewardsAndPenalties[index].slashing_outcome +=
validator[].get_slashing_penalty(
adjusted_total_slashing_balance, total_balance).int64
proc collectEpochRewardsAndPenalties*(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: phase0.BeaconState, cache: var StateCache, cfg: RuntimeConfig) =
if get_current_epoch(state) == GENESIS_EPOCH:
return
var info: phase0.EpochInfo
info.init(state)
info.process_attestations(state, cache)
doAssert info.validators.len == state.validators.len
rewardsAndPenalties.setLen(state.validators.len)
let
finality_delay = get_finality_delay(state)
total_balance = info.balances.current_epoch
total_balance_sqrt = integer_squareroot(total_balance)
for index, validator in info.validators.pairs:
if not is_eligible_validator(validator):
continue
let base_reward = get_base_reward_sqrt(
state, index.ValidatorIndex, total_balance_sqrt)
template get_attestation_component_reward_helper(attesting_balance: Gwei): Gwei =
get_attestation_component_reward(attesting_balance,
info.balances.current_epoch, base_reward.uint64, finality_delay)
template rp: untyped = rewardsAndPenalties[index]
rp.source_outcome = get_source_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_source_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_attesters)
rp.target_outcome = get_target_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_target_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_target_attesters)
rp.head_outcome = get_head_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_head_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_head_attesters)
let (inclusion_delay_delta, proposer_delta) = get_inclusion_delay_delta(
validator, base_reward)
rp.inclusion_delay_outcome = inclusion_delay_delta.getOutcome
rp.max_inclusion_delay_reward =
base_reward - state_transition_epoch.get_proposer_reward(base_reward)
rp.inactivity_penalty = get_inactivity_penalty_delta(
validator, base_reward, finality_delay).penalties
if proposer_delta.isSome:
let proposer_index = proposer_delta.get[0]
if proposer_index < info.validators.lenu64:
rewardsAndPenalties[proposer_index].proposer_outcome +=
proposer_delta.get[1].getOutcome
rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch)
proc collectEpochRewardsAndPenalties*(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: altair.BeaconState | merge.BeaconState,
cache: var StateCache, cfg: RuntimeConfig) =
if get_current_epoch(state) == GENESIS_EPOCH:
return
var info: altair.EpochInfo
info.init(state)
doAssert info.validators.len == state.validators.len
rewardsAndPenalties.setLen(state.validators.len)
let
total_active_balance = info.balances.current_epoch
base_reward_per_increment = get_base_reward_per_increment(
total_active_balance)
for flag_index in 0 ..< PARTICIPATION_FLAG_WEIGHTS.len:
for validator_index, delta in get_flag_index_deltas(
state, flag_index, base_reward_per_increment, info):
template rp: untyped = rewardsAndPenalties[validator_index]
let
base_reward = get_base_reward_increment(
state, validator_index, base_reward_per_increment)
active_increments = get_active_increments(info)
unslashed_participating_increment =
get_unslashed_participating_increment(info, flag_index)
max_flag_index_reward = get_flag_index_reward(
state, base_reward, active_increments,
unslashed_participating_increment,
PARTICIPATION_FLAG_WEIGHTS[flag_index].uint64)
case flag_index
of TIMELY_SOURCE_FLAG_INDEX:
rp.source_outcome = delta.getOutcome
rp.max_source_reward = max_flag_index_reward
of TIMELY_TARGET_FLAG_INDEX:
rp.target_outcome = delta.getOutcome
rp.max_target_reward = max_flag_index_reward
of TIMELY_HEAD_FLAG_INDEX:
rp.head_outcome = delta.getOutcome
rp.max_head_reward = max_flag_index_reward
else:
raiseAssert(&"Unknown flag index {flag_index}.")
for validator_index, penalty in get_inactivity_penalty_deltas(
cfg, state, info):
rewardsAndPenalties[validator_index].inactivity_penalty += penalty
rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch)
proc collectFromSlashedValidator(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: ForkyBeaconState, slashedIndex, proposerIndex: ValidatorIndex) =
template slashed_validator: untyped = state.validators[slashedIndex]
let slashingPenalty = get_slashing_penalty(state, slashed_validator.effective_balance)
let whistleblowerReward = get_whistleblower_reward(slashed_validator.effective_balance)
rewardsAndPenalties[slashedIndex].slashing_outcome -= slashingPenalty.int64
rewardsAndPenalties[proposerIndex].slashing_outcome += whistleblowerReward.int64
proc collectFromProposerSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock) =
withStateAndBlck(forkedState, forkedBlock):
for proposer_slashing in blck.message.body.proposer_slashings:
doAssert check_proposer_slashing(state.data, proposer_slashing, {}).isOk
let slashedIndex = proposer_slashing.signed_header_1.message.proposer_index
rewardsAndPenalties.collectFromSlashedValidator(state.data,
slashedIndex.ValidatorIndex, blck.message.proposer_index.ValidatorIndex)
proc collectFromAttesterSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock) =
withStateAndBlck(forkedState, forkedBlock):
for attester_slashing in blck.message.body.attester_slashings:
let attester_slashing_validity = check_attester_slashing(
state.data, attester_slashing, {})
doAssert attester_slashing_validity.isOk
for slashedIndex in attester_slashing_validity.value:
rewardsAndPenalties.collectFromSlashedValidator(
state.data, slashedIndex, blck.message.proposer_index.ValidatorIndex)
proc collectFromAttestations(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
epochParticipationFlags: var ParticipationFlags,
cache: var StateCache) =
withStateAndBlck(forkedState, forkedBlock):
when stateFork > BeaconStateFork.Phase0:
let base_reward_per_increment = get_base_reward_per_increment(
get_total_active_balance(state.data, cache))
doAssert base_reward_per_increment > 0
for attestation in blck.message.body.attestations:
doAssert check_attestation(state.data, attestation, {}, cache).isOk
let proposerReward =
if attestation.data.target.epoch == get_current_epoch(state.data):
get_proposer_reward(
state.data, attestation, base_reward_per_increment, cache,
epochParticipationFlags.currentEpochParticipation)
else:
get_proposer_reward(
state.data, attestation, base_reward_per_increment, cache,
epochParticipationFlags.previousEpochParticipation)
rewardsAndPenalties[blck.message.proposer_index].proposer_outcome +=
proposerReward.int64
let inclusionDelay = state.data.slot - attestation.data.slot
for index in get_attesting_indices(
state.data, attestation.data, attestation.aggregation_bits, cache):
rewardsAndPenalties[index].inclusion_delay = some(inclusionDelay.uint64)
proc collectFromDeposits(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
pubkeyToIndex: var PubkeyToIndexTable,
cfg: RuntimeConfig) =
withStateAndBlck(forkedState, forkedBlock):
for deposit in blck.message.body.deposits:
let pubkey = deposit.data.pubkey
let amount = deposit.data.amount
var index = findValidatorIndex(state.data, pubkey)
if index == -1:
index = pubkeyToIndex.getOrDefault(pubkey, -1)
if index != -1:
rewardsAndPenalties[index].deposits += amount
elif verify_deposit_signature(cfg, deposit.data):
pubkeyToIndex[pubkey] = rewardsAndPenalties.len
rewardsAndPenalties.add(
RewardsAndPenalties(deposits: amount))
proc collectFromSyncAggregate(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
cache: var StateCache) =
withStateAndBlck(forkedState, forkedBlock):
when stateFork > BeaconStateFork.Phase0:
let total_active_balance = get_total_active_balance(state.data, cache)
let participant_reward = get_participant_reward(total_active_balance)
let proposer_reward =
state_transition_block.get_proposer_reward(participant_reward)
let indices = get_sync_committee_cache(state.data, cache).current_sync_committee
template aggregate: untyped = blck.message.body.sync_aggregate
doAssert indices.len == SYNC_COMMITTEE_SIZE
doAssert aggregate.sync_committee_bits.len == SYNC_COMMITTEE_SIZE
doAssert state.data.current_sync_committee.pubkeys.len == SYNC_COMMITTEE_SIZE
for i in 0 ..< SYNC_COMMITTEE_SIZE:
rewardsAndPenalties[indices[i]].max_sync_committee_reward +=
participant_reward
if aggregate.sync_committee_bits[i]:
rewardsAndPenalties[indices[i]].sync_committee_outcome +=
participant_reward.int64
rewardsAndPenalties[blck.message.proposer_index].proposer_outcome +=
proposer_reward.int64
else:
rewardsAndPenalties[indices[i]].sync_committee_outcome -=
participant_reward.int64
proc collectBlockRewardsAndPenalties*(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
auxiliaryState: var AuxiliaryState,
cache: var StateCache, cfg: RuntimeConfig) =
rewardsAndPenalties.collectFromProposerSlashings(forkedState, forkedBlock)
rewardsAndPenalties.collectFromAttesterSlashings(forkedState, forkedBlock)
rewardsAndPenalties.collectFromAttestations(
forkedState, forkedBlock, auxiliaryState.epochParticipationFlags, cache)
rewardsAndPenalties.collectFromDeposits(
forkedState, forkedBlock, auxiliaryState.pubkeyToIndex, cfg)
# This table is needed only to resolve double deposits in the same block, so
# it can be cleared after processing all deposits for the current block.
auxiliaryState.pubkeyToIndex.clear
rewardsAndPenalties.collectFromSyncAggregate(forkedState, forkedBlock, cache)
proc getStartEpoch*(outDir: string): Epoch =
outDir.getEpochRange.lastEpoch + 1
func serializeToCsv*(rp: RewardsAndPenalties,
avgInclusionDelay = none(float)): string =
for name, value in fieldPairs(rp):
if value isnot Option:
result &= $value & ","
if avgInclusionDelay.isSome:
result.addFloat(avgInclusionDelay.get)
elif rp.inclusion_delay.isSome:
result &= $rp.inclusion_delay.get
result &= "\n"

View File

@ -1,5 +1,5 @@
import
os, stats, strformat, tables,
os, stats, strformat, tables, snappy,
chronicles, confutils, stew/[byteutils, io2], eth/db/kvstore_sqlite3,
../beacon_chain/networking/network_metadata,
../beacon_chain/[beacon_chain_db],
@ -7,9 +7,13 @@ import
../beacon_chain/spec/datatypes/[phase0, altair, merge],
../beacon_chain/spec/[
beaconstate, helpers, state_transition, state_transition_epoch, validator,
state_transition_block, signatures],
ssz_codec],
../beacon_chain/sszdump,
../research/simutils, ./e2store
../research/simutils,
./e2store, ./ncli_common
when defined(posix):
import system/ansi_c
type Timers = enum
tInit = "Initialize DB"
@ -146,18 +150,21 @@ type
desc: "Number of slots to run benchmark for, 0 = all the way to head".}: uint64
of DbCmd.validatorDb:
outDir* {.
defaultValue: ""
name: "out-db"
desc: "Output database".}: string
perfect* {.
defaultValue: false
name: "perfect"
desc: "Include perfect records (full rewards)".}: bool
name: "out-dir"
abbr: "o"
desc: "Output directory".}: string
startEpoch* {.
defaultValue: 0
name: "start-epoch"
desc: "Epoch from which to start recording statistics. " &
"By default one more than the last epoch in the database.".}: uint
abbr: "s"
desc: "Epoch from which to start recording statistics." &
"By default one past the last epoch in the output directory".}: Option[uint]
endEpoch* {.
name: "end-epoch"
abbr: "e"
desc: "The last for which to record statistics." &
"By default the last epoch in the input database".}: Option[uint]
var shouldShutDown = false
proc putState(db: BeaconChainDB, state: ForkedHashedBeaconState) =
withState(state):
@ -175,21 +182,6 @@ func getSlotRange(dag: ChainDAGRef, startSlot: int64, count: uint64): (Slot, Slo
else: start + count
(start, ends)
func getBlockRange(dag: ChainDAGRef, start, ends: Slot): seq[BlockRef] =
# Range of block in reverse order
var
blockRefs: seq[BlockRef]
cur = dag.head
while cur != nil:
if cur.slot < ends:
if cur.slot < start or cur.slot == 0: # skip genesis
break
else:
blockRefs.add cur
cur = cur.parent
blockRefs
proc cmdBench(conf: DbConf, cfg: RuntimeConfig) =
var timers: array[Timers, RunningStat]
@ -249,6 +241,7 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) =
template processBlocks(blocks: auto) =
for b in blocks.mitems():
if shouldShutDown: quit QuitSuccess
while getStateField(stateData[].data, slot) < b.message.slot:
let isEpoch = (getStateField(stateData[].data, slot) + 1).is_epoch()
withTimer(timers[if isEpoch: tAdvanceEpoch else: tAdvanceSlot]):
@ -317,6 +310,7 @@ proc cmdDumpState(conf: DbConf) =
mergeState = (ref merge.HashedBeaconState)()
for stateRoot in conf.stateRoot:
if shouldShutDown: quit QuitSuccess
template doit(state: untyped) =
try:
state.root = Eth2Digest.fromHex(stateRoot)
@ -338,6 +332,7 @@ proc cmdPutState(conf: DbConf, cfg: RuntimeConfig) =
defer: db.close()
for file in conf.stateFile:
if shouldShutDown: quit QuitSuccess
let state = newClone(readSszForkedHashedBeaconState(
cfg, readAllBytes(file).tryGet()))
db.putState(state[])
@ -347,6 +342,7 @@ proc cmdDumpBlock(conf: DbConf) =
defer: db.close()
for blockRoot in conf.blockRootx:
if shouldShutDown: quit QuitSuccess
try:
let root = Eth2Digest.fromHex(blockRoot)
if (let blck = db.getPhase0Block(root); blck.isSome):
@ -365,6 +361,8 @@ proc cmdPutBlock(conf: DbConf, cfg: RuntimeConfig) =
defer: db.close()
for file in conf.blckFile:
if shouldShutDown: quit QuitSuccess
let blck = readSszForkedSignedBeaconBlock(
cfg, readAllBytes(file).tryGet())
@ -413,6 +411,7 @@ proc copyPrunedDatabase(
copyDb.putBlock(db.getPhase0Block(genesisBlock.get).get)
for signedBlock in getAncestors(db, headBlock.get):
if shouldShutDown: quit QuitSuccess
if not dry_run:
copyDb.putBlock(signedBlock)
copyDb.checkpoint()
@ -527,6 +526,7 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
timers: array[Timers, RunningStat]
for era in conf.era ..< conf.era + conf.eraCount:
if shouldShutDown: quit QuitSuccess
let
firstSlot =
if era == 0: none(Slot)
@ -663,6 +663,8 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) =
of EpochInfoFork.Altair:
echo "TODO altair"
if shouldShutDown: quit QuitSuccess
for bi in 0 ..< blockRefs.len:
blck = db.getPhase0Block(blockRefs[blockRefs.len - bi - 1].root).get()
while getStateField(state[].data, slot) < blck.message.slot:
@ -718,8 +720,7 @@ proc createValidatorsRawTable(db: SqStoreRef) =
db.exec("""
CREATE TABLE IF NOT EXISTS validators_raw(
validator_index INTEGER PRIMARY KEY,
pubkey BLOB NOT NULL UNIQUE,
withdrawal_credentials BLOB NOT NULL
pubkey BLOB NOT NULL UNIQUE
);
""").expect("DB")
@ -728,167 +729,17 @@ proc createValidatorsView(db: SqStoreRef) =
CREATE VIEW IF NOT EXISTS validators AS
SELECT
validator_index,
'0x' || lower(hex(pubkey)) as pubkey,
'0x' || lower(hex(withdrawal_credentials)) as with_cred
'0x' || lower(hex(pubkey)) as pubkey
FROM validators_raw;
""").expect("DB")
proc createPhase0EpochInfoTable(db: SqStoreRef) =
db.exec("""
CREATE TABLE IF NOT EXISTS phase0_epoch_info(
epoch INTEGER PRIMARY KEY,
current_epoch_raw INTEGER NOT NULL,
previous_epoch_raw INTEGER NOT NULL,
current_epoch_attesters_raw INTEGER NOT NULL,
current_epoch_target_attesters_raw INTEGER NOT NULL,
previous_epoch_attesters_raw INTEGER NOT NULL,
previous_epoch_target_attesters_raw INTEGER NOT NULL,
previous_epoch_head_attesters_raw INTEGER NOT NULL
);
""").expect("DB")
proc createAltairEpochInfoTable(db: SqStoreRef) =
db.exec("""
CREATE TABLE IF NOT EXISTS altair_epoch_info(
epoch INTEGER PRIMARY KEY,
previous_epoch_timely_source_balance INTEGER NOT NULL,
previous_epoch_timely_target_balance INTEGER NOT NULL,
previous_epoch_timely_head_balance INTEGER NOT NULL,
current_epoch_timely_target_balance INTEGER NOT NULL,
current_epoch_total_active_balance INTEGER NOT NULL
);
""").expect("DB")
proc createValidatorEpochInfoTable(db: SqStoreRef) =
db.exec("""
CREATE TABLE IF NOT EXISTS validator_epoch_info(
validator_index INTEGER,
epoch INTEGER,
source_outcome INTEGER NOT NULL,
max_source_reward INTEGER NOT NULL,
target_outcome INTEGER NOT NULL,
max_target_reward INTEGER NOT NULL,
head_outcome INTEGER NOT NULL,
max_head_reward INTEGER NOT NULL,
inclusion_delay_outcome INTEGER NOT NULL,
max_inclusion_delay_reward INTEGER NOT NULL,
sync_committee_outcome INTEGER NOT NULL,
max_sync_committee_reward INTEGER NOT NULL,
proposer_outcome INTEGER NOT NULL,
inactivity_penalty INTEGER NOT NULL,
slashing_outcome INTEGER NOT NULL,
inclusion_delay INTEGER NULL,
PRIMARY KEY(validator_index, epoch)
);
""").expect("DB")
proc createInsertValidatorProc(db: SqStoreRef): auto =
db.prepareStmt("""
INSERT OR IGNORE INTO validators_raw(
validator_index,
pubkey,
withdrawal_credentials)
VALUES(?, ?, ?);""",
(int64, array[48, byte], array[32, byte]), void).expect("DB")
proc createInsertPhase0EpochInfoProc(db: SqStoreRef): auto =
db.prepareStmt("""
INSERT OR IGNORE INTO phase0_epoch_info(
epoch,
current_epoch_raw,
previous_epoch_raw,
current_epoch_attesters_raw,
current_epoch_target_attesters_raw,
previous_epoch_attesters_raw,
previous_epoch_target_attesters_raw,
previous_epoch_head_attesters_raw)
VALUES(?, ?, ?, ?, ?, ?, ?, ?);""",
(int64, int64, int64, int64, int64, int64, int64, int64), void).expect("DB")
proc createInsertAltairEpochInfoProc(db: SqStoreRef): auto =
db.prepareStmt("""
INSERT OR IGNORE INTO altair_epoch_info(
epoch,
previous_epoch_timely_source_balance,
previous_epoch_timely_target_balance,
previous_epoch_timely_head_balance,
current_epoch_timely_target_balance,
current_epoch_total_active_balance)
VALUES(?, ?, ?, ?, ?, ?);""",
(int64, int64, int64, int64, int64, int64), void).expect("DB")
proc createInsertValidatorEpochInfoProc(db: SqStoreRef): auto =
db.prepareStmt("""
INSERT OR IGNORE INTO validator_epoch_info(
validator_index,
epoch,
source_outcome,
max_source_reward,
target_outcome,
max_target_reward,
head_outcome,
max_head_reward,
inclusion_delay_outcome,
max_inclusion_delay_reward,
sync_committee_outcome,
max_sync_committee_reward,
proposer_outcome,
inactivity_penalty,
slashing_outcome,
inclusion_delay)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""",
(int64, int64, int64, int64, int64, int64, int64, int64, int64, int64,
int64, int64, int64, int64, int64, Option[int64]), void).expect("DB")
type
RewardsAndPenalties = object
source_outcome: int64
max_source_reward: Gwei
target_outcome: int64
max_target_reward: Gwei
head_outcome: int64
max_head_reward: Gwei
inclusion_delay_outcome: int64
max_inclusion_delay_reward: Gwei
sync_committee_outcome: int64
max_sync_committee_reward: Gwei
proposer_outcome: int64
inactivity_penalty: Gwei
slashing_outcome: int64
deposits: Gwei
inclusion_delay: Option[int64]
ParticipationFlags = object
currentEpochParticipation: EpochParticipationFlags
previousEpochParticipation: EpochParticipationFlags
PubkeyToIndexTable = Table[ValidatorPubKey, int]
AuxiliaryState = object
epochParticipationFlags: ParticipationFlags
pubkeyToIndex: PubkeyToIndexTable
proc copyParticipationFlags(auxiliaryState: var AuxiliaryState,
forkedState: ForkedHashedBeaconState) =
withState(forkedState):
when stateFork > BeaconStateFork.Phase0:
template flags: untyped = auxiliaryState.epochParticipationFlags
flags.currentEpochParticipation = state.data.current_epoch_participation
flags.previousEpochParticipation = state.data.previous_epoch_participation
proc isPerfect(info: RewardsAndPenalties): bool =
info.slashing_outcome >= 0 and
info.source_outcome == info.max_source_reward.int64 and
info.target_outcome == info.max_target_reward.int64 and
info.head_outcome == info.max_head_reward.int64 and
info.inclusion_delay_outcome == info.max_inclusion_delay_reward.int64 and
info.sync_committee_outcome == info.max_sync_committee_reward.int64
proc getMaxEpochFromDbTable(db: SqStoreRef, tableName: string): int64 =
var queryResult: int64
discard db.exec(&"SELECT MAX(epoch) FROM {tableName}", ()) do (res: int64):
queryResult = res
return queryResult
pubkey)
VALUES(?, ?);""",
(int64, array[48, byte]), void).expect("DB")
proc collectBalances(balances: var seq[uint64], forkedState: ForkedHashedBeaconState) =
withState(forkedState):
@ -950,280 +801,17 @@ template inTransaction(db: SqStoreRef, dbName: string, body: untyped) =
proc insertValidators(db: SqStoreRef, state: ForkedHashedBeaconState,
startIndex, endIndex: int64) =
var insertValidator {.global.}: SqliteStmt[
(int64, array[48, byte], array[32, byte]), void]
(int64, array[48, byte]), void]
once: insertValidator = db.createInsertValidatorProc
withState(state):
db.inTransaction("DB"):
for i in startIndex ..< endIndex:
insertValidator.exec((i, state.data.validators[i].pubkey.toRaw,
state.data.validators[i].withdrawal_credentials.data)).expect("DB")
proc getOutcome(delta: RewardDelta): int64 =
delta.rewards.int64 - delta.penalties.int64
proc collectSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: ForkyBeaconState, total_balance: Gwei) =
let
epoch = get_current_epoch(state)
adjusted_total_slashing_balance = get_adjusted_total_slashing_balance(
state, total_balance)
for index in 0 ..< state.validators.len:
let validator = unsafeAddr state.validators.asSeq()[index]
if slashing_penalty_applies(validator[], epoch):
rewardsAndPenalties[index].slashing_outcome +=
validator[].get_slashing_penalty(
adjusted_total_slashing_balance, total_balance).int64
proc collectEpochRewardsAndPenalties(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: phase0.BeaconState, cache: var StateCache, cfg: RuntimeConfig) =
if get_current_epoch(state) == GENESIS_EPOCH:
return
var info: phase0.EpochInfo
info.init(state)
info.processAttestations(state, cache)
doAssert info.validators.len == state.validators.len
rewardsAndPenalties.setLen(state.validators.len)
let
finality_delay = get_finality_delay(state)
total_balance = info.balances.current_epoch
total_balance_sqrt = integer_squareroot(total_balance)
for index, validator in info.validators.pairs:
if not is_eligible_validator(validator):
continue
let base_reward = get_base_reward_sqrt(
state, index.ValidatorIndex, total_balance_sqrt)
template get_attestation_component_reward_helper(attesting_balance: Gwei): Gwei =
get_attestation_component_reward(attesting_balance,
info.balances.current_epoch, base_reward.uint64, finality_delay)
template rp: untyped = rewardsAndPenalties[index]
rp.source_outcome = get_source_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_source_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_attesters)
rp.target_outcome = get_target_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_target_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_target_attesters)
rp.head_outcome = get_head_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_head_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_head_attesters)
let (inclusion_delay_delta, proposer_delta) = get_inclusion_delay_delta(
validator, base_reward)
rp.inclusion_delay_outcome = inclusion_delay_delta.getOutcome
rp.max_inclusion_delay_reward =
base_reward - state_transition_epoch.get_proposer_reward(base_reward)
rp.inactivity_penalty = get_inactivity_penalty_delta(
validator, base_reward, finality_delay).penalties
if proposer_delta.isSome:
let proposer_index = proposer_delta.get[0]
if proposer_index < info.validators.lenu64:
rewardsAndPenalties[proposer_index].proposer_outcome +=
proposer_delta.get[1].getOutcome
rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch)
proc collectEpochRewardsAndPenalties(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: altair.BeaconState | merge.BeaconState,
cache: var StateCache, cfg: RuntimeConfig) =
if get_current_epoch(state) == GENESIS_EPOCH:
return
var info: altair.EpochInfo
info.init(state)
doAssert info.validators.len == state.validators.len
rewardsAndPenalties.setLen(state.validators.len)
let
total_active_balance = info.balances.current_epoch
base_reward_per_increment = get_base_reward_per_increment(
total_active_balance)
for flag_index in 0 ..< PARTICIPATION_FLAG_WEIGHTS.len:
for validator_index, delta in get_flag_index_deltas(
state, flag_index, base_reward_per_increment, info):
template rp: untyped = rewardsAndPenalties[validator_index]
let
base_reward = get_base_reward_increment(
state, validator_index, base_reward_per_increment)
active_increments = get_active_increments(info)
unslashed_participating_increment =
get_unslashed_participating_increment(info, flag_index)
max_flag_index_reward = get_flag_index_reward(
state, base_reward, active_increments,
unslashed_participating_increment,
PARTICIPATION_FLAG_WEIGHTS[flag_index].uint64)
case flag_index
of TIMELY_SOURCE_FLAG_INDEX:
rp.source_outcome = delta.getOutcome
rp.max_source_reward = max_flag_index_reward
of TIMELY_TARGET_FLAG_INDEX:
rp.target_outcome = delta.getOutcome
rp.max_target_reward = max_flag_index_reward
of TIMELY_HEAD_FLAG_INDEX:
rp.head_outcome = delta.getOutcome
rp.max_head_reward = max_flag_index_reward
else:
raiseAssert(&"Unknown flag index {flag_index}.")
for validator_index, penalty in get_inactivity_penalty_deltas(
cfg, state, info):
rewardsAndPenalties[validator_index].inactivity_penalty += penalty
rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch)
proc collectFromSlashedValidator(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: ForkyBeaconState, slashedIndex, proposerIndex: ValidatorIndex) =
template slashed_validator: untyped = state.validators[slashedIndex]
let slashingPenalty = get_slashing_penalty(state, slashed_validator.effective_balance)
let whistleblowerReward = get_whistleblower_reward(slashed_validator.effective_balance)
rewardsAndPenalties[slashedIndex].slashing_outcome -= slashingPenalty.int64
rewardsAndPenalties[proposerIndex].slashing_outcome += whistleblowerReward.int64
proc collectFromProposerSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock) =
withStateAndBlck(forkedState, forkedBlock):
for proposer_slashing in blck.message.body.proposer_slashings:
doAssert check_proposer_slashing(state.data, proposer_slashing, {}).isOk
let slashedIndex = proposer_slashing.signed_header_1.message.proposer_index
rewardsAndPenalties.collectFromSlashedValidator(state.data,
slashedIndex.ValidatorIndex, blck.message.proposer_index.ValidatorIndex)
proc collectFromAttesterSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock) =
withStateAndBlck(forkedState, forkedBlock):
for attester_slashing in blck.message.body.attester_slashings:
let attester_slashing_validity = check_attester_slashing(
state.data, attester_slashing, {})
doAssert attester_slashing_validity.isOk
for slashedIndex in attester_slashing_validity.value:
rewardsAndPenalties.collectFromSlashedValidator(
state.data, slashedIndex, blck.message.proposer_index.ValidatorIndex)
proc collectFromAttestations(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
epochParticipationFlags: var ParticipationFlags,
cache: var StateCache) =
withStateAndBlck(forkedState, forkedBlock):
when stateFork > BeaconStateFork.Phase0:
let base_reward_per_increment = get_base_reward_per_increment(
get_total_active_balance(state.data, cache))
doAssert base_reward_per_increment > 0
for attestation in blck.message.body.attestations:
doAssert check_attestation(state.data, attestation, {}, cache).isOk
let proposerReward =
if attestation.data.target.epoch == get_current_epoch(state.data):
get_proposer_reward(
state.data, attestation, base_reward_per_increment, cache,
epochParticipationFlags.currentEpochParticipation)
else:
get_proposer_reward(
state.data, attestation, base_reward_per_increment, cache,
epochParticipationFlags.previousEpochParticipation)
rewardsAndPenalties[blck.message.proposer_index].proposer_outcome +=
proposerReward.int64
let inclusionDelay = state.data.slot - attestation.data.slot
for index in get_attesting_indices(
state.data, attestation.data, attestation.aggregation_bits, cache):
rewardsAndPenalties[index].inclusion_delay = some(inclusionDelay.int64)
proc collectFromDeposits(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
pubkeyToIndex: var PubkeyToIndexTable,
cfg: RuntimeConfig) =
withStateAndBlck(forkedState, forkedBlock):
for deposit in blck.message.body.deposits:
let pubkey = deposit.data.pubkey
let amount = deposit.data.amount
var index = findValidatorIndex(state.data, pubkey)
if index == -1:
index = pubkeyToIndex.getOrDefault(pubkey, -1)
if index != -1:
rewardsAndPenalties[index].deposits += amount
elif verify_deposit_signature(cfg, deposit.data):
pubkeyToIndex[pubkey] = rewardsAndPenalties.len
rewardsAndPenalties.add(
RewardsAndPenalties(deposits: amount))
proc collectFromSyncAggregate(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
cache: var StateCache) =
withStateAndBlck(forkedState, forkedBlock):
when stateFork > BeaconStateFork.Phase0:
let total_active_balance = get_total_active_balance(state.data, cache)
let participant_reward = get_participant_reward(total_active_balance)
let proposer_reward =
state_transition_block.get_proposer_reward(participant_reward)
let indices = get_sync_committee_cache(state.data, cache).current_sync_committee
template aggregate: untyped = blck.message.body.sync_aggregate
doAssert indices.len == SYNC_COMMITTEE_SIZE
doAssert aggregate.sync_committee_bits.len == SYNC_COMMITTEE_SIZE
doAssert state.data.current_sync_committee.pubkeys.len == SYNC_COMMITTEE_SIZE
for i in 0 ..< SYNC_COMMITTEE_SIZE:
rewardsAndPenalties[indices[i]].max_sync_committee_reward +=
participant_reward
if aggregate.sync_committee_bits[i]:
rewardsAndPenalties[indices[i]].sync_committee_outcome +=
participant_reward.int64
rewardsAndPenalties[blck.message.proposer_index].proposer_outcome +=
proposer_reward.int64
else:
rewardsAndPenalties[indices[i]].sync_committee_outcome -=
participant_reward.int64
proc collectBlockRewardsAndPenalties(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
auxiliaryState: var AuxiliaryState,
cache: var StateCache, cfg: RuntimeConfig) =
rewardsAndPenalties.collectFromProposerSlashings(forkedState, forkedBlock)
rewardsAndPenalties.collectFromAttesterSlashings(forkedState, forkedBlock)
rewardsAndPenalties.collectFromAttestations(
forkedState, forkedBlock, auxiliaryState.epochParticipationFlags, cache)
rewardsAndPenalties.collectFromDeposits(
forkedState, forkedBlock, auxiliaryState.pubkeyToIndex, cfg)
# This table is needed only to resolve double deposits in the same block, so
# it can be cleared after processing all deposits for the current block.
auxiliaryState.pubkeyToIndex.clear
rewardsAndPenalties.collectFromSyncAggregate(forkedState, forkedBlock, cache)
insertValidator.exec(
(i, state.data.validators[i].pubkey.toRaw)).expect("DB")
proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
# Create a database with performance information for every epoch
echo "Opening database..."
info "Opening database..."
let db = BeaconChainDB.new(conf.databaseDir.string, false, true)
defer: db.close()
@ -1241,43 +829,43 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
outDb.createValidatorsRawTable
outDb.createValidatorsView
outDb.createPhase0EpochInfoTable
outDb.createAltairEpochInfoTable
outDb.createValidatorEpochInfoTable
let
insertPhase0EpochInfo = outDb.createInsertPhase0EpochInfoProc
insertAltairEpochInfo = outDb.createInsertAltairEpochInfoProc
insertValidatorInfo = outDb.createInsertValidatorEpochInfoProc
minEpoch =
if conf.startEpoch == 0:
Epoch(max(outDb.getMaxEpochFromDbTable("phase0_epoch_info"),
outDb.getMaxEpochFromDbTable("altair_epoch_info")) + 1)
startEpoch =
if conf.startEpoch.isNone:
Epoch(conf.startEpoch.get)
else:
Epoch(conf.startEpoch)
start = minEpoch.start_slot()
ends = dag.finalizedHead.slot # Avoid dealing with changes
getStartEpoch(conf.outDir)
endEpoch =
if conf.endEpoch.isSome:
Epoch(conf.endEpoch.get)
else:
dag.finalizedHead.slot.epoch # Avoid dealing with changes
if start > ends:
echo "No (new) data found, database at ", minEpoch, ", finalized to ", ends.epoch
quit 1
if startEpoch > endEpoch:
fatal "Start epoch cannot be bigger than end epoch.",
startEpoch = startEpoch, endEpoch = endEpoch
quit QuitFailure
let blockRefs = dag.getBlockRange(start, ends)
info "Analyzing performance for epochs.",
startEpoch = startEpoch, endEpoch = endEpoch
echo "Analyzing performance for epochs ",
start.epoch, " - ", ends.epoch
let
startSlot = startEpoch.start_slot
endSlot = endEpoch.start_slot + SLOTS_PER_EPOCH
blockRefs = dag.getBlockRange(startSlot, endSlot)
let tmpState = newClone(dag.headState)
var cache = StateCache()
let slot = if start > 0: start - 1 else: 0.Slot
let slot = if startSlot > 0: startSlot - 1 else: 0.Slot
if blockRefs.len > 0:
dag.updateStateData(tmpState[], blockRefs[^1].atSlot(slot), false, cache)
discard dag.updateStateData(tmpState[], blockRefs[^1].atSlot(slot), false, cache)
else:
dag.updateStateData(tmpState[], dag.head.atSlot(slot), false, cache)
discard dag.updateStateData(tmpState[], dag.head.atSlot(slot), false, cache)
let dbValidatorsCount = outDb.getDbValidatorsCount()
let savedValidatorsCount = outDb.getDbValidatorsCount
var validatorsCount = getStateField(tmpState[].data, validators).len
outDb.insertValidators(tmpState[].data, dbValidatorsCount, validatorsCount)
outDb.insertValidators(tmpState[].data, savedValidatorsCount, validatorsCount)
var previousEpochBalances: seq[uint64]
collectBalances(previousEpochBalances, tmpState[].data)
@ -1290,8 +878,10 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
auxiliaryState.copyParticipationFlags(tmpState[].data)
proc processEpoch() =
let epoch = getStateField(tmpState[].data, slot).epoch.int64
echo epoch
let epoch = getStateField(tmpState[].data, slot).epoch
info "Processing epoch ...", epoch = epoch
var csvLines = newStringOfCap(1000000)
withState(tmpState[].data):
withEpochInfo(forkedInfo):
@ -1300,65 +890,29 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
doAssert state.data.balances.len == rewardsAndPenalties.len
for index, validator in info.validators.pairs:
template outputInfo: untyped = rewardsAndPenalties[index]
template rp: untyped = rewardsAndPenalties[index]
checkBalance(index, validator, state.data.balances[index],
previousEpochBalances[index], outputInfo)
previousEpochBalances[index], rp)
let delay =
when infoFork == EpochInfoFork.Phase0:
when infoFork == EpochInfoFork.Phase0:
rp.inclusion_delay = block:
let notSlashed = (RewardFlags.isSlashed notin validator.flags)
if notSlashed and validator.is_previous_epoch_attester.isSome():
some(int64(validator.is_previous_epoch_attester.get().delay))
some(validator.is_previous_epoch_attester.get().delay.uint64)
else:
none(int64)
else:
rewardsAndPenalties[index].inclusion_delay
none(uint64)
csvLines.add rp.serializeToCsv
if conf.perfect or not outputInfo.isPerfect:
insertValidatorInfo.exec((
index.int64,
epoch,
outputInfo.source_outcome,
outputInfo.max_source_reward.int64,
outputInfo.target_outcome,
outputInfo.max_target_reward.int64,
outputInfo.head_outcome,
outputInfo.max_head_reward.int64,
outputInfo.inclusion_delay_outcome,
outputInfo.max_inclusion_delay_reward.int64,
outputInfo.sync_committee_outcome,
outputInfo.max_sync_committee_reward.int64,
outputInfo.proposer_outcome,
outputInfo.inactivity_penalty.int64,
outputInfo.slashing_outcome,
delay)).expect("DB")
let fileName = getFilePathForEpoch(epoch, conf.outDir)
var res = io2.removeFile(fileName)
doAssert res.isOk
res = io2.writeFile(fileName, snappy.encode(csvLines.toBytes))
doAssert res.isOk
if shouldShutDown: quit QuitSuccess
collectBalances(previousEpochBalances, tmpState[].data)
case forkedInfo.kind
of EpochInfoFork.Phase0:
template info: untyped = forkedInfo.phase0Data
insertPhase0EpochInfo.exec((
epoch,
info.balances.current_epoch_raw.int64,
info.balances.previous_epoch_raw.int64,
info.balances.current_epoch_attesters_raw.int64,
info.balances.current_epoch_target_attesters_raw.int64,
info.balances.previous_epoch_attesters_raw.int64,
info.balances.previous_epoch_target_attesters_raw.int64,
info.balances.previous_epoch_head_attesters_raw.int64)
).expect("DB")
of EpochInfoFork.Altair:
template info: untyped = forkedInfo.altairData
insertAltairEpochInfo.exec((
epoch,
info.balances.previous_epoch[0].int64,
info.balances.previous_epoch[1].int64,
info.balances.previous_epoch[2].int64,
info.balances.current_epoch_TIMELY_TARGET.int64,
info.balances.current_epoch.int64)).expect("DB")
proc processSlots(ends: Slot, endsFlags: UpdateFlags) =
var currentSlot = getStateField(tmpState[].data, slot)
while currentSlot < ends:
@ -1370,17 +924,17 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
rewardsAndPenalties.collectEpochRewardsAndPenalties(
state.data, cache, cfg)
let ok = process_slots(cfg, tmpState[].data, nextSlot, cache, forkedInfo, flags)
doAssert ok, "Slot processing can't fail with correct inputs"
let res = process_slots(cfg, tmpState[].data, nextSlot, cache, forkedInfo, flags)
doAssert res.isOk, "Slot processing can't fail with correct inputs"
currentSlot = nextSlot
if currentSlot.isEpoch:
outDb.inTransaction("DB"):
processEpoch()
processEpoch()
rewardsAndPenalties.setLen(0)
rewardsAndPenalties.setLen(validatorsCount)
auxiliaryState.copyParticipationFlags(tmpState[].data)
clear cache
for bi in 0 ..< blockRefs.len:
let forkedBlock = dag.getForkedBlock(blockRefs[blockRefs.len - bi - 1])
@ -1393,8 +947,8 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
let res = state_transition_block(
cfg, tmpState[].data, blck, cache, {}, noRollback)
if res.isErr:
echo "State transition failed (!)"
quit 1
fatal "State transition failed (!)"
quit QuitFailure
let newValidatorsCount = getStateField(tmpState[].data, validators).len
if newValidatorsCount > validatorsCount:
@ -1403,15 +957,27 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
rewardsAndPenalties.setLen(newValidatorsCount)
previousEpochBalances.setLen(newValidatorsCount)
# ... and add the new validators to the database.
outDb.insertValidators(tmpState[].data, validatorsCount, newValidatorsCount)
outDb.insertValidators(
tmpState[].data, validatorsCount, newValidatorsCount)
validatorsCount = newValidatorsCount
# Capture rewards of empty slots as well, including the epoch that got
# finalized
let ok = processSlots(ends, {})
doAssert ok, "Slot processing can't fail with correct inputs"
processSlots(endSlot, {})
proc controlCHook {.noconv.} =
notice "Shutting down after having received SIGINT."
shouldShutDown = true
proc exitOnSigterm(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM."
shouldShutDown = true
when isMainModule:
setControlCHook(controlCHook)
when defined(posix):
c_signal(SIGTERM, exitOnSigterm)
var
conf = DbConf.load()
cfg = getRuntimeConfig(conf.eth2Network)

View File

@ -0,0 +1,236 @@
import
std/[os, strutils, streams, parsecsv],
stew/[io2, byteutils], chronicles, confutils, snappy,
../beacon_chain/spec/datatypes/base,
./ncli_common
when defined(posix):
import system/ansi_c
type
AggregatorConf = object
startEpoch {.
name: "start-epoch"
abbr: "s"
desc: "The first epoch which to be aggregated. " &
"By default use the first epoch for which has a file" .}: Option[uint64]
endEpoch {.
name: "end-epoch"
abbr: "e"
desc: "The last epoch which to be aggregated. " &
"By default use the last epoch for which has a file" .}: Option[uint64]
resolution {.
defaultValue: 225,
name: "resolution"
abbr: "r"
desc: "How many epochs to be aggregated in a single file" .}: uint
inputDir {.
name: "input-dir"
abbr: "i"
desc: "The directory with the epoch info files" .}: InputDir
outputDir {.
defaultValue: ""
name: "output-dir"
abbr: "o"
desc: "The directory where aggregated file to be written. " &
"By default use the same directory as the input one"}: InputDir
var shutDown = false
proc determineStartAndEndEpochs(config: AggregatorConf):
tuple[startEpoch, endEpoch: Epoch] =
if config.startEpoch.isNone or config.endEpoch.isNone:
(result.startEpoch, result.endEpoch) = getEpochRange(config.inputDir.string)
if config.startEpoch.isSome:
result.startEpoch = config.startEpoch.get.Epoch
if config.endEpoch.isSome:
result.endEpoch = config.endEpoch.get.Epoch
if result.startEpoch > result.endEpoch:
fatal "Start epoch cannot be bigger than the end epoch.",
startEpoch = result.startEpoch, endEpoch = result.endEpoch
quit QuitFailure
proc checkIntegrity(startEpoch, endEpoch: Epoch, dir: string) =
for epoch in startEpoch .. endEpoch:
let filePath = getFilePathForEpoch(epoch, dir)
if not filePath.fileExists:
fatal "File for epoch does not exist.", epoch = epoch, filePath = filePath
quit QuitFailure
proc parseRow(csvRow: CsvRow): RewardsAndPenalties =
result = RewardsAndPenalties(
source_outcome: parseBiggestInt(csvRow[0]),
max_source_reward: parseBiggestUInt(csvRow[1]),
target_outcome: parseBiggestInt(csvRow[2]),
max_target_reward: parseBiggestUInt(csvRow[3]),
head_outcome: parseBiggestInt(csvRow[4]),
max_head_reward: parseBiggestUInt(csvRow[5]),
inclusion_delay_outcome: parseBiggestInt(csvRow[6]),
max_inclusion_delay_reward: parseBiggestUInt(csvRow[7]),
sync_committee_outcome: parseBiggestInt(csvRow[8]),
max_sync_committee_reward: parseBiggestUInt(csvRow[9]),
proposer_outcome: parseBiggestInt(csvRow[10]),
inactivity_penalty: parseBiggestUInt(csvRow[11]),
slashing_outcome: parseBiggestInt(csvRow[12]),
deposits: parseBiggestUInt(csvRow[13]))
if csvRow[14].len > 0:
result.inclusion_delay = some(parseBiggestUInt(csvRow[14]))
proc `+=`(lhs: var RewardsAndPenalties, rhs: RewardsAndPenalties) =
lhs.source_outcome += rhs.source_outcome
lhs.max_source_reward += rhs.max_source_reward
lhs.target_outcome += rhs.target_outcome
lhs.max_target_reward += rhs.max_target_reward
lhs.head_outcome += rhs.head_outcome
lhs.max_head_reward += rhs.max_head_reward
lhs.inclusion_delay_outcome += rhs.inclusion_delay_outcome
lhs.max_inclusion_delay_reward += rhs.max_inclusion_delay_reward
lhs.sync_committee_outcome += rhs.sync_committee_outcome
lhs.max_sync_committee_reward += rhs.max_sync_committee_reward
lhs.proposer_outcome += rhs.proposer_outcome
lhs.inactivity_penalty += rhs.inactivity_penalty
lhs.slashing_outcome += rhs.slashing_outcome
lhs.deposits += rhs.deposits
if lhs.inclusion_delay.isSome:
if rhs.inclusion_delay.isSome:
lhs.inclusion_delay.get += rhs.inclusion_delay.get
else:
if rhs.inclusion_delay.isSome:
lhs.inclusion_delay = some(rhs.inclusion_delay.get)
proc average(rp: var RewardsAndPenalties,
averageInclusionDelay: var Option[float],
epochsCount: uint, inclusionDelaysCount: uint64) =
rp.source_outcome = rp.source_outcome div epochsCount.int64
rp.max_source_reward = rp.max_source_reward div epochsCount
rp.target_outcome = rp.target_outcome div epochsCount.int64
rp.max_target_reward = rp.max_target_reward div epochsCount
rp.head_outcome = rp.head_outcome div epochsCount.int64
rp.max_head_reward = rp.max_head_reward div epochsCount
rp.inclusion_delay_outcome = rp.inclusion_delay_outcome div epochsCount.int64
rp.max_inclusion_delay_reward = rp.max_inclusion_delay_reward div epochsCount
rp.sync_committee_outcome = rp.sync_committee_outcome div epochsCount.int64
rp.max_sync_committee_reward = rp.max_sync_committee_reward div epochsCount
rp.proposer_outcome = rp.proposer_outcome div epochsCount.int64
rp.inactivity_penalty = rp.inactivity_penalty div epochsCount
rp.slashing_outcome = rp.slashing_outcome div epochsCount.int64
if rp.inclusion_delay.isSome:
doAssert inclusionDelaysCount != 0
averageInclusionDelay = some(
rp.inclusion_delay.get.float / inclusionDelaysCount.float)
else:
doAssert inclusionDelaysCount == 0
averageInclusionDelay = none(float)
proc getFilePathForEpochs(startEpoch, endEpoch: Epoch, dir: string): string =
let fileName = epochAsString(startEpoch) & "_" &
epochAsString(endEpoch) & epochFileNameExtension
dir / fileName
proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint,
inputDir, outputDir: string) =
if startEpoch > endEpoch:
fatal "Start epoch cannot be larger than the end one.",
startEpoch = startEpoch, endEpoch = endEpoch
quit QuitFailure
info "Aggregating epochs ...", startEpoch = startEpoch, endEpoch = endEpoch,
inputDir = inputDir, outputDir = outputDir
var rewardsAndPenalties: seq[RewardsAndPenalties]
var participationEpochsCount: seq[uint]
var inclusionDelaysCount: seq[uint]
var epochsAggregated = 0'u
for epoch in startEpoch .. endEpoch:
let filePath = getFilePathForEpoch(epoch, inputDir)
info "Processing file ...", file = filePath
let data = io2.readAllBytes(filePath)
doAssert data.isOk
let dataStream = newStringStream(
string.fromBytes(snappy.decode(
data.get.toOpenArray(0, data.get.len - 1))))
var csvParser: CsvParser
csvParser.open(dataStream, filePath)
var validatorsCount = 0'u
while csvParser.readRow:
inc validatorsCount
let rp = parseRow(csvParser.row)
if validatorsCount > participationEpochsCount.len.uint:
rewardsAndPenalties.add rp
participationEpochsCount.add 1
if rp.inclusionDelay.isSome:
inclusionDelaysCount.add 1
else:
inclusionDelaysCount.add 0
else:
rewardsAndPenalties[validatorsCount - 1] += rp
inc participationEpochsCount[validatorsCount - 1]
if rp.inclusionDelay.isSome:
inc inclusionDelaysCount[validatorsCount - 1]
inc epochsAggregated
if epochsAggregated == resolution or epoch == endEpoch or shutDown:
var csvLines: string
for i in 0 ..< participationEpochsCount.len:
var averageInclusionDelay: Option[float]
average(rewardsAndPenalties[i], averageInclusionDelay,
participationEpochsCount[i], inclusionDelaysCount[i])
csvLines &= serializeToCsv(
rewardsAndPenalties[i], averageInclusionDelay)
let fileName = getFilePathForEpochs(
epoch - epochsAggregated + 1, epoch, outputDir)
info "Writing file ...", fileName = fileName
var result = io2.removeFile(fileName)
doAssert result.isOk
result = io2.writeFile(fileName, snappy.encode(csvLines.toBytes))
doAssert result.isOk
if shutDown:
quit QuitSuccess
participationEpochsCount.setLen(0)
rewardsAndPenalties.setLen(0)
inclusionDelaysCount.setLen(0)
epochsAggregated = 0
proc controlCHook {.noconv.} =
notice "Shutting down after having received SIGINT."
shutDown = true
proc exitOnSigterm(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM."
shutDown = true
proc main =
setControlCHook(controlCHook)
when defined(posix):
c_signal(SIGTERM, exitOnSigterm)
let config = load AggregatorConf
let (startEpoch, endEpoch) = config.determineStartAndEndEpochs
if endEpoch == 0:
fatal "Not found epoch info files in the directory.",
inputDir = config.inputDir
quit QuitFailure
checkIntegrity(startEpoch, endEpoch, config.inputDir.string)
let outputDir =
if config.outputDir.string.len > 0:
config.outputDir
else:
config.inputDir
aggregateEpochs(startEpoch, endEpoch, config.resolution,
config.inputDir.string, outputDir.string)
when isMainModule:
main()

View File

@ -0,0 +1,359 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "071e1cb8",
"metadata": {},
"outputs": [],
"source": [
"%load_ext autotime\n",
"%matplotlib inline\n",
"import string\n",
"import sqlite3\n",
"import os\n",
"import re\n",
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"import snappy\n",
"from scipy.interpolate import make_interp_spline\n",
"from pathlib import Path\n",
"from io import StringIO"
]
},
{
"cell_type": "markdown",
"id": "f00bca92",
"metadata": {},
"source": [
"**Database connection:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f69cd8b3",
"metadata": {},
"outputs": [],
"source": [
"database_dir = \"../build/data/mainnetValidatorDb/validatorDb.sqlite3\"\n",
"connection = sqlite3.connect(database_dir)"
]
},
{
"cell_type": "markdown",
"id": "8ccd8945",
"metadata": {},
"source": [
"**Rewards and penalties components:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "229adad0",
"metadata": {},
"outputs": [],
"source": [
"SOURCE = \"source\"\n",
"TARGET = \"target\"\n",
"HEAD = \"head\"\n",
"INCLUSION_DELAY = \"inclusion_delay\"\n",
"SYNC_COMMITTEE = \"sync_committee\"\n",
"\n",
"CSV_DATA_COLUMNS_NAMES = [\n",
" \"source_outcome\",\n",
" \"max_source_reward\",\n",
" \"target_outcome\",\n",
" \"max_target_reward\",\n",
" \"head_outcome\",\n",
" \"max_head_reward\",\n",
" \"inclusion_delay_outcome\",\n",
" \"max_inclusion_delay_reward\",\n",
" \"sync_committee_outcome\",\n",
" \"max_sync_committee_reward\",\n",
" \"proposer_outcome\",\n",
" \"inactivity_penalty\",\n",
" \"slashing_outcome\",\n",
" \"deposits\",\n",
" \"inclusion_delay\"]"
]
},
{
"cell_type": "markdown",
"id": "9a747287",
"metadata": {},
"source": [
"**Helper functions:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "63fb9f21",
"metadata": {},
"outputs": [],
"source": [
"def valid_public_key(public_key):\n",
" \"\"\"Checks whether a string is a valid hex representation of a public key of an Eth2 validator.\"\"\"\n",
" if len(public_key) != 96:\n",
" return False\n",
" return all(c in string.hexdigits for c in public_key)\n",
"\n",
"def idx(public_key):\n",
" \"\"\"Returns validator's index by its public key.\"\"\"\n",
" \n",
" if public_key.startswith(\"0x\"):\n",
" public_key = public_key[2:]\n",
" \n",
" if not valid_public_key(public_key):\n",
" raise ValueError(f\"The string '{public_key}' is not a valid public key of a validator.\")\n",
" \n",
" QUERY_FIELD = \"validator_index\"\n",
" query = f\"SELECT {QUERY_FIELD} FROM validators_raw WHERE pubkey=x'{public_key}';\"\n",
" query_result = pd.read_sql_query(query, connection)\n",
" \n",
" if len(query_result[QUERY_FIELD]) == 0:\n",
" raise ValueError(f\"Not found a validator with a public key '{public_key}'.\")\n",
" \n",
" if len(query_result[QUERY_FIELD]) > 1:\n",
" raise ValueError(f\"Found multiple validators with a public key '{public_key}'.\")\n",
" \n",
" return query_result[QUERY_FIELD][0]"
]
},
{
"cell_type": "markdown",
"id": "946762c1",
"metadata": {},
"source": [
"**Input parameters:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e9aca3ed",
"metadata": {},
"outputs": [],
"source": [
"start_epoch = 10000\n",
"end_epoch = 20000\n",
"resolution = 225\n",
"files_dir = \"../build/data/mainnetCompactedValidatorDb/\"\n",
"use_compacted_files = True\n",
"rewards = [SOURCE, TARGET, HEAD, INCLUSION_DELAY, SYNC_COMMITTEE]\n",
"validators_sets = {\n",
" \"set1\": list(range(10)),\n",
" \"set2\": list(map(idx, [\n",
" \"0x8efba2238a00d678306c6258105b058e3c8b0c1f36e821de42da7319c4221b77aa74135dab1860235e19d6515575c381\",\n",
" \"0xa2dce641f347a9e46f58458390e168fa4b3a0166d74fc495457cb00c8e4054b5d284c62aa0d9578af1996c2e08e36fb6\",\n",
" \"0x98b7d0eac7ab95d34dbf2b7baa39a8ec451671328c063ab1207c2055d9d5d6f1115403dc5ea19a1111a404823bd9a6e9\",\n",
" \"0xb0fd08e2e06d1f4d90d0d6843feb543ebeca684cde397fe230e6cdf6f255d234f2c26f4b36c07170dfdfcbbe355d0848\",\n",
" \"0xab7a5aa955382906be3d76e322343bd439e8690f286ecf2f2a7646363b249f5c133d0501d766ccf1aa1640f0283047b3\",\n",
" \"0x980c0c001645a00b71c720935ce193e1ed0e917782c4cb07dd476a4fdb7decb8d91daf2770eb413055f0c1d14b5ed6df\",\n",
" \"0xac7cbdc535ce8254eb9cdedf10d5b1e75de4cd5e91756c3467d0492b01b70b5c6a81530e9849c6b696c8bc157861d0c3\",\n",
" \"0x98ea289db7ea9714699ec93701a3b6db43900e04ae5497be01fa8cc5a56754c23589eaf1f674de718e291376f452d68c\",\n",
" \"0x92451d4c099e51f54ab20f5c1a4edf405595c60122ccfb0f39250b7e80986fe0fe457bacd8a887e9087cd6fc323f492c\",\n",
" \"0xa06f6c678f0129aec056df309a4fe18760116ecaea2292947c5a9cc997632ff437195309783c269ffca7bb2704e675a0\"\n",
" ])),\n",
" \"set3\": list(range(20, 30))\n",
" }"
]
},
{
"cell_type": "markdown",
"id": "5e0fb2da",
"metadata": {},
"source": [
"**Loading the data and losses calculation:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "485a2d7e",
"metadata": {},
"outputs": [],
"source": [
"COMPACTED_EPOCH_INFO_FILE_PATTERN = re.compile(r\"(\\d{8})\\_(\\d{8})\\.epoch\")\n",
"\n",
"def get_first_and_last_epoch(file_name):\n",
" m = re.match(COMPACTED_EPOCH_INFO_FILE_PATTERN, file_name)\n",
" if m == None:\n",
" return None\n",
" return int(m.group(1)), int(m.group(2))\n",
"\n",
"def isEpochInfoFile(file_name):\n",
" r = get_first_and_last_epoch(file_name)\n",
" if r == None:\n",
" return False\n",
" file_start_epoch, file_end_epoch = r\n",
" if file_start_epoch > file_end_epoch:\n",
" return False\n",
" if file_end_epoch < start_epoch:\n",
" return False\n",
" if file_start_epoch > end_epoch:\n",
" return False\n",
" return True\n",
"\n",
"def adjust_constraints(sorted_file_names):\n",
" first_start_epoch, first_end_epoch = get_first_and_last_epoch(sorted_file_names[0])\n",
" _, last_end_epoch = get_first_and_last_epoch(sorted_file_names[-1])\n",
" start_epoch = first_start_epoch\n",
" end_epoch = last_end_epoch\n",
" resolution = first_end_epoch - first_start_epoch + 1\n",
"\n",
"def read_csv(file_path):\n",
" return pd.read_csv(\n",
" StringIO(snappy.decompress(file_path.read_bytes()).decode(\"utf-8\")),\n",
" names = CSV_DATA_COLUMNS_NAMES, usecols = set(range(0, 10)))\n",
"\n",
"def get_outcome_var(component):\n",
" return component + \"_outcome\"\n",
"\n",
"def get_max_reward_var(component):\n",
" return \"max_\" + component + \"_reward\"\n",
"\n",
"def sum_max_values(t):\n",
" return sum(getattr(t, get_max_reward_var(reward)) for reward in rewards)\n",
"\n",
"def sum_actual_values(t):\n",
" return sum(getattr(t, get_outcome_var(reward)) for reward in rewards)\n",
"\n",
"total_losses_per_epoch_point = {}\n",
"validators_per_epoch_point = {}\n",
"average_losses_per_epoch_point = {}\n",
"\n",
"def compute_total_losses(epoch_point, epochs = None):\n",
" for set_name, validators in validators_sets.items():\n",
" if not set_name in total_losses_per_epoch_point:\n",
" total_losses_per_epoch_point[set_name] = {}\n",
" validators_per_epoch_point[set_name] = {}\n",
" if not epoch_point in total_losses_per_epoch_point[set_name]:\n",
" total_losses_per_epoch_point[set_name][epoch_point] = 0\n",
" validators_per_epoch_point[set_name][epoch_point] = 0\n",
" for validator_index in validators:\n",
" validator_info = data.iloc[validator_index]\n",
" validator_losses = \\\n",
" sum_max_values(validator_info) - sum_actual_values(validator_info)\n",
" total_losses_per_epoch_point[set_name][epoch_point] += \\\n",
" validator_losses if epochs == None else validator_losses * epochs\n",
" validators_per_epoch_point[set_name][epoch_point] += \\\n",
" 1 if epochs == None else epochs\n",
"\n",
"def compute_average_losses():\n",
" for set_name in validators_sets:\n",
" if not set_name in average_losses_per_epoch_point:\n",
" average_losses_per_epoch_point[set_name] = {}\n",
" for epoch_point, total_losses in total_losses_per_epoch_point[set_name].items():\n",
" average_losses_per_epoch_point[set_name][epoch_point] = \\\n",
" total_losses / validators_per_epoch_point[set_name][epoch_point]\n",
"\n",
"if use_compacted_files:\n",
" file_names = [file_name for file_name in os.listdir(files_dir)\n",
" if isEpochInfoFile(file_name)]\n",
" file_names.sort()\n",
" adjust_constraints(file_names)\n",
"\n",
" for file_name in file_names:\n",
" data = read_csv(Path(files_dir + file_name))\n",
" file_first_epoch, file_last_epoch = get_first_and_last_epoch(file_name)\n",
" file_epochs_range = file_last_epoch - file_first_epoch + 1\n",
" epoch_point = file_first_epoch // resolution\n",
" compute_total_losses(epoch_point, file_epochs_range)\n",
"else:\n",
" for epoch in range(start_epoch, end_epoch + 1):\n",
" data = read_csv(Path(files_dir + \"{:08}.epoch\".format(epoch)))\n",
" epoch_point = epoch // resolution\n",
" compute_total_losses(epoch_point)\n",
"\n",
"compute_average_losses()"
]
},
{
"cell_type": "markdown",
"id": "800ee35b",
"metadata": {},
"source": [
"**Average losses graph:** "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "62d1e96d",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"plt.subplots(figsize = (20, 5))\n",
"plt.title(\"Average losses per epoch\")\n",
"plt.xlabel(\"Epoch\")\n",
"plt.ylabel(\"Gwei\")\n",
"\n",
"for name, value in average_losses_per_epoch_point.items():\n",
" epochs = np.array([ep * resolution + resolution // 2 for ep in value.keys()])\n",
" values = np.array(list(value.values()))\n",
" spline = make_interp_spline(epochs, values)\n",
" num_samples = (end_epoch - start_epoch + 1) // resolution * 100\n",
" x = np.linspace(epochs.min(), epochs.max(), num_samples)\n",
" y = spline(x)\n",
" plt.plot(x, y, label=name)\n",
"\n",
"plt.legend(loc=\"best\")"
]
},
{
"cell_type": "markdown",
"id": "0fff538c",
"metadata": {},
"source": [
"**Total losses:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6ab52601",
"metadata": {},
"outputs": [],
"source": [
"sets_total_losses = {}\n",
"for set_name, epoch_points in total_losses_per_epoch_point.items():\n",
" sets_total_losses[set_name] = 0\n",
" for _, losses in epoch_points.items():\n",
" sets_total_losses[set_name] += losses\n",
"\n",
"plt.title(\"Total losses\")\n",
"plt.xlabel(\"Set\")\n",
"plt.ylabel(\"Ethers\")\n",
"plt.bar(list(sets_total_losses.keys()), [loss * 1e-9 for loss in sets_total_losses.values()])\n",
"print(sets_total_losses)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}