val_mon: register locally produced aggregates (#3352)

These use a separate flow, and were previously only registered from the
network

* don't log successes in totals mode (TMI)
* remove `attestation-sent` event which is unused
This commit is contained in:
Jacek Sieka 2022-02-04 08:33:20 +01:00 committed by GitHub
parent 9c18765b3b
commit 49282e9477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 108 additions and 99 deletions

View File

@ -69,7 +69,6 @@ type
attachedValidatorBalanceTotal*: uint64
gossipState*: GossipState
beaconClock*: BeaconClock
onAttestationSent*: OnAttestationCallback
restKeysCache*: Table[ValidatorPubKey, ValidatorIndex]
validatorMonitor*: ref ValidatorMonitor
stateTtlCache*: StateTtlCache

View File

@ -360,7 +360,7 @@ proc aggregateValidator*(
wallTime)
self.validatorMonitor[].registerAggregate(
src, wallTime, signedAggregateAndProof, attesting_indices)
src, wallTime, signedAggregateAndProof.message, attesting_indices)
beacon_aggregates_received.inc()
beacon_aggregate_delay.observe(delay.toFloatSeconds())
@ -518,7 +518,7 @@ proc contributionValidator*(
contributionAndProof, v.get()[0])
self.validatorMonitor[].registerSyncContribution(
src, wallTime, contributionAndProof, v.get()[1])
src, wallTime, contributionAndProof.message, v.get()[1])
beacon_sync_committee_contributions_received.inc()

View File

