diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index 2a51d4b7a..01e7ba46a 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -49,6 +49,7 @@ type dag*: ChainDAGRef quarantine*: QuarantineRef attestationPool*: ref AttestationPool + syncCommitteeMsgPool*: ref SyncCommitteeMsgPool exitPool*: ref ExitPool eth1Monitor*: Eth1Monitor rpcServer*: RpcServer diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 0886037c4..a5cfb77d0 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -66,6 +66,7 @@ type dag*: ChainDAGRef attestationPool*: ref AttestationPool validatorPool: ref ValidatorPool + syncCommitteeMsgPool: SyncCommitteeMsgPoolRef doppelgangerDetection*: DoppelgangerProtection @@ -98,6 +99,7 @@ proc new*(T: type Eth2Processor, attestationPool: ref AttestationPool, exitPool: ref ExitPool, validatorPool: ref ValidatorPool, + syncCommitteeMsgPool: SyncCommitteeMsgPoolRef, quarantine: QuarantineRef, rng: ref BrHmacDrbgContext, getBeaconTime: GetBeaconTimeFn): ref Eth2Processor = @@ -110,6 +112,7 @@ proc new*(T: type Eth2Processor, attestationPool: attestationPool, exitPool: exitPool, validatorPool: validatorPool, + syncCommitteeMsgPool: syncCommitteeMsgPool, quarantine: quarantine, getCurrentBeaconTime: getBeaconTime, batchCrypto: BatchCrypto.new( @@ -342,3 +345,58 @@ proc voluntaryExitValidator*( beacon_voluntary_exits_received.inc() ValidationResult.Accept + +proc syncCommitteeMsgValidator*( + self: ref Eth2Processor, + syncCommitteeMsg: SyncCommitteeMessage, + committeeIdx: SyncCommitteeIndex, + checkSignature: bool = true): ValidationResult = + logScope: + syncCommitteeMsg = shortLog(syncCommitteeMsg) + committeeIdx + + let wallTime = self.getCurrentBeaconTime() + + # Potential under/overflows are fine; would just create odd metrics and logs + let delay = wallTime - syncCommitteeMsg.slot.toBeaconTime + debug "Sync committee message received", delay + + # Now proceed to validation + let v = validateSyncCommitteeMessage(self.dag, self.syncCommitteeMsgPool, + syncCommitteeMsg, committeeIdx, wallTime, + checkSignature) + if v.isErr(): + debug "Dropping sync committee message", validationError = v.error + return v.error[0] + + trace "Sync committee message validated" + ValidationResult.Accept + +proc syncCommitteeContributionValidator*( + self: ref Eth2Processor, + contributionAndProof: SignedContributionAndProof, + checkSignature: bool = true): ValidationResult = + logScope: + contributionAndProof = shortLog(contributionAndProof.message.contribution) + signature = shortLog(contributionAndProof.signature) + aggregator_index = contributionAndProof.message.aggregator_index + + let wallTime = self.getCurrentBeaconTime() + + # Potential under/overflows are fine; would just create odd metrics and logs + let delay = wallTime - contributionAndProof.message.contribution.slot.toBeaconTime + debug "Contribution received", delay + + # Now proceed to validation + let v = validateSignedContributionAndProof(self.dag, self.syncCommitteeMsgPool, + contributionAndProof, wallTime, + checkSignature) + if v.isErr(): + let (_, wallSlot) = wallTime.toSlot() + debug "Dropping contribution", + validationError = v.error, + selection_proof = contributionAndProof.message.selection_proof, + wallSlot + return v.error[0] + + ValidationResult.Accept diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 9e62afd02..1a9101a90 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -40,7 +40,7 @@ import validator], ./consensus_object_pools/[ blockchain_dag, block_quarantine, block_clearance, block_pools_types, - attestation_pool, exit_pool, spec_cache], + attestation_pool, sync_committee_msg_pool, exit_pool, spec_cache], ./eth1/eth1_monitor from eth/common/eth_types import BlockHashOrNumber @@ -314,6 +314,7 @@ proc init*(T: type BeaconNode, rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime, getStateField(dag.headState.data, genesis_validators_root)) attestationPool = newClone(AttestationPool.init(dag, quarantine)) + syncCommitteeMsgPool = newClone(SyncCommitteeMsgPool.init()) exitPool = newClone(ExitPool.init(dag)) case config.slashingDbKind @@ -344,7 +345,7 @@ proc init*(T: type BeaconNode, processor = Eth2Processor.new( config.doppelgangerDetection, blockProcessor, dag, attestationPool, exitPool, validatorPool, - quarantine, rng, getBeaconTime) + syncCommitteeMsgPool, quarantine, rng, getBeaconTime) var node = BeaconNode( nickname: nickname, @@ -358,6 +359,7 @@ proc init*(T: type BeaconNode, gossipState: GossipState.Disconnected, quarantine: quarantine, attestationPool: attestationPool, + syncCommitteeMsgPool: syncCommitteeMsgPool, attachedValidators: validatorPool, exitPool: exitPool, eth1Monitor: eth1Monitor, @@ -982,6 +984,8 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = # the database are synced with the filesystem. node.db.checkpoint() + node.syncCommitteeMsgPool[].clearPerSlotData() + # -1 is a more useful output than 18446744073709551615 as an indicator of # no future attestation/proposal known. template displayInt64(x: Slot): int64 = diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index 6bc4e5103..abad111ea 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -14,7 +14,7 @@ import # Nimble packages stew/[assign2, byteutils, objects], chronos, metrics, - chronicles, + chronicles, chronicles/timings, json_serialization/std/[options, sets, net], serialization/errors, eth/db/kvstore, eth/keys, eth/p2p/discoveryv5/[protocol, enr], @@ -24,7 +24,8 @@ import ../spec/[ eth2_merkleization, forks, helpers, network, signatures, state_transition], ../consensus_object_pools/[ - spec_cache, blockchain_dag, block_clearance, attestation_pool, exit_pool], + spec_cache, blockchain_dag, block_clearance, attestation_pool, exit_pool, + sync_committee_msg_pool], ../eth1/eth1_monitor, ../networking/eth2_network, ../sszdump, ../sync/sync_manager, @@ -38,15 +39,28 @@ const delayBuckets = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05, declareCounter beacon_attestations_sent, "Number of beacon chain attestations sent by this peer" + declareHistogram beacon_attestation_sent_delay, "Time(s) between slot start and attestation sent moment", buckets = delayBuckets + +declareCounter beacon_sync_committee_messages_sent, + "Number of sync committee messages sent by this peer" + +declareCounter beacon_sync_committee_contributions_sent, + "Number of sync committee contributions sent by this peer" + +declareHistogram beacon_sync_committee_message_sent_delay, + "Time(s) between slot start and sync committee message sent moment", + buckets = delayBuckets + declareCounter beacon_blocks_proposed, "Number of beacon chain blocks sent by this peer" declareGauge(attached_validator_balance, "Validator balance at slot end of the first 64 validators, in Gwei", labels = ["pubkey"]) + declarePublicGauge(attached_validator_balance_total, "Validator balance of all attached validators, in Gwei") @@ -181,6 +195,43 @@ proc sendAttestation*( result = $ok false +proc sendSyncCommitteeMessage*( + node: BeaconNode, msg: SyncCommitteeMessage, + committeeIdx: SyncCommitteeIndex, checkSignature: bool): Future[bool] {.async.} = + # Validate sync committee message before sending it via gossip + # validation will also register the message with the sync committee + # message pool. Notably, although libp2p calls the data handler for + # any subscription on the subnet topic, it does not perform validation. + let ok = node.processor.syncCommitteeMsgValidator( + msg, committeeIdx, checkSignature) + + return case ok + of ValidationResult.Accept: + node.network.broadcastSyncCommitteeMessage(msg, committeeIdx) + beacon_sync_committee_messages_sent.inc() + true + else: + notice "Produced sync committee message failed validation", + msg, result = $ok + false + +proc sendSyncCommitteeContribution*( + node: BeaconNode, + msg: SignedContributionAndProof, + checkSignature: bool): Future[bool] {.async.} = + let ok = node.processor.syncCommitteeContributionValidator( + msg, checkSignature) + + return case ok + of ValidationResult.Accept: + node.network.broadcastSignedContributionAndProof(msg) + beacon_sync_committee_contributions_sent.inc() + true + else: + notice "Produced sync committee contribution failed validation", + msg, result = $ok + false + proc createAndSendAttestation(node: BeaconNode, fork: Fork, genesis_validators_root: Eth2Digest, @@ -573,41 +624,38 @@ proc sendAggregatedAttestations( aggregationSlot proc updateValidatorMetrics*(node: BeaconNode) = - when defined(metrics): - # Technically, this only needs to be done on epoch transitions and if there's - # a reorg that spans an epoch transition, but it's easier to implement this - # way for now. + # Technically, this only needs to be done on epoch transitions and if there's + # a reorg that spans an epoch transition, but it's easier to implement this + # way for now. - # We'll limit labelled metrics to the first 64, so that we don't overload - # Prometheus. + # We'll limit labelled metrics to the first 64, so that we don't overload + # Prometheus. - var total: Gwei - var i = 0 - for _, v in node.attachedValidators[].validators: - let balance = - if v.index.isNone(): - 0.Gwei - elif v.index.get().uint64 >= - getStateField(node.dag.headState.data, balances).lenu64: - debug "Cannot get validator balance, index out of bounds", - pubkey = shortLog(v.pubkey), index = v.index.get(), - balances = getStateField(node.dag.headState.data, balances).len, - stateRoot = getStateRoot(node.dag.headState.data) - 0.Gwei - else: - getStateField(node.dag.headState.data, balances)[v.index.get()] + var total: Gwei + var i = 0 + for _, v in node.attachedValidators[].validators: + let balance = + if v.index.isNone(): + 0.Gwei + elif v.index.get().uint64 >= + getStateField(node.dag.headState.data, balances).lenu64: + debug "Cannot get validator balance, index out of bounds", + pubkey = shortLog(v.pubkey), index = v.index.get(), + balances = getStateField(node.dag.headState.data, balances).len, + stateRoot = getStateRoot(node.dag.headState.data) + 0.Gwei + else: + getStateField(node.dag.headState.data, balances)[v.index.get()] - if i < 64: - attached_validator_balance.set( - balance.toGaugeValue, labelValues = [shortLog(v.pubkey)]) + if i < 64: + attached_validator_balance.set( + balance.toGaugeValue, labelValues = [shortLog(v.pubkey)]) - inc i - total += balance + inc i + total += balance - node.attachedValidatorBalanceTotal = total - attached_validator_balance_total.set(total.toGaugeValue) - else: - discard + node.attachedValidatorBalanceTotal = total + attached_validator_balance_total.set(total.toGaugeValue) proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} = ## Perform validator duties - create blocks, vote and aggregate existing votes