Validator monitoring (#2925)

Validator monitoring based on and mostly compatible with the
implementation in Lighthouse - tracks additional logs and metrics for
specified validators so as to stay on top on performance.

The implementation works more or less the following way:
* Validator pubkeys are singled out for monitoring - these can be
running on the node or not
* For every action that the validator takes, we record steps in the
process such as messages being seen on the network or published in the
API
* When the dust settles at the end of an epoch, we report the
information from one epoch before that, which coincides with the
balances being updated - this is a tradeoff between being correct
(waiting for finalization) and providing relevant information in a
timely manner)
This commit is contained in:
Jacek Sieka 2021-12-20 20:20:31 +01:00 committed by GitHub
parent 6ef3834f4a
commit c270ec21e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1127 additions and 111 deletions

View File

@ -84,13 +84,14 @@ OK: 3/3 Fail: 0/3 Skip: 0/3
OK: 1/1 Fail: 0/1 Skip: 0/1
## Block pool processing [Preset: mainnet]
```diff
+ Adding the same block twice returns a Duplicate error [Preset: mainnet] OK
+ Simple block add&get [Preset: mainnet] OK
+ getRef returns nil for missing blocks OK
+ loading tail block works [Preset: mainnet] OK
+ updateHead updates head and headState [Preset: mainnet] OK
+ updateStateData sanity [Preset: mainnet] OK
```
OK: 5/5 Fail: 0/5 Skip: 0/5
OK: 6/6 Fail: 0/6 Skip: 0/6
## Block processor [Preset: mainnet]
```diff
+ Reverse order block add & get [Preset: mainnet] OK
@ -395,4 +396,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL---
OK: 215/217 Fail: 0/217 Skip: 2/217
OK: 216/218 Fail: 0/218 Skip: 2/218

View File

@ -23,13 +23,14 @@ import
sync_committee_msg_pool],
./spec/datatypes/base,
./sync/[sync_manager, request_manager],
./validators/[action_tracker, validator_pool]
./validators/[action_tracker, validator_monitor, validator_pool]
export
osproc, chronos, httpserver, presto, action_tracker, beacon_clock,
beacon_chain_db, conf, attestation_pool, sync_committee_msg_pool,
validator_pool, eth2_network, eth1_monitor, request_manager, sync_manager,
eth2_processor, blockchain_dag, block_quarantine, base, exit_pool
eth2_processor, blockchain_dag, block_quarantine, base, exit_pool,
validator_monitor
type
RpcServer* = RpcHttpServer
@ -70,6 +71,7 @@ type
beaconClock*: BeaconClock
onAttestationSent*: OnAttestationCallback
restKeysCache*: Table[ValidatorPubKey, ValidatorIndex]
validatorMonitor*: ref ValidatorMonitor
const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT

View File

@ -372,6 +372,20 @@ type
name: "terminal-total-difficulty-override"
}: Option[uint64]
validatorMonitorAuto* {.
desc: "Automatically monitor locally active validators (BETA)"
defaultValue: false
name: "validator-monitor-auto" }: bool
validatorMonitorPubkeys* {.
desc: "One or more validators to monitor - works best when --subscribe-all-subnets is enabled (BETA)"
name: "validator-monitor-pubkey" }: seq[ValidatorPubKey]
validatorMonitorTotals* {.
desc: "Publish metrics to single 'totals' label for better collection performance when monitoring many validators (BETA)"
defaultValue: false
name: "validator-monitor-totals" }: bool
of createTestnet:
testnetDepositsFile* {.
desc: "A LaunchPad deposits file for the genesis state validators"
@ -750,6 +764,12 @@ func parseCmdArg*(T: type PubKey0x, input: TaintedString): T
{.raises: [ValueError, Defect].} =
PubKey0x(hexToPaddedByteArray[RawPubKeySize](input.string))
func parseCmdArg*(T: type ValidatorPubKey, input: TaintedString): T
{.raises: [ValueError, Defect].} =
let res = ValidatorPubKey.fromHex(input.string)
if res.isErr(): raise (ref ValueError)(msg: $res.error())
res.get()
func completeCmdArg*(T: type PubKey0x, input: TaintedString): seq[string] =
return @[]

View File

@ -16,9 +16,12 @@ import
../spec/[signatures_batch, forks, helpers],
../spec/datatypes/[phase0, altair, merge],
".."/beacon_chain_db,
../validators/validator_monitor,
./block_dag
export options, sets, tables, hashes, helpers, beacon_chain_db, block_dag
export
options, sets, tables, hashes, helpers, beacon_chain_db, block_dag,
validator_monitor
# ChainDAG and types related to forming a DAG of blocks, keeping track of their
# relationships and allowing various forms of lookups
@ -83,6 +86,8 @@ type
db*: BeaconChainDB ##\
## ColdDB - Stores the canonical chain
validatorMonitor*: ref ValidatorMonitor
# -----------------------------------
# ChainDAGRef - DAG of candidate chains
@ -317,4 +322,3 @@ func init*(t: typedesc[FinalizationInfoObject], blockRoot: Eth2Digest,
state_root: stateRoot,
epoch: epoch
)

View File

@ -344,8 +344,9 @@ proc getForkedBlock(db: BeaconChainDB, root: Eth2Digest):
err()
proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
updateFlags: UpdateFlags, onBlockCb: OnBlockCallback = nil,
onHeadCb: OnHeadCallback = nil, onReorgCb: OnReorgCallback = nil,
validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags,
onBlockCb: OnBlockCallback = nil, onHeadCb: OnHeadCallback = nil,
onReorgCb: OnReorgCallback = nil,
onFinCb: OnFinalizedCallback = nil): ChainDAGRef =
# TODO we require that the db contains both a head and a tail block -
# asserting here doesn't seem like the right way to go about it however..
@ -487,6 +488,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
let dag = ChainDAGRef(
db: db,
validatorMonitor: validatorMonitor,
blocks: blocks,
backfillBlocks: backfillBlocks,
genesis: genesisRef,
@ -796,6 +798,7 @@ proc advanceSlots(
# target
doAssert getStateField(state.data, slot) <= slot
while getStateField(state.data, slot) < slot:
let preEpoch = getStateField(state.data, slot).epoch
loadStateCache(dag, cache, state.blck, getStateField(state.data, slot).epoch)
doAssert process_slots(
@ -805,6 +808,16 @@ proc advanceSlots(
if save:
dag.putState(state)
# The reward information in the state transition is computed for epoch
# transitions - when transitioning into epoch N, the activities in epoch
# N-2 are translated into balance updates, and this is what we capture
# in the monitor. This may be inaccurate during a deep reorg (>1 epoch)
# which is an acceptable tradeoff for monitoring.
withState(state.data):
let postEpoch = state.data.slot.epoch
if preEpoch != postEpoch:
dag.validatorMonitor[].registerEpochInfo(postEpoch, info, state.data)
proc applyBlock(
dag: ChainDAGRef,
state: var StateData, blck: BlockData, flags: UpdateFlags,
@ -1295,6 +1308,13 @@ proc updateHead*(
beacon_head_root.set newHead.root.toGaugeValue
beacon_head_slot.set newHead.slot.toGaugeValue
withState(dag.headState.data):
# Every time the head changes, the "canonical" view of balances and other
# state-related metrics change - notify the validator monitor.
# Doing this update during head update ensures there's a reasonable number
# of such updates happening - at most once per valid block.
dag.validatorMonitor[].registerState(state.data)
if lastHead.slot.epoch != newHead.slot.epoch:
# Epoch updated - in theory, these could happen when the wall clock
# changes epoch, even if there is no new block / head, but we'll delay

View File

@ -35,6 +35,7 @@ type
resfut*: Future[Result[void, BlockError]]
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource
BlockProcessor* = object
## This manages the processing of blocks from different sources
@ -67,6 +68,7 @@ type
# Consumer
# ----------------------------------------------------------------
consensusManager: ref ConsensusManager
validatorMonitor: ref ValidatorMonitor
## Blockchain DAG, AttestationPool and Quarantine
getBeaconTime: GetBeaconTimeFn
@ -80,6 +82,7 @@ proc new*(T: type BlockProcessor,
dumpDirInvalid, dumpDirIncoming: string,
rng: ref BrHmacDrbgContext, taskpool: TaskPoolPtr,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
@ -87,6 +90,7 @@ proc new*(T: type BlockProcessor,
dumpDirIncoming: dumpDirIncoming,
blockQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
validatorMonitor: validatorMonitor,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier(rng: rng, taskpool: taskpool)
)
@ -101,7 +105,7 @@ proc hasBlocks*(self: BlockProcessor): bool =
# ------------------------------------------------------------------------------
proc addBlock*(
self: var BlockProcessor, blck: ForkedSignedBeaconBlock,
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
resfut: Future[Result[void, BlockError]] = nil,
validationDur = Duration()) =
## Enqueue a Gossip-validated block for consensus verification
@ -126,7 +130,8 @@ proc addBlock*(
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur))
validationDur: validationDur,
src: src))
except AsyncQueueFullError:
raiseAssert "unbounded queue"
@ -153,12 +158,18 @@ proc dumpBlock*[T](
proc storeBlock*(
self: var BlockProcessor,
signedBlock: ForkySignedBeaconBlock,
wallSlot: Slot, queueTick: Moment = Moment.now(),
src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock, queueTick: Moment = Moment.now(),
validationDur = Duration()): Result[BlockRef, BlockError] =
## storeBlock is the main entry point for unvalidated blocks - all untrusted
## blocks, regardless of origin, pass through here. When storing a block,
## we will add it to the dag and pass it to all block consumers that need
## to know about it, such as the fork choice and the monitoring
let
attestationPool = self.consensusManager.attestationPool
startTick = Moment.now()
wallSlot = wallTime.slotOrZero()
vm = self.validatorMonitor
dag = self.consensusManager.dag
# The block is certainly not missing any more
@ -176,6 +187,23 @@ proc storeBlock*(
attestationPool[].addForkChoice(
epochRef, blckRef, trustedBlock.message, wallSlot)
vm[].registerBeaconBlock(
src, wallTime, trustedBlock.message)
for attestation in trustedBlock.message.body.attestations:
for idx in get_attesting_indices(
epochRef, attestation.data, attestation.aggregation_bits):
vm[].registerAttestationInBlock(attestation.data, idx,
trustedBlock.message)
withState(dag[].clearanceState.data):
when stateFork >= BeaconStateFork.Altair and
Trusted isnot phase0.TrustedSignedBeaconBlock: # altair+
for i in trustedBlock.message.body.sync_aggregate.sync_committee_bits.oneIndices():
vm[].registerSyncAggregateInBlock(
trustedBlock.message.slot, trustedBlock.root,
state.data.current_sync_committee.pubkeys.data[i])
self.dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received.
@ -212,7 +240,7 @@ proc storeBlock*(
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent
self.addBlock(quarantined)
self.addBlock(MsgSource.gossip, quarantined)
blck
@ -233,7 +261,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
let
res = withBlck(entry.blck):
self.storeBlock(blck, wallSlot, entry.queueTick, entry.validationDur)
self.storeBlock(entry.src, wallTime, blck, entry.queueTick, entry.validationDur)
if entry.resfut != nil:
entry.resfut.complete(

View File

@ -112,6 +112,9 @@ type
# ----------------------------------------------------------------
blockProcessor: ref BlockProcessor
# Validator monitoring
validatorMonitor: ref ValidatorMonitor
# Validated with no further verification required
# ----------------------------------------------------------------
exitPool: ref ExitPool
@ -135,6 +138,7 @@ type
proc new*(T: type Eth2Processor,
doppelGangerDetectionEnabled: bool,
blockProcessor: ref BlockProcessor,
validatorMonitor: ref ValidatorMonitor,
dag: ChainDAGRef,
attestationPool: ref AttestationPool,
exitPool: ref ExitPool,
@ -150,6 +154,7 @@ proc new*(T: type Eth2Processor,
doppelgangerDetection: DoppelgangerProtection(
nodeLaunchSlot: getBeaconTime().slotOrZero),
blockProcessor: blockProcessor,
validatorMonitor: validatorMonitor,
dag: dag,
attestationPool: attestationPool,
exitPool: exitPool,
@ -171,9 +176,8 @@ proc new*(T: type Eth2Processor,
# could be used to push out valid messages.
proc blockValidator*(
self: var Eth2Processor,
signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock |
merge.SignedBeaconBlock): ValidationRes =
self: var Eth2Processor, src: MsgSource,
signedBlock: ForkySignedBeaconBlock): ValidationRes =
let
wallTime = self.getCurrentBeaconTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
@ -207,7 +211,7 @@ proc blockValidator*(
trace "Block validated"
self.blockProcessor[].addBlock(
ForkedSignedBeaconBlock.init(signedBlock),
src, ForkedSignedBeaconBlock.init(signedBlock),
validationDur = self.getCurrentBeaconTime() - wallTime)
# Validator monitor registration for blocks is done by the processor
@ -250,9 +254,8 @@ proc checkForPotentialDoppelganger(
quit QuitFailure
proc attestationValidator*(
self: ref Eth2Processor,
attestation: Attestation,
subnet_id: SubnetId,
self: ref Eth2Processor, src: MsgSource,
attestation: Attestation, subnet_id: SubnetId,
checkSignature: bool = true): Future[ValidationRes] {.async.} =
let wallTime = self.getCurrentBeaconTime()
var (afterGenesis, wallSlot) = wallTime.toSlot()
@ -286,8 +289,12 @@ proc attestationValidator*(
self.attestationPool[].addAttestation(
attestation, [attester_index], sig, wallSlot)
self.validatorMonitor[].registerAttestation(
src, wallTime, attestation, attester_index)
beacon_attestations_received.inc()
beacon_attestation_delay.observe(delay.toFloatSeconds())
ok()
else:
debug "Dropping attestation", validationError = v.error
@ -295,7 +302,7 @@ proc attestationValidator*(
err(v.error())
proc aggregateValidator*(
self: ref Eth2Processor,
self: ref Eth2Processor, src: MsgSource,
signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationRes] {.async.} =
let wallTime = self.getCurrentBeaconTime()
var (afterGenesis, wallSlot) = wallTime.toSlot()
@ -334,6 +341,9 @@ proc aggregateValidator*(
self.attestationPool[].addAttestation(
signedAggregateAndProof.message.aggregate, attesting_indices, sig, wallSlot)
self.validatorMonitor[].registerAggregate(
src, wallTime, signedAggregateAndProof, attesting_indices)
beacon_aggregates_received.inc()
beacon_aggregate_delay.observe(delay.toFloatSeconds())
@ -345,8 +355,8 @@ proc aggregateValidator*(
err(v.error())
proc attesterSlashingValidator*(
self: var Eth2Processor, attesterSlashing: AttesterSlashing):
ValidationRes =
self: var Eth2Processor, src: MsgSource,
attesterSlashing: AttesterSlashing): ValidationRes =
logScope:
attesterSlashing = shortLog(attesterSlashing)
@ -359,6 +369,8 @@ proc attesterSlashingValidator*(
self.exitPool[].addMessage(attesterSlashing)
self.validatorMonitor[].registerAttesterSlashing(src, attesterSlashing)
beacon_attester_slashings_received.inc()
else:
debug "Dropping attester slashing", validationError = v.error
@ -367,8 +379,8 @@ proc attesterSlashingValidator*(
v
proc proposerSlashingValidator*(
self: var Eth2Processor, proposerSlashing: ProposerSlashing):
Result[void, ValidationError] =
self: var Eth2Processor, src: MsgSource,
proposerSlashing: ProposerSlashing): Result[void, ValidationError] =
logScope:
proposerSlashing = shortLog(proposerSlashing)
@ -377,18 +389,21 @@ proc proposerSlashingValidator*(
let v = self.exitPool[].validateProposerSlashing(proposerSlashing)
if v.isOk():
trace "Proposer slashing validated"
beacon_proposer_slashings_received.inc()
else:
debug "Dropping proposer slashing", validationError = v.error
self.exitPool[].addMessage(proposerSlashing)
self.validatorMonitor[].registerProposerSlashing(src, proposerSlashing)
beacon_proposer_slashings_received.inc()
else:
debug "Dropping proposer slashing", validationError = v.error
beacon_proposer_slashings_dropped.inc(1, [$v.error[0]])
v
proc voluntaryExitValidator*(
self: var Eth2Processor, signedVoluntaryExit: SignedVoluntaryExit):
Result[void, ValidationError] =
self: var Eth2Processor, src: MsgSource,
signedVoluntaryExit: SignedVoluntaryExit): Result[void, ValidationError] =
logScope:
signedVoluntaryExit = shortLog(signedVoluntaryExit)
@ -400,6 +415,9 @@ proc voluntaryExitValidator*(
self.exitPool[].addMessage(signedVoluntaryExit)
self.validatorMonitor[].registerVoluntaryExit(
src, signedVoluntaryExit.message)
beacon_voluntary_exits_received.inc()
else:
debug "Dropping voluntary exit", error = v.error
@ -408,7 +426,7 @@ proc voluntaryExitValidator*(
v
proc syncCommitteeMessageValidator*(
self: ref Eth2Processor,
self: ref Eth2Processor, src: MsgSource,
syncCommitteeMsg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex,
checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} =
@ -441,6 +459,9 @@ proc syncCommitteeMessageValidator*(
subcommitteeIdx,
positions)
self.validatorMonitor[].registerSyncCommitteeMessage(
src, wallTime, syncCommitteeMsg)
beacon_sync_committee_messages_received.inc()
ok()
@ -450,7 +471,7 @@ proc syncCommitteeMessageValidator*(
err(v.error())
proc contributionValidator*(
self: ref Eth2Processor,
self: ref Eth2Processor, src: MsgSource,
contributionAndProof: SignedContributionAndProof,
checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} =
let
@ -475,7 +496,12 @@ proc contributionValidator*(
return if v.isOk():
trace "Contribution validated"
self.syncCommitteeMsgPool[].addContribution(contributionAndProof, v.get)
self.syncCommitteeMsgPool[].addContribution(
contributionAndProof, v.get()[0])
self.validatorMonitor[].registerSyncContribution(
src, wallTime, contributionAndProof, v.get()[1])
beacon_sync_committee_contributions_received.inc()
ok()

View File

@ -856,8 +856,7 @@ proc validateContribution*(
msg: SignedContributionAndProof,
wallTime: BeaconTime,
checkSignature: bool):
Future[Result[CookedSig, ValidationError]] {.async.} =
Future[Result[(CookedSig, seq[ValidatorIndex]), ValidationError]] {.async.} =
let
syncCommitteeSlot = msg.message.contribution.slot
@ -901,6 +900,12 @@ proc validateContribution*(
# that is, any(contribution.aggregation_bits).
return errReject("SignedContributionAndProof: aggregation bits empty")
# TODO we take a copy of the participants to avoid the data going stale
# between validation and use - nonetheless, a design that avoids it and
# stays safe would be nice
let participants = dag.syncCommitteeParticipants(
msg.message.contribution.slot, subcommitteeIdx)
let sig = if checkSignature:
let deferredCrypto = batchCrypto.scheduleContributionChecks(
fork, genesis_validators_root, msg, subcommitteeIdx, dag)
@ -951,4 +956,4 @@ proc validateContribution*(
return errReject("SyncCommitteeMessage: unable to load signature")
sig.get()
return ok(sig)
return ok((sig, participants))

View File

@ -31,7 +31,7 @@ import
./networking/[eth2_discovery, eth2_network, network_metadata],
./gossip_processing/[eth2_processor, block_processor, consensus_manager],
./validators/[
validator_duties, validator_pool,
validator_duties, validator_monitor, validator_pool,
slashing_protection, keystore_management],
./sync/[sync_protocol],
./rpc/[rest_api, rpc_api],
@ -294,11 +294,19 @@ proc init(T: type BeaconNode,
info "Loading block dag from database", path = config.databaseDir
let
validatorMonitor = newClone(ValidatorMonitor.init(
config.validatorMonitorAuto))
for key in config.validatorMonitorPubkeys:
validatorMonitor[].addMonitor(key, none(ValidatorIndex))
let
chainDagFlags = if config.verifyFinalization: {verifyFinalization}
else: {}
dag = ChainDAGRef.init(cfg, db, chainDagFlags, onBlockAdded, onHeadChanged,
onChainReorg, onFinalization)
dag = ChainDAGRef.init(
cfg, db, validatorMonitor, chainDagFlags, onBlockAdded, onHeadChanged,
onChainReorg, onFinalization)
quarantine = newClone(Quarantine.init())
databaseGenesisValidatorsRoot =
getStateField(dag.headState.data, genesis_validators_root)
@ -408,7 +416,7 @@ proc init(T: type BeaconNode,
)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, getBeaconTime)
rng, taskpool, consensusManager, validatorMonitor, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] =
# The design with a callback for block verification is unusual compared
@ -416,12 +424,13 @@ proc init(T: type BeaconNode,
# taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, BlockError]]("blockVerifier")
blockProcessor[].addBlock(signedBlock, resfut)
blockProcessor[].addBlock(MsgSource.gossip, signedBlock, resfut)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, dag, attestationPool, exitPool, validatorPool,
syncCommitteeMsgPool, quarantine, rng, getBeaconTime, taskpool)
blockProcessor, validatorMonitor, dag, attestationPool, exitPool,
validatorPool, syncCommitteeMsgPool, quarantine, rng, getBeaconTime,
taskpool)
syncManager = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, getBackfillSlot, blockVerifier)
@ -453,6 +462,7 @@ proc init(T: type BeaconNode,
gossipState: GossipState.Disconnected,
beaconClock: beaconClock,
onAttestationSent: onAttestationSent,
validatorMonitor: validatorMonitor
)
debug "Loading validators", validatorsDir = config.validatorsDir()
@ -464,9 +474,11 @@ proc init(T: type BeaconNode,
# we start with a reasonable ENR
let wallSlot = node.beaconClock.now().slotOrZero()
for validator in node.attachedValidators[].validators.values():
if config.validatorMonitorAuto:
validatorMonitor[].addMonitor(validator.pubkey, validator.index)
if validator.index.isSome():
node.actionTracker.knownValidators[validator.index.get()] = wallSlot
let
stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot)
# Here, we also set the correct ENR should we be in all subnets mode!
@ -1011,7 +1023,8 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addValidator(
getBeaconBlocksTopic(node.dag.forkDigests.phase0),
proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].blockValidator(signedBlock)))
toValidationResult(node.processor[].blockValidator(
MsgSource.gossip, signedBlock)))
template installPhase0Validators(digest: auto) =
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
@ -1022,32 +1035,37 @@ proc installMessageValidators(node: BeaconNode) =
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): Future[ValidationResult] {.async.} =
return toValidationResult(
await node.processor.attestationValidator(attestation, subnet_id)))
await node.processor.attestationValidator(
MsgSource.gossip, attestation, subnet_id)))
node.network.addAsyncValidator(
getAggregateAndProofsTopic(digest),
proc(signedAggregateAndProof: SignedAggregateAndProof):
Future[ValidationResult] {.async.} =
return toValidationResult(
await node.processor.aggregateValidator(signedAggregateAndProof)))
await node.processor.aggregateValidator(
MsgSource.gossip, signedAggregateAndProof)))
node.network.addValidator(
getAttesterSlashingsTopic(digest),
proc (attesterSlashing: AttesterSlashing): ValidationResult =
toValidationResult(
node.processor[].attesterSlashingValidator(attesterSlashing)))
node.processor[].attesterSlashingValidator(
MsgSource.gossip, attesterSlashing)))
node.network.addValidator(
getProposerSlashingsTopic(digest),
proc (proposerSlashing: ProposerSlashing): ValidationResult =
toValidationResult(
node.processor[].proposerSlashingValidator(proposerSlashing)))
node.processor[].proposerSlashingValidator(
MsgSource.gossip, proposerSlashing)))
node.network.addValidator(
getVoluntaryExitsTopic(digest),
proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult =
toValidationResult(
node.processor[].voluntaryExitValidator(signedVoluntaryExit)))
node.processor[].voluntaryExitValidator(
MsgSource.gossip, signedVoluntaryExit)))
installPhase0Validators(node.dag.forkDigests.phase0)
@ -1059,12 +1077,14 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addValidator(
getBeaconBlocksTopic(node.dag.forkDigests.altair),
proc (signedBlock: altair.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].blockValidator(signedBlock)))
toValidationResult(node.processor[].blockValidator(
MsgSource.gossip, signedBlock)))
node.network.addValidator(
getBeaconBlocksTopic(node.dag.forkDigests.merge),
proc (signedBlock: merge.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].blockValidator(signedBlock)))
toValidationResult(node.processor[].blockValidator(
MsgSource.gossip, signedBlock)))
template installSyncCommitteeeValidators(digest: auto) =
for committeeIdx in allSyncSubcommittees():
@ -1075,13 +1095,14 @@ proc installMessageValidators(node: BeaconNode) =
# This proc needs to be within closureScope; don't lift out of loop.
proc(msg: SyncCommitteeMessage): Future[ValidationResult] {.async.} =
return toValidationResult(
await node.processor.syncCommitteeMessageValidator(msg, idx)))
await node.processor.syncCommitteeMessageValidator(
MsgSource.gossip, msg, idx)))
node.network.addAsyncValidator(
getSyncCommitteeContributionAndProofTopic(digest),
proc(msg: SignedContributionAndProof): Future[ValidationResult] {.async.} =
return toValidationResult(
await node.processor.contributionValidator(msg)))
await node.processor.contributionValidator(MsgSource.gossip, msg)))
installSyncCommitteeeValidators(node.dag.forkDigests.altair)
installSyncCommitteeeValidators(node.dag.forkDigests.merge)

View File

@ -543,6 +543,12 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
request.slot, subnet_id, request.validator_index,
request.is_aggregator)
let validator_pubkey = getStateField(
node.dag.headState.data, validators).asSeq()[request.validator_index].pubkey
node.validatorMonitor[].addAutoMonitor(
validator_pubkey, ValidatorIndex(request.validator_index))
return RestApiResponse.jsonMsgResponse(BeaconCommitteeSubscriptionSuccess)
# https://ethereum.github.io/beacon-APIs/#/Validator/prepareSyncCommitteeSubnets
@ -569,6 +575,12 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
lenu64(getStateField(node.dag.headState.data, validators)):
return RestApiResponse.jsonError(Http400,
InvalidValidatorIndexValueError)
let validator_pubkey = getStateField(
node.dag.headState.data, validators).asSeq()[item.validator_index].pubkey
node.validatorMonitor[].addAutoMonitor(
validator_pubkey, ValidatorIndex(item.validator_index))
subs
warn "Sync committee subscription request served, but not implemented"

View File

@ -369,6 +369,13 @@ func is_active_validator*(validator: Validator, epoch: Epoch): bool =
## Check if ``validator`` is active
validator.activation_epoch <= epoch and epoch < validator.exit_epoch
func is_exited_validator*(validator: Validator, epoch: Epoch): bool =
## Check if ``validator`` is exited
validator.exit_epoch <= epoch
func is_withdrawable_validator*(validator: Validator, epoch: Epoch): bool =
epoch >= validator.withdrawable_epoch
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/beacon-chain.md#get_active_validator_indices
iterator get_active_validator_indices*(state: ForkyBeaconState, epoch: Epoch):
ValidatorIndex =

View File

@ -218,7 +218,7 @@ proc sendAttestation*(
# libp2p calls the data handler for any subscription on the subnet
# topic, it does not perform validation.
let res = await node.processor.attestationValidator(
attestation, subnet_id, checkSignature)
MsgSource.api, attestation, subnet_id, checkSignature)
return
if res.isGoodForSending:
@ -241,8 +241,9 @@ proc sendSyncCommitteeMessage*(
# validation will also register the message with the sync committee
# message pool. Notably, although libp2p calls the data handler for
# any subscription on the subnet topic, it does not perform validation.
let res = await node.processor.syncCommitteeMessageValidator(msg, subcommitteeIdx,
checkSignature)
let res = await node.processor.syncCommitteeMessageValidator(
MsgSource.api, msg, subcommitteeIdx, checkSignature)
return
if res.isGoodForSending:
node.network.broadcastSyncCommitteeMessage(msg, subcommitteeIdx)
@ -340,7 +341,7 @@ proc sendSyncCommitteeContribution*(
msg: SignedContributionAndProof,
checkSignature: bool): Future[SendResult] {.async.} =
let res = await node.processor.contributionValidator(
msg, checkSignature)
MsgSource.api, msg, checkSignature)
return
if res.isGoodForSending:
@ -571,7 +572,7 @@ proc proposeBlock(node: BeaconNode,
# storeBlock puts the block in the chaindag, and if accepted, takes care
# of side effects such as event api notification
newBlockRef = node.blockProcessor[].storeBlock(
signedBlock, wallTime.slotOrZero())
MsgSource.api, wallTime, signedBlock)
if newBlockRef.isErr:
warn "Unable to add proposed block to block pool",
@ -1156,7 +1157,7 @@ proc sendAggregateAndProof*(node: BeaconNode,
proof: SignedAggregateAndProof): Future[SendResult] {.
async.} =
# REST/JSON-RPC API helper procedure.
let res = await node.processor.aggregateValidator(proof)
let res = await node.processor.aggregateValidator(MsgSource.api, proof)
return
if res.isGoodForSending:
node.network.broadcastAggregateAndProof(proof)
@ -1176,7 +1177,7 @@ proc sendAggregateAndProof*(node: BeaconNode,
proc sendVoluntaryExit*(node: BeaconNode,
exit: SignedVoluntaryExit): SendResult =
# REST/JSON-RPC API helper procedure.
let res = node.processor[].voluntaryExitValidator(exit)
let res = node.processor[].voluntaryExitValidator(MsgSource.api, exit)
if res.isGoodForSending:
node.network.broadcastVoluntaryExit(exit)
ok()
@ -1188,7 +1189,7 @@ proc sendVoluntaryExit*(node: BeaconNode,
proc sendAttesterSlashing*(node: BeaconNode,
slashing: AttesterSlashing): SendResult =
# REST/JSON-RPC API helper procedure.
let res = node.processor[].attesterSlashingValidator(slashing)
let res = node.processor[].attesterSlashingValidator(MsgSource.api, slashing)
if res.isGoodForSending:
node.network.broadcastAttesterSlashing(slashing)
ok()
@ -1200,7 +1201,7 @@ proc sendAttesterSlashing*(node: BeaconNode,
proc sendProposerSlashing*(node: BeaconNode,
slashing: ProposerSlashing): SendResult =
# REST/JSON-RPC API helper procedure.
let res = node.processor[].proposerSlashingValidator(slashing)
let res = node.processor[].proposerSlashingValidator(MsgSource.api, slashing)
if res.isGoodForSending:
node.network.broadcastProposerSlashing(slashing)
ok()
@ -1236,7 +1237,7 @@ proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock
wallTime = node.beaconClock.now()
accepted = withBlck(forked):
let newBlockRef = node.blockProcessor[].storeBlock(
blck, wallTime.slotOrZero())
MsgSource.api, wallTime, blck)
# The boolean we return tells the caller whether the block was integrated
# into the chain

View File

@ -0,0 +1,749 @@
import
std/[options, tables],
metrics, chronicles,
../spec/[crypto, beaconstate, forks, helpers, presets],
../spec/datatypes/[phase0, altair],
../beacon_clock
logScope: topics = "val_mon"
# Validator monitoring based on the same feature in Lighthouse - using the same
# metrics allows users to more easily reuse monitoring setups
declareGauge validator_monitor_balance_gwei,
"The validator's balance in gwei.", labels = ["validator"]
declareGauge validator_monitor_effective_balance_gwei,
"The validator's effective balance in gwei.", labels = ["validator"]
declareGauge validator_monitor_slashed,
"Set to 1 if the validator is slashed.", labels = ["validator"]
declareGauge validator_monitor_active,
"Set to 1 if the validator is active.", labels = ["validator"]
declareGauge validator_monitor_exited,
"Set to 1 if the validator is exited.", labels = ["validator"]
declareGauge validator_monitor_withdrawable,
"Set to 1 if the validator is withdrawable.", labels = ["validator"]
declareGauge validator_activation_eligibility_epoch,
"Set to the epoch where the validator will be eligible for activation.", labels = ["validator"]
declareGauge validator_activation_epoch,
"Set to the epoch where the validator will activate.", labels = ["validator"]
declareGauge validator_exit_epoch,
"Set to the epoch where the validator will exit.", labels = ["validator"]
declareGauge validator_withdrawable_epoch,
"Set to the epoch where the validator will be withdrawable.", labels = ["validator"]
declareCounter validator_monitor_prev_epoch_on_chain_attester_hit,
"Incremented if the validator is flagged as a previous epoch attester during per epoch processing", labels = ["validator"]
declareCounter validator_monitor_prev_epoch_on_chain_attester_miss,
"Incremented if the validator is not flagged as a previous epoch attester during per epoch processing", labels = ["validator"]
declareCounter validator_monitor_prev_epoch_on_chain_head_attester_hit,
"Incremented if the validator is flagged as a previous epoch head attester during per epoch processing", labels = ["validator"]
declareCounter validator_monitor_prev_epoch_on_chain_head_attester_miss,
"Incremented if the validator is not flagged as a previous epoch head attester during per epoch processing", labels = ["validator"]
declareCounter validator_monitor_prev_epoch_on_chain_target_attester_hit,
"Incremented if the validator is flagged as a previous epoch target attester during per epoch processing", labels = ["validator"]
declareCounter validator_monitor_prev_epoch_on_chain_target_attester_miss,
"Incremented if the validator is not flagged as a previous epoch target attester during per epoch processing", labels = ["validator"]
declareCounter validator_monitor_prev_epoch_on_chain_source_attester_hit,
"Incremented if the validator is flagged as a previous epoch source attester during per epoch processing", labels = ["validator"]
declareCounter validator_monitor_prev_epoch_on_chain_source_attester_miss,
"Incremented if the validator is not flagged as a previous epoch source attester during per epoch processing", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_attestations_total,
"The number of unagg. attestations seen in the previous epoch.", labels = ["validator"]
declareHistogram validator_monitor_prev_epoch_attestations_min_delay_seconds,
"The min delay between when the validator should send the attestation and when it was received.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_attestation_aggregate_inclusions,
"The count of times an attestation was seen inside an aggregate.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_attestation_block_inclusions,
"The count of times an attestation was seen inside a block.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_attestation_block_min_inclusion_distance,
"The minimum inclusion distance observed for the inclusion of an attestation in a block.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_aggregates_total,
"The number of aggregates seen in the previous epoch.", labels = ["validator"]
declareHistogram validator_monitor_prev_epoch_aggregates_min_delay_seconds,
"The min delay between when the validator should send the aggregate and when it was received.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_exits_total,
"The number of exits seen in the previous epoch.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_proposer_slashings_total,
"The number of proposer slashings seen in the previous epoch.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_attester_slashings_total,
"The number of attester slashings seen in the previous epoch.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_sync_committee_messages_total,
"The number of sync committee messages seen in the previous epoch.", labels = ["validator"]
declareHistogram validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds,
"The min delay between when the validator should send the sync committee message and when it was received.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_sync_contribution_inclusions,
"The count of times a sync signature was seen inside a sync contribution.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_sync_signature_block_inclusions,
"The count of times a sync signature was seen inside a block.", labels = ["validator"]
declareGauge validator_monitor_prev_epoch_sync_contributions_total,
"The number of sync contributions seen in the previous epoch.", labels = ["validator"]
declareHistogram validator_monitor_prev_epoch_sync_contribution_min_delay_seconds,
"The min delay between when the validator should send the sync contribution and when it was received.", labels = ["validator"]
declareGauge validator_monitor_validator_in_current_sync_committee,
"Is the validator in the current sync committee (1 for true and 0 for false)", labels = ["validator"]
declareGauge validator_monitor_validators_total,
"Count of validators that are specifically monitored by this beacon node"
declareCounter validator_monitor_unaggregated_attestation_total,
"Number of unaggregated attestations seen", labels = ["src", "validator"]
declareHistogram validator_monitor_unaggregated_attestation_delay_seconds,
"The delay between when the validator should send the attestation and when it was received.", labels = ["src", "validator"]
declareCounter validator_monitor_sync_committee_messages_total,
"Number of sync committee messages seen", labels = ["src", "validator"]
declareHistogram validator_monitor_sync_committee_messages_delay_seconds,
"The delay between when the validator should send the sync committee message and when it was received.", labels = ["src", "validator"]
declareCounter validator_monitor_sync_contributions_total,
"Number of sync contributions seen", labels = ["src", "validator"]
declareHistogram validator_monitor_sync_contributions_delay_seconds,
"The delay between when the aggregator should send the sync contribution and when it was received.", labels = ["src", "validator"]
declareCounter validator_monitor_aggregated_attestation_total,
"Number of aggregated attestations seen", labels = ["src", "validator"]
declareHistogram validator_monitor_aggregated_attestation_delay_seconds,
"The delay between then the validator should send the aggregate and when it was received.", labels = ["src", "validator"]
declareCounter validator_monitor_attestation_in_aggregate_total,
"Number of times an attestation has been seen in an aggregate", labels = ["src", "validator"]
declareCounter validator_monitor_sync_committee_message_in_contribution_total,
"Number of times a sync committee message has been seen in a sync contribution", labels = ["src", "validator"]
declareHistogram validator_monitor_attestation_in_aggregate_delay_seconds,
"The delay between when the validator should send the aggregate and when it was received.", labels = ["src", "validator"]
declareCounter validator_monitor_attestation_in_block_total,
"Number of times an attestation has been seen in a block", labels = ["src", "validator"]
declareCounter validator_monitor_sync_committee_message_in_block_total,
"Number of times a validator's sync committee message has been seen in a sync aggregate", labels = ["src", "validator"]
declareGauge validator_monitor_attestation_in_block_delay_slots,
"The excess slots (beyond the minimum delay) between the attestation slot and the block slot.", labels = ["src", "validator"]
declareCounter validator_monitor_beacon_block_total,
"Number of beacon blocks seen", labels = ["src", "validator"]
declareHistogram validator_monitor_beacon_block_delay_seconds,
"The delay between when the validator should send the block and when it was received.", labels = ["src", "validator"]
declareCounter validator_monitor_exit_total,
"Number of beacon exits seen", labels = ["src", "validator"]
declareCounter validator_monitor_proposer_slashing_total,
"Number of proposer slashings seen", labels = ["src", "validator"]
declareCounter validator_monitor_attester_slashing_total,
"Number of attester slashings seen", labels = ["src", "validator"]
type
EpochSummary = object
## Similar to the state transition, we collect everything that happens in
## an epoch during that epoch and the one that follows it, then at the end
## of the monitoring period, we report the statistics to the user.
## In case of a deep reorg (>1 epoch) this information will be off, but will
## repair itself in the next epoch, which is a reasonable trade-off between
## correctness and utility.
##
## It should be noted that some metrics may be slightly inaccurate given the
## nature of gossip processing: in particular, old messages may reappear
## on the network and therefore be double-counted.
attestations: int64
attestation_min_delay: Option[Duration]
attestation_aggregate_inclusions: int64
attestation_block_inclusions: int64
attestation_min_block_inclusion_distance: Option[uint64]
aggregates: int64
aggregate_min_delay: Option[Duration]
sync_committee_messages: int64
sync_committee_message_min_delay: Option[Duration]
sync_signature_block_inclusions: int64
sync_signature_contribution_inclusions: int64
sync_contributions: int64
sync_contribution_min_delay: Option[Duration]
exits: int64
proposer_slashings: int64
attester_slashings: int64
MonitoredValidator = object
id: string # A short id is used above all for metrics
pubkey: ValidatorPubKey
index: Option[ValidatorIndex]
summaries: array[2, EpochSummary] # We monitor the current and previous epochs
ValidatorMonitor* = object
epoch: Epoch # The most recent epoch seen in monitoring
monitors: Table[ValidatorPubKey, ref MonitoredValidator]
indices: Table[uint64, ref MonitoredValidator]
knownValidators: int
autoRegister: bool
totals: bool
MsgSource* {.pure.} = enum
# From where a message is being sent - for compatibility with lighthouse, we
# don't differentiate sync and requests, but rather use "gossip" - we also
# don't differentiate in-beacon validators but use "api" as if they were
# VC:s - this simplifies the initial implementation but should likely be
# expanded in the future.
gossip = "gossip"
api = "api"
template toGaugeValue(v: bool): int64 =
if v: 1 else: 0
proc update_if_lt[T](current: var Option[T], val: T) =
if current.isNone() or val < current.get():
current = some(val)
proc addMonitor*(
self: var ValidatorMonitor, pubkey: ValidatorPubKey,
index: Option[ValidatorIndex]) =
if pubkey in self.monitors:
return
let id = shortLog(pubkey)
let monitor = (ref MonitoredValidator)(id: id, index: index)
self.monitors[pubkey] = monitor
if index.isSome():
self.indices[index.get().uint64] = monitor
template metricId: string =
mixin self, id
if self.totals: "total" else: id
proc addAutoMonitor*(
self: var ValidatorMonitor, pubkey: ValidatorPubKey,
index: ValidatorIndex) =
if not self.autoRegister:
return
# automatic monitors must be registered with index - we don't look for them in
# the state
self.addMonitor(pubkey, some(index))
info "Started monitoring validator",
validator = shortLog(pubkey), pubkey, index
proc init*(T: type ValidatorMonitor, autoRegister = false, totals = false): T =
T(autoRegister: autoRegister, totals: totals)
template summaryIdx(epoch: Epoch): int = (epoch.uint64 mod 2).int
template withEpochSummary(
self: var ValidatorMonitor, monitor: var MonitoredValidator,
epochParam: Epoch, body: untyped) =
let epoch = epochParam
if epoch == self.epoch or epoch + 1 == self.epoch:
template epochSummary: untyped {.inject.} = monitor.summaries[summaryIdx(epoch)]
body
proc updateEpoch(self: var ValidatorMonitor, epoch: Epoch) =
# Called at the start of a new epoch to provide a summary of the events 2
# epochs back then clear the slate for new reporting.
if epoch <= self.epoch:
return
let
clearMonitor = epoch > self.epoch + 1
# index of the EpochSummary that we'll first report, then clear
summaryIdx = epoch.summaryIdx
if clearMonitor:
# More than one epoch passed since the last check which makes it difficult
# to report correctly with the amount of data we store - skip this round
# and hope things improve
notice "Resetting validator monitoring", epoch, monitorEpoch = self.epoch
self.epoch = epoch
validator_monitor_validators_total.set(self.monitors.len().int64)
for (_, monitor) in self.monitors.mpairs():
if clearMonitor:
monitor.summaries = default(type(monitor.summaries))
continue
let
id = monitor.id
let summary = monitor.summaries[summaryIdx]
validator_monitor_prev_epoch_attestations_total.set(
summary.attestations, [metricId])
if summary.attestation_min_delay.isSome():
validator_monitor_prev_epoch_attestations_min_delay_seconds.observe(
summary.attestation_min_delay.get().toFloatSeconds(), [metricId])
validator_monitor_prev_epoch_attestation_aggregate_inclusions.set(
summary.attestation_aggregate_inclusions, [metricId])
validator_monitor_prev_epoch_attestation_block_inclusions.set(
summary.attestation_block_inclusions, [metricId])
if summary.attestation_min_block_inclusion_distance.isSome():
validator_monitor_prev_epoch_attestation_block_min_inclusion_distance.set(
summary.attestation_min_block_inclusion_distance.get().int64, [metricId])
validator_monitor_prev_epoch_sync_committee_messages_total.set(
summary.sync_committee_messages, [metricId])
if summary.sync_committee_message_min_delay.isSome():
validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds.observe(
summary.sync_committee_message_min_delay.get().toFloatSeconds(), [metricId])
validator_monitor_prev_epoch_sync_contribution_inclusions.set(
summary.sync_signature_contribution_inclusions, [metricId])
validator_monitor_prev_epoch_sync_signature_block_inclusions.set(
summary.sync_signature_block_inclusions, [metricId])
validator_monitor_prev_epoch_sync_contributions_total.set(
summary.sync_contributions, [metricId])
if summary.sync_contribution_min_delay.isSome():
validator_monitor_prev_epoch_sync_contribution_min_delay_seconds.observe(
summary.sync_contribution_min_delay.get().toFloatSeconds(), [metricId])
validator_monitor_prev_epoch_aggregates_total.set(
summary.aggregates, [metricId])
if summary.aggregate_min_delay.isSome():
validator_monitor_prev_epoch_aggregates_min_delay_seconds.observe(
summary.aggregate_min_delay.get().toFloatSeconds(), [metricId])
validator_monitor_prev_epoch_exits_total.set(
summary.exits, [metricId])
validator_monitor_prev_epoch_proposer_slashings_total.set(
summary.proposer_slashings, [metricId])
validator_monitor_prev_epoch_attester_slashings_total.set(
summary.attester_slashings, [metricId])
monitor.summaries[summaryIdx] = default(type(monitor.summaries[summaryIdx]))
func is_active_unslashed_in_previous_epoch(status: RewardStatus): bool =
let flags = status.flags
RewardFlags.isActiveInPreviousEpoch in flags and
RewardFlags.isSlashed notin flags
func is_previous_epoch_source_attester(status: RewardStatus): bool =
status.is_previous_epoch_attester.isSome()
func is_previous_epoch_head_attester(status: RewardStatus): bool =
RewardFlags.isPreviousEpochHeadAttester in status.flags
func is_previous_epoch_target_attester(status: RewardStatus): bool =
RewardFlags.isPreviousEpochTargetAttester in status.flags
func is_previous_epoch_source_attester(status: ParticipationInfo): bool =
ParticipationFlag.timelySourceAttester in status.flags
func is_previous_epoch_head_attester(status: ParticipationInfo): bool =
ParticipationFlag.timelyHeadAttester in status.flags
func is_previous_epoch_target_attester(status: ParticipationInfo): bool =
ParticipationFlag.timelyTargetAttester in status.flags
func is_active_unslashed_in_previous_epoch(status: ParticipationInfo): bool =
ParticipationFlag.eligible in status.flags
proc registerEpochInfo*(
self: var ValidatorMonitor, epoch: Epoch, info: ForkedEpochInfo,
state: ForkyBeaconState) =
# Register rewards, as computed during the epoch transition that lands in
# `epoch` - the rewards will be from attestations that were created at
# `epoch - 2`.
if epoch < 2 or self.monitors.len == 0:
return
withEpochInfo(info):
for pubkey, monitor in self.monitors:
if monitor.index.isNone:
continue
let
idx = monitor.index.get()
if info.validators.lenu64 <= idx.uint64:
# No summary for this validator (yet?)
debug "No reward information for validator",
id = monitor.id, idx
continue
let
prev_epoch = epoch - 2
id = monitor.id
let status = info.validators[idx]
if not status.is_active_unslashed_in_previous_epoch():
# Monitored validator is not active, due to awaiting activation
# or being exited/withdrawn. Do not attempt to report on its
# attestations.
continue
let
previous_epoch_matched_source = status.is_previous_epoch_source_attester()
previous_epoch_matched_target = status.is_previous_epoch_target_attester()
previous_epoch_matched_head = status.is_previous_epoch_head_attester()
# Indicates if any attestation made it on-chain.
# For Base states, this will be *any* attestation whatsoever. For Altair states,
# this will be any attestation that matched a "timely" flag.
if previous_epoch_matched_source:
# These two metrics are the same - keep both around for LH compatibility
validator_monitor_prev_epoch_on_chain_attester_hit.inc(1, [metricId])
validator_monitor_prev_epoch_on_chain_source_attester_hit.inc(1, [metricId])
info "Previous epoch attestation included",
timely_source = previous_epoch_matched_source,
timely_target = previous_epoch_matched_target,
timely_head = previous_epoch_matched_head,
epoch = prev_epoch,
validator = id
else:
validator_monitor_prev_epoch_on_chain_attester_miss.inc(1, [metricId])
validator_monitor_prev_epoch_on_chain_source_attester_miss.inc(1, [metricId])
warn "Previous epoch attestation missing",
epoch = prev_epoch,
validator = id
# Indicates if any on-chain attestation hit the head.
if previous_epoch_matched_head:
validator_monitor_prev_epoch_on_chain_head_attester_hit.inc(1, [metricId])
else:
validator_monitor_prev_epoch_on_chain_head_attester_miss.inc(1, [metricId])
notice "Attestation failed to match head",
epoch = prev_epoch,
validator = id
# Indicates if any on-chain attestation hit the target.
if previous_epoch_matched_target:
validator_monitor_prev_epoch_on_chain_target_attester_hit.inc(1, [metricId])
else:
validator_monitor_prev_epoch_on_chain_target_attester_miss.inc(1, [metricId])
notice "Attestation failed to match target",
epoch = prev_epoch,
validator = id
when state isnot phase0.BeaconState: # altair+
# Indicates the number of sync committee signatures that made it into
# a sync aggregate in the current_epoch (state.epoch - 1).
# Note: Unlike attestations, sync committee signatures must be included in the
# immediate next slot. Hence, num included sync aggregates for `state.epoch - 1`
# is available right after state transition to state.epoch.
let current_epoch = epoch - 1
if state.current_sync_committee.pubkeys.data.contains(pubkey):
validator_monitor_validator_in_current_sync_committee.set(1, [metricId])
self.withEpochSummary(monitor[], current_epoch):
info "Current epoch sync signatures",
included = epochSummary.sync_signature_block_inclusions,
expected = SLOTS_PER_EPOCH,
epoch = current_epoch,
validator = id
else:
validator_monitor_validator_in_current_sync_committee.set(0, [metricId])
debug "Validator isn't part of the current sync committee",
epoch = current_epoch,
validator = id
self.updateEpoch(epoch)
proc registerState*(self: var ValidatorMonitor, state: ForkyBeaconState) =
# Update indices for the validators we're monitoring
for v in self.knownValidators..<state.validators.len:
self.monitors.withValue(state.validators[v].pubkey, monitor):
monitor[][].index = some(ValidatorIndex(v))
self.indices[uint64(v)] = monitor[]
info "Started monitoring validator",
validator = monitor[][].id, pubkey = state.validators[v].pubkey, index = v
self.knownValidators = state.validators.len
let
current_epoch = state.slot.epoch
# Update metrics for monitored validators according to the latest rewards
for (_, monitor) in self.monitors.mpairs():
if not monitor[].index.isSome():
continue
let idx = monitor[].index.get()
if state.balances.lenu64 <= idx.uint64:
continue
let id = monitor[].id
validator_monitor_balance_gwei.set(
state.balances[idx].toGaugeValue(), [metricId])
validator_monitor_effective_balance_gwei.set(
state.validators[idx].effective_balance.toGaugeValue(), [metricId])
validator_monitor_slashed.set(
state.validators[idx].slashed.toGaugeValue(), [metricId])
validator_monitor_active.set(
is_active_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId])
validator_monitor_exited.set(
is_exited_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId])
validator_monitor_withdrawable.set(
is_withdrawable_validator(state.validators[idx], current_epoch).toGaugeValue(), [metricId])
validator_activation_eligibility_epoch.set(
state.validators[idx].activation_eligibility_epoch.toGaugeValue(), [metricId])
validator_activation_epoch.set(
state.validators[idx].activation_epoch.toGaugeValue(), [metricId])
validator_exit_epoch.set(
state.validators[idx].exit_epoch.toGaugeValue(), [metricId])
validator_withdrawable_epoch.set(
state.validators[idx].withdrawable_epoch.toGaugeValue(), [metricId])
template withMonitor(self: var ValidatorMonitor, key: ValidatorPubKey, body: untyped): untyped =
self.monitors.withValue(key, valuex):
template monitor: untyped {.inject.} = valuex[][]
body
template withMonitor(self: var ValidatorMonitor, idx: uint64, body: untyped): untyped =
self.indices.withValue(idx, valuex):
template monitor: untyped {.inject.} = valuex[][]
body
template withMonitor(self: var ValidatorMonitor, idx: ValidatorIndex, body: untyped): untyped =
withMonitor(self, idx.uint64, body)
proc delay(slot: Slot, time: BeaconTime, offset: Duration): Duration =
time - slot.toBeaconTime(offset)
proc registerAttestation*(
self: var ValidatorMonitor,
src: MsgSource,
seen_timestamp: BeaconTime,
attestation: Attestation,
idx: ValidatorIndex) =
let
slot = attestation.data.slot
delay = delay(slot, seen_timestamp, attestationSlotOffset)
self.withMonitor(idx):
let id = monitor.id
validator_monitor_unaggregated_attestation_total.inc(1, [$src, metricId])
validator_monitor_unaggregated_attestation_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
info "Attestation seen",
attestation = shortLog(attestation),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.attestations += 1
update_if_lt(epochSummary.attestation_min_delay, delay)
proc registerAggregate*(
self: var ValidatorMonitor,
src: MsgSource,
seen_timestamp: BeaconTime,
signed_aggregate_and_proof: SignedAggregateAndProof,
attesting_indices: openArray[ValidatorIndex]) =
let
slot = signed_aggregate_and_proof.message.aggregate.data.slot
delay = delay(slot, seen_timestamp, aggregateSlotOffset)
aggregator_index = signed_aggregate_and_proof.message.aggregator_index
self.withMonitor(aggregator_index):
let id = monitor.id
validator_monitor_aggregated_attestation_total.inc(1, [$src, metricId])
validator_monitor_aggregated_attestation_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
info "Aggregated attestion seen",
aggregate = shortLog(signed_aggregate_and_proof.message.aggregate),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.aggregates += 1
update_if_lt(epochSummary.aggregate_min_delay, delay)
for idx in attesting_indices:
self.withMonitor(idx):
let id = monitor.id
validator_monitor_attestation_in_aggregate_total.inc(1, [$src, metricId])
validator_monitor_attestation_in_aggregate_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
info "Attestation included in aggregate",
aggregate = shortLog(signed_aggregate_and_proof.message.aggregate),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.attestation_aggregate_inclusions += 1
proc registerAttestationInBlock*(
self: var ValidatorMonitor,
data: AttestationData,
attesting_index: ValidatorIndex,
blck: auto) =
self.withMonitor(attesting_index):
let
id = monitor.id
inclusion_lag = (blck.slot - data.slot) - MIN_ATTESTATION_INCLUSION_DELAY
epoch = data.slot.epoch
validator_monitor_attestation_in_block_total.inc(1, ["block", id])
validator_monitor_attestation_in_block_delay_slots.set(inclusion_lag.int64, ["block", id])
info "Attestation included in block",
attestation_data = shortLog(data),
block_slot = blck.slot,
inclusion_lag_slots = inclusion_lag,
epoch = epoch, validator = id
self.withEpochSummary(monitor, epoch):
epochSummary.attestation_block_inclusions += 1
update_if_lt(
epochSummary.attestation_min_block_inclusion_distance, inclusion_lag)
proc registerBeaconBlock*(
self: var ValidatorMonitor,
src: MsgSource,
seen_timestamp: BeaconTime,
blck: auto) =
self.withMonitor(blck.proposer_index):
let
id = monitor.id
slot = blck.slot
delay = delay(slot, seen_timestamp, seconds(0))
validator_monitor_beacon_block_total.inc(1, [$src, metricId])
validator_monitor_beacon_block_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
info "Block seen",
blck = shortLog(blck), src, epoch = slot.epoch, validator = id
proc registerSyncCommitteeMessage*(
self: var ValidatorMonitor,
src: MsgSource,
seen_timestamp: BeaconTime,
sync_committee_message: SyncCommitteeMessage) =
self.withMonitor(sync_committee_message.validator_index):
let
id = monitor.id
slot = sync_committee_message.slot
delay = delay(slot, seen_timestamp, syncCommitteeMessageSlotOffset)
validator_monitor_sync_committee_messages_total.inc(1, [$src, metricId])
validator_monitor_sync_committee_messages_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
info "Sync committee message seen",
syncCommitteeMessage = shortLog(sync_committee_message.beacon_block_root),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.sync_committee_messages += 1
update_if_lt(epochSummary.sync_committee_message_min_delay, delay)
proc registerSyncContribution*(
self: var ValidatorMonitor,
src: MsgSource,
seen_timestamp: BeaconTime,
sync_contribution: SignedContributionAndProof,
participants: openArray[ValidatorIndex]) =
let
slot = sync_contribution.message.contribution.slot
beacon_block_root = sync_contribution.message.contribution.beacon_block_root
delay = delay(slot, seen_timestamp, syncContributionSlotOffset)
let aggregator_index = sync_contribution.message.aggregator_index
self.withMonitor(aggregator_index):
let id = monitor.id
validator_monitor_sync_contributions_total.inc(1, [$src, metricId])
validator_monitor_sync_contributions_delay_seconds.observe(
delay.toFloatSeconds(), [$src, metricId])
info "Sync contribution seen",
contribution = shortLog(sync_contribution.message.contribution),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.sync_contributions += 1
update_if_lt(epochSummary.sync_contribution_min_delay, delay)
for participant in participants:
self.withMonitor(participant):
let id = monitor.id
validator_monitor_sync_committee_message_in_contribution_total.inc(1, [$src, metricId])
info "Sync signature included in contribution",
contribution = shortLog(sync_contribution.message.contribution),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.sync_signature_contribution_inclusions += 1
proc registerSyncAggregateInBlock*(
self: var ValidatorMonitor, slot: Slot, beacon_block_root: Eth2Digest,
pubkey: ValidatorPubKey) =
self.withMonitor(pubkey):
let id = monitor.id
validator_monitor_sync_committee_message_in_block_total.inc(1, ["block", id])
info "Sync signature included in block",
head = beacon_block_root, slot = slot, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.sync_signature_block_inclusions += 1
proc registerVoluntaryExit*(
self: var ValidatorMonitor, src: MsgSource, exit: VoluntaryExit) =
self.withMonitor(exit.validator_index.ValidatorIndex):
let
id = monitor.id
epoch = exit.epoch
validator_monitor_exit_total.inc(1, [$src, metricId])
notice "Voluntary exit seen",
epoch = epoch, validator = id, src = src
self.withEpochSummary(monitor, epoch):
epochSummary.exits += 1
proc registerProposerSlashing*(
self: var ValidatorMonitor, src: MsgSource, slashing: ProposerSlashing) =
let proposer = slashing.signed_header_1.message.proposer_index
self.withMonitor(proposer):
let
id = monitor.id
slot = slashing.signed_header_1.message.slot
root_1 = hash_tree_root(slashing.signed_header_1.message)
root_2 = hash_tree_root(slashing.signed_header_2.message)
validator_monitor_proposer_slashing_total.inc(1, [$src, metricId])
warn "Proposer slashing seen",
root_2 = root_2, root_1 = root_1, slot = slot, validator = id, src = src
self.withEpochSummary(monitor, slot.epoch):
epochSummary.proposer_slashings += 1
proc registerAttesterSlashing*(
self: var ValidatorMonitor, src: MsgSource, slashing: AttesterSlashing) =
let data = slashing.attestation_1.data
for idx in slashing.attestation_2.attesting_indices:
if idx notin slashing.attestation_1.attesting_indices.asSeq:
continue
self.withMonitor(idx):
let
id = monitor.id
slot = data.slot
validator_monitor_attester_slashing_total.inc(1, [$src, metricId])
warn "Attester slashing seen",
slot = slot, validator = id, src = src
self.withEpochSummary(monitor, slot.epoch):
epochSummary.attester_slashings += 1

View File

@ -24,6 +24,7 @@
- [Perform a voluntary exit](./voluntary-exit.md)
- [Add an additional validator](./additional-validator.md)
- [Monitor attestation performance](./attestation-performance.md)
- [Monitor specific validators](./validator-monitor.md)
# How-to (misc)
- [Upgrade / downgrade Nimbus](./keep-updated.md)

View File

@ -0,0 +1,63 @@
# Validator monitoring
> ⚠️ This feature is currently in BETA - the details of its implementation may change in response to community feedback.
The validator monitoring feature allows for tracking the life-cycle and performance of one or more validators in detail. Monitoring can be carried out for any validator, with slightly more detail for validators that are running through the same beacon node.
Every time the validator performs a duty, the duty is recorded and the monitor keeps track of the reward-related events for having performed it. For example:
* When attesting, the attestation is added to an aggregate, then a block, before a reward is applied to the state
* When performing sync committee duties, likewise
Validator actions can be traced either through logging, or comprehensive metrics that allow for creating alerts in monitoring tools. The metrics are based on the same feature in [Lighthouse](https://lighthouse-book.sigmaprime.io/validator-monitoring.html), thus dashboards and alerts can be used with either client.
## Enabling validator monitoring
The monitor can be enabled either for all keys that are used with a particular beacon node, or for a specific list of validators, or both.
```
# Enable automatic monitoring of all validators used with this beacon node
./run-mainnet-beacon-node.sh --validator-monitor-auto
# Enable monitoring of one or more specific validators
./run-mainnet-beacon-node.sh \
--validator-monitor-pubkey=0xa1d1ad0714035353258038e964ae9675dc0252ee22cea896825c01458e1807bfad2f9969338798548d9858a571f7425c \
--validator-monitor-pubkey=0xb2ff4716ed345b05dd1dfc6a5a9fa70856d8c75dcc9e881dd2f766d5f891326f0d10e96f3a444ce6c912b69c22c6754d
# Publish metrics as totals for all monitored validators instead of each validator separately - used for limiting the load on metrics when monitoring many validators
./run-mainnet-beacon-node.sh --validator-monitor-totals
```
## Understanding monitoring
When a validator performs a duty, such as signing an attestation or a sync committee message, this is broadcast to the network. Other nodes pick it up and package the message into an aggregate and later a block. The block is included in the canonical chain and a reward is given two epochs (~13 minutes) later.
The monitor tracks these actions and will log each step at the `INF` level. If any step is missed, a `NOT` log is shown instead.
The typical lifecycle of an attestation might look something like the following:
```
INF 2021-11-22 11:32:44.228+01:00 Attestation seen topics="val_mon" attestation="(aggregation_bits: 0b0000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, data: (slot: 2656363, index: 11, beacon_block_root: \"bbe7fc25\", source: \"83010:a8a1b125\", target: \"83011:6db281cd\"), signature: \"b88ef2f2\")" src=api epoch=83011 validator=b93c290b
INF 2021-11-22 11:32:51.293+01:00 Attestation included in aggregate topics="val_mon" aggregate="(aggregation_bits: 0b1111111101011111001101111111101100111111110100111011111110110101110111111010111111011101111011101111111111101111100001111111100111, data: (slot: 2656363, index: 11, beacon_block_root: \"bbe7fc25\", source: \"83010:a8a1b125\", target: \"83011:6db281cd\"), signature: \"8576b3fc\")" src=gossip epoch=83011 validator=b93c290b
INF 2021-11-22 11:33:07.193+01:00 Attestation included in block attestation_data="(slot: 2656364, index: 9, beacon_block_root: \"c7761767\", source: \"83010:a8a1b125\", target: \"83011:6db281cd\")" block_slot=2656365 inclusion_lag_slots=0 epoch=83011 validator=b65b6e1b
```
The lifecycle of a particular message can be traced by following the `epoch=.... validator=...` fields in the message.
Failures at any point are recorded at a higher logging level, such as `NOT`(ice):
```
NOT 2021-11-17 20:53:42.108+01:00 Attestation failed to match head topics="chaindag" epoch=81972 validator=...
```
> ⚠️ It should be noted that metrics are tracked for the current history - in the case of a reorg on the chain - in particular a deep reorg - no attempt is made to revisit previously reported values. In the case that finality is delayed, the risk of stale metrics increases.
Likewise, many metrics, such as aggregation inclusion, reflect conditions on the network - it may happen that the same message is counted more than once under certain conditions.
## Monitoring metrics
The full list of metrics supported by the validator monitoring feature can be seen in the [source code](https://github.com/status-im/nimbus-eth2/blob/unstable/beacon_chain/validators/validator_monitor.nim) or by examining the metrics output:
```
curl -s localhost:8008/metrics | grep HELP.*validator_
```

View File

@ -200,8 +200,10 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) =
quit 1
echo "Initializing block pool..."
let dag = withTimerRet(timers[tInit]):
ChainDAGRef.init(cfg, db, {})
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = withTimerRet(timers[tInit]):
ChainDAGRef.init(cfg, db, validatorMonitor, {})
var
(start, ends) = dag.getSlotRange(conf.benchSlot, conf.benchSlots)
@ -473,7 +475,10 @@ proc cmdRewindState(conf: DbConf, cfg: RuntimeConfig) =
quit 1
echo "Initializing block pool..."
let dag = init(ChainDAGRef, cfg, db, {})
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, cfg, db, validatorMonitor, {})
let blckRef = dag.getRef(fromHex(Eth2Digest, conf.blockRoot))
if blckRef == nil:
@ -502,7 +507,8 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
echo "Initializing block pool..."
let
dag = init(ChainDAGRef, cfg, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, cfg, db, validatorMonitor, {})
let tmpState = assignClone(dag.headState)
@ -564,7 +570,9 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) =
quit 1
echo "# Initializing block pool..."
let dag = ChainDAGRef.init(cfg, db, {})
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = ChainDAGRef.init(cfg, db, validatorMonitor, {})
var
(start, ends) = dag.getSlotRange(conf.perfSlot, conf.perfSlots)
@ -702,7 +710,9 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
quit 1
echo "Initializing block pool..."
let dag = ChainDAGRef.init(cfg, db, {})
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = ChainDAGRef.init(cfg, db, validatorMonitor, {})
let outDb = SqStoreRef.init(conf.outDir, "validatorDb").expect("DB")
defer: outDb.close()

View File

@ -83,7 +83,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
putInitialDepositContractSnapshot(db, depositContractSnapshot)
var
dag = ChainDAGRef.init(cfg, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = ChainDAGRef.init(cfg, db, validatorMonitor, {})
eth1Chain = Eth1Chain.init(cfg, db)
merkleizer = depositContractSnapshot.createMerkleizer
taskpool = Taskpool.new()
@ -231,7 +232,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
doAssert res.isOk
syncCommitteePool[].addContribution(
signedContributionAndProof, res.get())
signedContributionAndProof, res.get()[0])
proc getNewBlock[T](
stateData: var StateData, slot: Slot, cache: var StateCache): T =

View File

@ -92,15 +92,15 @@ proc initialLoad(
forkedState[], forkedState[],
asTrusted(signedBlock)
)
let dag = ChainDAGRef.init(
defaultRuntimeConfig,
db,
updateFlags = {}
)
let fkChoice = newClone(ForkChoice.init(
dag.getFinalizedEpochRef(),
dag.finalizedHead.blck
))
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = ChainDAGRef.init(
defaultRuntimeConfig, db, validatorMonitor, {})
fkChoice = newClone(ForkChoice.init(
dag.getFinalizedEpochRef(),
dag.finalizedHead.blck
))
(dag, fkChoice)

View File

@ -58,7 +58,10 @@ suite "Attestation pool processing" & preset():
setup:
# Genesis state that results in 6 members per committee
var
dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6), {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(
ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6),
validatorMonitor, {})
taskpool = Taskpool.new()
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
quarantine = newClone(Quarantine.init())

View File

@ -66,7 +66,8 @@ func withDigest(blck: merge.TrustedBeaconBlock):
proc getTestStates(stateFork: BeaconStateFork): auto =
let
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
var testStates = getTestStates(dag.headState.data, stateFork)
# Ensure transitions beyond just adding validators and increasing slots
@ -312,7 +313,8 @@ suite "Beacon chain DB" & preset():
test "sanity check phase 0 getState rollback" & preset():
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
state = (ref ForkedHashedBeaconState)(
kind: BeaconStateFork.Phase0,
phase0Data: phase0.HashedBeaconState(data: phase0.BeaconState(
@ -334,7 +336,8 @@ suite "Beacon chain DB" & preset():
test "sanity check Altair and cross-fork getState rollback" & preset():
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
state = (ref ForkedHashedBeaconState)(
kind: BeaconStateFork.Altair,
altairData: altair.HashedBeaconState(data: altair.BeaconState(
@ -359,7 +362,8 @@ suite "Beacon chain DB" & preset():
test "sanity check Merge and cross-fork getState rollback" & preset():
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
state = (ref ForkedHashedBeaconState)(
kind: BeaconStateFork.Merge,
mergeData: merge.HashedBeaconState(data: merge.BeaconState(

View File

@ -27,7 +27,8 @@ suite "Block processor" & preset():
setup:
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
taskpool = Taskpool.new()
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
quarantine = newClone(Quarantine.init())
@ -40,11 +41,11 @@ suite "Block processor" & preset():
getTimeFn = proc(): BeaconTime = b2.message.slot.toBeaconTime()
processor = BlockProcessor.new(
false, "", "", keys.newRng(), taskpool, consensusManager,
getTimeFn)
validatorMonitor, getTimeFn)
test "Reverse order block add & get" & preset():
let missing = processor[].storeBlock(
b2, b2.message.slot)
MsgSource.gossip, b2.message.slot.toBeaconTime(), b2)
check: missing.error == BlockError.MissingParent
check:
@ -54,7 +55,7 @@ suite "Block processor" & preset():
let
status = processor[].storeBlock(
b1, b2.message.slot)
MsgSource.gossip, b2.message.slot.toBeaconTime(), b1)
b1Get = dag.get(b1.root)
check:
@ -84,7 +85,8 @@ suite "Block processor" & preset():
# check that init also reloads block graph
var
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor2 = newClone(ValidatorMonitor.init())
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {})
check:
# ensure we loaded the correct head state

View File

@ -50,7 +50,8 @@ suite "Block pool processing" & preset():
setup:
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = Quarantine.init()
state = newClone(dag.headState.data)
@ -159,6 +160,15 @@ suite "Block pool processing" & preset():
dag.getBlockRange(Slot(3), 2, blocks.toOpenArray(0, 1)) == 2
blocks[2..<2].len == 0
test "Adding the same block twice returns a Duplicate error" & preset():
let
b10 = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
b11 = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
check:
b11.error == BlockError.Duplicate
not b10[].isNil
test "updateHead updates head and headState" & preset():
let
b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
@ -230,7 +240,8 @@ suite "Block pool altair processing" & preset():
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, cfg, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, cfg, db, validatorMonitor, {})
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = Quarantine.init()
state = newClone(dag.headState.data)
@ -304,7 +315,8 @@ suite "chain DAG finalization tests" & preset():
setup:
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = Quarantine.init()
cache = StateCache()
@ -397,7 +409,8 @@ suite "chain DAG finalization tests" & preset():
db.getStateRoot(finalizedCheckpoint.blck.root, finalizedCheckpoint.slot).isSome
let
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor2 = newClone(ValidatorMonitor.init())
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {})
# check that the state reloaded from database resembles what we had before
check:
@ -437,7 +450,8 @@ suite "chain DAG finalization tests" & preset():
check: added.isOk()
var
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor2 = newClone(ValidatorMonitor.init())
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {})
# check that we can apply the block after the orphaning
let added2 = dag2.addHeadBlock(verifier, blck, nilPhase0Callback)
@ -485,7 +499,8 @@ suite "chain DAG finalization tests" & preset():
cur = cur.parent
let
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor2 = newClone(ValidatorMonitor.init())
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {})
# check that the state reloaded from database resembles what we had before
check:
@ -525,7 +540,8 @@ suite "Old database versions" & preset():
db.putGenesisBlock(genBlock.root)
var
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db,validatorMonitor, {})
state = newClone(dag.headState.data)
cache = StateCache()
att0 = makeFullAttestations(state[], dag.tail.root, 0.Slot, cache)
@ -546,7 +562,8 @@ suite "Diverging hardforks":
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, phase0RuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, phase0RuntimeConfig, db, validatorMonitor, {})
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = newClone(Quarantine.init())
cache = StateCache()
@ -569,7 +586,10 @@ suite "Diverging hardforks":
check b1Add.isOk()
dag.updateHead(b1Add[], quarantine[])
var dagAltair = init(ChainDAGRef, altairRuntimeConfig, db, {})
let validatorMonitorAltair = newClone(ValidatorMonitor.init())
var dagAltair = init(
ChainDAGRef, altairRuntimeConfig, db, validatorMonitorAltair, {})
discard AttestationPool.init(dagAltair, quarantine)
test "Non-tail block in common":
@ -598,7 +618,10 @@ suite "Diverging hardforks":
check b2Add.isOk()
dag.updateHead(b2Add[], quarantine[])
var dagAltair = init(ChainDAGRef, altairRuntimeConfig, db, {})
let validatorMonitor = newClone(ValidatorMonitor.init())
var dagAltair = init(
ChainDAGRef, altairRuntimeConfig, db, validatorMonitor, {})
discard AttestationPool.init(dagAltair, quarantine)
suite "Backfill":
@ -632,7 +655,9 @@ suite "Backfill":
ChainDAGRef.preInit(
db, genState[], tailState[], tailBlock.asTrusted())
let dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
check:
dag.getRef(tailBlock.root) == dag.tail
@ -687,12 +712,17 @@ suite "Backfill":
ChainDAGRef.preInit(
db, genState[], tailState[], tailBlock.asTrusted())
let dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
check:
dag.addBackfillBlock(blocks[^2].phase0Data).isOk()
let dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
let
validatorMonitor2 = newClone(ValidatorMonitor.init())
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {})
check:
dag.getRef(tailBlock.root) == dag.tail

View File

@ -17,8 +17,10 @@ import "."/[testutil, testdbutil]
suite "Exit pool testing suite":
setup:
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(
ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), {})
ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3),
validatorMonitor, {})
pool = newClone(ExitPool.init(dag))
test "addExitMessage/getProposerSlashingMessage":

View File

@ -36,7 +36,10 @@ suite "Gossip validation " & preset():
setup:
# Genesis state that results in 3 members per committee
var
dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(
ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3),
validatorMonitor, {})
taskpool = Taskpool.new()
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
quarantine = newClone(Quarantine.init())
@ -185,11 +188,11 @@ suite "Gossip validation - Extra": # Not based on preset config
batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false, taskpool)
var
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
dag = block:
let
dag = ChainDAGRef.init(cfg, makeTestDB(num_validators), {})
quarantine = newClone(Quarantine.init())
validatorMonitor = newClone(ValidatorMonitor.init())
dag = ChainDAGRef.init(
cfg, makeTestDB(num_validators), validatorMonitor, {})
var cache = StateCache()
for blck in makeTestBlocks(
dag.headState.data, cache, int(SLOTS_PER_EPOCH), false, cfg = cfg):

View File

@ -23,7 +23,8 @@ suite "state diff tests" & preset():
setup:
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
test "random slot differences" & preset():
let testStates = getTestStates(dag.headState.data, BeaconStateFork.Altair)