Fix counters in validator monitor totals mode (#3332)

The current counters set gauges etc to the value of the _last_ validator
to be processed - as the name of the feature implies, we should be using
sums instead.

* fix missing beacon state metrics on startup, pre-first-head-selection
* fix epoch metrics not being updated on cross-epoch reorg
This commit is contained in:
Jacek Sieka 2022-01-31 08:36:29 +01:00 committed by GitHub
parent d583e8e4ac
commit ad327a8769
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 227 additions and 134 deletions

View File

@ -422,6 +422,38 @@ proc getForkedBlock*(
dag.getForkedBlock(blck.bid).expect(
"BlockRef block should always load, database corrupt?")
proc updateBeaconMetrics(state: StateData, cache: var StateCache) =
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#additional-metrics
# both non-negative, so difference can't overflow or underflow int64
beacon_head_root.set(state.blck.root.toGaugeValue)
beacon_head_slot.set(state.blck.slot.toGaugeValue)
withState(state.data):
beacon_pending_deposits.set(
(state.data.eth1_data.deposit_count -
state.data.eth1_deposit_index).toGaugeValue)
beacon_processed_deposits_total.set(
state.data.eth1_deposit_index.toGaugeValue)
beacon_current_justified_epoch.set(
state.data.current_justified_checkpoint.epoch.toGaugeValue)
beacon_current_justified_root.set(
state.data.current_justified_checkpoint.root.toGaugeValue)
beacon_previous_justified_epoch.set(
state.data.previous_justified_checkpoint.epoch.toGaugeValue)
beacon_previous_justified_root.set(
state.data.previous_justified_checkpoint.root.toGaugeValue)
beacon_finalized_epoch.set(
state.data.finalized_checkpoint.epoch.toGaugeValue)
beacon_finalized_root.set(
state.data.finalized_checkpoint.root.toGaugeValue)
let active_validators = count_active_validators(
state.data, state.data.slot.epoch, cache).toGaugeValue
beacon_active_validators.set(active_validators)
beacon_current_active_validators.set(active_validators)
proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags,
onBlockCb: OnBlockCallback = nil, onHeadCb: OnHeadCallback = nil,
@ -605,7 +637,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
genesisRef, tailRef, headRef, tailRoot, headRoot, stateFork, configFork
quit 1
assign(dag.clearanceState, dag.headState)
# db state is likely a epoch boundary state which is what we want for epochs
assign(dag.epochRefState, dag.headState)
dag.forkDigests = newClone ForkDigests.init(
@ -631,6 +663,11 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
quit 1
# Clearance most likely happens from head - assign it after rewinding head
assign(dag.clearanceState, dag.headState)
updateBeaconMetrics(dag.headState, cache)
# The tail block is "implicitly" finalized as it was given either as a
# checkpoint block, or is the genesis, thus we use it as a lower bound when
# computing the finalized head
@ -658,8 +695,6 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
let stateTick = Moment.now()
dag.clearanceState = dag.headState
# Pruning metadata
dag.lastPrunePoint = dag.finalizedHead
@ -1374,6 +1409,8 @@ proc updateHead*(
dag.db.putHeadBlock(newHead.root)
updateBeaconMetrics(dag.headState, cache)
withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
dag.headSyncCommittees = state.data.get_sync_committee_cache(cache)
@ -1433,17 +1470,6 @@ proc updateHead*(
prevDepBlock.root)
dag.onHeadChanged(data)
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#additional-metrics
# both non-negative, so difference can't overflow or underflow int64
beacon_pending_deposits.set(
getStateField(dag.headState.data, eth1_data).deposit_count.toGaugeValue -
getStateField(dag.headState.data, eth1_deposit_index).toGaugeValue)
beacon_processed_deposits_total.set(
getStateField(dag.headState.data, eth1_deposit_index).toGaugeValue)
beacon_head_root.set newHead.root.toGaugeValue
beacon_head_slot.set newHead.slot.toGaugeValue
withState(dag.headState.data):
# Every time the head changes, the "canonical" view of balances and other
# state-related metrics change - notify the validator monitor.
@ -1451,31 +1477,6 @@ proc updateHead*(
# of such updates happening - at most once per valid block.
dag.validatorMonitor[].registerState(state.data)
if lastHead.slot.epoch != newHead.slot.epoch:
# Epoch updated - in theory, these could happen when the wall clock
# changes epoch, even if there is no new block / head, but we'll delay
# updating them until a block confirms the change
beacon_current_justified_epoch.set(
getStateField(
dag.headState.data, current_justified_checkpoint).epoch.toGaugeValue)
beacon_current_justified_root.set(
getStateField(
dag.headState.data, current_justified_checkpoint).root.toGaugeValue)
beacon_previous_justified_epoch.set(
getStateField(
dag.headState.data, previous_justified_checkpoint).epoch.toGaugeValue)
beacon_previous_justified_root.set(
getStateField(
dag.headState.data, previous_justified_checkpoint).root.toGaugeValue)
let
epochRef = getEpochRef(dag, dag.headState, cache)
number_of_active_validators =
epochRef.shuffled_active_validator_indices.lenu64().toGaugeValue
beacon_active_validators.set(number_of_active_validators)
beacon_current_active_validators.set(number_of_active_validators)
if finalizedHead != dag.finalizedHead:
debug "Reached new finalization checkpoint",
head = shortLog(dag.headState.blck),
@ -1504,11 +1505,6 @@ proc updateHead*(
dag.updateFinalizedBlocks()
beacon_finalized_epoch.set(getStateField(
dag.headState.data, finalized_checkpoint).epoch.toGaugeValue)
beacon_finalized_root.set(getStateField(
dag.headState.data, finalized_checkpoint).root.toGaugeValue)
# Pruning the block dag is required every time the finalized head changes
# in order to clear out blocks that are no longer viable and should
# therefore no longer be considered as part of the chain we're following

View File

@ -83,6 +83,8 @@ declareHistogram validator_monitor_prev_epoch_sync_contribution_min_delay_second
"The min delay between when the validator should send the sync contribution and when it was received.", labels = ["validator"]
declareGauge validator_monitor_validator_in_current_sync_committee,
"Is the validator in the current sync committee (1 for true and 0 for false)", labels = ["validator"]
declareGauge validator_monitor_validator_in_next_sync_committee,
"Is the validator in the next sync committee (1 for true and 0 for false)", labels = ["validator"]
declareGauge validator_monitor_validators_total,
"Count of validators that are specifically monitored by this beacon node"
@ -125,6 +127,9 @@ declareCounter validator_monitor_proposer_slashing_total,
declareCounter validator_monitor_attester_slashing_total,
"Number of attester slashings seen", labels = ["src", "validator"]
const
total = "total" # what we use for label when using "totals" mode
type
EpochSummary = object
## Similar to the state transition, we collect everything that happens in
@ -187,6 +192,9 @@ type
template toGaugeValue(v: bool): int64 =
if v: 1 else: 0
template toGaugeValue(v: TimeDiff): float =
toFloatSeconds(v)
proc update_if_lt[T](current: var Option[T], val: T) =
if current.isNone() or val < current.get():
current = some(val)
@ -207,7 +215,7 @@ proc addMonitor*(
template metricId: string =
mixin self, id
if self.totals: "total" else: id
if self.totals: total else: id
proc addAutoMonitor*(
self: var ValidatorMonitor, pubkey: ValidatorPubKey,
@ -242,81 +250,110 @@ proc updateEpoch(self: var ValidatorMonitor, epoch: Epoch) =
return
let
clearMonitor = epoch > self.epoch + 1
monitorEpoch = self.epoch
# index of the EpochSummary that we'll first report, then clear
summaryIdx = epoch.summaryIdx
if clearMonitor:
self.epoch = epoch
validator_monitor_validators_total.set(self.monitors.len().int64)
if epoch > monitorEpoch + 1:
# More than one epoch passed since the last check which makes it difficult
# to report correctly with the amount of data we store - skip this round
# and hope things improve
notice "Resetting validator monitoring", epoch, monitorEpoch = self.epoch
self.epoch = epoch
validator_monitor_validators_total.set(self.monitors.len().int64)
notice "Resetting validator monitoring", epoch, monitorEpoch
for (_, monitor) in self.monitors.mpairs():
if clearMonitor:
monitor.summaries = default(type(monitor.summaries))
continue
reset(monitor.summaries)
return
let
id = monitor.id
template setAll(metric, name: untyped) =
if self.totals:
var agg: int64
for monitor {.inject.} in self.monitors.mvalues:
agg += monitor.summaries[summaryIdx].name
metric.set(agg, [total])
else:
for monitor {.inject.} in self.monitors.mvalues:
metric.set(monitor.summaries[summaryIdx].name, [monitor.id])
let summary = monitor.summaries[summaryIdx]
template observeAll(metric, name: untyped) =
for monitor {.inject.} in self.monitors.mvalues:
if monitor.summaries[summaryIdx].name.isSome():
metric.observe(
monitor.summaries[summaryIdx].name.get.toGaugeValue(),
[if self.totals: total else: monitor.id])
validator_monitor_prev_epoch_attestations_total.set(
summary.attestations, [metricId])
if summary.attestation_min_delay.isSome():
validator_monitor_prev_epoch_attestations_min_delay_seconds.observe(
summary.attestation_min_delay.get().toFloatSeconds(), [metricId])
setAll(
validator_monitor_prev_epoch_attestations_total,
attestations)
validator_monitor_prev_epoch_attestation_aggregate_inclusions.set(
summary.attestation_aggregate_inclusions, [metricId])
validator_monitor_prev_epoch_attestation_block_inclusions.set(
summary.attestation_block_inclusions, [metricId])
observeAll(
validator_monitor_prev_epoch_attestations_min_delay_seconds,
attestation_min_delay)
if summary.attestation_min_block_inclusion_distance.isSome():
setAll(
validator_monitor_prev_epoch_attestation_aggregate_inclusions,
attestation_aggregate_inclusions)
setAll(
validator_monitor_prev_epoch_attestation_block_inclusions,
attestation_block_inclusions)
setAll(
validator_monitor_prev_epoch_sync_committee_messages_total,
sync_committee_messages)
observeAll(
validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds,
sync_committee_message_min_delay)
setAll(
validator_monitor_prev_epoch_sync_contribution_inclusions,
sync_signature_contribution_inclusions)
setAll(
validator_monitor_prev_epoch_sync_signature_block_inclusions,
sync_signature_block_inclusions)
setAll(
validator_monitor_prev_epoch_sync_contributions_total,
sync_contributions)
observeAll(
validator_monitor_prev_epoch_sync_contribution_min_delay_seconds,
sync_contribution_min_delay)
setAll(
validator_monitor_prev_epoch_aggregates_total,
aggregates)
observeAll(
validator_monitor_prev_epoch_aggregates_min_delay_seconds,
aggregate_min_delay)
setAll(
validator_monitor_prev_epoch_exits_total,
exits)
setAll(
validator_monitor_prev_epoch_proposer_slashings_total,
proposer_slashings)
setAll(
validator_monitor_prev_epoch_attester_slashings_total,
attester_slashings)
if not self.totals:
for monitor in self.monitors.mvalues:
if monitor.summaries[summaryIdx].
attestation_min_block_inclusion_distance.isSome:
validator_monitor_prev_epoch_attestation_block_min_inclusion_distance.set(
summary.attestation_min_block_inclusion_distance.get().int64, [metricId])
monitor.summaries[summaryIdx].
attestation_min_block_inclusion_distance.get().int64, [monitor.id])
validator_monitor_prev_epoch_sync_committee_messages_total.set(
summary.sync_committee_messages, [metricId])
if summary.sync_committee_message_min_delay.isSome():
validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds.observe(
summary.sync_committee_message_min_delay.get().toFloatSeconds(), [metricId])
validator_monitor_prev_epoch_sync_contribution_inclusions.set(
summary.sync_signature_contribution_inclusions, [metricId])
validator_monitor_prev_epoch_sync_signature_block_inclusions.set(
summary.sync_signature_block_inclusions, [metricId])
validator_monitor_prev_epoch_sync_contributions_total.set(
summary.sync_contributions, [metricId])
if summary.sync_contribution_min_delay.isSome():
validator_monitor_prev_epoch_sync_contribution_min_delay_seconds.observe(
summary.sync_contribution_min_delay.get().toFloatSeconds(), [metricId])
validator_monitor_prev_epoch_aggregates_total.set(
summary.aggregates, [metricId])
if summary.aggregate_min_delay.isSome():
validator_monitor_prev_epoch_aggregates_min_delay_seconds.observe(
summary.aggregate_min_delay.get().toFloatSeconds(), [metricId])
validator_monitor_prev_epoch_exits_total.set(
summary.exits, [metricId])
validator_monitor_prev_epoch_proposer_slashings_total.set(
summary.proposer_slashings, [metricId])
validator_monitor_prev_epoch_attester_slashings_total.set(
summary.attester_slashings, [metricId])
monitor.summaries[summaryIdx] = default(type(monitor.summaries[summaryIdx]))
for monitor in self.monitors.mvalues:
reset(monitor.summaries[summaryIdx])
func is_active_unslashed_in_previous_epoch(status: RewardStatus): bool =
let flags = status.flags
@ -354,6 +391,8 @@ proc registerEpochInfo*(
if epoch < 2 or self.monitors.len == 0:
return
var in_current_sync_committee, in_next_sync_committee: int64
withEpochInfo(info):
for pubkey, monitor in self.monitors:
if monitor.index.isNone:
@ -435,6 +474,7 @@ proc registerEpochInfo*(
let current_epoch = epoch - 1
if state.current_sync_committee.pubkeys.data.contains(pubkey):
if not self.totals:
validator_monitor_validator_in_current_sync_committee.set(1, [metricId])
self.withEpochSummary(monitor[], current_epoch):
@ -443,12 +483,35 @@ proc registerEpochInfo*(
expected = SLOTS_PER_EPOCH,
epoch = current_epoch,
validator = id
in_current_sync_committee += 1
else:
if not self.totals:
validator_monitor_validator_in_current_sync_committee.set(0, [metricId])
debug "Validator isn't part of the current sync committee",
epoch = current_epoch,
validator = id
if state.next_sync_committee.pubkeys.data.contains(pubkey):
if not self.totals:
validator_monitor_validator_in_next_sync_committee.set(1, [metricId])
self.withEpochSummary(monitor[], current_epoch):
info "Validator in next sync committee",
epoch = current_epoch,
validator = id
in_next_sync_committee += 1
else:
if not self.totals:
validator_monitor_validator_in_next_sync_committee.set(0, [metricId])
if self.totals:
validator_monitor_validator_in_current_sync_committee.set(
in_current_sync_committee, [total])
validator_monitor_validator_in_next_sync_committee.set(
in_next_sync_committee, [total])
self.updateEpoch(epoch)
proc registerState*(self: var ValidatorMonitor, state: ForkyBeaconState) =
@ -467,7 +530,38 @@ proc registerState*(self: var ValidatorMonitor, state: ForkyBeaconState) =
current_epoch = state.slot.epoch
# Update metrics for monitored validators according to the latest rewards
for (_, monitor) in self.monitors.mpairs():
if self.totals:
var
balance: uint64
effective_balance: uint64
slashed: int64
active: int64
exited: int64
withdrawable: int64
for monitor in self.monitors.mvalues:
if not monitor[].index.isSome():
continue
let idx = monitor[].index.get()
if state.balances.lenu64 <= idx.uint64:
continue
balance += state.balances[idx]
effective_balance += state.validators[idx].effective_balance
if state.validators[idx].slashed: slashed += 1
if is_active_validator(state.validators[idx], current_epoch): active += 1
if is_exited_validator(state.validators[idx], current_epoch): exited += 1
if is_withdrawable_validator(state.validators[idx], current_epoch): withdrawable += 1
validator_monitor_balance_gwei.set(balance.toGaugeValue(), [total])
validator_monitor_effective_balance_gwei.set(effective_balance.toGaugeValue(), [total])
validator_monitor_slashed.set(slashed, [total])
validator_monitor_active.set(active, [total])
validator_monitor_exited.set(exited, [total])
validator_monitor_withdrawable.set(withdrawable, [total])
else:
for monitor in self.monitors.mvalues():
if not monitor[].index.isSome():
continue
@ -477,25 +571,25 @@ proc registerState*(self: var ValidatorMonitor, state: ForkyBeaconState) =
let id = monitor[].id
validator_monitor_balance_gwei.set(
state.balances[idx].toGaugeValue(), [metricId])
state.balances[idx].toGaugeValue(), [id])
validator_monitor_effective_balance_gwei.set(
state.validators[idx].effective_balance.toGaugeValue(), [metricId])
state.validators[idx].effective_balance.toGaugeValue(), [id])
validator_monitor_slashed.set(
state.validators[idx].slashed.toGaugeValue(), [metricId])
state.validators[idx].slashed.toGaugeValue(), [id])
validator_monitor_active.set(
is_active_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId])
is_active_validator(state.validators[idx], current_epoch).toGaugeValue(), [id])
validator_monitor_exited.set(
is_exited_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId])
is_exited_validator(state.validators[idx], current_epoch).toGaugeValue(), [id])
validator_monitor_withdrawable.set(
is_withdrawable_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId])
is_withdrawable_validator(state.validators[idx], current_epoch).toGaugeValue(), [id])
validator_activation_eligibility_epoch.set(
state.validators[idx].activation_eligibility_epoch.toGaugeValue(), [metricId])
state.validators[idx].activation_eligibility_epoch.toGaugeValue(), [id])
validator_activation_epoch.set(
state.validators[idx].activation_epoch.toGaugeValue(), [metricId])
state.validators[idx].activation_epoch.toGaugeValue(), [id])
validator_exit_epoch.set(
state.validators[idx].exit_epoch.toGaugeValue(), [metricId])
state.validators[idx].exit_epoch.toGaugeValue(), [id])
validator_withdrawable_epoch.set(
state.validators[idx].withdrawable_epoch.toGaugeValue(), [metricId])
state.validators[idx].withdrawable_epoch.toGaugeValue(), [id])
template withMonitor(self: var ValidatorMonitor, key: ValidatorPubKey, body: untyped): untyped =
self.monitors.withValue(key, valuex):
@ -524,7 +618,7 @@ proc registerAttestation*(
let id = monitor.id
validator_monitor_unaggregated_attestation_total.inc(1, [$src, metricId])
validator_monitor_unaggregated_attestation_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
delay.toGaugeValue(), [$src, metricId])
info "Attestation seen",
attestation = shortLog(attestation),
@ -549,7 +643,7 @@ proc registerAggregate*(
let id = monitor.id
validator_monitor_aggregated_attestation_total.inc(1, [$src, metricId])
validator_monitor_aggregated_attestation_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
delay.toGaugeValue(), [$src, metricId])
info "Aggregated attestion seen",
aggregate = shortLog(signed_aggregate_and_proof.message.aggregate),
@ -564,7 +658,7 @@ proc registerAggregate*(
let id = monitor.id
validator_monitor_attestation_in_aggregate_total.inc(1, [$src, metricId])
validator_monitor_attestation_in_aggregate_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
delay.toGaugeValue(), [$src, metricId])
info "Attestation included in aggregate",
aggregate = shortLog(signed_aggregate_and_proof.message.aggregate),
@ -585,7 +679,10 @@ proc registerAttestationInBlock*(
epoch = data.slot.epoch
validator_monitor_attestation_in_block_total.inc(1, ["block", metricId])
validator_monitor_attestation_in_block_delay_slots.set(inclusion_lag.int64, ["block", metricId])
if not self.totals:
validator_monitor_attestation_in_block_delay_slots.set(
inclusion_lag.int64, ["block", metricId])
info "Attestation included in block",
attestation_data = shortLog(data),
@ -611,7 +708,7 @@ proc registerBeaconBlock*(
validator_monitor_beacon_block_total.inc(1, [$src, metricId])
validator_monitor_beacon_block_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
delay.toGaugeValue(), [$src, metricId])
info "Block seen",
blck = shortLog(blck), src, epoch = slot.epoch, validator = id
@ -629,7 +726,7 @@ proc registerSyncCommitteeMessage*(
validator_monitor_sync_committee_messages_total.inc(1, [$src, metricId])
validator_monitor_sync_committee_messages_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
delay.toGaugeValue(), [$src, metricId])
info "Sync committee message seen",
syncCommitteeMessage = shortLog(sync_committee_message.beacon_block_root),
@ -655,7 +752,7 @@ proc registerSyncContribution*(
let id = monitor.id
validator_monitor_sync_contributions_total.inc(1, [$src, metricId])
validator_monitor_sync_contributions_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
delay.toGaugeValue(), [$src, metricId])
info "Sync contribution seen",
contribution = shortLog(sync_contribution.message.contribution),