nimbus-eth2/ncli/ncli_common.nim
Jacek Sieka 05ffe7b2bf
Prune BlockRef on finalization (#3513)
Up til now, the block dag has been using `BlockRef`, a structure adapted
for a full DAG, to represent all of chain history. This is a correct and
simple design, but does not exploit the linearity of the chain once
parts of it finalize.

By pruning the in-memory `BlockRef` structure at finalization, we save,
at the time of writing, a cool ~250mb (or 25%:ish) chunk of memory
landing us at a steady state of ~750mb normal memory usage for a
validating node.

Above all though, we prevent memory usage from growing proportionally
with the length of the chain, something that would not be sustainable
over time -  instead, the steady state memory usage is roughly
determined by the validator set size which grows much more slowly. With
these changes, the core should remain sustainable memory-wise post-merge
all the way to withdrawals (when the validator set is expected to grow).

In-memory indices are still used for the "hot" unfinalized portion of
the chain - this ensure that consensus performance remains unchanged.

What changes is that for historical access, we use a db-based linear
slot index which is cache-and-disk-friendly, keeping the cost for
accessing historical data at a similar level as before, achieving the
savings at no percievable cost to functionality or performance.

A nice collateral benefit is the almost-instant startup since we no
longer load any large indicies at dag init.

The cost of this functionality instead can be found in the complexity of
having to deal with two ways of traversing the chain - by `BlockRef` and
by slot.

* use `BlockId` instead of `BlockRef` where finalized / historical data
may be required
* simplify clearance pre-advancement
* remove dag.finalizedBlocks (~50:ish mb)
* remove `getBlockAtSlot` - use `getBlockIdAtSlot` instead
* `parent` and `atSlot` for `BlockId` now require a `ChainDAGRef`
instance, unlike `BlockRef` traversal
* prune `BlockRef` parents on finality (~200:ish mb)
* speed up ChainDAG init by not loading finalized history index
* mess up light client server error handling - this need revisiting :)
2022-03-17 17:42:56 +00:00

410 lines
17 KiB
Nim

# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
re, strutils, os, math,
stew/bitops2,
../beacon_chain/spec/[
datatypes/base,
datatypes/phase0,
datatypes/altair,
datatypes/bellatrix,
beaconstate,
state_transition_epoch,
state_transition_block,
signatures],
../beacon_chain/consensus_object_pools/blockchain_dag
type
RewardsAndPenalties* = object
source_outcome*: int64
max_source_reward*: Gwei
target_outcome*: int64
max_target_reward*: Gwei
head_outcome*: int64
max_head_reward*: Gwei
inclusion_delay_outcome*: int64
max_inclusion_delay_reward*: Gwei
sync_committee_outcome*: int64
max_sync_committee_reward*: Gwei
proposer_outcome*: int64
inactivity_penalty*: Gwei
slashing_outcome*: int64
deposits*: Gwei
inclusion_delay*: Option[uint64]
ParticipationFlags* = object
currentEpochParticipation: EpochParticipationFlags
previousEpochParticipation: EpochParticipationFlags
PubkeyToIndexTable = Table[ValidatorPubKey, int]
AuxiliaryState* = object
epochParticipationFlags: ParticipationFlags
pubkeyToIndex: PubkeyToIndexTable
const
epochInfoFileNameDigitsCount = 8
epochFileNameExtension* = ".epoch"
epochNumberRegexStr = r"\d{" & $epochInfoFileNameDigitsCount & r"}\"
func copyParticipationFlags*(auxiliaryState: var AuxiliaryState,
forkedState: ForkedHashedBeaconState) =
withState(forkedState):
when stateFork > BeaconStateFork.Phase0:
template flags: untyped = auxiliaryState.epochParticipationFlags
flags.currentEpochParticipation = state.data.current_epoch_participation
flags.previousEpochParticipation = state.data.previous_epoch_participation
proc getUnaggregatedFilesEpochRange*(dir: string):
tuple[firstEpoch, lastEpoch: Epoch] =
const epochInfoFileNameRegexStr =
epochNumberRegexStr & epochFileNameExtension
var pattern {.global.}: Regex
once: pattern = re(epochInfoFileNameRegexStr)
var smallestEpochFileName =
'9'.repeat(epochInfoFileNameDigitsCount) & epochFileNameExtension
var largestEpochFileName =
'0'.repeat(epochInfoFileNameDigitsCount) & epochFileNameExtension
for (_, fn) in walkDir(dir.string, relative = true):
if fn.match(pattern):
if fn < smallestEpochFileName:
smallestEpochFileName = fn
if fn > largestEpochFileName:
largestEpochFileName = fn
result.firstEpoch = parseUInt(
smallestEpochFileName[0 ..< epochInfoFileNameDigitsCount]).Epoch
result.lastEpoch = parseUInt(
largestEpochFileName[0 ..< epochInfoFileNameDigitsCount]).Epoch
proc getUnaggregatedFilesLastEpoch*(dir: string): Epoch =
dir.getUnaggregatedFilesEpochRange.lastEpoch
proc getAggregatedFilesLastEpoch*(dir: string): Epoch =
const epochInfoFileNameRegexStr =
epochNumberRegexStr & "_" & epochNumberRegexStr & epochFileNameExtension
var pattern {.global.}: Regex
once: pattern = re(epochInfoFileNameRegexStr)
var largestEpochInFileName = 0'u
for (_, fn) in walkDir(dir.string, relative = true):
if fn.match(pattern):
let fileLastEpoch = parseUint(
fn[epochInfoFileNameDigitsCount + 1 .. 2 * epochInfoFileNameDigitsCount])
if fileLastEpoch > largestEpochInFileName:
largestEpochInFileName = fileLastEpoch
largestEpochInFileName.Epoch
func epochAsString*(epoch: Epoch): string =
let strEpoch = $epoch
'0'.repeat(epochInfoFileNameDigitsCount - strEpoch.len) & strEpoch
func getFilePathForEpoch*(epoch: Epoch, dir: string): string =
dir / epochAsString(epoch) & epochFileNameExtension
func getFilePathForEpochs*(startEpoch, endEpoch: Epoch, dir: string): string =
let fileName = epochAsString(startEpoch) & "_" &
epochAsString(endEpoch) & epochFileNameExtension
dir / fileName
func getBlockRange*(dag: ChainDAGRef, start, ends: Slot): seq[BlockId] =
# Range of block in reverse order
doAssert start < ends
result = newSeqOfCap[BlockId](ends - start)
var current = ends
while current > start:
current -= 1
let bsid = dag.getBlockIdAtSlot(current).valueOr:
continue
if bsid.bid.slot < start: # current might be empty
break
result.add(bsid.bid)
current = bsid.bid.slot # skip empty slots
func getOutcome(delta: RewardDelta): int64 =
delta.rewards.int64 - delta.penalties.int64
func collectSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: ForkyBeaconState, total_balance: Gwei) =
let
epoch = get_current_epoch(state)
adjusted_total_slashing_balance = get_adjusted_total_slashing_balance(
state, total_balance)
for index in 0 ..< state.validators.len:
let validator = unsafeAddr state.validators.asSeq()[index]
if slashing_penalty_applies(validator[], epoch):
rewardsAndPenalties[index].slashing_outcome +=
validator[].get_slashing_penalty(
adjusted_total_slashing_balance, total_balance).int64
proc collectEpochRewardsAndPenalties*(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: var phase0.BeaconState, cache: var StateCache, cfg: RuntimeConfig,
flags: UpdateFlags) =
if get_current_epoch(state) == GENESIS_EPOCH:
return
var info: phase0.EpochInfo
info.init(state)
info.process_attestations(state, cache)
doAssert info.validators.len == state.validators.len
rewardsAndPenalties.setLen(state.validators.len)
process_justification_and_finalization(state, info.balances, flags)
let
finality_delay = get_finality_delay(state)
total_balance = info.balances.current_epoch
total_balance_sqrt = integer_squareroot(total_balance)
for index, validator in info.validators.pairs:
if not is_eligible_validator(validator):
continue
let base_reward = get_base_reward_sqrt(
state, index.ValidatorIndex, total_balance_sqrt)
template get_attestation_component_reward_helper(attesting_balance: Gwei): Gwei =
get_attestation_component_reward(attesting_balance,
info.balances.current_epoch, base_reward.uint64, finality_delay)
template rp: untyped = rewardsAndPenalties[index]
rp.source_outcome = get_source_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_source_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_attesters)
rp.target_outcome = get_target_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_target_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_target_attesters)
rp.head_outcome = get_head_delta(
validator, base_reward, info.balances, finality_delay).getOutcome
rp.max_head_reward = get_attestation_component_reward_helper(
info.balances.previous_epoch_head_attesters)
let (inclusion_delay_delta, proposer_delta) = get_inclusion_delay_delta(
validator, base_reward)
rp.inclusion_delay_outcome = inclusion_delay_delta.getOutcome
rp.max_inclusion_delay_reward =
base_reward - state_transition_epoch.get_proposer_reward(base_reward)
rp.inactivity_penalty = get_inactivity_penalty_delta(
validator, base_reward, finality_delay).penalties
if proposer_delta.isSome:
let proposer_index = proposer_delta.get[0]
if proposer_index < info.validators.lenu64:
rewardsAndPenalties[proposer_index].proposer_outcome +=
proposer_delta.get[1].getOutcome
rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch)
proc collectEpochRewardsAndPenalties*(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: var (altair.BeaconState | bellatrix.BeaconState),
cache: var StateCache, cfg: RuntimeConfig, flags: UpdateFlags) =
if get_current_epoch(state) == GENESIS_EPOCH:
return
var info: altair.EpochInfo
info.init(state)
doAssert info.validators.len == state.validators.len
rewardsAndPenalties.setLen(state.validators.len)
process_justification_and_finalization(state, info.balances, flags)
process_inactivity_updates(cfg, state, info)
let
total_active_balance = info.balances.current_epoch
base_reward_per_increment = get_base_reward_per_increment(
total_active_balance)
finality_delay = get_finality_delay(state)
for flag_index in 0 ..< PARTICIPATION_FLAG_WEIGHTS.len:
for validator_index, delta in get_flag_index_deltas(
state, flag_index, base_reward_per_increment, info, finality_delay):
template rp: untyped = rewardsAndPenalties[validator_index]
let
base_reward = get_base_reward_increment(
state, validator_index, base_reward_per_increment)
active_increments = get_active_increments(info)
unslashed_participating_increment =
get_unslashed_participating_increment(info, flag_index)
max_flag_index_reward = get_flag_index_reward(
state, base_reward, active_increments,
unslashed_participating_increment,
PARTICIPATION_FLAG_WEIGHTS[flag_index].uint64,
finalityDelay)
case flag_index
of TIMELY_SOURCE_FLAG_INDEX:
rp.source_outcome = delta.getOutcome
rp.max_source_reward = max_flag_index_reward
of TIMELY_TARGET_FLAG_INDEX:
rp.target_outcome = delta.getOutcome
rp.max_target_reward = max_flag_index_reward
of TIMELY_HEAD_FLAG_INDEX:
rp.head_outcome = delta.getOutcome
rp.max_head_reward = max_flag_index_reward
else:
raiseAssert(&"Unknown flag index {flag_index}.")
for validator_index, penalty in get_inactivity_penalty_deltas(
cfg, state, info):
rewardsAndPenalties[validator_index].inactivity_penalty += penalty
rewardsAndPenalties.collectSlashings(state, info.balances.current_epoch)
func collectFromSlashedValidator(
rewardsAndPenalties: var seq[RewardsAndPenalties],
state: ForkyBeaconState, slashedIndex, proposerIndex: ValidatorIndex) =
template slashed_validator: untyped = state.validators[slashedIndex]
let slashingPenalty = get_slashing_penalty(state, slashed_validator.effective_balance)
let whistleblowerReward = get_whistleblower_reward(slashed_validator.effective_balance)
rewardsAndPenalties[slashedIndex].slashing_outcome -= slashingPenalty.int64
rewardsAndPenalties[proposerIndex].slashing_outcome += whistleblowerReward.int64
func collectFromProposerSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock) =
withStateAndBlck(forkedState, forkedBlock):
for proposer_slashing in blck.message.body.proposer_slashings:
doAssert check_proposer_slashing(state.data, proposer_slashing, {}).isOk
let slashedIndex = proposer_slashing.signed_header_1.message.proposer_index
rewardsAndPenalties.collectFromSlashedValidator(state.data,
slashedIndex.ValidatorIndex, blck.message.proposer_index.ValidatorIndex)
func collectFromAttesterSlashings(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock) =
withStateAndBlck(forkedState, forkedBlock):
for attester_slashing in blck.message.body.attester_slashings:
let attester_slashing_validity = check_attester_slashing(
state.data, attester_slashing, {})
doAssert attester_slashing_validity.isOk
for slashedIndex in attester_slashing_validity.value:
rewardsAndPenalties.collectFromSlashedValidator(
state.data, slashedIndex, blck.message.proposer_index.ValidatorIndex)
func collectFromAttestations(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
epochParticipationFlags: var ParticipationFlags,
cache: var StateCache) =
withStateAndBlck(forkedState, forkedBlock):
when stateFork > BeaconStateFork.Phase0:
let base_reward_per_increment = get_base_reward_per_increment(
get_total_active_balance(state.data, cache))
doAssert base_reward_per_increment > 0
for attestation in blck.message.body.attestations:
doAssert check_attestation(state.data, attestation, {}, cache).isOk
let proposerReward =
if attestation.data.target.epoch == get_current_epoch(state.data):
get_proposer_reward(
state.data, attestation, base_reward_per_increment, cache,
epochParticipationFlags.currentEpochParticipation)
else:
get_proposer_reward(
state.data, attestation, base_reward_per_increment, cache,
epochParticipationFlags.previousEpochParticipation)
rewardsAndPenalties[blck.message.proposer_index].proposer_outcome +=
proposerReward.int64
let inclusionDelay = state.data.slot - attestation.data.slot
for index in get_attesting_indices(
state.data, attestation.data, attestation.aggregation_bits, cache):
rewardsAndPenalties[index].inclusion_delay = some(inclusionDelay.uint64)
proc collectFromDeposits(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
pubkeyToIndex: var PubkeyToIndexTable,
cfg: RuntimeConfig) =
withStateAndBlck(forkedState, forkedBlock):
for deposit in blck.message.body.deposits:
let pubkey = deposit.data.pubkey
let amount = deposit.data.amount
var index = findValidatorIndex(state.data, pubkey)
if index == -1:
index = pubkeyToIndex.getOrDefault(pubkey, -1)
if index != -1:
rewardsAndPenalties[index].deposits += amount
elif verify_deposit_signature(cfg, deposit.data):
pubkeyToIndex[pubkey] = rewardsAndPenalties.len
rewardsAndPenalties.add(
RewardsAndPenalties(deposits: amount))
func collectFromSyncAggregate(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
cache: var StateCache) =
withStateAndBlck(forkedState, forkedBlock):
when stateFork > BeaconStateFork.Phase0:
let total_active_balance = get_total_active_balance(state.data, cache)
let participant_reward = get_participant_reward(total_active_balance)
let proposer_reward =
state_transition_block.get_proposer_reward(participant_reward)
let indices = get_sync_committee_cache(state.data, cache).current_sync_committee
template aggregate: untyped = blck.message.body.sync_aggregate
doAssert indices.len == SYNC_COMMITTEE_SIZE
doAssert aggregate.sync_committee_bits.len == SYNC_COMMITTEE_SIZE
doAssert state.data.current_sync_committee.pubkeys.len == SYNC_COMMITTEE_SIZE
for i in 0 ..< SYNC_COMMITTEE_SIZE:
rewardsAndPenalties[indices[i]].max_sync_committee_reward +=
participant_reward
if aggregate.sync_committee_bits[i]:
rewardsAndPenalties[indices[i]].sync_committee_outcome +=
participant_reward.int64
rewardsAndPenalties[blck.message.proposer_index].proposer_outcome +=
proposer_reward.int64
else:
rewardsAndPenalties[indices[i]].sync_committee_outcome -=
participant_reward.int64
proc collectBlockRewardsAndPenalties*(
rewardsAndPenalties: var seq[RewardsAndPenalties],
forkedState: ForkedHashedBeaconState,
forkedBlock: ForkedTrustedSignedBeaconBlock,
auxiliaryState: var AuxiliaryState,
cache: var StateCache, cfg: RuntimeConfig) =
rewardsAndPenalties.collectFromProposerSlashings(forkedState, forkedBlock)
rewardsAndPenalties.collectFromAttesterSlashings(forkedState, forkedBlock)
rewardsAndPenalties.collectFromAttestations(
forkedState, forkedBlock, auxiliaryState.epochParticipationFlags, cache)
rewardsAndPenalties.collectFromDeposits(
forkedState, forkedBlock, auxiliaryState.pubkeyToIndex, cfg)
# This table is needed only to resolve double deposits in the same block, so
# it can be cleared after processing all deposits for the current block.
auxiliaryState.pubkeyToIndex.clear
rewardsAndPenalties.collectFromSyncAggregate(forkedState, forkedBlock, cache)
func serializeToCsv*(rp: RewardsAndPenalties,
avgInclusionDelay = none(float)): string =
for name, value in fieldPairs(rp):
if value isnot Option:
result &= $value & ","
if avgInclusionDelay.isSome:
result.addFloat(avgInclusionDelay.get)
elif rp.inclusion_delay.isSome:
result &= $rp.inclusion_delay.get
result &= "\n"