introduce SyncCommitteeMsgPool to eth2_processor and nimbus_beacon_node (#2831)

This commit is contained in:
tersec 2021-08-28 22:27:51 +00:00 committed by GitHub
parent 166e22a43b
commit 0418fbada2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 145 additions and 34 deletions

View File

@ -49,6 +49,7 @@ type
dag*: ChainDAGRef
quarantine*: QuarantineRef
attestationPool*: ref AttestationPool
syncCommitteeMsgPool*: ref SyncCommitteeMsgPool
exitPool*: ref ExitPool
eth1Monitor*: Eth1Monitor
rpcServer*: RpcServer

View File

@ -66,6 +66,7 @@ type
dag*: ChainDAGRef
attestationPool*: ref AttestationPool
validatorPool: ref ValidatorPool
syncCommitteeMsgPool: SyncCommitteeMsgPoolRef
doppelgangerDetection*: DoppelgangerProtection
@ -98,6 +99,7 @@ proc new*(T: type Eth2Processor,
attestationPool: ref AttestationPool,
exitPool: ref ExitPool,
validatorPool: ref ValidatorPool,
syncCommitteeMsgPool: SyncCommitteeMsgPoolRef,
quarantine: QuarantineRef,
rng: ref BrHmacDrbgContext,
getBeaconTime: GetBeaconTimeFn): ref Eth2Processor =
@ -110,6 +112,7 @@ proc new*(T: type Eth2Processor,
attestationPool: attestationPool,
exitPool: exitPool,
validatorPool: validatorPool,
syncCommitteeMsgPool: syncCommitteeMsgPool,
quarantine: quarantine,
getCurrentBeaconTime: getBeaconTime,
batchCrypto: BatchCrypto.new(
@ -342,3 +345,58 @@ proc voluntaryExitValidator*(
beacon_voluntary_exits_received.inc()
ValidationResult.Accept
proc syncCommitteeMsgValidator*(
self: ref Eth2Processor,
syncCommitteeMsg: SyncCommitteeMessage,
committeeIdx: SyncCommitteeIndex,
checkSignature: bool = true): ValidationResult =
logScope:
syncCommitteeMsg = shortLog(syncCommitteeMsg)
committeeIdx
let wallTime = self.getCurrentBeaconTime()
# Potential under/overflows are fine; would just create odd metrics and logs
let delay = wallTime - syncCommitteeMsg.slot.toBeaconTime
debug "Sync committee message received", delay
# Now proceed to validation
let v = validateSyncCommitteeMessage(self.dag, self.syncCommitteeMsgPool,
syncCommitteeMsg, committeeIdx, wallTime,
checkSignature)
if v.isErr():
debug "Dropping sync committee message", validationError = v.error
return v.error[0]
trace "Sync committee message validated"
ValidationResult.Accept
proc syncCommitteeContributionValidator*(
self: ref Eth2Processor,
contributionAndProof: SignedContributionAndProof,
checkSignature: bool = true): ValidationResult =
logScope:
contributionAndProof = shortLog(contributionAndProof.message.contribution)
signature = shortLog(contributionAndProof.signature)
aggregator_index = contributionAndProof.message.aggregator_index
let wallTime = self.getCurrentBeaconTime()
# Potential under/overflows are fine; would just create odd metrics and logs
let delay = wallTime - contributionAndProof.message.contribution.slot.toBeaconTime
debug "Contribution received", delay
# Now proceed to validation
let v = validateSignedContributionAndProof(self.dag, self.syncCommitteeMsgPool,
contributionAndProof, wallTime,
checkSignature)
if v.isErr():
let (_, wallSlot) = wallTime.toSlot()
debug "Dropping contribution",
validationError = v.error,
selection_proof = contributionAndProof.message.selection_proof,
wallSlot
return v.error[0]
ValidationResult.Accept

View File

@ -40,7 +40,7 @@ import
validator],
./consensus_object_pools/[
blockchain_dag, block_quarantine, block_clearance, block_pools_types,
attestation_pool, exit_pool, spec_cache],
attestation_pool, sync_committee_msg_pool, exit_pool, spec_cache],
./eth1/eth1_monitor
from eth/common/eth_types import BlockHashOrNumber
@ -314,6 +314,7 @@ proc init*(T: type BeaconNode,
rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime,
getStateField(dag.headState.data, genesis_validators_root))
attestationPool = newClone(AttestationPool.init(dag, quarantine))
syncCommitteeMsgPool = newClone(SyncCommitteeMsgPool.init())
exitPool = newClone(ExitPool.init(dag))
case config.slashingDbKind
@ -344,7 +345,7 @@ proc init*(T: type BeaconNode,
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, dag, attestationPool, exitPool, validatorPool,
quarantine, rng, getBeaconTime)
syncCommitteeMsgPool, quarantine, rng, getBeaconTime)
var node = BeaconNode(
nickname: nickname,
@ -358,6 +359,7 @@ proc init*(T: type BeaconNode,
gossipState: GossipState.Disconnected,
quarantine: quarantine,
attestationPool: attestationPool,
syncCommitteeMsgPool: syncCommitteeMsgPool,
attachedValidators: validatorPool,
exitPool: exitPool,
eth1Monitor: eth1Monitor,
@ -982,6 +984,8 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# the database are synced with the filesystem.
node.db.checkpoint()
node.syncCommitteeMsgPool[].clearPerSlotData()
# -1 is a more useful output than 18446744073709551615 as an indicator of
# no future attestation/proposal known.
template displayInt64(x: Slot): int64 =

View File

@ -14,7 +14,7 @@ import
# Nimble packages
stew/[assign2, byteutils, objects],
chronos, metrics,
chronicles,
chronicles, chronicles/timings,
json_serialization/std/[options, sets, net], serialization/errors,
eth/db/kvstore,
eth/keys, eth/p2p/discoveryv5/[protocol, enr],
@ -24,7 +24,8 @@ import
../spec/[
eth2_merkleization, forks, helpers, network, signatures, state_transition],
../consensus_object_pools/[
spec_cache, blockchain_dag, block_clearance, attestation_pool, exit_pool],
spec_cache, blockchain_dag, block_clearance, attestation_pool, exit_pool,
sync_committee_msg_pool],
../eth1/eth1_monitor,
../networking/eth2_network,
../sszdump, ../sync/sync_manager,
@ -38,15 +39,28 @@ const delayBuckets = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05,
declareCounter beacon_attestations_sent,
"Number of beacon chain attestations sent by this peer"
declareHistogram beacon_attestation_sent_delay,
"Time(s) between slot start and attestation sent moment",
buckets = delayBuckets
declareCounter beacon_sync_committee_messages_sent,
"Number of sync committee messages sent by this peer"
declareCounter beacon_sync_committee_contributions_sent,
"Number of sync committee contributions sent by this peer"
declareHistogram beacon_sync_committee_message_sent_delay,
"Time(s) between slot start and sync committee message sent moment",
buckets = delayBuckets
declareCounter beacon_blocks_proposed,
"Number of beacon chain blocks sent by this peer"
declareGauge(attached_validator_balance,
"Validator balance at slot end of the first 64 validators, in Gwei",
labels = ["pubkey"])
declarePublicGauge(attached_validator_balance_total,
"Validator balance of all attached validators, in Gwei")
@ -181,6 +195,43 @@ proc sendAttestation*(
result = $ok
false
proc sendSyncCommitteeMessage*(
node: BeaconNode, msg: SyncCommitteeMessage,
committeeIdx: SyncCommitteeIndex, checkSignature: bool): Future[bool] {.async.} =
# Validate sync committee message before sending it via gossip
# validation will also register the message with the sync committee
# message pool. Notably, although libp2p calls the data handler for
# any subscription on the subnet topic, it does not perform validation.
let ok = node.processor.syncCommitteeMsgValidator(
msg, committeeIdx, checkSignature)
return case ok
of ValidationResult.Accept:
node.network.broadcastSyncCommitteeMessage(msg, committeeIdx)
beacon_sync_committee_messages_sent.inc()
true
else:
notice "Produced sync committee message failed validation",
msg, result = $ok
false
proc sendSyncCommitteeContribution*(
node: BeaconNode,
msg: SignedContributionAndProof,
checkSignature: bool): Future[bool] {.async.} =
let ok = node.processor.syncCommitteeContributionValidator(
msg, checkSignature)
return case ok
of ValidationResult.Accept:
node.network.broadcastSignedContributionAndProof(msg)
beacon_sync_committee_contributions_sent.inc()
true
else:
notice "Produced sync committee contribution failed validation",
msg, result = $ok
false
proc createAndSendAttestation(node: BeaconNode,
fork: Fork,
genesis_validators_root: Eth2Digest,
@ -573,41 +624,38 @@ proc sendAggregatedAttestations(
aggregationSlot
proc updateValidatorMetrics*(node: BeaconNode) =
when defined(metrics):
# Technically, this only needs to be done on epoch transitions and if there's
# a reorg that spans an epoch transition, but it's easier to implement this
# way for now.
# Technically, this only needs to be done on epoch transitions and if there's
# a reorg that spans an epoch transition, but it's easier to implement this
# way for now.
# We'll limit labelled metrics to the first 64, so that we don't overload
# Prometheus.
# We'll limit labelled metrics to the first 64, so that we don't overload
# Prometheus.
var total: Gwei
var i = 0
for _, v in node.attachedValidators[].validators:
let balance =
if v.index.isNone():
0.Gwei
elif v.index.get().uint64 >=
getStateField(node.dag.headState.data, balances).lenu64:
debug "Cannot get validator balance, index out of bounds",
pubkey = shortLog(v.pubkey), index = v.index.get(),
balances = getStateField(node.dag.headState.data, balances).len,
stateRoot = getStateRoot(node.dag.headState.data)
0.Gwei
else:
getStateField(node.dag.headState.data, balances)[v.index.get()]
var total: Gwei
var i = 0
for _, v in node.attachedValidators[].validators:
let balance =
if v.index.isNone():
0.Gwei
elif v.index.get().uint64 >=
getStateField(node.dag.headState.data, balances).lenu64:
debug "Cannot get validator balance, index out of bounds",
pubkey = shortLog(v.pubkey), index = v.index.get(),
balances = getStateField(node.dag.headState.data, balances).len,
stateRoot = getStateRoot(node.dag.headState.data)
0.Gwei
else:
getStateField(node.dag.headState.data, balances)[v.index.get()]
if i < 64:
attached_validator_balance.set(
balance.toGaugeValue, labelValues = [shortLog(v.pubkey)])
if i < 64:
attached_validator_balance.set(
balance.toGaugeValue, labelValues = [shortLog(v.pubkey)])
inc i
total += balance
inc i
total += balance
node.attachedValidatorBalanceTotal = total
attached_validator_balance_total.set(total.toGaugeValue)
else:
discard
node.attachedValidatorBalanceTotal = total
attached_validator_balance_total.set(total.toGaugeValue)
proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
## Perform validator duties - create blocks, vote and aggregate existing votes