diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index becaa5983..d54c3cddf 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -84,13 +84,14 @@ OK: 3/3 Fail: 0/3 Skip: 0/3 OK: 1/1 Fail: 0/1 Skip: 0/1 ## Block pool processing [Preset: mainnet] ```diff ++ Adding the same block twice returns a Duplicate error [Preset: mainnet] OK + Simple block add&get [Preset: mainnet] OK + getRef returns nil for missing blocks OK + loading tail block works [Preset: mainnet] OK + updateHead updates head and headState [Preset: mainnet] OK + updateStateData sanity [Preset: mainnet] OK ``` -OK: 5/5 Fail: 0/5 Skip: 0/5 +OK: 6/6 Fail: 0/6 Skip: 0/6 ## Block processor [Preset: mainnet] ```diff + Reverse order block add & get [Preset: mainnet] OK @@ -395,4 +396,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1 ---TOTAL--- -OK: 215/217 Fail: 0/217 Skip: 2/217 +OK: 216/218 Fail: 0/218 Skip: 2/218 diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 9a39019ab..9d6358d77 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -23,13 +23,14 @@ import sync_committee_msg_pool], ./spec/datatypes/base, ./sync/[sync_manager, request_manager], - ./validators/[action_tracker, validator_pool] + ./validators/[action_tracker, validator_monitor, validator_pool] export osproc, chronos, httpserver, presto, action_tracker, beacon_clock, beacon_chain_db, conf, attestation_pool, sync_committee_msg_pool, validator_pool, eth2_network, eth1_monitor, request_manager, sync_manager, - eth2_processor, blockchain_dag, block_quarantine, base, exit_pool + eth2_processor, blockchain_dag, block_quarantine, base, exit_pool, + validator_monitor type RpcServer* = RpcHttpServer @@ -70,6 +71,7 @@ type beaconClock*: BeaconClock onAttestationSent*: OnAttestationCallback restKeysCache*: Table[ValidatorPubKey, ValidatorIndex] + validatorMonitor*: ref ValidatorMonitor const MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index efd80659a..31f09fe8f 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -372,6 +372,20 @@ type name: "terminal-total-difficulty-override" }: Option[uint64] + validatorMonitorAuto* {. + desc: "Automatically monitor locally active validators (BETA)" + defaultValue: false + name: "validator-monitor-auto" }: bool + + validatorMonitorPubkeys* {. + desc: "One or more validators to monitor - works best when --subscribe-all-subnets is enabled (BETA)" + name: "validator-monitor-pubkey" }: seq[ValidatorPubKey] + + validatorMonitorTotals* {. + desc: "Publish metrics to single 'totals' label for better collection performance when monitoring many validators (BETA)" + defaultValue: false + name: "validator-monitor-totals" }: bool + of createTestnet: testnetDepositsFile* {. desc: "A LaunchPad deposits file for the genesis state validators" @@ -750,6 +764,12 @@ func parseCmdArg*(T: type PubKey0x, input: TaintedString): T {.raises: [ValueError, Defect].} = PubKey0x(hexToPaddedByteArray[RawPubKeySize](input.string)) +func parseCmdArg*(T: type ValidatorPubKey, input: TaintedString): T + {.raises: [ValueError, Defect].} = + let res = ValidatorPubKey.fromHex(input.string) + if res.isErr(): raise (ref ValueError)(msg: $res.error()) + res.get() + func completeCmdArg*(T: type PubKey0x, input: TaintedString): seq[string] = return @[] diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 34b963062..ee2607629 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -16,9 +16,12 @@ import ../spec/[signatures_batch, forks, helpers], ../spec/datatypes/[phase0, altair, merge], ".."/beacon_chain_db, + ../validators/validator_monitor, ./block_dag -export options, sets, tables, hashes, helpers, beacon_chain_db, block_dag +export + options, sets, tables, hashes, helpers, beacon_chain_db, block_dag, + validator_monitor # ChainDAG and types related to forming a DAG of blocks, keeping track of their # relationships and allowing various forms of lookups @@ -83,6 +86,8 @@ type db*: BeaconChainDB ##\ ## ColdDB - Stores the canonical chain + validatorMonitor*: ref ValidatorMonitor + # ----------------------------------- # ChainDAGRef - DAG of candidate chains @@ -317,4 +322,3 @@ func init*(t: typedesc[FinalizationInfoObject], blockRoot: Eth2Digest, state_root: stateRoot, epoch: epoch ) - diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 1d07930e1..2583cece0 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -344,8 +344,9 @@ proc getForkedBlock(db: BeaconChainDB, root: Eth2Digest): err() proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, - updateFlags: UpdateFlags, onBlockCb: OnBlockCallback = nil, - onHeadCb: OnHeadCallback = nil, onReorgCb: OnReorgCallback = nil, + validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags, + onBlockCb: OnBlockCallback = nil, onHeadCb: OnHeadCallback = nil, + onReorgCb: OnReorgCallback = nil, onFinCb: OnFinalizedCallback = nil): ChainDAGRef = # TODO we require that the db contains both a head and a tail block - # asserting here doesn't seem like the right way to go about it however.. @@ -487,6 +488,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, let dag = ChainDAGRef( db: db, + validatorMonitor: validatorMonitor, blocks: blocks, backfillBlocks: backfillBlocks, genesis: genesisRef, @@ -796,6 +798,7 @@ proc advanceSlots( # target doAssert getStateField(state.data, slot) <= slot while getStateField(state.data, slot) < slot: + let preEpoch = getStateField(state.data, slot).epoch loadStateCache(dag, cache, state.blck, getStateField(state.data, slot).epoch) doAssert process_slots( @@ -805,6 +808,16 @@ proc advanceSlots( if save: dag.putState(state) + # The reward information in the state transition is computed for epoch + # transitions - when transitioning into epoch N, the activities in epoch + # N-2 are translated into balance updates, and this is what we capture + # in the monitor. This may be inaccurate during a deep reorg (>1 epoch) + # which is an acceptable tradeoff for monitoring. + withState(state.data): + let postEpoch = state.data.slot.epoch + if preEpoch != postEpoch: + dag.validatorMonitor[].registerEpochInfo(postEpoch, info, state.data) + proc applyBlock( dag: ChainDAGRef, state: var StateData, blck: BlockData, flags: UpdateFlags, @@ -1295,6 +1308,13 @@ proc updateHead*( beacon_head_root.set newHead.root.toGaugeValue beacon_head_slot.set newHead.slot.toGaugeValue + withState(dag.headState.data): + # Every time the head changes, the "canonical" view of balances and other + # state-related metrics change - notify the validator monitor. + # Doing this update during head update ensures there's a reasonable number + # of such updates happening - at most once per valid block. + dag.validatorMonitor[].registerState(state.data) + if lastHead.slot.epoch != newHead.slot.epoch: # Epoch updated - in theory, these could happen when the wall clock # changes epoch, even if there is no new block / head, but we'll delay diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 924177b95..f0ebcba02 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -35,6 +35,7 @@ type resfut*: Future[Result[void, BlockError]] queueTick*: Moment # Moment when block was enqueued validationDur*: Duration # Time it took to perform gossip validation + src*: MsgSource BlockProcessor* = object ## This manages the processing of blocks from different sources @@ -67,6 +68,7 @@ type # Consumer # ---------------------------------------------------------------- consensusManager: ref ConsensusManager + validatorMonitor: ref ValidatorMonitor ## Blockchain DAG, AttestationPool and Quarantine getBeaconTime: GetBeaconTimeFn @@ -80,6 +82,7 @@ proc new*(T: type BlockProcessor, dumpDirInvalid, dumpDirIncoming: string, rng: ref BrHmacDrbgContext, taskpool: TaskPoolPtr, consensusManager: ref ConsensusManager, + validatorMonitor: ref ValidatorMonitor, getBeaconTime: GetBeaconTimeFn): ref BlockProcessor = (ref BlockProcessor)( dumpEnabled: dumpEnabled, @@ -87,6 +90,7 @@ proc new*(T: type BlockProcessor, dumpDirIncoming: dumpDirIncoming, blockQueue: newAsyncQueue[BlockEntry](), consensusManager: consensusManager, + validatorMonitor: validatorMonitor, getBeaconTime: getBeaconTime, verifier: BatchVerifier(rng: rng, taskpool: taskpool) ) @@ -101,7 +105,7 @@ proc hasBlocks*(self: BlockProcessor): bool = # ------------------------------------------------------------------------------ proc addBlock*( - self: var BlockProcessor, blck: ForkedSignedBeaconBlock, + self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, resfut: Future[Result[void, BlockError]] = nil, validationDur = Duration()) = ## Enqueue a Gossip-validated block for consensus verification @@ -126,7 +130,8 @@ proc addBlock*( self.blockQueue.addLastNoWait(BlockEntry( blck: blck, resfut: resfut, queueTick: Moment.now(), - validationDur: validationDur)) + validationDur: validationDur, + src: src)) except AsyncQueueFullError: raiseAssert "unbounded queue" @@ -153,12 +158,18 @@ proc dumpBlock*[T]( proc storeBlock*( self: var BlockProcessor, - signedBlock: ForkySignedBeaconBlock, - wallSlot: Slot, queueTick: Moment = Moment.now(), + src: MsgSource, wallTime: BeaconTime, + signedBlock: ForkySignedBeaconBlock, queueTick: Moment = Moment.now(), validationDur = Duration()): Result[BlockRef, BlockError] = + ## storeBlock is the main entry point for unvalidated blocks - all untrusted + ## blocks, regardless of origin, pass through here. When storing a block, + ## we will add it to the dag and pass it to all block consumers that need + ## to know about it, such as the fork choice and the monitoring let attestationPool = self.consensusManager.attestationPool startTick = Moment.now() + wallSlot = wallTime.slotOrZero() + vm = self.validatorMonitor dag = self.consensusManager.dag # The block is certainly not missing any more @@ -176,6 +187,23 @@ proc storeBlock*( attestationPool[].addForkChoice( epochRef, blckRef, trustedBlock.message, wallSlot) + vm[].registerBeaconBlock( + src, wallTime, trustedBlock.message) + + for attestation in trustedBlock.message.body.attestations: + for idx in get_attesting_indices( + epochRef, attestation.data, attestation.aggregation_bits): + vm[].registerAttestationInBlock(attestation.data, idx, + trustedBlock.message) + + withState(dag[].clearanceState.data): + when stateFork >= BeaconStateFork.Altair and + Trusted isnot phase0.TrustedSignedBeaconBlock: # altair+ + for i in trustedBlock.message.body.sync_aggregate.sync_committee_bits.oneIndices(): + vm[].registerSyncAggregateInBlock( + trustedBlock.message.slot, trustedBlock.root, + state.data.current_sync_committee.pubkeys.data[i]) + self.dumpBlock(signedBlock, blck) # There can be a scenario where we receive a block we already received. @@ -212,7 +240,7 @@ proc storeBlock*( for quarantined in self.consensusManager.quarantine[].pop(blck.get().root): # Process the blocks that had the newly accepted block as parent - self.addBlock(quarantined) + self.addBlock(MsgSource.gossip, quarantined) blck @@ -233,7 +261,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) = let res = withBlck(entry.blck): - self.storeBlock(blck, wallSlot, entry.queueTick, entry.validationDur) + self.storeBlock(entry.src, wallTime, blck, entry.queueTick, entry.validationDur) if entry.resfut != nil: entry.resfut.complete( diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 0a2bbbc70..4ecc79b45 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -112,6 +112,9 @@ type # ---------------------------------------------------------------- blockProcessor: ref BlockProcessor + # Validator monitoring + validatorMonitor: ref ValidatorMonitor + # Validated with no further verification required # ---------------------------------------------------------------- exitPool: ref ExitPool @@ -135,6 +138,7 @@ type proc new*(T: type Eth2Processor, doppelGangerDetectionEnabled: bool, blockProcessor: ref BlockProcessor, + validatorMonitor: ref ValidatorMonitor, dag: ChainDAGRef, attestationPool: ref AttestationPool, exitPool: ref ExitPool, @@ -150,6 +154,7 @@ proc new*(T: type Eth2Processor, doppelgangerDetection: DoppelgangerProtection( nodeLaunchSlot: getBeaconTime().slotOrZero), blockProcessor: blockProcessor, + validatorMonitor: validatorMonitor, dag: dag, attestationPool: attestationPool, exitPool: exitPool, @@ -171,9 +176,8 @@ proc new*(T: type Eth2Processor, # could be used to push out valid messages. proc blockValidator*( - self: var Eth2Processor, - signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock | - merge.SignedBeaconBlock): ValidationRes = + self: var Eth2Processor, src: MsgSource, + signedBlock: ForkySignedBeaconBlock): ValidationRes = let wallTime = self.getCurrentBeaconTime() (afterGenesis, wallSlot) = wallTime.toSlot() @@ -207,7 +211,7 @@ proc blockValidator*( trace "Block validated" self.blockProcessor[].addBlock( - ForkedSignedBeaconBlock.init(signedBlock), + src, ForkedSignedBeaconBlock.init(signedBlock), validationDur = self.getCurrentBeaconTime() - wallTime) # Validator monitor registration for blocks is done by the processor @@ -250,9 +254,8 @@ proc checkForPotentialDoppelganger( quit QuitFailure proc attestationValidator*( - self: ref Eth2Processor, - attestation: Attestation, - subnet_id: SubnetId, + self: ref Eth2Processor, src: MsgSource, + attestation: Attestation, subnet_id: SubnetId, checkSignature: bool = true): Future[ValidationRes] {.async.} = let wallTime = self.getCurrentBeaconTime() var (afterGenesis, wallSlot) = wallTime.toSlot() @@ -286,8 +289,12 @@ proc attestationValidator*( self.attestationPool[].addAttestation( attestation, [attester_index], sig, wallSlot) + self.validatorMonitor[].registerAttestation( + src, wallTime, attestation, attester_index) + beacon_attestations_received.inc() beacon_attestation_delay.observe(delay.toFloatSeconds()) + ok() else: debug "Dropping attestation", validationError = v.error @@ -295,7 +302,7 @@ proc attestationValidator*( err(v.error()) proc aggregateValidator*( - self: ref Eth2Processor, + self: ref Eth2Processor, src: MsgSource, signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationRes] {.async.} = let wallTime = self.getCurrentBeaconTime() var (afterGenesis, wallSlot) = wallTime.toSlot() @@ -334,6 +341,9 @@ proc aggregateValidator*( self.attestationPool[].addAttestation( signedAggregateAndProof.message.aggregate, attesting_indices, sig, wallSlot) + self.validatorMonitor[].registerAggregate( + src, wallTime, signedAggregateAndProof, attesting_indices) + beacon_aggregates_received.inc() beacon_aggregate_delay.observe(delay.toFloatSeconds()) @@ -345,8 +355,8 @@ proc aggregateValidator*( err(v.error()) proc attesterSlashingValidator*( - self: var Eth2Processor, attesterSlashing: AttesterSlashing): - ValidationRes = + self: var Eth2Processor, src: MsgSource, + attesterSlashing: AttesterSlashing): ValidationRes = logScope: attesterSlashing = shortLog(attesterSlashing) @@ -359,6 +369,8 @@ proc attesterSlashingValidator*( self.exitPool[].addMessage(attesterSlashing) + self.validatorMonitor[].registerAttesterSlashing(src, attesterSlashing) + beacon_attester_slashings_received.inc() else: debug "Dropping attester slashing", validationError = v.error @@ -367,8 +379,8 @@ proc attesterSlashingValidator*( v proc proposerSlashingValidator*( - self: var Eth2Processor, proposerSlashing: ProposerSlashing): - Result[void, ValidationError] = + self: var Eth2Processor, src: MsgSource, + proposerSlashing: ProposerSlashing): Result[void, ValidationError] = logScope: proposerSlashing = shortLog(proposerSlashing) @@ -377,18 +389,21 @@ proc proposerSlashingValidator*( let v = self.exitPool[].validateProposerSlashing(proposerSlashing) if v.isOk(): trace "Proposer slashing validated" - beacon_proposer_slashings_received.inc() - else: - debug "Dropping proposer slashing", validationError = v.error self.exitPool[].addMessage(proposerSlashing) + self.validatorMonitor[].registerProposerSlashing(src, proposerSlashing) + + beacon_proposer_slashings_received.inc() + else: + debug "Dropping proposer slashing", validationError = v.error beacon_proposer_slashings_dropped.inc(1, [$v.error[0]]) + v proc voluntaryExitValidator*( - self: var Eth2Processor, signedVoluntaryExit: SignedVoluntaryExit): - Result[void, ValidationError] = + self: var Eth2Processor, src: MsgSource, + signedVoluntaryExit: SignedVoluntaryExit): Result[void, ValidationError] = logScope: signedVoluntaryExit = shortLog(signedVoluntaryExit) @@ -400,6 +415,9 @@ proc voluntaryExitValidator*( self.exitPool[].addMessage(signedVoluntaryExit) + self.validatorMonitor[].registerVoluntaryExit( + src, signedVoluntaryExit.message) + beacon_voluntary_exits_received.inc() else: debug "Dropping voluntary exit", error = v.error @@ -408,7 +426,7 @@ proc voluntaryExitValidator*( v proc syncCommitteeMessageValidator*( - self: ref Eth2Processor, + self: ref Eth2Processor, src: MsgSource, syncCommitteeMsg: SyncCommitteeMessage, subcommitteeIdx: SyncSubcommitteeIndex, checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} = @@ -441,6 +459,9 @@ proc syncCommitteeMessageValidator*( subcommitteeIdx, positions) + self.validatorMonitor[].registerSyncCommitteeMessage( + src, wallTime, syncCommitteeMsg) + beacon_sync_committee_messages_received.inc() ok() @@ -450,7 +471,7 @@ proc syncCommitteeMessageValidator*( err(v.error()) proc contributionValidator*( - self: ref Eth2Processor, + self: ref Eth2Processor, src: MsgSource, contributionAndProof: SignedContributionAndProof, checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} = let @@ -475,7 +496,12 @@ proc contributionValidator*( return if v.isOk(): trace "Contribution validated" - self.syncCommitteeMsgPool[].addContribution(contributionAndProof, v.get) + self.syncCommitteeMsgPool[].addContribution( + contributionAndProof, v.get()[0]) + + self.validatorMonitor[].registerSyncContribution( + src, wallTime, contributionAndProof, v.get()[1]) + beacon_sync_committee_contributions_received.inc() ok() diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 969e8b77d..72f3e7c87 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -856,8 +856,7 @@ proc validateContribution*( msg: SignedContributionAndProof, wallTime: BeaconTime, checkSignature: bool): - Future[Result[CookedSig, ValidationError]] {.async.} = - + Future[Result[(CookedSig, seq[ValidatorIndex]), ValidationError]] {.async.} = let syncCommitteeSlot = msg.message.contribution.slot @@ -901,6 +900,12 @@ proc validateContribution*( # that is, any(contribution.aggregation_bits). return errReject("SignedContributionAndProof: aggregation bits empty") + # TODO we take a copy of the participants to avoid the data going stale + # between validation and use - nonetheless, a design that avoids it and + # stays safe would be nice + let participants = dag.syncCommitteeParticipants( + msg.message.contribution.slot, subcommitteeIdx) + let sig = if checkSignature: let deferredCrypto = batchCrypto.scheduleContributionChecks( fork, genesis_validators_root, msg, subcommitteeIdx, dag) @@ -951,4 +956,4 @@ proc validateContribution*( return errReject("SyncCommitteeMessage: unable to load signature") sig.get() - return ok(sig) + return ok((sig, participants)) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 7ed1649b2..951782dd9 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -31,7 +31,7 @@ import ./networking/[eth2_discovery, eth2_network, network_metadata], ./gossip_processing/[eth2_processor, block_processor, consensus_manager], ./validators/[ - validator_duties, validator_pool, + validator_duties, validator_monitor, validator_pool, slashing_protection, keystore_management], ./sync/[sync_protocol], ./rpc/[rest_api, rpc_api], @@ -294,11 +294,19 @@ proc init(T: type BeaconNode, info "Loading block dag from database", path = config.databaseDir + let + validatorMonitor = newClone(ValidatorMonitor.init( + config.validatorMonitorAuto)) + + for key in config.validatorMonitorPubkeys: + validatorMonitor[].addMonitor(key, none(ValidatorIndex)) + let chainDagFlags = if config.verifyFinalization: {verifyFinalization} else: {} - dag = ChainDAGRef.init(cfg, db, chainDagFlags, onBlockAdded, onHeadChanged, - onChainReorg, onFinalization) + dag = ChainDAGRef.init( + cfg, db, validatorMonitor, chainDagFlags, onBlockAdded, onHeadChanged, + onChainReorg, onFinalization) quarantine = newClone(Quarantine.init()) databaseGenesisValidatorsRoot = getStateField(dag.headState.data, genesis_validators_root) @@ -408,7 +416,7 @@ proc init(T: type BeaconNode, ) blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, - rng, taskpool, consensusManager, getBeaconTime) + rng, taskpool, consensusManager, validatorMonitor, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] = # The design with a callback for block verification is unusual compared @@ -416,12 +424,13 @@ proc init(T: type BeaconNode, # taken in the sync/request managers - this is an architectural compromise # that should probably be reimagined more holistically in the future. let resfut = newFuture[Result[void, BlockError]]("blockVerifier") - blockProcessor[].addBlock(signedBlock, resfut) + blockProcessor[].addBlock(MsgSource.gossip, signedBlock, resfut) resfut processor = Eth2Processor.new( config.doppelgangerDetection, - blockProcessor, dag, attestationPool, exitPool, validatorPool, - syncCommitteeMsgPool, quarantine, rng, getBeaconTime, taskpool) + blockProcessor, validatorMonitor, dag, attestationPool, exitPool, + validatorPool, syncCommitteeMsgPool, quarantine, rng, getBeaconTime, + taskpool) syncManager = newSyncManager[Peer, PeerID]( network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, blockVerifier) @@ -453,6 +462,7 @@ proc init(T: type BeaconNode, gossipState: GossipState.Disconnected, beaconClock: beaconClock, onAttestationSent: onAttestationSent, + validatorMonitor: validatorMonitor ) debug "Loading validators", validatorsDir = config.validatorsDir() @@ -464,9 +474,11 @@ proc init(T: type BeaconNode, # we start with a reasonable ENR let wallSlot = node.beaconClock.now().slotOrZero() for validator in node.attachedValidators[].validators.values(): + if config.validatorMonitorAuto: + validatorMonitor[].addMonitor(validator.pubkey, validator.index) + if validator.index.isSome(): node.actionTracker.knownValidators[validator.index.get()] = wallSlot - let stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot) # Here, we also set the correct ENR should we be in all subnets mode! @@ -1011,7 +1023,8 @@ proc installMessageValidators(node: BeaconNode) = node.network.addValidator( getBeaconBlocksTopic(node.dag.forkDigests.phase0), proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult = - toValidationResult(node.processor[].blockValidator(signedBlock))) + toValidationResult(node.processor[].blockValidator( + MsgSource.gossip, signedBlock))) template installPhase0Validators(digest: auto) = for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64: @@ -1022,32 +1035,37 @@ proc installMessageValidators(node: BeaconNode) = # This proc needs to be within closureScope; don't lift out of loop. proc(attestation: Attestation): Future[ValidationResult] {.async.} = return toValidationResult( - await node.processor.attestationValidator(attestation, subnet_id))) + await node.processor.attestationValidator( + MsgSource.gossip, attestation, subnet_id))) node.network.addAsyncValidator( getAggregateAndProofsTopic(digest), proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] {.async.} = return toValidationResult( - await node.processor.aggregateValidator(signedAggregateAndProof))) + await node.processor.aggregateValidator( + MsgSource.gossip, signedAggregateAndProof))) node.network.addValidator( getAttesterSlashingsTopic(digest), proc (attesterSlashing: AttesterSlashing): ValidationResult = toValidationResult( - node.processor[].attesterSlashingValidator(attesterSlashing))) + node.processor[].attesterSlashingValidator( + MsgSource.gossip, attesterSlashing))) node.network.addValidator( getProposerSlashingsTopic(digest), proc (proposerSlashing: ProposerSlashing): ValidationResult = toValidationResult( - node.processor[].proposerSlashingValidator(proposerSlashing))) + node.processor[].proposerSlashingValidator( + MsgSource.gossip, proposerSlashing))) node.network.addValidator( getVoluntaryExitsTopic(digest), proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult = toValidationResult( - node.processor[].voluntaryExitValidator(signedVoluntaryExit))) + node.processor[].voluntaryExitValidator( + MsgSource.gossip, signedVoluntaryExit))) installPhase0Validators(node.dag.forkDigests.phase0) @@ -1059,12 +1077,14 @@ proc installMessageValidators(node: BeaconNode) = node.network.addValidator( getBeaconBlocksTopic(node.dag.forkDigests.altair), proc (signedBlock: altair.SignedBeaconBlock): ValidationResult = - toValidationResult(node.processor[].blockValidator(signedBlock))) + toValidationResult(node.processor[].blockValidator( + MsgSource.gossip, signedBlock))) node.network.addValidator( getBeaconBlocksTopic(node.dag.forkDigests.merge), proc (signedBlock: merge.SignedBeaconBlock): ValidationResult = - toValidationResult(node.processor[].blockValidator(signedBlock))) + toValidationResult(node.processor[].blockValidator( + MsgSource.gossip, signedBlock))) template installSyncCommitteeeValidators(digest: auto) = for committeeIdx in allSyncSubcommittees(): @@ -1075,13 +1095,14 @@ proc installMessageValidators(node: BeaconNode) = # This proc needs to be within closureScope; don't lift out of loop. proc(msg: SyncCommitteeMessage): Future[ValidationResult] {.async.} = return toValidationResult( - await node.processor.syncCommitteeMessageValidator(msg, idx))) + await node.processor.syncCommitteeMessageValidator( + MsgSource.gossip, msg, idx))) node.network.addAsyncValidator( getSyncCommitteeContributionAndProofTopic(digest), proc(msg: SignedContributionAndProof): Future[ValidationResult] {.async.} = return toValidationResult( - await node.processor.contributionValidator(msg))) + await node.processor.contributionValidator(MsgSource.gossip, msg))) installSyncCommitteeeValidators(node.dag.forkDigests.altair) installSyncCommitteeeValidators(node.dag.forkDigests.merge) diff --git a/beacon_chain/rpc/rest_validator_api.nim b/beacon_chain/rpc/rest_validator_api.nim index 1ac2baae7..8452f1df2 100644 --- a/beacon_chain/rpc/rest_validator_api.nim +++ b/beacon_chain/rpc/rest_validator_api.nim @@ -543,6 +543,12 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = request.slot, subnet_id, request.validator_index, request.is_aggregator) + let validator_pubkey = getStateField( + node.dag.headState.data, validators).asSeq()[request.validator_index].pubkey + + node.validatorMonitor[].addAutoMonitor( + validator_pubkey, ValidatorIndex(request.validator_index)) + return RestApiResponse.jsonMsgResponse(BeaconCommitteeSubscriptionSuccess) # https://ethereum.github.io/beacon-APIs/#/Validator/prepareSyncCommitteeSubnets @@ -569,6 +575,12 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = lenu64(getStateField(node.dag.headState.data, validators)): return RestApiResponse.jsonError(Http400, InvalidValidatorIndexValueError) + let validator_pubkey = getStateField( + node.dag.headState.data, validators).asSeq()[item.validator_index].pubkey + + node.validatorMonitor[].addAutoMonitor( + validator_pubkey, ValidatorIndex(item.validator_index)) + subs warn "Sync committee subscription request served, but not implemented" diff --git a/beacon_chain/spec/helpers.nim b/beacon_chain/spec/helpers.nim index 6813d8746..2ffd623c6 100644 --- a/beacon_chain/spec/helpers.nim +++ b/beacon_chain/spec/helpers.nim @@ -369,6 +369,13 @@ func is_active_validator*(validator: Validator, epoch: Epoch): bool = ## Check if ``validator`` is active validator.activation_epoch <= epoch and epoch < validator.exit_epoch +func is_exited_validator*(validator: Validator, epoch: Epoch): bool = + ## Check if ``validator`` is exited + validator.exit_epoch <= epoch + +func is_withdrawable_validator*(validator: Validator, epoch: Epoch): bool = + epoch >= validator.withdrawable_epoch + # https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/beacon-chain.md#get_active_validator_indices iterator get_active_validator_indices*(state: ForkyBeaconState, epoch: Epoch): ValidatorIndex = diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index 11d1fb1b9..799d10b8b 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -218,7 +218,7 @@ proc sendAttestation*( # libp2p calls the data handler for any subscription on the subnet # topic, it does not perform validation. let res = await node.processor.attestationValidator( - attestation, subnet_id, checkSignature) + MsgSource.api, attestation, subnet_id, checkSignature) return if res.isGoodForSending: @@ -241,8 +241,9 @@ proc sendSyncCommitteeMessage*( # validation will also register the message with the sync committee # message pool. Notably, although libp2p calls the data handler for # any subscription on the subnet topic, it does not perform validation. - let res = await node.processor.syncCommitteeMessageValidator(msg, subcommitteeIdx, - checkSignature) + let res = await node.processor.syncCommitteeMessageValidator( + MsgSource.api, msg, subcommitteeIdx, checkSignature) + return if res.isGoodForSending: node.network.broadcastSyncCommitteeMessage(msg, subcommitteeIdx) @@ -340,7 +341,7 @@ proc sendSyncCommitteeContribution*( msg: SignedContributionAndProof, checkSignature: bool): Future[SendResult] {.async.} = let res = await node.processor.contributionValidator( - msg, checkSignature) + MsgSource.api, msg, checkSignature) return if res.isGoodForSending: @@ -571,7 +572,7 @@ proc proposeBlock(node: BeaconNode, # storeBlock puts the block in the chaindag, and if accepted, takes care # of side effects such as event api notification newBlockRef = node.blockProcessor[].storeBlock( - signedBlock, wallTime.slotOrZero()) + MsgSource.api, wallTime, signedBlock) if newBlockRef.isErr: warn "Unable to add proposed block to block pool", @@ -1156,7 +1157,7 @@ proc sendAggregateAndProof*(node: BeaconNode, proof: SignedAggregateAndProof): Future[SendResult] {. async.} = # REST/JSON-RPC API helper procedure. - let res = await node.processor.aggregateValidator(proof) + let res = await node.processor.aggregateValidator(MsgSource.api, proof) return if res.isGoodForSending: node.network.broadcastAggregateAndProof(proof) @@ -1176,7 +1177,7 @@ proc sendAggregateAndProof*(node: BeaconNode, proc sendVoluntaryExit*(node: BeaconNode, exit: SignedVoluntaryExit): SendResult = # REST/JSON-RPC API helper procedure. - let res = node.processor[].voluntaryExitValidator(exit) + let res = node.processor[].voluntaryExitValidator(MsgSource.api, exit) if res.isGoodForSending: node.network.broadcastVoluntaryExit(exit) ok() @@ -1188,7 +1189,7 @@ proc sendVoluntaryExit*(node: BeaconNode, proc sendAttesterSlashing*(node: BeaconNode, slashing: AttesterSlashing): SendResult = # REST/JSON-RPC API helper procedure. - let res = node.processor[].attesterSlashingValidator(slashing) + let res = node.processor[].attesterSlashingValidator(MsgSource.api, slashing) if res.isGoodForSending: node.network.broadcastAttesterSlashing(slashing) ok() @@ -1200,7 +1201,7 @@ proc sendAttesterSlashing*(node: BeaconNode, proc sendProposerSlashing*(node: BeaconNode, slashing: ProposerSlashing): SendResult = # REST/JSON-RPC API helper procedure. - let res = node.processor[].proposerSlashingValidator(slashing) + let res = node.processor[].proposerSlashingValidator(MsgSource.api, slashing) if res.isGoodForSending: node.network.broadcastProposerSlashing(slashing) ok() @@ -1236,7 +1237,7 @@ proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock wallTime = node.beaconClock.now() accepted = withBlck(forked): let newBlockRef = node.blockProcessor[].storeBlock( - blck, wallTime.slotOrZero()) + MsgSource.api, wallTime, blck) # The boolean we return tells the caller whether the block was integrated # into the chain diff --git a/beacon_chain/validators/validator_monitor.nim b/beacon_chain/validators/validator_monitor.nim new file mode 100644 index 000000000..4d3c05c8e --- /dev/null +++ b/beacon_chain/validators/validator_monitor.nim @@ -0,0 +1,749 @@ +import + std/[options, tables], + metrics, chronicles, + ../spec/[crypto, beaconstate, forks, helpers, presets], + ../spec/datatypes/[phase0, altair], + ../beacon_clock + +logScope: topics = "val_mon" + +# Validator monitoring based on the same feature in Lighthouse - using the same +# metrics allows users to more easily reuse monitoring setups + +declareGauge validator_monitor_balance_gwei, + "The validator's balance in gwei.", labels = ["validator"] +declareGauge validator_monitor_effective_balance_gwei, + "The validator's effective balance in gwei.", labels = ["validator"] +declareGauge validator_monitor_slashed, + "Set to 1 if the validator is slashed.", labels = ["validator"] +declareGauge validator_monitor_active, + "Set to 1 if the validator is active.", labels = ["validator"] +declareGauge validator_monitor_exited, + "Set to 1 if the validator is exited.", labels = ["validator"] +declareGauge validator_monitor_withdrawable, + "Set to 1 if the validator is withdrawable.", labels = ["validator"] +declareGauge validator_activation_eligibility_epoch, + "Set to the epoch where the validator will be eligible for activation.", labels = ["validator"] +declareGauge validator_activation_epoch, + "Set to the epoch where the validator will activate.", labels = ["validator"] +declareGauge validator_exit_epoch, + "Set to the epoch where the validator will exit.", labels = ["validator"] +declareGauge validator_withdrawable_epoch, + "Set to the epoch where the validator will be withdrawable.", labels = ["validator"] + +declareCounter validator_monitor_prev_epoch_on_chain_attester_hit, + "Incremented if the validator is flagged as a previous epoch attester during per epoch processing", labels = ["validator"] +declareCounter validator_monitor_prev_epoch_on_chain_attester_miss, + "Incremented if the validator is not flagged as a previous epoch attester during per epoch processing", labels = ["validator"] +declareCounter validator_monitor_prev_epoch_on_chain_head_attester_hit, + "Incremented if the validator is flagged as a previous epoch head attester during per epoch processing", labels = ["validator"] +declareCounter validator_monitor_prev_epoch_on_chain_head_attester_miss, + "Incremented if the validator is not flagged as a previous epoch head attester during per epoch processing", labels = ["validator"] +declareCounter validator_monitor_prev_epoch_on_chain_target_attester_hit, + "Incremented if the validator is flagged as a previous epoch target attester during per epoch processing", labels = ["validator"] +declareCounter validator_monitor_prev_epoch_on_chain_target_attester_miss, + "Incremented if the validator is not flagged as a previous epoch target attester during per epoch processing", labels = ["validator"] +declareCounter validator_monitor_prev_epoch_on_chain_source_attester_hit, + "Incremented if the validator is flagged as a previous epoch source attester during per epoch processing", labels = ["validator"] +declareCounter validator_monitor_prev_epoch_on_chain_source_attester_miss, + "Incremented if the validator is not flagged as a previous epoch source attester during per epoch processing", labels = ["validator"] + +declareGauge validator_monitor_prev_epoch_attestations_total, + "The number of unagg. attestations seen in the previous epoch.", labels = ["validator"] +declareHistogram validator_monitor_prev_epoch_attestations_min_delay_seconds, + "The min delay between when the validator should send the attestation and when it was received.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_attestation_aggregate_inclusions, + "The count of times an attestation was seen inside an aggregate.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_attestation_block_inclusions, + "The count of times an attestation was seen inside a block.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_attestation_block_min_inclusion_distance, + "The minimum inclusion distance observed for the inclusion of an attestation in a block.", labels = ["validator"] + +declareGauge validator_monitor_prev_epoch_aggregates_total, + "The number of aggregates seen in the previous epoch.", labels = ["validator"] +declareHistogram validator_monitor_prev_epoch_aggregates_min_delay_seconds, + "The min delay between when the validator should send the aggregate and when it was received.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_exits_total, + "The number of exits seen in the previous epoch.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_proposer_slashings_total, + "The number of proposer slashings seen in the previous epoch.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_attester_slashings_total, + "The number of attester slashings seen in the previous epoch.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_sync_committee_messages_total, + "The number of sync committee messages seen in the previous epoch.", labels = ["validator"] +declareHistogram validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds, + "The min delay between when the validator should send the sync committee message and when it was received.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_sync_contribution_inclusions, + "The count of times a sync signature was seen inside a sync contribution.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_sync_signature_block_inclusions, + "The count of times a sync signature was seen inside a block.", labels = ["validator"] +declareGauge validator_monitor_prev_epoch_sync_contributions_total, + "The number of sync contributions seen in the previous epoch.", labels = ["validator"] +declareHistogram validator_monitor_prev_epoch_sync_contribution_min_delay_seconds, + "The min delay between when the validator should send the sync contribution and when it was received.", labels = ["validator"] +declareGauge validator_monitor_validator_in_current_sync_committee, + "Is the validator in the current sync committee (1 for true and 0 for false)", labels = ["validator"] + +declareGauge validator_monitor_validators_total, + "Count of validators that are specifically monitored by this beacon node" +declareCounter validator_monitor_unaggregated_attestation_total, + "Number of unaggregated attestations seen", labels = ["src", "validator"] +declareHistogram validator_monitor_unaggregated_attestation_delay_seconds, + "The delay between when the validator should send the attestation and when it was received.", labels = ["src", "validator"] +declareCounter validator_monitor_sync_committee_messages_total, + "Number of sync committee messages seen", labels = ["src", "validator"] +declareHistogram validator_monitor_sync_committee_messages_delay_seconds, + "The delay between when the validator should send the sync committee message and when it was received.", labels = ["src", "validator"] +declareCounter validator_monitor_sync_contributions_total, + "Number of sync contributions seen", labels = ["src", "validator"] +declareHistogram validator_monitor_sync_contributions_delay_seconds, + "The delay between when the aggregator should send the sync contribution and when it was received.", labels = ["src", "validator"] +declareCounter validator_monitor_aggregated_attestation_total, + "Number of aggregated attestations seen", labels = ["src", "validator"] +declareHistogram validator_monitor_aggregated_attestation_delay_seconds, + "The delay between then the validator should send the aggregate and when it was received.", labels = ["src", "validator"] +declareCounter validator_monitor_attestation_in_aggregate_total, + "Number of times an attestation has been seen in an aggregate", labels = ["src", "validator"] +declareCounter validator_monitor_sync_committee_message_in_contribution_total, + "Number of times a sync committee message has been seen in a sync contribution", labels = ["src", "validator"] +declareHistogram validator_monitor_attestation_in_aggregate_delay_seconds, + "The delay between when the validator should send the aggregate and when it was received.", labels = ["src", "validator"] +declareCounter validator_monitor_attestation_in_block_total, + "Number of times an attestation has been seen in a block", labels = ["src", "validator"] +declareCounter validator_monitor_sync_committee_message_in_block_total, + "Number of times a validator's sync committee message has been seen in a sync aggregate", labels = ["src", "validator"] +declareGauge validator_monitor_attestation_in_block_delay_slots, + "The excess slots (beyond the minimum delay) between the attestation slot and the block slot.", labels = ["src", "validator"] +declareCounter validator_monitor_beacon_block_total, + "Number of beacon blocks seen", labels = ["src", "validator"] +declareHistogram validator_monitor_beacon_block_delay_seconds, + "The delay between when the validator should send the block and when it was received.", labels = ["src", "validator"] +declareCounter validator_monitor_exit_total, + "Number of beacon exits seen", labels = ["src", "validator"] +declareCounter validator_monitor_proposer_slashing_total, + "Number of proposer slashings seen", labels = ["src", "validator"] +declareCounter validator_monitor_attester_slashing_total, + "Number of attester slashings seen", labels = ["src", "validator"] + +type + EpochSummary = object + ## Similar to the state transition, we collect everything that happens in + ## an epoch during that epoch and the one that follows it, then at the end + ## of the monitoring period, we report the statistics to the user. + ## In case of a deep reorg (>1 epoch) this information will be off, but will + ## repair itself in the next epoch, which is a reasonable trade-off between + ## correctness and utility. + ## + ## It should be noted that some metrics may be slightly inaccurate given the + ## nature of gossip processing: in particular, old messages may reappear + ## on the network and therefore be double-counted. + attestations: int64 + attestation_min_delay: Option[Duration] + attestation_aggregate_inclusions: int64 + attestation_block_inclusions: int64 + attestation_min_block_inclusion_distance: Option[uint64] + + aggregates: int64 + aggregate_min_delay: Option[Duration] + + sync_committee_messages: int64 + sync_committee_message_min_delay: Option[Duration] + + sync_signature_block_inclusions: int64 + sync_signature_contribution_inclusions: int64 + + sync_contributions: int64 + sync_contribution_min_delay: Option[Duration] + + exits: int64 + proposer_slashings: int64 + attester_slashings: int64 + + MonitoredValidator = object + id: string # A short id is used above all for metrics + pubkey: ValidatorPubKey + index: Option[ValidatorIndex] + summaries: array[2, EpochSummary] # We monitor the current and previous epochs + + ValidatorMonitor* = object + epoch: Epoch # The most recent epoch seen in monitoring + + monitors: Table[ValidatorPubKey, ref MonitoredValidator] + indices: Table[uint64, ref MonitoredValidator] + + knownValidators: int + autoRegister: bool + totals: bool + + MsgSource* {.pure.} = enum + # From where a message is being sent - for compatibility with lighthouse, we + # don't differentiate sync and requests, but rather use "gossip" - we also + # don't differentiate in-beacon validators but use "api" as if they were + # VC:s - this simplifies the initial implementation but should likely be + # expanded in the future. + gossip = "gossip" + api = "api" + +template toGaugeValue(v: bool): int64 = + if v: 1 else: 0 + +proc update_if_lt[T](current: var Option[T], val: T) = + if current.isNone() or val < current.get(): + current = some(val) + +proc addMonitor*( + self: var ValidatorMonitor, pubkey: ValidatorPubKey, + index: Option[ValidatorIndex]) = + if pubkey in self.monitors: + return + + let id = shortLog(pubkey) + let monitor = (ref MonitoredValidator)(id: id, index: index) + + self.monitors[pubkey] = monitor + + if index.isSome(): + self.indices[index.get().uint64] = monitor + +template metricId: string = + mixin self, id + if self.totals: "total" else: id + +proc addAutoMonitor*( + self: var ValidatorMonitor, pubkey: ValidatorPubKey, + index: ValidatorIndex) = + if not self.autoRegister: + return + + # automatic monitors must be registered with index - we don't look for them in + # the state + self.addMonitor(pubkey, some(index)) + + info "Started monitoring validator", + validator = shortLog(pubkey), pubkey, index + +proc init*(T: type ValidatorMonitor, autoRegister = false, totals = false): T = + T(autoRegister: autoRegister, totals: totals) + +template summaryIdx(epoch: Epoch): int = (epoch.uint64 mod 2).int + +template withEpochSummary( + self: var ValidatorMonitor, monitor: var MonitoredValidator, + epochParam: Epoch, body: untyped) = + let epoch = epochParam + if epoch == self.epoch or epoch + 1 == self.epoch: + template epochSummary: untyped {.inject.} = monitor.summaries[summaryIdx(epoch)] + body + +proc updateEpoch(self: var ValidatorMonitor, epoch: Epoch) = + # Called at the start of a new epoch to provide a summary of the events 2 + # epochs back then clear the slate for new reporting. + if epoch <= self.epoch: + return + + let + clearMonitor = epoch > self.epoch + 1 + # index of the EpochSummary that we'll first report, then clear + summaryIdx = epoch.summaryIdx + + if clearMonitor: + # More than one epoch passed since the last check which makes it difficult + # to report correctly with the amount of data we store - skip this round + # and hope things improve + notice "Resetting validator monitoring", epoch, monitorEpoch = self.epoch + + self.epoch = epoch + + validator_monitor_validators_total.set(self.monitors.len().int64) + + for (_, monitor) in self.monitors.mpairs(): + if clearMonitor: + monitor.summaries = default(type(monitor.summaries)) + continue + + let + id = monitor.id + + let summary = monitor.summaries[summaryIdx] + + validator_monitor_prev_epoch_attestations_total.set( + summary.attestations, [metricId]) + + if summary.attestation_min_delay.isSome(): + validator_monitor_prev_epoch_attestations_min_delay_seconds.observe( + summary.attestation_min_delay.get().toFloatSeconds(), [metricId]) + + validator_monitor_prev_epoch_attestation_aggregate_inclusions.set( + summary.attestation_aggregate_inclusions, [metricId]) + validator_monitor_prev_epoch_attestation_block_inclusions.set( + summary.attestation_block_inclusions, [metricId]) + + if summary.attestation_min_block_inclusion_distance.isSome(): + validator_monitor_prev_epoch_attestation_block_min_inclusion_distance.set( + summary.attestation_min_block_inclusion_distance.get().int64, [metricId]) + + validator_monitor_prev_epoch_sync_committee_messages_total.set( + summary.sync_committee_messages, [metricId]) + + if summary.sync_committee_message_min_delay.isSome(): + validator_monitor_prev_epoch_sync_committee_messages_min_delay_seconds.observe( + summary.sync_committee_message_min_delay.get().toFloatSeconds(), [metricId]) + + validator_monitor_prev_epoch_sync_contribution_inclusions.set( + summary.sync_signature_contribution_inclusions, [metricId]) + validator_monitor_prev_epoch_sync_signature_block_inclusions.set( + summary.sync_signature_block_inclusions, [metricId]) + + validator_monitor_prev_epoch_sync_contributions_total.set( + summary.sync_contributions, [metricId]) + if summary.sync_contribution_min_delay.isSome(): + validator_monitor_prev_epoch_sync_contribution_min_delay_seconds.observe( + summary.sync_contribution_min_delay.get().toFloatSeconds(), [metricId]) + + validator_monitor_prev_epoch_aggregates_total.set( + summary.aggregates, [metricId]) + + if summary.aggregate_min_delay.isSome(): + validator_monitor_prev_epoch_aggregates_min_delay_seconds.observe( + summary.aggregate_min_delay.get().toFloatSeconds(), [metricId]) + + validator_monitor_prev_epoch_exits_total.set( + summary.exits, [metricId]) + + validator_monitor_prev_epoch_proposer_slashings_total.set( + summary.proposer_slashings, [metricId]) + + validator_monitor_prev_epoch_attester_slashings_total.set( + summary.attester_slashings, [metricId]) + + monitor.summaries[summaryIdx] = default(type(monitor.summaries[summaryIdx])) + +func is_active_unslashed_in_previous_epoch(status: RewardStatus): bool = + let flags = status.flags + RewardFlags.isActiveInPreviousEpoch in flags and + RewardFlags.isSlashed notin flags + +func is_previous_epoch_source_attester(status: RewardStatus): bool = + status.is_previous_epoch_attester.isSome() + +func is_previous_epoch_head_attester(status: RewardStatus): bool = + RewardFlags.isPreviousEpochHeadAttester in status.flags + +func is_previous_epoch_target_attester(status: RewardStatus): bool = + RewardFlags.isPreviousEpochTargetAttester in status.flags + +func is_previous_epoch_source_attester(status: ParticipationInfo): bool = + ParticipationFlag.timelySourceAttester in status.flags + +func is_previous_epoch_head_attester(status: ParticipationInfo): bool = + ParticipationFlag.timelyHeadAttester in status.flags + +func is_previous_epoch_target_attester(status: ParticipationInfo): bool = + ParticipationFlag.timelyTargetAttester in status.flags + +func is_active_unslashed_in_previous_epoch(status: ParticipationInfo): bool = + ParticipationFlag.eligible in status.flags + +proc registerEpochInfo*( + self: var ValidatorMonitor, epoch: Epoch, info: ForkedEpochInfo, + state: ForkyBeaconState) = + # Register rewards, as computed during the epoch transition that lands in + # `epoch` - the rewards will be from attestations that were created at + # `epoch - 2`. + + if epoch < 2 or self.monitors.len == 0: + return + + withEpochInfo(info): + for pubkey, monitor in self.monitors: + if monitor.index.isNone: + continue + + let + idx = monitor.index.get() + + if info.validators.lenu64 <= idx.uint64: + # No summary for this validator (yet?) + debug "No reward information for validator", + id = monitor.id, idx + continue + + let + prev_epoch = epoch - 2 + id = monitor.id + + let status = info.validators[idx] + + if not status.is_active_unslashed_in_previous_epoch(): + # Monitored validator is not active, due to awaiting activation + # or being exited/withdrawn. Do not attempt to report on its + # attestations. + continue + + let + previous_epoch_matched_source = status.is_previous_epoch_source_attester() + previous_epoch_matched_target = status.is_previous_epoch_target_attester() + previous_epoch_matched_head = status.is_previous_epoch_head_attester() + + # Indicates if any attestation made it on-chain. + # For Base states, this will be *any* attestation whatsoever. For Altair states, + # this will be any attestation that matched a "timely" flag. + if previous_epoch_matched_source: + # These two metrics are the same - keep both around for LH compatibility + validator_monitor_prev_epoch_on_chain_attester_hit.inc(1, [metricId]) + validator_monitor_prev_epoch_on_chain_source_attester_hit.inc(1, [metricId]) + + info "Previous epoch attestation included", + timely_source = previous_epoch_matched_source, + timely_target = previous_epoch_matched_target, + timely_head = previous_epoch_matched_head, + epoch = prev_epoch, + validator = id + else: + validator_monitor_prev_epoch_on_chain_attester_miss.inc(1, [metricId]) + validator_monitor_prev_epoch_on_chain_source_attester_miss.inc(1, [metricId]) + + warn "Previous epoch attestation missing", + epoch = prev_epoch, + validator = id + + # Indicates if any on-chain attestation hit the head. + if previous_epoch_matched_head: + validator_monitor_prev_epoch_on_chain_head_attester_hit.inc(1, [metricId]) + else: + validator_monitor_prev_epoch_on_chain_head_attester_miss.inc(1, [metricId]) + notice "Attestation failed to match head", + epoch = prev_epoch, + validator = id + + # Indicates if any on-chain attestation hit the target. + if previous_epoch_matched_target: + validator_monitor_prev_epoch_on_chain_target_attester_hit.inc(1, [metricId]) + else: + validator_monitor_prev_epoch_on_chain_target_attester_miss.inc(1, [metricId]) + + notice "Attestation failed to match target", + epoch = prev_epoch, + validator = id + + when state isnot phase0.BeaconState: # altair+ + # Indicates the number of sync committee signatures that made it into + # a sync aggregate in the current_epoch (state.epoch - 1). + # Note: Unlike attestations, sync committee signatures must be included in the + # immediate next slot. Hence, num included sync aggregates for `state.epoch - 1` + # is available right after state transition to state.epoch. + let current_epoch = epoch - 1 + + if state.current_sync_committee.pubkeys.data.contains(pubkey): + validator_monitor_validator_in_current_sync_committee.set(1, [metricId]) + + self.withEpochSummary(monitor[], current_epoch): + info "Current epoch sync signatures", + included = epochSummary.sync_signature_block_inclusions, + expected = SLOTS_PER_EPOCH, + epoch = current_epoch, + validator = id + else: + validator_monitor_validator_in_current_sync_committee.set(0, [metricId]) + debug "Validator isn't part of the current sync committee", + epoch = current_epoch, + validator = id + + self.updateEpoch(epoch) + +proc registerState*(self: var ValidatorMonitor, state: ForkyBeaconState) = + # Update indices for the validators we're monitoring + for v in self.knownValidators.. ⚠️ This feature is currently in BETA - the details of its implementation may change in response to community feedback. + +The validator monitoring feature allows for tracking the life-cycle and performance of one or more validators in detail. Monitoring can be carried out for any validator, with slightly more detail for validators that are running through the same beacon node. + +Every time the validator performs a duty, the duty is recorded and the monitor keeps track of the reward-related events for having performed it. For example: + +* When attesting, the attestation is added to an aggregate, then a block, before a reward is applied to the state +* When performing sync committee duties, likewise + +Validator actions can be traced either through logging, or comprehensive metrics that allow for creating alerts in monitoring tools. The metrics are based on the same feature in [Lighthouse](https://lighthouse-book.sigmaprime.io/validator-monitoring.html), thus dashboards and alerts can be used with either client. + +## Enabling validator monitoring + +The monitor can be enabled either for all keys that are used with a particular beacon node, or for a specific list of validators, or both. + +``` +# Enable automatic monitoring of all validators used with this beacon node +./run-mainnet-beacon-node.sh --validator-monitor-auto + +# Enable monitoring of one or more specific validators +./run-mainnet-beacon-node.sh \ + --validator-monitor-pubkey=0xa1d1ad0714035353258038e964ae9675dc0252ee22cea896825c01458e1807bfad2f9969338798548d9858a571f7425c \ + --validator-monitor-pubkey=0xb2ff4716ed345b05dd1dfc6a5a9fa70856d8c75dcc9e881dd2f766d5f891326f0d10e96f3a444ce6c912b69c22c6754d + +# Publish metrics as totals for all monitored validators instead of each validator separately - used for limiting the load on metrics when monitoring many validators +./run-mainnet-beacon-node.sh --validator-monitor-totals +``` + +## Understanding monitoring + +When a validator performs a duty, such as signing an attestation or a sync committee message, this is broadcast to the network. Other nodes pick it up and package the message into an aggregate and later a block. The block is included in the canonical chain and a reward is given two epochs (~13 minutes) later. + +The monitor tracks these actions and will log each step at the `INF` level. If any step is missed, a `NOT` log is shown instead. + +The typical lifecycle of an attestation might look something like the following: + +``` +INF 2021-11-22 11:32:44.228+01:00 Attestation seen topics="val_mon" attestation="(aggregation_bits: 0b0000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, data: (slot: 2656363, index: 11, beacon_block_root: \"bbe7fc25\", source: \"83010:a8a1b125\", target: \"83011:6db281cd\"), signature: \"b88ef2f2\")" src=api epoch=83011 validator=b93c290b +INF 2021-11-22 11:32:51.293+01:00 Attestation included in aggregate topics="val_mon" aggregate="(aggregation_bits: 0b1111111101011111001101111111101100111111110100111011111110110101110111111010111111011101111011101111111111101111100001111111100111, data: (slot: 2656363, index: 11, beacon_block_root: \"bbe7fc25\", source: \"83010:a8a1b125\", target: \"83011:6db281cd\"), signature: \"8576b3fc\")" src=gossip epoch=83011 validator=b93c290b +INF 2021-11-22 11:33:07.193+01:00 Attestation included in block attestation_data="(slot: 2656364, index: 9, beacon_block_root: \"c7761767\", source: \"83010:a8a1b125\", target: \"83011:6db281cd\")" block_slot=2656365 inclusion_lag_slots=0 epoch=83011 validator=b65b6e1b +``` + +The lifecycle of a particular message can be traced by following the `epoch=.... validator=...` fields in the message. + +Failures at any point are recorded at a higher logging level, such as `NOT`(ice): + +``` +NOT 2021-11-17 20:53:42.108+01:00 Attestation failed to match head topics="chaindag" epoch=81972 validator=... +``` + +> ⚠️ It should be noted that metrics are tracked for the current history - in the case of a reorg on the chain - in particular a deep reorg - no attempt is made to revisit previously reported values. In the case that finality is delayed, the risk of stale metrics increases. + +Likewise, many metrics, such as aggregation inclusion, reflect conditions on the network - it may happen that the same message is counted more than once under certain conditions. + +## Monitoring metrics + +The full list of metrics supported by the validator monitoring feature can be seen in the [source code](https://github.com/status-im/nimbus-eth2/blob/unstable/beacon_chain/validators/validator_monitor.nim) or by examining the metrics output: + +``` +curl -s localhost:8008/metrics | grep HELP.*validator_ +``` diff --git a/ncli/ncli_db.nim b/ncli/ncli_db.nim index 7e2431fb7..2ac8cd5a4 100644 --- a/ncli/ncli_db.nim +++ b/ncli/ncli_db.nim @@ -200,8 +200,10 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) = quit 1 echo "Initializing block pool..." - let dag = withTimerRet(timers[tInit]): - ChainDAGRef.init(cfg, db, {}) + let + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = withTimerRet(timers[tInit]): + ChainDAGRef.init(cfg, db, validatorMonitor, {}) var (start, ends) = dag.getSlotRange(conf.benchSlot, conf.benchSlots) @@ -473,7 +475,10 @@ proc cmdRewindState(conf: DbConf, cfg: RuntimeConfig) = quit 1 echo "Initializing block pool..." - let dag = init(ChainDAGRef, cfg, db, {}) + + let + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, cfg, db, validatorMonitor, {}) let blckRef = dag.getRef(fromHex(Eth2Digest, conf.blockRoot)) if blckRef == nil: @@ -502,7 +507,8 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) = echo "Initializing block pool..." let - dag = init(ChainDAGRef, cfg, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, cfg, db, validatorMonitor, {}) let tmpState = assignClone(dag.headState) @@ -564,7 +570,9 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) = quit 1 echo "# Initializing block pool..." - let dag = ChainDAGRef.init(cfg, db, {}) + let + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = ChainDAGRef.init(cfg, db, validatorMonitor, {}) var (start, ends) = dag.getSlotRange(conf.perfSlot, conf.perfSlots) @@ -702,7 +710,9 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = quit 1 echo "Initializing block pool..." - let dag = ChainDAGRef.init(cfg, db, {}) + let + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = ChainDAGRef.init(cfg, db, validatorMonitor, {}) let outDb = SqStoreRef.init(conf.outDir, "validatorDb").expect("DB") defer: outDb.close() diff --git a/research/block_sim.nim b/research/block_sim.nim index 60449a95a..848de7d62 100644 --- a/research/block_sim.nim +++ b/research/block_sim.nim @@ -83,7 +83,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6, putInitialDepositContractSnapshot(db, depositContractSnapshot) var - dag = ChainDAGRef.init(cfg, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = ChainDAGRef.init(cfg, db, validatorMonitor, {}) eth1Chain = Eth1Chain.init(cfg, db) merkleizer = depositContractSnapshot.createMerkleizer taskpool = Taskpool.new() @@ -231,7 +232,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6, doAssert res.isOk syncCommitteePool[].addContribution( - signedContributionAndProof, res.get()) + signedContributionAndProof, res.get()[0]) proc getNewBlock[T]( stateData: var StateData, slot: Slot, cache: var StateCache): T = diff --git a/tests/consensus_spec/test_fixture_fork_choice.nim b/tests/consensus_spec/test_fixture_fork_choice.nim index 0980b9843..5c0b7f813 100644 --- a/tests/consensus_spec/test_fixture_fork_choice.nim +++ b/tests/consensus_spec/test_fixture_fork_choice.nim @@ -92,15 +92,15 @@ proc initialLoad( forkedState[], forkedState[], asTrusted(signedBlock) ) - let dag = ChainDAGRef.init( - defaultRuntimeConfig, - db, - updateFlags = {} - ) - let fkChoice = newClone(ForkChoice.init( - dag.getFinalizedEpochRef(), - dag.finalizedHead.blck - )) + + let + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = ChainDAGRef.init( + defaultRuntimeConfig, db, validatorMonitor, {}) + fkChoice = newClone(ForkChoice.init( + dag.getFinalizedEpochRef(), + dag.finalizedHead.blck + )) (dag, fkChoice) diff --git a/tests/test_attestation_pool.nim b/tests/test_attestation_pool.nim index 96c98de5f..38c9a0de5 100644 --- a/tests/test_attestation_pool.nim +++ b/tests/test_attestation_pool.nim @@ -58,7 +58,10 @@ suite "Attestation pool processing" & preset(): setup: # Genesis state that results in 6 members per committee var - dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6), {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init( + ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6), + validatorMonitor, {}) taskpool = Taskpool.new() verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool) quarantine = newClone(Quarantine.init()) diff --git a/tests/test_beacon_chain_db.nim b/tests/test_beacon_chain_db.nim index d892b7686..8b6ec87f7 100644 --- a/tests/test_beacon_chain_db.nim +++ b/tests/test_beacon_chain_db.nim @@ -66,7 +66,8 @@ func withDigest(blck: merge.TrustedBeaconBlock): proc getTestStates(stateFork: BeaconStateFork): auto = let db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) var testStates = getTestStates(dag.headState.data, stateFork) # Ensure transitions beyond just adding validators and increasing slots @@ -312,7 +313,8 @@ suite "Beacon chain DB" & preset(): test "sanity check phase 0 getState rollback" & preset(): var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) state = (ref ForkedHashedBeaconState)( kind: BeaconStateFork.Phase0, phase0Data: phase0.HashedBeaconState(data: phase0.BeaconState( @@ -334,7 +336,8 @@ suite "Beacon chain DB" & preset(): test "sanity check Altair and cross-fork getState rollback" & preset(): var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) state = (ref ForkedHashedBeaconState)( kind: BeaconStateFork.Altair, altairData: altair.HashedBeaconState(data: altair.BeaconState( @@ -359,7 +362,8 @@ suite "Beacon chain DB" & preset(): test "sanity check Merge and cross-fork getState rollback" & preset(): var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) state = (ref ForkedHashedBeaconState)( kind: BeaconStateFork.Merge, mergeData: merge.HashedBeaconState(data: merge.BeaconState( diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index b8475f346..18a2da34f 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -27,7 +27,8 @@ suite "Block processor" & preset(): setup: var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) taskpool = Taskpool.new() verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool) quarantine = newClone(Quarantine.init()) @@ -40,11 +41,11 @@ suite "Block processor" & preset(): getTimeFn = proc(): BeaconTime = b2.message.slot.toBeaconTime() processor = BlockProcessor.new( false, "", "", keys.newRng(), taskpool, consensusManager, - getTimeFn) + validatorMonitor, getTimeFn) test "Reverse order block add & get" & preset(): let missing = processor[].storeBlock( - b2, b2.message.slot) + MsgSource.gossip, b2.message.slot.toBeaconTime(), b2) check: missing.error == BlockError.MissingParent check: @@ -54,7 +55,7 @@ suite "Block processor" & preset(): let status = processor[].storeBlock( - b1, b2.message.slot) + MsgSource.gossip, b2.message.slot.toBeaconTime(), b1) b1Get = dag.get(b1.root) check: @@ -84,7 +85,8 @@ suite "Block processor" & preset(): # check that init also reloads block graph var - dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor2 = newClone(ValidatorMonitor.init()) + dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {}) check: # ensure we loaded the correct head state diff --git a/tests/test_blockchain_dag.nim b/tests/test_blockchain_dag.nim index bccf79d15..11108da03 100644 --- a/tests/test_blockchain_dag.nim +++ b/tests/test_blockchain_dag.nim @@ -50,7 +50,8 @@ suite "Block pool processing" & preset(): setup: var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new()) quarantine = Quarantine.init() state = newClone(dag.headState.data) @@ -159,6 +160,15 @@ suite "Block pool processing" & preset(): dag.getBlockRange(Slot(3), 2, blocks.toOpenArray(0, 1)) == 2 blocks[2..<2].len == 0 + test "Adding the same block twice returns a Duplicate error" & preset(): + let + b10 = dag.addHeadBlock(verifier, b1, nilPhase0Callback) + b11 = dag.addHeadBlock(verifier, b1, nilPhase0Callback) + + check: + b11.error == BlockError.Duplicate + not b10[].isNil + test "updateHead updates head and headState" & preset(): let b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback) @@ -230,7 +240,8 @@ suite "Block pool altair processing" & preset(): var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, cfg, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, cfg, db, validatorMonitor, {}) verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new()) quarantine = Quarantine.init() state = newClone(dag.headState.data) @@ -304,7 +315,8 @@ suite "chain DAG finalization tests" & preset(): setup: var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new()) quarantine = Quarantine.init() cache = StateCache() @@ -397,7 +409,8 @@ suite "chain DAG finalization tests" & preset(): db.getStateRoot(finalizedCheckpoint.blck.root, finalizedCheckpoint.slot).isSome let - dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor2 = newClone(ValidatorMonitor.init()) + dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {}) # check that the state reloaded from database resembles what we had before check: @@ -437,7 +450,8 @@ suite "chain DAG finalization tests" & preset(): check: added.isOk() var - dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor2 = newClone(ValidatorMonitor.init()) + dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {}) # check that we can apply the block after the orphaning let added2 = dag2.addHeadBlock(verifier, blck, nilPhase0Callback) @@ -485,7 +499,8 @@ suite "chain DAG finalization tests" & preset(): cur = cur.parent let - dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor2 = newClone(ValidatorMonitor.init()) + dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {}) # check that the state reloaded from database resembles what we had before check: @@ -525,7 +540,8 @@ suite "Old database versions" & preset(): db.putGenesisBlock(genBlock.root) var - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db,validatorMonitor, {}) state = newClone(dag.headState.data) cache = StateCache() att0 = makeFullAttestations(state[], dag.tail.root, 0.Slot, cache) @@ -546,7 +562,8 @@ suite "Diverging hardforks": var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, phase0RuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, phase0RuntimeConfig, db, validatorMonitor, {}) verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new()) quarantine = newClone(Quarantine.init()) cache = StateCache() @@ -569,7 +586,10 @@ suite "Diverging hardforks": check b1Add.isOk() dag.updateHead(b1Add[], quarantine[]) - var dagAltair = init(ChainDAGRef, altairRuntimeConfig, db, {}) + let validatorMonitorAltair = newClone(ValidatorMonitor.init()) + + var dagAltair = init( + ChainDAGRef, altairRuntimeConfig, db, validatorMonitorAltair, {}) discard AttestationPool.init(dagAltair, quarantine) test "Non-tail block in common": @@ -598,7 +618,10 @@ suite "Diverging hardforks": check b2Add.isOk() dag.updateHead(b2Add[], quarantine[]) - var dagAltair = init(ChainDAGRef, altairRuntimeConfig, db, {}) + let validatorMonitor = newClone(ValidatorMonitor.init()) + + var dagAltair = init( + ChainDAGRef, altairRuntimeConfig, db, validatorMonitor, {}) discard AttestationPool.init(dagAltair, quarantine) suite "Backfill": @@ -632,7 +655,9 @@ suite "Backfill": ChainDAGRef.preInit( db, genState[], tailState[], tailBlock.asTrusted()) - let dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + let + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) check: dag.getRef(tailBlock.root) == dag.tail @@ -687,12 +712,17 @@ suite "Backfill": ChainDAGRef.preInit( db, genState[], tailState[], tailBlock.asTrusted()) - let dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + let + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) check: dag.addBackfillBlock(blocks[^2].phase0Data).isOk() - let dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + let + validatorMonitor2 = newClone(ValidatorMonitor.init()) + + dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {}) check: dag.getRef(tailBlock.root) == dag.tail diff --git a/tests/test_exit_pool.nim b/tests/test_exit_pool.nim index 3aa23577e..323a2d90b 100644 --- a/tests/test_exit_pool.nim +++ b/tests/test_exit_pool.nim @@ -17,8 +17,10 @@ import "."/[testutil, testdbutil] suite "Exit pool testing suite": setup: let + validatorMonitor = newClone(ValidatorMonitor.init()) dag = init( - ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), {}) + ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), + validatorMonitor, {}) pool = newClone(ExitPool.init(dag)) test "addExitMessage/getProposerSlashingMessage": diff --git a/tests/test_gossip_validation.nim b/tests/test_gossip_validation.nim index 5aefe0d62..5e0c60521 100644 --- a/tests/test_gossip_validation.nim +++ b/tests/test_gossip_validation.nim @@ -36,7 +36,10 @@ suite "Gossip validation " & preset(): setup: # Genesis state that results in 3 members per committee var - dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init( + ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), + validatorMonitor, {}) taskpool = Taskpool.new() verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool) quarantine = newClone(Quarantine.init()) @@ -185,11 +188,11 @@ suite "Gossip validation - Extra": # Not based on preset config batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false, taskpool) var verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new()) - dag = block: let - dag = ChainDAGRef.init(cfg, makeTestDB(num_validators), {}) - quarantine = newClone(Quarantine.init()) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = ChainDAGRef.init( + cfg, makeTestDB(num_validators), validatorMonitor, {}) var cache = StateCache() for blck in makeTestBlocks( dag.headState.data, cache, int(SLOTS_PER_EPOCH), false, cfg = cfg): diff --git a/tests/test_statediff.nim b/tests/test_statediff.nim index 5516aa4d0..d044d8d14 100644 --- a/tests/test_statediff.nim +++ b/tests/test_statediff.nim @@ -23,7 +23,8 @@ suite "state diff tests" & preset(): setup: var db = makeTestDB(SLOTS_PER_EPOCH) - dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) + validatorMonitor = newClone(ValidatorMonitor.init()) + dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) test "random slot differences" & preset(): let testStates = getTestStates(dag.headState.data, BeaconStateFork.Altair)