From 47f1f7ff1a12c951904c220ba6d92b0739d7d4bd Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Mon, 17 Jan 2022 14:58:33 +0200 Subject: [PATCH] More efficient reward data persistance; Address review comments The new format is based on compressed CSV files in two channels: * Detailed per-epoch data * Aggregated "daily" summaries The use of append-only CSV file speeds up significantly the epoch processing speed during data generation. The use of compression results in smaller storage requirements overall. The use of the aggregated files has a very minor cost in both CPU and storage, but leads to near interactive speed for report generation. Other changes: - Implemented support for graceful shut downs to avoid corrupting the saved files. - Fixed a memory leak caused by lacking `StateCache` clean up on each iteration. - Addressed review comments - Moved the rewards and penalties calculation code in a separate module Required invasive changes to existing modules: - The `data` field of the `KeyedBlockRef` type is made public to be used by the validator rewards monitor's Chain DAG update procedure. - The `getForkedBlock` procedure from the `blockchain_dag.nim` module is made public to be used by the validator rewards monitor's Chain DAG update procedure. --- AllTests-mainnet.md | 5 +- Makefile | 3 +- .../block_pools_types.nim | 2 +- .../consensus_object_pools/blockchain_dag.nim | 2 +- beacon_chain/networking/eth2_network.nim | 2 +- beacon_chain/spec/beaconstate.nim | 12 +- beacon_chain/spec/datatypes/base.nim | 5 + beacon_chain/spec/state_transition_block.nim | 2 +- beacon_chain/spec/state_transition_epoch.nim | 12 +- ncli/ncli_common.nim | 370 ++++++++++ ncli/ncli_db.nim | 636 +++--------------- ncli/validator_db_aggregator.nim | 236 +++++++ ncli/validator_db_reports.ipynb | 359 ++++++++++ 13 files changed, 1092 insertions(+), 554 deletions(-) create mode 100644 ncli/ncli_common.nim create mode 100644 ncli/validator_db_aggregator.nim create mode 100644 ncli/validator_db_reports.ipynb diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 445d5c5de..06d0a9a33 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -75,8 +75,9 @@ OK: 16/16 Fail: 0/16 Skip: 0/16 + Smoke test initialize_beacon_state_from_eth1 [Preset: mainnet] OK + get_beacon_proposer_index OK + latest_block_root OK ++ process_slots OK ``` -OK: 3/3 Fail: 0/3 Skip: 0/3 +OK: 4/4 Fail: 0/4 Skip: 0/4 ## Beacon time ```diff + basics OK @@ -441,4 +442,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1 ---TOTAL--- -OK: 237/239 Fail: 0/239 Skip: 2/239 +OK: 238/240 Fail: 0/240 Skip: 2/240 diff --git a/Makefile b/Makefile index 21b201c49..3b0891c86 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,8 @@ TOOLS := \ ncli_db \ stack_sizes \ nimbus_validator_client \ - nimbus_signing_node + nimbus_signing_node \ + validator_db_aggregator .PHONY: $(TOOLS) # bench_bls_sig_agggregation TODO reenable after bls v0.10.1 changes diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index eac2714b5..07459cce5 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -61,7 +61,7 @@ type # by root without keeping a Table that keeps a separate copy of the digest # At the time of writing, a Table[Eth2Digest, BlockRef] adds about 100mb of # unnecessary overhead. - data: BlockRef + data*: BlockRef ChainDAGRef* = ref object ## Pool of blocks responsible for keeping a DAG of resolved blocks. diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 749199594..2914721f1 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -342,7 +342,7 @@ proc getStateData( true -proc getForkedBlock(db: BeaconChainDB, root: Eth2Digest): +proc getForkedBlock*(db: BeaconChainDB, root: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = # When we only have a digest, we don't know which fork it's from so we try # them one by one - this should be used sparingly diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 5af07c32f..70054014f 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1851,7 +1851,7 @@ template gossipMaxSize(T: untyped): uint32 = T is altair.SignedBeaconBlock: GOSSIP_MAX_SIZE else: - static: raiseAssert "unknown type" + {.fatal: "unknown type".} static: doAssert maxSize <= maxGossipMaxSize() maxSize.uint32 diff --git a/beacon_chain/spec/beaconstate.nim b/beacon_chain/spec/beaconstate.nim index c54de5800..88832e2fb 100644 --- a/beacon_chain/spec/beaconstate.nim +++ b/beacon_chain/spec/beaconstate.nim @@ -130,27 +130,27 @@ proc get_slashing_penalty*(state: ForkyBeaconState, validator_effective_balance div MIN_SLASHING_PENALTY_QUOTIENT elif state is altair.BeaconState: validator_effective_balance div MIN_SLASHING_PENALTY_QUOTIENT_ALTAIR - elif state is merge.BeaconState: + elif state is bellatrix.BeaconState: validator_effective_balance div MIN_SLASHING_PENALTY_QUOTIENT_MERGE else: - raiseAssert "invalid BeaconState type" + {.fatal: "invalid BeaconState type".} # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/beacon-chain.md#slash_validator # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/altair/beacon-chain.md#modified-slash_validator # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/merge/beacon-chain.md#modified-slash_validator -proc get_whistleblower_reward*(validator_effective_balance: Gwei): Gwei = +func get_whistleblower_reward*(validator_effective_balance: Gwei): Gwei = validator_effective_balance div WHISTLEBLOWER_REWARD_QUOTIENT # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/beacon-chain.md#slash_validator # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/altair/beacon-chain.md#modified-slash_validator # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/merge/beacon-chain.md#modified-slash_validator -proc get_proposer_reward(state: ForkyBeaconState, whistleblower_reward: Gwei): Gwei = +func get_proposer_reward(state: ForkyBeaconState, whistleblower_reward: Gwei): Gwei = when state is phase0.BeaconState: whistleblower_reward div PROPOSER_REWARD_QUOTIENT - elif state is altair.BeaconState or state is merge.BeaconState: + elif state is altair.BeaconState or state is bellatrix.BeaconState: whistleblower_reward * PROPOSER_WEIGHT div WEIGHT_DENOMINATOR else: - raiseAssert "invalid BeaconState type" + {.fatal: "invalid BeaconState type".} # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/beacon-chain.md#slash_validator # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/altair/beacon-chain.md#modified-slash_validator diff --git a/beacon_chain/spec/datatypes/base.nim b/beacon_chain/spec/datatypes/base.nim index f9f65d74a..eaf543711 100644 --- a/beacon_chain/spec/datatypes/base.nim +++ b/beacon_chain/spec/datatypes/base.nim @@ -895,3 +895,8 @@ template isomorphicCast*[T, U](x: U): T = doAssert sizeof(T) == sizeof(U) doAssert getSizeofSig(T()) == getSizeofSig(U()) cast[ptr T](unsafeAddr x)[] + +proc clear*(cache: var StateCache) = + cache.shuffled_active_validator_indices.clear + cache.beacon_proposer_indices.clear + cache.sync_committees.clear diff --git a/beacon_chain/spec/state_transition_block.nim b/beacon_chain/spec/state_transition_block.nim index 69e2cc767..45ca9f0c0 100644 --- a/beacon_chain/spec/state_transition_block.nim +++ b/beacon_chain/spec/state_transition_block.nim @@ -434,7 +434,7 @@ func get_participant_reward*(total_active_balance: Gwei): Gwei = get_base_reward_per_increment(total_active_balance) * total_active_increments max_participant_rewards = total_base_rewards * SYNC_REWARD_WEIGHT div WEIGHT_DENOMINATOR div SLOTS_PER_EPOCH - return max_participant_rewards div SYNC_COMMITTEE_SIZE + max_participant_rewards div SYNC_COMMITTEE_SIZE # https://github.com/ethereum/consensus-specs/blob/v1.1.0-alpha.6/specs/altair/beacon-chain.md#sync-committee-processing func get_proposer_reward*(participant_reward: Gwei): Gwei = diff --git a/beacon_chain/spec/state_transition_epoch.nim b/beacon_chain/spec/state_transition_epoch.nim index a9e7994c7..f8bbaa9f2 100644 --- a/beacon_chain/spec/state_transition_epoch.nim +++ b/beacon_chain/spec/state_transition_epoch.nim @@ -859,22 +859,22 @@ func get_adjusted_total_slashing_balance*( let multiplier = # tradeoff here about interleaving phase0/altair, but for these # single-constant changes... - uint64(when state is phase0.BeaconState: + when state is phase0.BeaconState: PROPORTIONAL_SLASHING_MULTIPLIER elif state is altair.BeaconState: PROPORTIONAL_SLASHING_MULTIPLIER_ALTAIR elif state is bellatrix.BeaconState: PROPORTIONAL_SLASHING_MULTIPLIER_MERGE else: - raiseAssert "process_slashings: incorrect BeaconState type") - return min(sum(state.slashings.data) * multiplier, total_balance) + {.fatal: "process_slashings: incorrect BeaconState type".} + min(sum(state.slashings.data) * multiplier, total_balance) # https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/beacon-chain.md#slashings # https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/beacon-chain.md#slashings # https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/merge/beacon-chain.md#slashings func slashing_penalty_applies*(validator: Validator, epoch: Epoch): bool = - return validator.slashed and - epoch + EPOCHS_PER_SLASHINGS_VECTOR div 2 == validator.withdrawable_epoch + validator.slashed and + epoch + EPOCHS_PER_SLASHINGS_VECTOR div 2 == validator.withdrawable_epoch # https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/beacon-chain.md#slashings # https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/beacon-chain.md#slashings @@ -886,7 +886,7 @@ func get_slashing_penalty*(validator: Validator, # numerator to avoid uint64 overflow let penalty_numerator = validator.effective_balance div increment * adjusted_total_slashing_balance - return penalty_numerator div total_balance * increment + penalty_numerator div total_balance * increment # https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/beacon-chain.md#slashings # https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/beacon-chain.md#slashings diff --git a/ncli/ncli_common.nim b/ncli/ncli_common.nim new file mode 100644 index 000000000..3ec5117b6 --- /dev/null +++ b/ncli/ncli_common.nim @@ -0,0 +1,370 @@ +import + re, strutils, os, + ../beacon_chain/spec/[ + datatypes/base, + datatypes/phase0, + datatypes/altair, + datatypes/merge, + beaconstate, + state_transition_epoch, + state_transition_block, + signatures], + ../beacon_chain/consensus_object_pools/block_pools_types + +type + RewardsAndPenalties* = object + source_outcome*: int64 + max_source_reward*: Gwei + target_outcome*: int64 + max_target_reward*: Gwei + head_outcome*: int64 + max_head_reward*: Gwei + inclusion_delay_outcome*: int64 + max_inclusion_delay_reward*: Gwei + sync_committee_outcome*: int64 + max_sync_committee_reward*: Gwei + proposer_outcome*: int64 + inactivity_penalty*: Gwei + slashing_outcome*: int64 + deposits*: Gwei + inclusion_delay*: Option[uint64] + + ParticipationFlags* = object + currentEpochParticipation: EpochParticipationFlags + previousEpochParticipation: EpochParticipationFlags + + PubkeyToIndexTable = Table[ValidatorPubKey, int] + + AuxiliaryState* = object + epochParticipationFlags: ParticipationFlags + pubkeyToIndex: PubkeyToIndexTable + +const + epochInfoFileNameDigitsCount = 8 + epochFileNameExtension* = ".epoch" + +proc copyParticipationFlags*(auxiliaryState: var AuxiliaryState, + forkedState: ForkedHashedBeaconState) = + withState(forkedState): + when stateFork > BeaconStateFork.Phase0: + template flags: untyped = auxiliaryState.epochParticipationFlags + flags.currentEpochParticipation = state.data.current_epoch_participation + flags.previousEpochParticipation = state.data.previous_epoch_participation + +proc getEpochRange*(dir: string): + tuple[firstEpoch, lastEpoch: Epoch] = + const epochInfoFileNameRegexStr = + r"\d{" & $epochInfoFileNameDigitsCount & r"}\" & epochFileNameExtension + var pattern {.global.}: Regex + once: pattern = re(epochInfoFileNameRegexStr) + var smallestEpochFileName = + '9'.repeat(epochInfoFileNameDigitsCount) & epochFileNameExtension + var largestEpochFileName = + '0'.repeat(epochInfoFileNameDigitsCount) & epochFileNameExtension + for (_, fn) in walkDir(dir.string, relative = true): + if fn.match(pattern): + if fn < smallestEpochFileName: + smallestEpochFileName = fn + if fn > largestEpochFileName: + largestEpochFileName = fn + result.firstEpoch = parseUInt( + smallestEpochFileName[0 ..< epochInfoFileNameDigitsCount]).Epoch + result.lastEpoch = parseUInt( + largestEpochFileName[0 ..< epochInfoFileNameDigitsCount]).Epoch + +proc epochAsString*(epoch: Epoch): string = + let strEpoch = $epoch + '0'.repeat(epochInfoFileNameDigitsCount - strEpoch.len) & strEpoch + +proc getFilePathForEpoch*(epoch: Epoch, dir: string): string = + dir / epochAsString(epoch) & epochFileNameExtension + +func getBlockRange*(dag: ChainDAGRef, start, ends: Slot): seq[BlockRef] = + # Range of block in reverse order + doAssert start < ends + result = newSeqOfCap[BlockRef](ends - start) + var current = dag.head + while current != nil: + if current.slot < ends: + if current.slot < start or current.slot == 0: # skip genesis + break + else: + result.add current + current = current.parent + +func getOutcome(delta: RewardDelta): int64 = + delta.rewards.int64 - delta.penalties.int64 + +proc collectSlashings( + rewardsAndPenalties: var seq[RewardsAndPenalties], + state: ForkyBeaconState, total_balance: Gwei) = + let + epoch = get_current_epoch(state) + adjusted_total_slashing_balance = get_adjusted_total_slashing_balance( + state, total_balance) + + for index in 0 ..< state.validators.len: + let validator = unsafeAddr state.validators.asSeq()[index] + if slashing_penalty_applies(validator[], epoch): + rewardsAndPenalties[index].slashing_outcome += + validator[].get_slashing_penalty( + adjusted_total_slashing_balance, total_balance).int64 + +proc collectEpochRewardsAndPenalties*( + rewardsAndPenalties: var seq[RewardsAndPenalties], + state: phase0.BeaconState, cache: var StateCache, cfg: RuntimeConfig) = + if get_current_epoch(state) == GENESIS_EPOCH: + return + + var info: phase0.EpochInfo + + info.init(state) + info.process_attestations(state, cache) + doAssert info.validators.len == state.validators.len + rewardsAndPenalties.setLen(state.validators.len) + + let + finality_delay = get_finality_delay(state) + total_balance = info.balances.current_epoch + total_balance_sqrt = integer_squareroot(total_balance) + + for index, validator in info.validators.pairs: + if not is_eligible_validator(validator): + continue + + let base_reward = get_base_reward_sqrt( + state, index.ValidatorIndex, total_balance_sqrt) + + template get_attestation_component_reward_helper(attesting_balance: Gwei): Gwei = + get_attestation_component_reward(attesting_balance, + info.balances.current_epoch, base_reward.uint64, finality_delay) + + template rp: untyped = rewardsAndPenalties[index] + + rp.source_outcome = get_source_delta( + validator, base_reward, info.balances, finality_delay).getOutcome + rp.max_source_reward = get_attestation_component_reward_helper( + info.balances.previous_epoch_attesters) + + rp.target_outcome = get_target_delta( + validator, base_reward, info.balances, finality_delay).getOutcome + rp.max_target_reward = get_attestation_component_reward_helper( + info.balances.previous_epoch_target_attesters) + + rp.head_outcome = get_head_delta( + validator, base_reward, info.balances, finality_delay).getOutcome + rp.max_head_reward = get_attestation_component_reward_helper( + info.balances.previous_epoch_head_attesters) + + let (inclusion_delay_delta, proposer_delta) = get_inclusion_delay_delta( + validator, base_reward) + rp.inclusion_delay_outcome = inclusion_delay_delta.getOutcome + rp.max_inclusion_delay_reward = + base_reward - state_transition_epoch.get_proposer_reward(base_reward) + + rp.inactivity_penalty = get_inactivity_penalty_delta( + validator, base_reward, finality_delay).penalties + + if proposer_delta.isSome: + let proposer_index = proposer_delta.get[0] + if proposer_index < info.validators.lenu64: + rewardsAndPenalties[proposer_index].proposer_outcome += + proposer_delta.get[1].getOutcome + + rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch) + +proc collectEpochRewardsAndPenalties*( + rewardsAndPenalties: var seq[RewardsAndPenalties], + state: altair.BeaconState | merge.BeaconState, + cache: var StateCache, cfg: RuntimeConfig) = + if get_current_epoch(state) == GENESIS_EPOCH: + return + + var info: altair.EpochInfo + info.init(state) + doAssert info.validators.len == state.validators.len + rewardsAndPenalties.setLen(state.validators.len) + + let + total_active_balance = info.balances.current_epoch + base_reward_per_increment = get_base_reward_per_increment( + total_active_balance) + + for flag_index in 0 ..< PARTICIPATION_FLAG_WEIGHTS.len: + for validator_index, delta in get_flag_index_deltas( + state, flag_index, base_reward_per_increment, info): + template rp: untyped = rewardsAndPenalties[validator_index] + + let + base_reward = get_base_reward_increment( + state, validator_index, base_reward_per_increment) + active_increments = get_active_increments(info) + unslashed_participating_increment = + get_unslashed_participating_increment(info, flag_index) + max_flag_index_reward = get_flag_index_reward( + state, base_reward, active_increments, + unslashed_participating_increment, + PARTICIPATION_FLAG_WEIGHTS[flag_index].uint64) + + case flag_index + of TIMELY_SOURCE_FLAG_INDEX: + rp.source_outcome = delta.getOutcome + rp.max_source_reward = max_flag_index_reward + of TIMELY_TARGET_FLAG_INDEX: + rp.target_outcome = delta.getOutcome + rp.max_target_reward = max_flag_index_reward + of TIMELY_HEAD_FLAG_INDEX: + rp.head_outcome = delta.getOutcome + rp.max_head_reward = max_flag_index_reward + else: + raiseAssert(&"Unknown flag index {flag_index}.") + + for validator_index, penalty in get_inactivity_penalty_deltas( + cfg, state, info): + rewardsAndPenalties[validator_index].inactivity_penalty += penalty + + rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch) + +proc collectFromSlashedValidator( + rewardsAndPenalties: var seq[RewardsAndPenalties], + state: ForkyBeaconState, slashedIndex, proposerIndex: ValidatorIndex) = + template slashed_validator: untyped = state.validators[slashedIndex] + let slashingPenalty = get_slashing_penalty(state, slashed_validator.effective_balance) + let whistleblowerReward = get_whistleblower_reward(slashed_validator.effective_balance) + rewardsAndPenalties[slashedIndex].slashing_outcome -= slashingPenalty.int64 + rewardsAndPenalties[proposerIndex].slashing_outcome += whistleblowerReward.int64 + +proc collectFromProposerSlashings( + rewardsAndPenalties: var seq[RewardsAndPenalties], + forkedState: ForkedHashedBeaconState, + forkedBlock: ForkedTrustedSignedBeaconBlock) = + withStateAndBlck(forkedState, forkedBlock): + for proposer_slashing in blck.message.body.proposer_slashings: + doAssert check_proposer_slashing(state.data, proposer_slashing, {}).isOk + let slashedIndex = proposer_slashing.signed_header_1.message.proposer_index + rewardsAndPenalties.collectFromSlashedValidator(state.data, + slashedIndex.ValidatorIndex, blck.message.proposer_index.ValidatorIndex) + +proc collectFromAttesterSlashings( + rewardsAndPenalties: var seq[RewardsAndPenalties], + forkedState: ForkedHashedBeaconState, + forkedBlock: ForkedTrustedSignedBeaconBlock) = + withStateAndBlck(forkedState, forkedBlock): + for attester_slashing in blck.message.body.attester_slashings: + let attester_slashing_validity = check_attester_slashing( + state.data, attester_slashing, {}) + doAssert attester_slashing_validity.isOk + for slashedIndex in attester_slashing_validity.value: + rewardsAndPenalties.collectFromSlashedValidator( + state.data, slashedIndex, blck.message.proposer_index.ValidatorIndex) + +proc collectFromAttestations( + rewardsAndPenalties: var seq[RewardsAndPenalties], + forkedState: ForkedHashedBeaconState, + forkedBlock: ForkedTrustedSignedBeaconBlock, + epochParticipationFlags: var ParticipationFlags, + cache: var StateCache) = + withStateAndBlck(forkedState, forkedBlock): + when stateFork > BeaconStateFork.Phase0: + let base_reward_per_increment = get_base_reward_per_increment( + get_total_active_balance(state.data, cache)) + doAssert base_reward_per_increment > 0 + for attestation in blck.message.body.attestations: + doAssert check_attestation(state.data, attestation, {}, cache).isOk + let proposerReward = + if attestation.data.target.epoch == get_current_epoch(state.data): + get_proposer_reward( + state.data, attestation, base_reward_per_increment, cache, + epochParticipationFlags.currentEpochParticipation) + else: + get_proposer_reward( + state.data, attestation, base_reward_per_increment, cache, + epochParticipationFlags.previousEpochParticipation) + rewardsAndPenalties[blck.message.proposer_index].proposer_outcome += + proposerReward.int64 + let inclusionDelay = state.data.slot - attestation.data.slot + for index in get_attesting_indices( + state.data, attestation.data, attestation.aggregation_bits, cache): + rewardsAndPenalties[index].inclusion_delay = some(inclusionDelay.uint64) + +proc collectFromDeposits( + rewardsAndPenalties: var seq[RewardsAndPenalties], + forkedState: ForkedHashedBeaconState, + forkedBlock: ForkedTrustedSignedBeaconBlock, + pubkeyToIndex: var PubkeyToIndexTable, + cfg: RuntimeConfig) = + withStateAndBlck(forkedState, forkedBlock): + for deposit in blck.message.body.deposits: + let pubkey = deposit.data.pubkey + let amount = deposit.data.amount + var index = findValidatorIndex(state.data, pubkey) + if index == -1: + index = pubkeyToIndex.getOrDefault(pubkey, -1) + if index != -1: + rewardsAndPenalties[index].deposits += amount + elif verify_deposit_signature(cfg, deposit.data): + pubkeyToIndex[pubkey] = rewardsAndPenalties.len + rewardsAndPenalties.add( + RewardsAndPenalties(deposits: amount)) + +proc collectFromSyncAggregate( + rewardsAndPenalties: var seq[RewardsAndPenalties], + forkedState: ForkedHashedBeaconState, + forkedBlock: ForkedTrustedSignedBeaconBlock, + cache: var StateCache) = + withStateAndBlck(forkedState, forkedBlock): + when stateFork > BeaconStateFork.Phase0: + let total_active_balance = get_total_active_balance(state.data, cache) + let participant_reward = get_participant_reward(total_active_balance) + let proposer_reward = + state_transition_block.get_proposer_reward(participant_reward) + let indices = get_sync_committee_cache(state.data, cache).current_sync_committee + + template aggregate: untyped = blck.message.body.sync_aggregate + + doAssert indices.len == SYNC_COMMITTEE_SIZE + doAssert aggregate.sync_committee_bits.len == SYNC_COMMITTEE_SIZE + doAssert state.data.current_sync_committee.pubkeys.len == SYNC_COMMITTEE_SIZE + + for i in 0 ..< SYNC_COMMITTEE_SIZE: + rewardsAndPenalties[indices[i]].max_sync_committee_reward += + participant_reward + if aggregate.sync_committee_bits[i]: + rewardsAndPenalties[indices[i]].sync_committee_outcome += + participant_reward.int64 + rewardsAndPenalties[blck.message.proposer_index].proposer_outcome += + proposer_reward.int64 + else: + rewardsAndPenalties[indices[i]].sync_committee_outcome -= + participant_reward.int64 + +proc collectBlockRewardsAndPenalties*( + rewardsAndPenalties: var seq[RewardsAndPenalties], + forkedState: ForkedHashedBeaconState, + forkedBlock: ForkedTrustedSignedBeaconBlock, + auxiliaryState: var AuxiliaryState, + cache: var StateCache, cfg: RuntimeConfig) = + rewardsAndPenalties.collectFromProposerSlashings(forkedState, forkedBlock) + rewardsAndPenalties.collectFromAttesterSlashings(forkedState, forkedBlock) + rewardsAndPenalties.collectFromAttestations( + forkedState, forkedBlock, auxiliaryState.epochParticipationFlags, cache) + rewardsAndPenalties.collectFromDeposits( + forkedState, forkedBlock, auxiliaryState.pubkeyToIndex, cfg) + # This table is needed only to resolve double deposits in the same block, so + # it can be cleared after processing all deposits for the current block. + auxiliaryState.pubkeyToIndex.clear + rewardsAndPenalties.collectFromSyncAggregate(forkedState, forkedBlock, cache) + +proc getStartEpoch*(outDir: string): Epoch = + outDir.getEpochRange.lastEpoch + 1 + +func serializeToCsv*(rp: RewardsAndPenalties, + avgInclusionDelay = none(float)): string = + for name, value in fieldPairs(rp): + if value isnot Option: + result &= $value & "," + if avgInclusionDelay.isSome: + result.addFloat(avgInclusionDelay.get) + elif rp.inclusion_delay.isSome: + result &= $rp.inclusion_delay.get + result &= "\n" diff --git a/ncli/ncli_db.nim b/ncli/ncli_db.nim index 2ad5feb30..57f7238ea 100644 --- a/ncli/ncli_db.nim +++ b/ncli/ncli_db.nim @@ -1,5 +1,5 @@ import - os, stats, strformat, tables, + os, stats, strformat, tables, snappy, chronicles, confutils, stew/[byteutils, io2], eth/db/kvstore_sqlite3, ../beacon_chain/networking/network_metadata, ../beacon_chain/[beacon_chain_db], @@ -7,9 +7,13 @@ import ../beacon_chain/spec/datatypes/[phase0, altair, merge], ../beacon_chain/spec/[ beaconstate, helpers, state_transition, state_transition_epoch, validator, - state_transition_block, signatures], + ssz_codec], ../beacon_chain/sszdump, - ../research/simutils, ./e2store + ../research/simutils, + ./e2store, ./ncli_common + +when defined(posix): + import system/ansi_c type Timers = enum tInit = "Initialize DB" @@ -146,18 +150,21 @@ type desc: "Number of slots to run benchmark for, 0 = all the way to head".}: uint64 of DbCmd.validatorDb: outDir* {. - defaultValue: "" - name: "out-db" - desc: "Output database".}: string - perfect* {. - defaultValue: false - name: "perfect" - desc: "Include perfect records (full rewards)".}: bool + name: "out-dir" + abbr: "o" + desc: "Output directory".}: string startEpoch* {. - defaultValue: 0 name: "start-epoch" - desc: "Epoch from which to start recording statistics. " & - "By default one more than the last epoch in the database.".}: uint + abbr: "s" + desc: "Epoch from which to start recording statistics." & + "By default one past the last epoch in the output directory".}: Option[uint] + endEpoch* {. + name: "end-epoch" + abbr: "e" + desc: "The last for which to record statistics." & + "By default the last epoch in the input database".}: Option[uint] + +var shouldShutDown = false proc putState(db: BeaconChainDB, state: ForkedHashedBeaconState) = withState(state): @@ -175,21 +182,6 @@ func getSlotRange(dag: ChainDAGRef, startSlot: int64, count: uint64): (Slot, Slo else: start + count (start, ends) -func getBlockRange(dag: ChainDAGRef, start, ends: Slot): seq[BlockRef] = - # Range of block in reverse order - var - blockRefs: seq[BlockRef] - cur = dag.head - - while cur != nil: - if cur.slot < ends: - if cur.slot < start or cur.slot == 0: # skip genesis - break - else: - blockRefs.add cur - cur = cur.parent - blockRefs - proc cmdBench(conf: DbConf, cfg: RuntimeConfig) = var timers: array[Timers, RunningStat] @@ -249,6 +241,7 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) = template processBlocks(blocks: auto) = for b in blocks.mitems(): + if shouldShutDown: quit QuitSuccess while getStateField(stateData[].data, slot) < b.message.slot: let isEpoch = (getStateField(stateData[].data, slot) + 1).is_epoch() withTimer(timers[if isEpoch: tAdvanceEpoch else: tAdvanceSlot]): @@ -317,6 +310,7 @@ proc cmdDumpState(conf: DbConf) = mergeState = (ref merge.HashedBeaconState)() for stateRoot in conf.stateRoot: + if shouldShutDown: quit QuitSuccess template doit(state: untyped) = try: state.root = Eth2Digest.fromHex(stateRoot) @@ -338,6 +332,7 @@ proc cmdPutState(conf: DbConf, cfg: RuntimeConfig) = defer: db.close() for file in conf.stateFile: + if shouldShutDown: quit QuitSuccess let state = newClone(readSszForkedHashedBeaconState( cfg, readAllBytes(file).tryGet())) db.putState(state[]) @@ -347,6 +342,7 @@ proc cmdDumpBlock(conf: DbConf) = defer: db.close() for blockRoot in conf.blockRootx: + if shouldShutDown: quit QuitSuccess try: let root = Eth2Digest.fromHex(blockRoot) if (let blck = db.getPhase0Block(root); blck.isSome): @@ -365,6 +361,8 @@ proc cmdPutBlock(conf: DbConf, cfg: RuntimeConfig) = defer: db.close() for file in conf.blckFile: + if shouldShutDown: quit QuitSuccess + let blck = readSszForkedSignedBeaconBlock( cfg, readAllBytes(file).tryGet()) @@ -413,6 +411,7 @@ proc copyPrunedDatabase( copyDb.putBlock(db.getPhase0Block(genesisBlock.get).get) for signedBlock in getAncestors(db, headBlock.get): + if shouldShutDown: quit QuitSuccess if not dry_run: copyDb.putBlock(signedBlock) copyDb.checkpoint() @@ -527,6 +526,7 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) = timers: array[Timers, RunningStat] for era in conf.era ..< conf.era + conf.eraCount: + if shouldShutDown: quit QuitSuccess let firstSlot = if era == 0: none(Slot) @@ -663,6 +663,8 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) = of EpochInfoFork.Altair: echo "TODO altair" + if shouldShutDown: quit QuitSuccess + for bi in 0 ..< blockRefs.len: blck = db.getPhase0Block(blockRefs[blockRefs.len - bi - 1].root).get() while getStateField(state[].data, slot) < blck.message.slot: @@ -718,8 +720,7 @@ proc createValidatorsRawTable(db: SqStoreRef) = db.exec(""" CREATE TABLE IF NOT EXISTS validators_raw( validator_index INTEGER PRIMARY KEY, - pubkey BLOB NOT NULL UNIQUE, - withdrawal_credentials BLOB NOT NULL + pubkey BLOB NOT NULL UNIQUE ); """).expect("DB") @@ -728,167 +729,17 @@ proc createValidatorsView(db: SqStoreRef) = CREATE VIEW IF NOT EXISTS validators AS SELECT validator_index, - '0x' || lower(hex(pubkey)) as pubkey, - '0x' || lower(hex(withdrawal_credentials)) as with_cred + '0x' || lower(hex(pubkey)) as pubkey FROM validators_raw; """).expect("DB") -proc createPhase0EpochInfoTable(db: SqStoreRef) = - db.exec(""" - CREATE TABLE IF NOT EXISTS phase0_epoch_info( - epoch INTEGER PRIMARY KEY, - current_epoch_raw INTEGER NOT NULL, - previous_epoch_raw INTEGER NOT NULL, - current_epoch_attesters_raw INTEGER NOT NULL, - current_epoch_target_attesters_raw INTEGER NOT NULL, - previous_epoch_attesters_raw INTEGER NOT NULL, - previous_epoch_target_attesters_raw INTEGER NOT NULL, - previous_epoch_head_attesters_raw INTEGER NOT NULL - ); - """).expect("DB") - -proc createAltairEpochInfoTable(db: SqStoreRef) = - db.exec(""" - CREATE TABLE IF NOT EXISTS altair_epoch_info( - epoch INTEGER PRIMARY KEY, - previous_epoch_timely_source_balance INTEGER NOT NULL, - previous_epoch_timely_target_balance INTEGER NOT NULL, - previous_epoch_timely_head_balance INTEGER NOT NULL, - current_epoch_timely_target_balance INTEGER NOT NULL, - current_epoch_total_active_balance INTEGER NOT NULL - ); - """).expect("DB") - -proc createValidatorEpochInfoTable(db: SqStoreRef) = - db.exec(""" - CREATE TABLE IF NOT EXISTS validator_epoch_info( - validator_index INTEGER, - epoch INTEGER, - source_outcome INTEGER NOT NULL, - max_source_reward INTEGER NOT NULL, - target_outcome INTEGER NOT NULL, - max_target_reward INTEGER NOT NULL, - head_outcome INTEGER NOT NULL, - max_head_reward INTEGER NOT NULL, - inclusion_delay_outcome INTEGER NOT NULL, - max_inclusion_delay_reward INTEGER NOT NULL, - sync_committee_outcome INTEGER NOT NULL, - max_sync_committee_reward INTEGER NOT NULL, - proposer_outcome INTEGER NOT NULL, - inactivity_penalty INTEGER NOT NULL, - slashing_outcome INTEGER NOT NULL, - inclusion_delay INTEGER NULL, - PRIMARY KEY(validator_index, epoch) - ); - """).expect("DB") - proc createInsertValidatorProc(db: SqStoreRef): auto = db.prepareStmt(""" INSERT OR IGNORE INTO validators_raw( validator_index, - pubkey, - withdrawal_credentials) - VALUES(?, ?, ?);""", - (int64, array[48, byte], array[32, byte]), void).expect("DB") - -proc createInsertPhase0EpochInfoProc(db: SqStoreRef): auto = - db.prepareStmt(""" - INSERT OR IGNORE INTO phase0_epoch_info( - epoch, - current_epoch_raw, - previous_epoch_raw, - current_epoch_attesters_raw, - current_epoch_target_attesters_raw, - previous_epoch_attesters_raw, - previous_epoch_target_attesters_raw, - previous_epoch_head_attesters_raw) - VALUES(?, ?, ?, ?, ?, ?, ?, ?);""", - (int64, int64, int64, int64, int64, int64, int64, int64), void).expect("DB") - -proc createInsertAltairEpochInfoProc(db: SqStoreRef): auto = - db.prepareStmt(""" - INSERT OR IGNORE INTO altair_epoch_info( - epoch, - previous_epoch_timely_source_balance, - previous_epoch_timely_target_balance, - previous_epoch_timely_head_balance, - current_epoch_timely_target_balance, - current_epoch_total_active_balance) - VALUES(?, ?, ?, ?, ?, ?);""", - (int64, int64, int64, int64, int64, int64), void).expect("DB") - -proc createInsertValidatorEpochInfoProc(db: SqStoreRef): auto = - db.prepareStmt(""" - INSERT OR IGNORE INTO validator_epoch_info( - validator_index, - epoch, - source_outcome, - max_source_reward, - target_outcome, - max_target_reward, - head_outcome, - max_head_reward, - inclusion_delay_outcome, - max_inclusion_delay_reward, - sync_committee_outcome, - max_sync_committee_reward, - proposer_outcome, - inactivity_penalty, - slashing_outcome, - inclusion_delay) - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""", - (int64, int64, int64, int64, int64, int64, int64, int64, int64, int64, - int64, int64, int64, int64, int64, Option[int64]), void).expect("DB") - -type - RewardsAndPenalties = object - source_outcome: int64 - max_source_reward: Gwei - target_outcome: int64 - max_target_reward: Gwei - head_outcome: int64 - max_head_reward: Gwei - inclusion_delay_outcome: int64 - max_inclusion_delay_reward: Gwei - sync_committee_outcome: int64 - max_sync_committee_reward: Gwei - proposer_outcome: int64 - inactivity_penalty: Gwei - slashing_outcome: int64 - deposits: Gwei - inclusion_delay: Option[int64] - - ParticipationFlags = object - currentEpochParticipation: EpochParticipationFlags - previousEpochParticipation: EpochParticipationFlags - - PubkeyToIndexTable = Table[ValidatorPubKey, int] - - AuxiliaryState = object - epochParticipationFlags: ParticipationFlags - pubkeyToIndex: PubkeyToIndexTable - -proc copyParticipationFlags(auxiliaryState: var AuxiliaryState, - forkedState: ForkedHashedBeaconState) = - withState(forkedState): - when stateFork > BeaconStateFork.Phase0: - template flags: untyped = auxiliaryState.epochParticipationFlags - flags.currentEpochParticipation = state.data.current_epoch_participation - flags.previousEpochParticipation = state.data.previous_epoch_participation - -proc isPerfect(info: RewardsAndPenalties): bool = - info.slashing_outcome >= 0 and - info.source_outcome == info.max_source_reward.int64 and - info.target_outcome == info.max_target_reward.int64 and - info.head_outcome == info.max_head_reward.int64 and - info.inclusion_delay_outcome == info.max_inclusion_delay_reward.int64 and - info.sync_committee_outcome == info.max_sync_committee_reward.int64 - -proc getMaxEpochFromDbTable(db: SqStoreRef, tableName: string): int64 = - var queryResult: int64 - discard db.exec(&"SELECT MAX(epoch) FROM {tableName}", ()) do (res: int64): - queryResult = res - return queryResult + pubkey) + VALUES(?, ?);""", + (int64, array[48, byte]), void).expect("DB") proc collectBalances(balances: var seq[uint64], forkedState: ForkedHashedBeaconState) = withState(forkedState): @@ -950,280 +801,17 @@ template inTransaction(db: SqStoreRef, dbName: string, body: untyped) = proc insertValidators(db: SqStoreRef, state: ForkedHashedBeaconState, startIndex, endIndex: int64) = var insertValidator {.global.}: SqliteStmt[ - (int64, array[48, byte], array[32, byte]), void] + (int64, array[48, byte]), void] once: insertValidator = db.createInsertValidatorProc withState(state): db.inTransaction("DB"): for i in startIndex ..< endIndex: - insertValidator.exec((i, state.data.validators[i].pubkey.toRaw, - state.data.validators[i].withdrawal_credentials.data)).expect("DB") - -proc getOutcome(delta: RewardDelta): int64 = - delta.rewards.int64 - delta.penalties.int64 - -proc collectSlashings( - rewardsAndPenalties: var seq[RewardsAndPenalties], - state: ForkyBeaconState, total_balance: Gwei) = - let - epoch = get_current_epoch(state) - adjusted_total_slashing_balance = get_adjusted_total_slashing_balance( - state, total_balance) - - for index in 0 ..< state.validators.len: - let validator = unsafeAddr state.validators.asSeq()[index] - if slashing_penalty_applies(validator[], epoch): - rewardsAndPenalties[index].slashing_outcome += - validator[].get_slashing_penalty( - adjusted_total_slashing_balance, total_balance).int64 - -proc collectEpochRewardsAndPenalties( - rewardsAndPenalties: var seq[RewardsAndPenalties], - state: phase0.BeaconState, cache: var StateCache, cfg: RuntimeConfig) = - if get_current_epoch(state) == GENESIS_EPOCH: - return - - var info: phase0.EpochInfo - - info.init(state) - info.processAttestations(state, cache) - doAssert info.validators.len == state.validators.len - rewardsAndPenalties.setLen(state.validators.len) - - let - finality_delay = get_finality_delay(state) - total_balance = info.balances.current_epoch - total_balance_sqrt = integer_squareroot(total_balance) - - for index, validator in info.validators.pairs: - if not is_eligible_validator(validator): - continue - - let base_reward = get_base_reward_sqrt( - state, index.ValidatorIndex, total_balance_sqrt) - - template get_attestation_component_reward_helper(attesting_balance: Gwei): Gwei = - get_attestation_component_reward(attesting_balance, - info.balances.current_epoch, base_reward.uint64, finality_delay) - - template rp: untyped = rewardsAndPenalties[index] - - rp.source_outcome = get_source_delta( - validator, base_reward, info.balances, finality_delay).getOutcome - rp.max_source_reward = get_attestation_component_reward_helper( - info.balances.previous_epoch_attesters) - - rp.target_outcome = get_target_delta( - validator, base_reward, info.balances, finality_delay).getOutcome - rp.max_target_reward = get_attestation_component_reward_helper( - info.balances.previous_epoch_target_attesters) - - rp.head_outcome = get_head_delta( - validator, base_reward, info.balances, finality_delay).getOutcome - rp.max_head_reward = get_attestation_component_reward_helper( - info.balances.previous_epoch_head_attesters) - - let (inclusion_delay_delta, proposer_delta) = get_inclusion_delay_delta( - validator, base_reward) - rp.inclusion_delay_outcome = inclusion_delay_delta.getOutcome - rp.max_inclusion_delay_reward = - base_reward - state_transition_epoch.get_proposer_reward(base_reward) - - rp.inactivity_penalty = get_inactivity_penalty_delta( - validator, base_reward, finality_delay).penalties - - if proposer_delta.isSome: - let proposer_index = proposer_delta.get[0] - if proposer_index < info.validators.lenu64: - rewardsAndPenalties[proposer_index].proposer_outcome += - proposer_delta.get[1].getOutcome - - rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch) - -proc collectEpochRewardsAndPenalties( - rewardsAndPenalties: var seq[RewardsAndPenalties], - state: altair.BeaconState | merge.BeaconState, - cache: var StateCache, cfg: RuntimeConfig) = - if get_current_epoch(state) == GENESIS_EPOCH: - return - - var info: altair.EpochInfo - info.init(state) - doAssert info.validators.len == state.validators.len - rewardsAndPenalties.setLen(state.validators.len) - - let - total_active_balance = info.balances.current_epoch - base_reward_per_increment = get_base_reward_per_increment( - total_active_balance) - - for flag_index in 0 ..< PARTICIPATION_FLAG_WEIGHTS.len: - for validator_index, delta in get_flag_index_deltas( - state, flag_index, base_reward_per_increment, info): - template rp: untyped = rewardsAndPenalties[validator_index] - - let - base_reward = get_base_reward_increment( - state, validator_index, base_reward_per_increment) - active_increments = get_active_increments(info) - unslashed_participating_increment = - get_unslashed_participating_increment(info, flag_index) - max_flag_index_reward = get_flag_index_reward( - state, base_reward, active_increments, - unslashed_participating_increment, - PARTICIPATION_FLAG_WEIGHTS[flag_index].uint64) - - case flag_index - of TIMELY_SOURCE_FLAG_INDEX: - rp.source_outcome = delta.getOutcome - rp.max_source_reward = max_flag_index_reward - of TIMELY_TARGET_FLAG_INDEX: - rp.target_outcome = delta.getOutcome - rp.max_target_reward = max_flag_index_reward - of TIMELY_HEAD_FLAG_INDEX: - rp.head_outcome = delta.getOutcome - rp.max_head_reward = max_flag_index_reward - else: - raiseAssert(&"Unknown flag index {flag_index}.") - - for validator_index, penalty in get_inactivity_penalty_deltas( - cfg, state, info): - rewardsAndPenalties[validator_index].inactivity_penalty += penalty - - rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch) - -proc collectFromSlashedValidator( - rewardsAndPenalties: var seq[RewardsAndPenalties], - state: ForkyBeaconState, slashedIndex, proposerIndex: ValidatorIndex) = - template slashed_validator: untyped = state.validators[slashedIndex] - let slashingPenalty = get_slashing_penalty(state, slashed_validator.effective_balance) - let whistleblowerReward = get_whistleblower_reward(slashed_validator.effective_balance) - rewardsAndPenalties[slashedIndex].slashing_outcome -= slashingPenalty.int64 - rewardsAndPenalties[proposerIndex].slashing_outcome += whistleblowerReward.int64 - -proc collectFromProposerSlashings( - rewardsAndPenalties: var seq[RewardsAndPenalties], - forkedState: ForkedHashedBeaconState, - forkedBlock: ForkedTrustedSignedBeaconBlock) = - withStateAndBlck(forkedState, forkedBlock): - for proposer_slashing in blck.message.body.proposer_slashings: - doAssert check_proposer_slashing(state.data, proposer_slashing, {}).isOk - let slashedIndex = proposer_slashing.signed_header_1.message.proposer_index - rewardsAndPenalties.collectFromSlashedValidator(state.data, - slashedIndex.ValidatorIndex, blck.message.proposer_index.ValidatorIndex) - -proc collectFromAttesterSlashings( - rewardsAndPenalties: var seq[RewardsAndPenalties], - forkedState: ForkedHashedBeaconState, - forkedBlock: ForkedTrustedSignedBeaconBlock) = - withStateAndBlck(forkedState, forkedBlock): - for attester_slashing in blck.message.body.attester_slashings: - let attester_slashing_validity = check_attester_slashing( - state.data, attester_slashing, {}) - doAssert attester_slashing_validity.isOk - for slashedIndex in attester_slashing_validity.value: - rewardsAndPenalties.collectFromSlashedValidator( - state.data, slashedIndex, blck.message.proposer_index.ValidatorIndex) - -proc collectFromAttestations( - rewardsAndPenalties: var seq[RewardsAndPenalties], - forkedState: ForkedHashedBeaconState, - forkedBlock: ForkedTrustedSignedBeaconBlock, - epochParticipationFlags: var ParticipationFlags, - cache: var StateCache) = - withStateAndBlck(forkedState, forkedBlock): - when stateFork > BeaconStateFork.Phase0: - let base_reward_per_increment = get_base_reward_per_increment( - get_total_active_balance(state.data, cache)) - doAssert base_reward_per_increment > 0 - for attestation in blck.message.body.attestations: - doAssert check_attestation(state.data, attestation, {}, cache).isOk - let proposerReward = - if attestation.data.target.epoch == get_current_epoch(state.data): - get_proposer_reward( - state.data, attestation, base_reward_per_increment, cache, - epochParticipationFlags.currentEpochParticipation) - else: - get_proposer_reward( - state.data, attestation, base_reward_per_increment, cache, - epochParticipationFlags.previousEpochParticipation) - rewardsAndPenalties[blck.message.proposer_index].proposer_outcome += - proposerReward.int64 - let inclusionDelay = state.data.slot - attestation.data.slot - for index in get_attesting_indices( - state.data, attestation.data, attestation.aggregation_bits, cache): - rewardsAndPenalties[index].inclusion_delay = some(inclusionDelay.int64) - -proc collectFromDeposits( - rewardsAndPenalties: var seq[RewardsAndPenalties], - forkedState: ForkedHashedBeaconState, - forkedBlock: ForkedTrustedSignedBeaconBlock, - pubkeyToIndex: var PubkeyToIndexTable, - cfg: RuntimeConfig) = - withStateAndBlck(forkedState, forkedBlock): - for deposit in blck.message.body.deposits: - let pubkey = deposit.data.pubkey - let amount = deposit.data.amount - var index = findValidatorIndex(state.data, pubkey) - if index == -1: - index = pubkeyToIndex.getOrDefault(pubkey, -1) - if index != -1: - rewardsAndPenalties[index].deposits += amount - elif verify_deposit_signature(cfg, deposit.data): - pubkeyToIndex[pubkey] = rewardsAndPenalties.len - rewardsAndPenalties.add( - RewardsAndPenalties(deposits: amount)) - -proc collectFromSyncAggregate( - rewardsAndPenalties: var seq[RewardsAndPenalties], - forkedState: ForkedHashedBeaconState, - forkedBlock: ForkedTrustedSignedBeaconBlock, - cache: var StateCache) = - withStateAndBlck(forkedState, forkedBlock): - when stateFork > BeaconStateFork.Phase0: - let total_active_balance = get_total_active_balance(state.data, cache) - let participant_reward = get_participant_reward(total_active_balance) - let proposer_reward = - state_transition_block.get_proposer_reward(participant_reward) - let indices = get_sync_committee_cache(state.data, cache).current_sync_committee - - template aggregate: untyped = blck.message.body.sync_aggregate - - doAssert indices.len == SYNC_COMMITTEE_SIZE - doAssert aggregate.sync_committee_bits.len == SYNC_COMMITTEE_SIZE - doAssert state.data.current_sync_committee.pubkeys.len == SYNC_COMMITTEE_SIZE - - for i in 0 ..< SYNC_COMMITTEE_SIZE: - rewardsAndPenalties[indices[i]].max_sync_committee_reward += - participant_reward - if aggregate.sync_committee_bits[i]: - rewardsAndPenalties[indices[i]].sync_committee_outcome += - participant_reward.int64 - rewardsAndPenalties[blck.message.proposer_index].proposer_outcome += - proposer_reward.int64 - else: - rewardsAndPenalties[indices[i]].sync_committee_outcome -= - participant_reward.int64 - -proc collectBlockRewardsAndPenalties( - rewardsAndPenalties: var seq[RewardsAndPenalties], - forkedState: ForkedHashedBeaconState, - forkedBlock: ForkedTrustedSignedBeaconBlock, - auxiliaryState: var AuxiliaryState, - cache: var StateCache, cfg: RuntimeConfig) = - rewardsAndPenalties.collectFromProposerSlashings(forkedState, forkedBlock) - rewardsAndPenalties.collectFromAttesterSlashings(forkedState, forkedBlock) - rewardsAndPenalties.collectFromAttestations( - forkedState, forkedBlock, auxiliaryState.epochParticipationFlags, cache) - rewardsAndPenalties.collectFromDeposits( - forkedState, forkedBlock, auxiliaryState.pubkeyToIndex, cfg) - # This table is needed only to resolve double deposits in the same block, so - # it can be cleared after processing all deposits for the current block. - auxiliaryState.pubkeyToIndex.clear - rewardsAndPenalties.collectFromSyncAggregate(forkedState, forkedBlock, cache) + insertValidator.exec( + (i, state.data.validators[i].pubkey.toRaw)).expect("DB") proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = # Create a database with performance information for every epoch - echo "Opening database..." + info "Opening database..." let db = BeaconChainDB.new(conf.databaseDir.string, false, true) defer: db.close() @@ -1241,43 +829,43 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = outDb.createValidatorsRawTable outDb.createValidatorsView - outDb.createPhase0EpochInfoTable - outDb.createAltairEpochInfoTable - outDb.createValidatorEpochInfoTable let - insertPhase0EpochInfo = outDb.createInsertPhase0EpochInfoProc - insertAltairEpochInfo = outDb.createInsertAltairEpochInfoProc - insertValidatorInfo = outDb.createInsertValidatorEpochInfoProc - minEpoch = - if conf.startEpoch == 0: - Epoch(max(outDb.getMaxEpochFromDbTable("phase0_epoch_info"), - outDb.getMaxEpochFromDbTable("altair_epoch_info")) + 1) + startEpoch = + if conf.startEpoch.isNone: + Epoch(conf.startEpoch.get) else: - Epoch(conf.startEpoch) - start = minEpoch.start_slot() - ends = dag.finalizedHead.slot # Avoid dealing with changes + getStartEpoch(conf.outDir) + endEpoch = + if conf.endEpoch.isSome: + Epoch(conf.endEpoch.get) + else: + dag.finalizedHead.slot.epoch # Avoid dealing with changes - if start > ends: - echo "No (new) data found, database at ", minEpoch, ", finalized to ", ends.epoch - quit 1 + if startEpoch > endEpoch: + fatal "Start epoch cannot be bigger than end epoch.", + startEpoch = startEpoch, endEpoch = endEpoch + quit QuitFailure - let blockRefs = dag.getBlockRange(start, ends) + info "Analyzing performance for epochs.", + startEpoch = startEpoch, endEpoch = endEpoch - echo "Analyzing performance for epochs ", - start.epoch, " - ", ends.epoch + let + startSlot = startEpoch.start_slot + endSlot = endEpoch.start_slot + SLOTS_PER_EPOCH + blockRefs = dag.getBlockRange(startSlot, endSlot) let tmpState = newClone(dag.headState) var cache = StateCache() - let slot = if start > 0: start - 1 else: 0.Slot + let slot = if startSlot > 0: startSlot - 1 else: 0.Slot if blockRefs.len > 0: - dag.updateStateData(tmpState[], blockRefs[^1].atSlot(slot), false, cache) + discard dag.updateStateData(tmpState[], blockRefs[^1].atSlot(slot), false, cache) else: - dag.updateStateData(tmpState[], dag.head.atSlot(slot), false, cache) + discard dag.updateStateData(tmpState[], dag.head.atSlot(slot), false, cache) - let dbValidatorsCount = outDb.getDbValidatorsCount() + let savedValidatorsCount = outDb.getDbValidatorsCount var validatorsCount = getStateField(tmpState[].data, validators).len - outDb.insertValidators(tmpState[].data, dbValidatorsCount, validatorsCount) + outDb.insertValidators(tmpState[].data, savedValidatorsCount, validatorsCount) var previousEpochBalances: seq[uint64] collectBalances(previousEpochBalances, tmpState[].data) @@ -1290,8 +878,10 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = auxiliaryState.copyParticipationFlags(tmpState[].data) proc processEpoch() = - let epoch = getStateField(tmpState[].data, slot).epoch.int64 - echo epoch + let epoch = getStateField(tmpState[].data, slot).epoch + info "Processing epoch ...", epoch = epoch + + var csvLines = newStringOfCap(1000000) withState(tmpState[].data): withEpochInfo(forkedInfo): @@ -1300,65 +890,29 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = doAssert state.data.balances.len == rewardsAndPenalties.len for index, validator in info.validators.pairs: - template outputInfo: untyped = rewardsAndPenalties[index] + template rp: untyped = rewardsAndPenalties[index] checkBalance(index, validator, state.data.balances[index], - previousEpochBalances[index], outputInfo) + previousEpochBalances[index], rp) - let delay = - when infoFork == EpochInfoFork.Phase0: + when infoFork == EpochInfoFork.Phase0: + rp.inclusion_delay = block: let notSlashed = (RewardFlags.isSlashed notin validator.flags) if notSlashed and validator.is_previous_epoch_attester.isSome(): - some(int64(validator.is_previous_epoch_attester.get().delay)) + some(validator.is_previous_epoch_attester.get().delay.uint64) else: - none(int64) - else: - rewardsAndPenalties[index].inclusion_delay + none(uint64) + csvLines.add rp.serializeToCsv - if conf.perfect or not outputInfo.isPerfect: - insertValidatorInfo.exec(( - index.int64, - epoch, - outputInfo.source_outcome, - outputInfo.max_source_reward.int64, - outputInfo.target_outcome, - outputInfo.max_target_reward.int64, - outputInfo.head_outcome, - outputInfo.max_head_reward.int64, - outputInfo.inclusion_delay_outcome, - outputInfo.max_inclusion_delay_reward.int64, - outputInfo.sync_committee_outcome, - outputInfo.max_sync_committee_reward.int64, - outputInfo.proposer_outcome, - outputInfo.inactivity_penalty.int64, - outputInfo.slashing_outcome, - delay)).expect("DB") + let fileName = getFilePathForEpoch(epoch, conf.outDir) + var res = io2.removeFile(fileName) + doAssert res.isOk + res = io2.writeFile(fileName, snappy.encode(csvLines.toBytes)) + doAssert res.isOk + if shouldShutDown: quit QuitSuccess collectBalances(previousEpochBalances, tmpState[].data) - case forkedInfo.kind - of EpochInfoFork.Phase0: - template info: untyped = forkedInfo.phase0Data - insertPhase0EpochInfo.exec(( - epoch, - info.balances.current_epoch_raw.int64, - info.balances.previous_epoch_raw.int64, - info.balances.current_epoch_attesters_raw.int64, - info.balances.current_epoch_target_attesters_raw.int64, - info.balances.previous_epoch_attesters_raw.int64, - info.balances.previous_epoch_target_attesters_raw.int64, - info.balances.previous_epoch_head_attesters_raw.int64) - ).expect("DB") - of EpochInfoFork.Altair: - template info: untyped = forkedInfo.altairData - insertAltairEpochInfo.exec(( - epoch, - info.balances.previous_epoch[0].int64, - info.balances.previous_epoch[1].int64, - info.balances.previous_epoch[2].int64, - info.balances.current_epoch_TIMELY_TARGET.int64, - info.balances.current_epoch.int64)).expect("DB") - proc processSlots(ends: Slot, endsFlags: UpdateFlags) = var currentSlot = getStateField(tmpState[].data, slot) while currentSlot < ends: @@ -1370,17 +924,17 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = rewardsAndPenalties.collectEpochRewardsAndPenalties( state.data, cache, cfg) - let ok = process_slots(cfg, tmpState[].data, nextSlot, cache, forkedInfo, flags) - doAssert ok, "Slot processing can't fail with correct inputs" + let res = process_slots(cfg, tmpState[].data, nextSlot, cache, forkedInfo, flags) + doAssert res.isOk, "Slot processing can't fail with correct inputs" currentSlot = nextSlot if currentSlot.isEpoch: - outDb.inTransaction("DB"): - processEpoch() + processEpoch() rewardsAndPenalties.setLen(0) rewardsAndPenalties.setLen(validatorsCount) auxiliaryState.copyParticipationFlags(tmpState[].data) + clear cache for bi in 0 ..< blockRefs.len: let forkedBlock = dag.getForkedBlock(blockRefs[blockRefs.len - bi - 1]) @@ -1393,8 +947,8 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = let res = state_transition_block( cfg, tmpState[].data, blck, cache, {}, noRollback) if res.isErr: - echo "State transition failed (!)" - quit 1 + fatal "State transition failed (!)" + quit QuitFailure let newValidatorsCount = getStateField(tmpState[].data, validators).len if newValidatorsCount > validatorsCount: @@ -1403,15 +957,27 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = rewardsAndPenalties.setLen(newValidatorsCount) previousEpochBalances.setLen(newValidatorsCount) # ... and add the new validators to the database. - outDb.insertValidators(tmpState[].data, validatorsCount, newValidatorsCount) + outDb.insertValidators( + tmpState[].data, validatorsCount, newValidatorsCount) validatorsCount = newValidatorsCount # Capture rewards of empty slots as well, including the epoch that got # finalized - let ok = processSlots(ends, {}) - doAssert ok, "Slot processing can't fail with correct inputs" + processSlots(endSlot, {}) + +proc controlCHook {.noconv.} = + notice "Shutting down after having received SIGINT." + shouldShutDown = true + +proc exitOnSigterm(signal: cint) {.noconv.} = + notice "Shutting down after having received SIGTERM." + shouldShutDown = true when isMainModule: + setControlCHook(controlCHook) + when defined(posix): + c_signal(SIGTERM, exitOnSigterm) + var conf = DbConf.load() cfg = getRuntimeConfig(conf.eth2Network) diff --git a/ncli/validator_db_aggregator.nim b/ncli/validator_db_aggregator.nim new file mode 100644 index 000000000..69a4511e2 --- /dev/null +++ b/ncli/validator_db_aggregator.nim @@ -0,0 +1,236 @@ +import + std/[os, strutils, streams, parsecsv], + stew/[io2, byteutils], chronicles, confutils, snappy, + ../beacon_chain/spec/datatypes/base, + ./ncli_common + +when defined(posix): + import system/ansi_c + +type + AggregatorConf = object + startEpoch {. + name: "start-epoch" + abbr: "s" + desc: "The first epoch which to be aggregated. " & + "By default use the first epoch for which has a file" .}: Option[uint64] + endEpoch {. + name: "end-epoch" + abbr: "e" + desc: "The last epoch which to be aggregated. " & + "By default use the last epoch for which has a file" .}: Option[uint64] + resolution {. + defaultValue: 225, + name: "resolution" + abbr: "r" + desc: "How many epochs to be aggregated in a single file" .}: uint + inputDir {. + name: "input-dir" + abbr: "i" + desc: "The directory with the epoch info files" .}: InputDir + outputDir {. + defaultValue: "" + name: "output-dir" + abbr: "o" + desc: "The directory where aggregated file to be written. " & + "By default use the same directory as the input one"}: InputDir + +var shutDown = false + +proc determineStartAndEndEpochs(config: AggregatorConf): + tuple[startEpoch, endEpoch: Epoch] = + if config.startEpoch.isNone or config.endEpoch.isNone: + (result.startEpoch, result.endEpoch) = getEpochRange(config.inputDir.string) + if config.startEpoch.isSome: + result.startEpoch = config.startEpoch.get.Epoch + if config.endEpoch.isSome: + result.endEpoch = config.endEpoch.get.Epoch + if result.startEpoch > result.endEpoch: + fatal "Start epoch cannot be bigger than the end epoch.", + startEpoch = result.startEpoch, endEpoch = result.endEpoch + quit QuitFailure + +proc checkIntegrity(startEpoch, endEpoch: Epoch, dir: string) = + for epoch in startEpoch .. endEpoch: + let filePath = getFilePathForEpoch(epoch, dir) + if not filePath.fileExists: + fatal "File for epoch does not exist.", epoch = epoch, filePath = filePath + quit QuitFailure + +proc parseRow(csvRow: CsvRow): RewardsAndPenalties = + result = RewardsAndPenalties( + source_outcome: parseBiggestInt(csvRow[0]), + max_source_reward: parseBiggestUInt(csvRow[1]), + target_outcome: parseBiggestInt(csvRow[2]), + max_target_reward: parseBiggestUInt(csvRow[3]), + head_outcome: parseBiggestInt(csvRow[4]), + max_head_reward: parseBiggestUInt(csvRow[5]), + inclusion_delay_outcome: parseBiggestInt(csvRow[6]), + max_inclusion_delay_reward: parseBiggestUInt(csvRow[7]), + sync_committee_outcome: parseBiggestInt(csvRow[8]), + max_sync_committee_reward: parseBiggestUInt(csvRow[9]), + proposer_outcome: parseBiggestInt(csvRow[10]), + inactivity_penalty: parseBiggestUInt(csvRow[11]), + slashing_outcome: parseBiggestInt(csvRow[12]), + deposits: parseBiggestUInt(csvRow[13])) + if csvRow[14].len > 0: + result.inclusion_delay = some(parseBiggestUInt(csvRow[14])) + +proc `+=`(lhs: var RewardsAndPenalties, rhs: RewardsAndPenalties) = + lhs.source_outcome += rhs.source_outcome + lhs.max_source_reward += rhs.max_source_reward + lhs.target_outcome += rhs.target_outcome + lhs.max_target_reward += rhs.max_target_reward + lhs.head_outcome += rhs.head_outcome + lhs.max_head_reward += rhs.max_head_reward + lhs.inclusion_delay_outcome += rhs.inclusion_delay_outcome + lhs.max_inclusion_delay_reward += rhs.max_inclusion_delay_reward + lhs.sync_committee_outcome += rhs.sync_committee_outcome + lhs.max_sync_committee_reward += rhs.max_sync_committee_reward + lhs.proposer_outcome += rhs.proposer_outcome + lhs.inactivity_penalty += rhs.inactivity_penalty + lhs.slashing_outcome += rhs.slashing_outcome + lhs.deposits += rhs.deposits + if lhs.inclusion_delay.isSome: + if rhs.inclusion_delay.isSome: + lhs.inclusion_delay.get += rhs.inclusion_delay.get + else: + if rhs.inclusion_delay.isSome: + lhs.inclusion_delay = some(rhs.inclusion_delay.get) + +proc average(rp: var RewardsAndPenalties, + averageInclusionDelay: var Option[float], + epochsCount: uint, inclusionDelaysCount: uint64) = + rp.source_outcome = rp.source_outcome div epochsCount.int64 + rp.max_source_reward = rp.max_source_reward div epochsCount + rp.target_outcome = rp.target_outcome div epochsCount.int64 + rp.max_target_reward = rp.max_target_reward div epochsCount + rp.head_outcome = rp.head_outcome div epochsCount.int64 + rp.max_head_reward = rp.max_head_reward div epochsCount + rp.inclusion_delay_outcome = rp.inclusion_delay_outcome div epochsCount.int64 + rp.max_inclusion_delay_reward = rp.max_inclusion_delay_reward div epochsCount + rp.sync_committee_outcome = rp.sync_committee_outcome div epochsCount.int64 + rp.max_sync_committee_reward = rp.max_sync_committee_reward div epochsCount + rp.proposer_outcome = rp.proposer_outcome div epochsCount.int64 + rp.inactivity_penalty = rp.inactivity_penalty div epochsCount + rp.slashing_outcome = rp.slashing_outcome div epochsCount.int64 + if rp.inclusion_delay.isSome: + doAssert inclusionDelaysCount != 0 + averageInclusionDelay = some( + rp.inclusion_delay.get.float / inclusionDelaysCount.float) + else: + doAssert inclusionDelaysCount == 0 + averageInclusionDelay = none(float) + +proc getFilePathForEpochs(startEpoch, endEpoch: Epoch, dir: string): string = + let fileName = epochAsString(startEpoch) & "_" & + epochAsString(endEpoch) & epochFileNameExtension + dir / fileName + +proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint, + inputDir, outputDir: string) = + if startEpoch > endEpoch: + fatal "Start epoch cannot be larger than the end one.", + startEpoch = startEpoch, endEpoch = endEpoch + quit QuitFailure + + info "Aggregating epochs ...", startEpoch = startEpoch, endEpoch = endEpoch, + inputDir = inputDir, outputDir = outputDir + + var rewardsAndPenalties: seq[RewardsAndPenalties] + var participationEpochsCount: seq[uint] + var inclusionDelaysCount: seq[uint] + var epochsAggregated = 0'u + + for epoch in startEpoch .. endEpoch: + let filePath = getFilePathForEpoch(epoch, inputDir) + info "Processing file ...", file = filePath + + let data = io2.readAllBytes(filePath) + doAssert data.isOk + let dataStream = newStringStream( + string.fromBytes(snappy.decode( + data.get.toOpenArray(0, data.get.len - 1)))) + + var csvParser: CsvParser + csvParser.open(dataStream, filePath) + + var validatorsCount = 0'u + while csvParser.readRow: + inc validatorsCount + let rp = parseRow(csvParser.row) + + if validatorsCount > participationEpochsCount.len.uint: + rewardsAndPenalties.add rp + participationEpochsCount.add 1 + if rp.inclusionDelay.isSome: + inclusionDelaysCount.add 1 + else: + inclusionDelaysCount.add 0 + else: + rewardsAndPenalties[validatorsCount - 1] += rp + inc participationEpochsCount[validatorsCount - 1] + if rp.inclusionDelay.isSome: + inc inclusionDelaysCount[validatorsCount - 1] + + inc epochsAggregated + + if epochsAggregated == resolution or epoch == endEpoch or shutDown: + var csvLines: string + for i in 0 ..< participationEpochsCount.len: + var averageInclusionDelay: Option[float] + average(rewardsAndPenalties[i], averageInclusionDelay, + participationEpochsCount[i], inclusionDelaysCount[i]) + csvLines &= serializeToCsv( + rewardsAndPenalties[i], averageInclusionDelay) + + let fileName = getFilePathForEpochs( + epoch - epochsAggregated + 1, epoch, outputDir) + info "Writing file ...", fileName = fileName + + var result = io2.removeFile(fileName) + doAssert result.isOk + result = io2.writeFile(fileName, snappy.encode(csvLines.toBytes)) + doAssert result.isOk + + if shutDown: + quit QuitSuccess + + participationEpochsCount.setLen(0) + rewardsAndPenalties.setLen(0) + inclusionDelaysCount.setLen(0) + epochsAggregated = 0 + +proc controlCHook {.noconv.} = + notice "Shutting down after having received SIGINT." + shutDown = true + +proc exitOnSigterm(signal: cint) {.noconv.} = + notice "Shutting down after having received SIGTERM." + shutDown = true + +proc main = + setControlCHook(controlCHook) + when defined(posix): + c_signal(SIGTERM, exitOnSigterm) + + let config = load AggregatorConf + let (startEpoch, endEpoch) = config.determineStartAndEndEpochs + if endEpoch == 0: + fatal "Not found epoch info files in the directory.", + inputDir = config.inputDir + quit QuitFailure + + checkIntegrity(startEpoch, endEpoch, config.inputDir.string) + + let outputDir = + if config.outputDir.string.len > 0: + config.outputDir + else: + config.inputDir + + aggregateEpochs(startEpoch, endEpoch, config.resolution, + config.inputDir.string, outputDir.string) + +when isMainModule: + main() diff --git a/ncli/validator_db_reports.ipynb b/ncli/validator_db_reports.ipynb new file mode 100644 index 000000000..344be6d60 --- /dev/null +++ b/ncli/validator_db_reports.ipynb @@ -0,0 +1,359 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "071e1cb8", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autotime\n", + "%matplotlib inline\n", + "import string\n", + "import sqlite3\n", + "import os\n", + "import re\n", + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import snappy\n", + "from scipy.interpolate import make_interp_spline\n", + "from pathlib import Path\n", + "from io import StringIO" + ] + }, + { + "cell_type": "markdown", + "id": "f00bca92", + "metadata": {}, + "source": [ + "**Database connection:**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f69cd8b3", + "metadata": {}, + "outputs": [], + "source": [ + "database_dir = \"../build/data/mainnetValidatorDb/validatorDb.sqlite3\"\n", + "connection = sqlite3.connect(database_dir)" + ] + }, + { + "cell_type": "markdown", + "id": "8ccd8945", + "metadata": {}, + "source": [ + "**Rewards and penalties components:**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "229adad0", + "metadata": {}, + "outputs": [], + "source": [ + "SOURCE = \"source\"\n", + "TARGET = \"target\"\n", + "HEAD = \"head\"\n", + "INCLUSION_DELAY = \"inclusion_delay\"\n", + "SYNC_COMMITTEE = \"sync_committee\"\n", + "\n", + "CSV_DATA_COLUMNS_NAMES = [\n", + " \"source_outcome\",\n", + " \"max_source_reward\",\n", + " \"target_outcome\",\n", + " \"max_target_reward\",\n", + " \"head_outcome\",\n", + " \"max_head_reward\",\n", + " \"inclusion_delay_outcome\",\n", + " \"max_inclusion_delay_reward\",\n", + " \"sync_committee_outcome\",\n", + " \"max_sync_committee_reward\",\n", + " \"proposer_outcome\",\n", + " \"inactivity_penalty\",\n", + " \"slashing_outcome\",\n", + " \"deposits\",\n", + " \"inclusion_delay\"]" + ] + }, + { + "cell_type": "markdown", + "id": "9a747287", + "metadata": {}, + "source": [ + "**Helper functions:**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "63fb9f21", + "metadata": {}, + "outputs": [], + "source": [ + "def valid_public_key(public_key):\n", + " \"\"\"Checks whether a string is a valid hex representation of a public key of an Eth2 validator.\"\"\"\n", + " if len(public_key) != 96:\n", + " return False\n", + " return all(c in string.hexdigits for c in public_key)\n", + "\n", + "def idx(public_key):\n", + " \"\"\"Returns validator's index by its public key.\"\"\"\n", + " \n", + " if public_key.startswith(\"0x\"):\n", + " public_key = public_key[2:]\n", + " \n", + " if not valid_public_key(public_key):\n", + " raise ValueError(f\"The string '{public_key}' is not a valid public key of a validator.\")\n", + " \n", + " QUERY_FIELD = \"validator_index\"\n", + " query = f\"SELECT {QUERY_FIELD} FROM validators_raw WHERE pubkey=x'{public_key}';\"\n", + " query_result = pd.read_sql_query(query, connection)\n", + " \n", + " if len(query_result[QUERY_FIELD]) == 0:\n", + " raise ValueError(f\"Not found a validator with a public key '{public_key}'.\")\n", + " \n", + " if len(query_result[QUERY_FIELD]) > 1:\n", + " raise ValueError(f\"Found multiple validators with a public key '{public_key}'.\")\n", + " \n", + " return query_result[QUERY_FIELD][0]" + ] + }, + { + "cell_type": "markdown", + "id": "946762c1", + "metadata": {}, + "source": [ + "**Input parameters:**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e9aca3ed", + "metadata": {}, + "outputs": [], + "source": [ + "start_epoch = 10000\n", + "end_epoch = 20000\n", + "resolution = 225\n", + "files_dir = \"../build/data/mainnetCompactedValidatorDb/\"\n", + "use_compacted_files = True\n", + "rewards = [SOURCE, TARGET, HEAD, INCLUSION_DELAY, SYNC_COMMITTEE]\n", + "validators_sets = {\n", + " \"set1\": list(range(10)),\n", + " \"set2\": list(map(idx, [\n", + " \"0x8efba2238a00d678306c6258105b058e3c8b0c1f36e821de42da7319c4221b77aa74135dab1860235e19d6515575c381\",\n", + " \"0xa2dce641f347a9e46f58458390e168fa4b3a0166d74fc495457cb00c8e4054b5d284c62aa0d9578af1996c2e08e36fb6\",\n", + " \"0x98b7d0eac7ab95d34dbf2b7baa39a8ec451671328c063ab1207c2055d9d5d6f1115403dc5ea19a1111a404823bd9a6e9\",\n", + " \"0xb0fd08e2e06d1f4d90d0d6843feb543ebeca684cde397fe230e6cdf6f255d234f2c26f4b36c07170dfdfcbbe355d0848\",\n", + " \"0xab7a5aa955382906be3d76e322343bd439e8690f286ecf2f2a7646363b249f5c133d0501d766ccf1aa1640f0283047b3\",\n", + " \"0x980c0c001645a00b71c720935ce193e1ed0e917782c4cb07dd476a4fdb7decb8d91daf2770eb413055f0c1d14b5ed6df\",\n", + " \"0xac7cbdc535ce8254eb9cdedf10d5b1e75de4cd5e91756c3467d0492b01b70b5c6a81530e9849c6b696c8bc157861d0c3\",\n", + " \"0x98ea289db7ea9714699ec93701a3b6db43900e04ae5497be01fa8cc5a56754c23589eaf1f674de718e291376f452d68c\",\n", + " \"0x92451d4c099e51f54ab20f5c1a4edf405595c60122ccfb0f39250b7e80986fe0fe457bacd8a887e9087cd6fc323f492c\",\n", + " \"0xa06f6c678f0129aec056df309a4fe18760116ecaea2292947c5a9cc997632ff437195309783c269ffca7bb2704e675a0\"\n", + " ])),\n", + " \"set3\": list(range(20, 30))\n", + " }" + ] + }, + { + "cell_type": "markdown", + "id": "5e0fb2da", + "metadata": {}, + "source": [ + "**Loading the data and losses calculation:**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "485a2d7e", + "metadata": {}, + "outputs": [], + "source": [ + "COMPACTED_EPOCH_INFO_FILE_PATTERN = re.compile(r\"(\\d{8})\\_(\\d{8})\\.epoch\")\n", + "\n", + "def get_first_and_last_epoch(file_name):\n", + " m = re.match(COMPACTED_EPOCH_INFO_FILE_PATTERN, file_name)\n", + " if m == None:\n", + " return None\n", + " return int(m.group(1)), int(m.group(2))\n", + "\n", + "def isEpochInfoFile(file_name):\n", + " r = get_first_and_last_epoch(file_name)\n", + " if r == None:\n", + " return False\n", + " file_start_epoch, file_end_epoch = r\n", + " if file_start_epoch > file_end_epoch:\n", + " return False\n", + " if file_end_epoch < start_epoch:\n", + " return False\n", + " if file_start_epoch > end_epoch:\n", + " return False\n", + " return True\n", + "\n", + "def adjust_constraints(sorted_file_names):\n", + " first_start_epoch, first_end_epoch = get_first_and_last_epoch(sorted_file_names[0])\n", + " _, last_end_epoch = get_first_and_last_epoch(sorted_file_names[-1])\n", + " start_epoch = first_start_epoch\n", + " end_epoch = last_end_epoch\n", + " resolution = first_end_epoch - first_start_epoch + 1\n", + "\n", + "def read_csv(file_path):\n", + " return pd.read_csv(\n", + " StringIO(snappy.decompress(file_path.read_bytes()).decode(\"utf-8\")),\n", + " names = CSV_DATA_COLUMNS_NAMES, usecols = set(range(0, 10)))\n", + "\n", + "def get_outcome_var(component):\n", + " return component + \"_outcome\"\n", + "\n", + "def get_max_reward_var(component):\n", + " return \"max_\" + component + \"_reward\"\n", + "\n", + "def sum_max_values(t):\n", + " return sum(getattr(t, get_max_reward_var(reward)) for reward in rewards)\n", + "\n", + "def sum_actual_values(t):\n", + " return sum(getattr(t, get_outcome_var(reward)) for reward in rewards)\n", + "\n", + "total_losses_per_epoch_point = {}\n", + "validators_per_epoch_point = {}\n", + "average_losses_per_epoch_point = {}\n", + "\n", + "def compute_total_losses(epoch_point, epochs = None):\n", + " for set_name, validators in validators_sets.items():\n", + " if not set_name in total_losses_per_epoch_point:\n", + " total_losses_per_epoch_point[set_name] = {}\n", + " validators_per_epoch_point[set_name] = {}\n", + " if not epoch_point in total_losses_per_epoch_point[set_name]:\n", + " total_losses_per_epoch_point[set_name][epoch_point] = 0\n", + " validators_per_epoch_point[set_name][epoch_point] = 0\n", + " for validator_index in validators:\n", + " validator_info = data.iloc[validator_index]\n", + " validator_losses = \\\n", + " sum_max_values(validator_info) - sum_actual_values(validator_info)\n", + " total_losses_per_epoch_point[set_name][epoch_point] += \\\n", + " validator_losses if epochs == None else validator_losses * epochs\n", + " validators_per_epoch_point[set_name][epoch_point] += \\\n", + " 1 if epochs == None else epochs\n", + "\n", + "def compute_average_losses():\n", + " for set_name in validators_sets:\n", + " if not set_name in average_losses_per_epoch_point:\n", + " average_losses_per_epoch_point[set_name] = {}\n", + " for epoch_point, total_losses in total_losses_per_epoch_point[set_name].items():\n", + " average_losses_per_epoch_point[set_name][epoch_point] = \\\n", + " total_losses / validators_per_epoch_point[set_name][epoch_point]\n", + "\n", + "if use_compacted_files:\n", + " file_names = [file_name for file_name in os.listdir(files_dir)\n", + " if isEpochInfoFile(file_name)]\n", + " file_names.sort()\n", + " adjust_constraints(file_names)\n", + "\n", + " for file_name in file_names:\n", + " data = read_csv(Path(files_dir + file_name))\n", + " file_first_epoch, file_last_epoch = get_first_and_last_epoch(file_name)\n", + " file_epochs_range = file_last_epoch - file_first_epoch + 1\n", + " epoch_point = file_first_epoch // resolution\n", + " compute_total_losses(epoch_point, file_epochs_range)\n", + "else:\n", + " for epoch in range(start_epoch, end_epoch + 1):\n", + " data = read_csv(Path(files_dir + \"{:08}.epoch\".format(epoch)))\n", + " epoch_point = epoch // resolution\n", + " compute_total_losses(epoch_point)\n", + "\n", + "compute_average_losses()" + ] + }, + { + "cell_type": "markdown", + "id": "800ee35b", + "metadata": {}, + "source": [ + "**Average losses graph:** " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "62d1e96d", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "plt.subplots(figsize = (20, 5))\n", + "plt.title(\"Average losses per epoch\")\n", + "plt.xlabel(\"Epoch\")\n", + "plt.ylabel(\"Gwei\")\n", + "\n", + "for name, value in average_losses_per_epoch_point.items():\n", + " epochs = np.array([ep * resolution + resolution // 2 for ep in value.keys()])\n", + " values = np.array(list(value.values()))\n", + " spline = make_interp_spline(epochs, values)\n", + " num_samples = (end_epoch - start_epoch + 1) // resolution * 100\n", + " x = np.linspace(epochs.min(), epochs.max(), num_samples)\n", + " y = spline(x)\n", + " plt.plot(x, y, label=name)\n", + "\n", + "plt.legend(loc=\"best\")" + ] + }, + { + "cell_type": "markdown", + "id": "0fff538c", + "metadata": {}, + "source": [ + "**Total losses:**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6ab52601", + "metadata": {}, + "outputs": [], + "source": [ + "sets_total_losses = {}\n", + "for set_name, epoch_points in total_losses_per_epoch_point.items():\n", + " sets_total_losses[set_name] = 0\n", + " for _, losses in epoch_points.items():\n", + " sets_total_losses[set_name] += losses\n", + "\n", + "plt.title(\"Total losses\")\n", + "plt.xlabel(\"Set\")\n", + "plt.ylabel(\"Ethers\")\n", + "plt.bar(list(sets_total_losses.keys()), [loss * 1e-9 for loss in sets_total_losses.values()])\n", + "print(sets_total_losses)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}