@ -159,8 +159,6 @@ proc init*(T: type BeaconNode,
proc onAttestationReceived(data: Attestation) =
eventBus.emit("attestation-received", data)
proc onAttestationSent(data: Attestation) =
eventBus.emit("attestation-sent", data)
proc onVoluntaryExitAdded(data: SignedVoluntaryExit) =
eventBus.emit("voluntary-exit", data)
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
@ -552,7 +550,6 @@ proc init*(T: type BeaconNode,
consensusManager: consensusManager,
gossipState: {},
beaconClock: beaconClock,
onAttestationSent: onAttestationSent,
validatorMonitor: validatorMonitor,
stateTtlCache: stateTtlCache
)

View File

@ -222,8 +222,6 @@ proc sendAttestation*(
if res.isGoodForSending:
node.network.broadcastAttestation(subnet_id, attestation)
beacon_attestations_sent.inc()
if not(isNil(node.onAttestationSent)):
node.onAttestationSent(attestation)
ok()
else:
notice "Produced attestation failed validation",
@ -867,24 +865,26 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
await proposeBlock(node, validator, proposer.get(), head, slot)
proc makeAggregateAndProof*(
pool: var AttestationPool, epochRef: EpochRef, slot: Slot, index: CommitteeIndex,
validatorIndex: ValidatorIndex, slot_signature: ValidatorSig): Option[AggregateAndProof] =
doAssert validatorIndex in get_beacon_committee(epochRef, slot, index)
pool: var AttestationPool, epochRef: EpochRef, slot: Slot,
committee_index: CommitteeIndex,
validator_index: ValidatorIndex,
slot_signature: ValidatorSig): Opt[AggregateAndProof] =
doAssert validator_index in get_beacon_committee(epochRef, slot, committee_index)
# TODO for testing purposes, refactor this into the condition check
# and just calculation
# https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/phase0/validator.md#aggregation-selection
if not is_aggregator(epochRef, slot, index, slot_signature):
return none(AggregateAndProof)
if not is_aggregator(epochRef, slot, committee_index, slot_signature):
return err()
let maybe_slot_attestation = getAggregatedAttestation(pool, slot, index)
let maybe_slot_attestation = getAggregatedAttestation(pool, slot, committee_index)
if maybe_slot_attestation.isNone:
return none(AggregateAndProof)
return err()
# https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/phase0/validator.md#construct-aggregate
# https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/phase0/validator.md#aggregateandproof
some(AggregateAndProof(
aggregator_index: validatorIndex.uint64,
ok(AggregateAndProof(
aggregator_index: validator_index.uint64,
aggregate: maybe_slot_attestation.get,
selection_proof: slot_signature))
@ -929,43 +929,47 @@ proc sendAggregatedAttestations(
doAssert slotSigsData.len == slotSigs.len
for i in 0..<slotSigs.len:
let
slotSig = slotSigs[i].read()
data = slotSigsData[i]
if slotSig.isErr():
error "Unable to create slot signature using remote signer",
validator = shortLog(data.v),
slot, error_msg = slotSig.error()
continue
let aggregateAndProof =
makeAggregateAndProof(node.attestationPool[], epochRef, slot,
data.committee_index,
data.validator_index,
slotSig.get())
slotSig = slotSigs[i].read().valueOr:
error "Unable to create slot signature using remote signer",
validator = shortLog(data.v),
slot, error = error
continue
aggregateAndProof = makeAggregateAndProof(
node.attestationPool[], epochRef, slot, data.committee_index,
data.validator_index, slotSig).valueOr:
# Don't broadcast when, e.g., this validator isn't aggregator
continue
# Don't broadcast when, e.g., this node isn't aggregator
if aggregateAndProof.isSome:
let sig =
block:
let res = await signAggregateAndProof(data.v,
aggregateAndProof.get, fork, genesis_validators_root)
if res.isErr():
error "Unable to sign aggregated attestation using remote signer",
validator = shortLog(data.v), error_msg = res.error()
return
res.get()
var signedAP = SignedAggregateAndProof(
message: aggregateAndProof.get,
sig = block:
let res = await signAggregateAndProof(data.v,
aggregateAndProof, fork, genesis_validators_root)
if res.isErr():
error "Unable to sign aggregated attestation using remote signer",
validator = shortLog(data.v), error_msg = res.error()
return
res.get()
signedAP = SignedAggregateAndProof(
message: aggregateAndProof,
signature: sig)
node.network.broadcastAggregateAndProof(signedAP)
# The subnet on which the attestations (should have) arrived
let subnet_id = compute_subnet_for_attestation(
committees_per_slot, slot, data.committee_index)
notice "Aggregated attestation sent",
attestation = shortLog(signedAP.message.aggregate),
aggregator_index = signedAP.message.aggregator_index,
signature = shortLog(signedAP.signature),
validator = shortLog(data.v),
subnet_id
node.network.broadcastAggregateAndProof(signedAP)
# The subnet on which the attestations (should have) arrived
let subnet_id = compute_subnet_for_attestation(
committees_per_slot, slot, data.committee_index)
notice "Aggregated attestation sent",
aggregate = shortLog(signedAP.message.aggregate),
aggregator_index = signedAP.message.aggregator_index,
signature = shortLog(signedAP.signature),
validator = shortLog(data.v),
subnet_id
node.validatorMonitor[].registerAggregate(
MsgSource.api, node.beaconClock.now(), signedAP.message,
get_attesting_indices(
epochRef, slot,
data.committee_index,
aggregateAndProof.aggregate.aggregation_bits))
proc updateValidatorMetrics*(node: BeaconNode) =
# Technically, this only needs to be done on epoch transitions and if there's

View File

@ -441,13 +441,13 @@ proc registerEpochInfo*(
# These two metrics are the same - keep both around for LH compatibility
validator_monitor_prev_epoch_on_chain_attester_hit.inc(1, [metricId])
validator_monitor_prev_epoch_on_chain_source_attester_hit.inc(1, [metricId])
info "Previous epoch attestation included",
timely_source = previous_epoch_matched_source,
timely_target = previous_epoch_matched_target,
timely_head = previous_epoch_matched_head,
epoch = prev_epoch,
validator = id
if not self.totals:
info "Previous epoch attestation included",
timely_source = previous_epoch_matched_source,
timely_target = previous_epoch_matched_target,
timely_head = previous_epoch_matched_head,
epoch = prev_epoch,
validator = id
else:
validator_monitor_prev_epoch_on_chain_attester_miss.inc(1, [metricId])
validator_monitor_prev_epoch_on_chain_source_attester_miss.inc(1, [metricId])
@ -487,12 +487,12 @@ proc registerEpochInfo*(
if not self.totals:
validator_monitor_validator_in_current_sync_committee.set(1, [metricId])
self.withEpochSummary(monitor[], current_epoch):
info "Current epoch sync signatures",
included = epochSummary.sync_signature_block_inclusions,
expected = SLOTS_PER_EPOCH,
epoch = current_epoch,
validator = id
self.withEpochSummary(monitor[], current_epoch):
info "Current epoch sync signatures",
included = epochSummary.sync_signature_block_inclusions,
expected = SLOTS_PER_EPOCH,
epoch = current_epoch,
validator = id
in_current_sync_committee += 1
else:
@ -506,10 +506,10 @@ proc registerEpochInfo*(
if not self.totals:
validator_monitor_validator_in_next_sync_committee.set(1, [metricId])
self.withEpochSummary(monitor[], current_epoch):
info "Validator in next sync committee",
epoch = current_epoch,
validator = id
in_next_sync_committee += 1
else:
@ -630,9 +630,10 @@ proc registerAttestation*(
validator_monitor_unaggregated_attestation_delay_seconds.observe(
delay.toGaugeValue(), [$src, metricId])
info "Attestation seen",
attestation = shortLog(attestation),
src, epoch = slot.epoch, validator = id
if not self.totals:
info "Attestation seen",
attestation = shortLog(attestation),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.attestations += 1
@ -642,12 +643,12 @@ proc registerAggregate*(
self: var ValidatorMonitor,
src: MsgSource,
seen_timestamp: BeaconTime,
signed_aggregate_and_proof: SignedAggregateAndProof,
aggregate_and_proof: AggregateAndProof,
attesting_indices: openArray[ValidatorIndex]) =
let
slot = signed_aggregate_and_proof.message.aggregate.data.slot
slot = aggregate_and_proof.aggregate.data.slot
delay = seen_timestamp - slot.aggregate_deadline()
aggregator_index = signed_aggregate_and_proof.message.aggregator_index
aggregator_index = aggregate_and_proof.aggregator_index
self.withMonitor(aggregator_index):
let id = monitor.id
@ -655,9 +656,10 @@ proc registerAggregate*(
validator_monitor_aggregated_attestation_delay_seconds.observe(
delay.toGaugeValue(), [$src, metricId])
info "Aggregated attestion seen",
aggregate = shortLog(signed_aggregate_and_proof.message.aggregate),
src, epoch = slot.epoch, validator = id
if not self.totals:
info "Aggregated attestion seen",
aggregate = shortLog(aggregate_and_proof.aggregate),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.aggregates += 1
@ -670,9 +672,10 @@ proc registerAggregate*(
validator_monitor_attestation_in_aggregate_delay_seconds.observe(
delay.toGaugeValue(), [$src, metricId])
info "Attestation included in aggregate",
aggregate = shortLog(signed_aggregate_and_proof.message.aggregate),
src, epoch = slot.epoch, validator = id
if not self.totals:
info "Attestation included in aggregate",
aggregate = shortLog(aggregate_and_proof.aggregate),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.attestation_aggregate_inclusions += 1
@ -694,11 +697,12 @@ proc registerAttestationInBlock*(
validator_monitor_attestation_in_block_delay_slots.set(
inclusion_lag.int64, ["block", metricId])
info "Attestation included in block",
attestation_data = shortLog(data),
block_slot = blck.slot,
inclusion_lag_slots = inclusion_lag,
epoch = epoch, validator = id
if not self.totals:
info "Attestation included in block",
attestation_data = shortLog(data),
block_slot = blck.slot,
inclusion_lag_slots = inclusion_lag,
epoch = epoch, validator = id
self.withEpochSummary(monitor, epoch):
epochSummary.attestation_block_inclusions += 1
@ -720,8 +724,9 @@ proc registerBeaconBlock*(
validator_monitor_beacon_block_delay_seconds.observe(
delay.toGaugeValue(), [$src, metricId])
info "Block seen",
blck = shortLog(blck), src, epoch = slot.epoch, validator = id
if not self.totals:
info "Block seen",
blck = shortLog(blck), src, epoch = slot.epoch, validator = id
proc registerSyncCommitteeMessage*(
self: var ValidatorMonitor,
@ -738,9 +743,10 @@ proc registerSyncCommitteeMessage*(
validator_monitor_sync_committee_messages_delay_seconds.observe(
delay.toGaugeValue(), [$src, metricId])
info "Sync committee message seen",
syncCommitteeMessage = shortLog(sync_committee_message.beacon_block_root),
src, epoch = slot.epoch, validator = id
if not self.totals:
info "Sync committee message seen",
syncCommitteeMessage = shortLog(sync_committee_message.beacon_block_root),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.sync_committee_messages += 1
@ -750,23 +756,24 @@ proc registerSyncContribution*(
self: var ValidatorMonitor,
src: MsgSource,
seen_timestamp: BeaconTime,
sync_contribution: SignedContributionAndProof,
contribution_and_proof: ContributionAndProof,
participants: openArray[ValidatorIndex]) =
let
slot = sync_contribution.message.contribution.slot
beacon_block_root = sync_contribution.message.contribution.beacon_block_root
slot = contribution_and_proof.contribution.slot
beacon_block_root = contribution_and_proof.contribution.beacon_block_root
delay = seen_timestamp - slot.sync_contribution_deadline()
let aggregator_index = sync_contribution.message.aggregator_index
let aggregator_index = contribution_and_proof.aggregator_index
self.withMonitor(aggregator_index):
let id = monitor.id
validator_monitor_sync_contributions.inc(1, [$src, metricId])
validator_monitor_sync_contributions_delay_seconds.observe(
delay.toGaugeValue(), [$src, metricId])
info "Sync contribution seen",
contribution = shortLog(sync_contribution.message.contribution),
src, epoch = slot.epoch, validator = id
if not self.totals:
info "Sync contribution seen",
contribution = shortLog(contribution_and_proof.contribution),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.sync_contributions += 1
@ -777,9 +784,10 @@ proc registerSyncContribution*(
let id = monitor.id
validator_monitor_sync_committee_message_in_contribution.inc(1, [$src, metricId])
info "Sync signature included in contribution",
contribution = shortLog(sync_contribution.message.contribution),
src, epoch = slot.epoch, validator = id
if not self.totals:
info "Sync signature included in contribution",
contribution = shortLog(contribution_and_proof.contribution),
src, epoch = slot.epoch, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.sync_signature_contribution_inclusions += 1
@ -791,8 +799,9 @@ proc registerSyncAggregateInBlock*(
let id = monitor.id
validator_monitor_sync_committee_message_in_block.inc(1, ["block", metricId])
info "Sync signature included in block",
head = beacon_block_root, slot = slot, validator = id
if not self.totals:
info "Sync signature included in block",
head = beacon_block_root, slot = slot, validator = id
self.withEpochSummary(monitor, slot.epoch):
epochSummary.sync_signature_block_inclusions += 1