print attestation/aggregate drop notice once per slot (#2475)
* add metrics for queue-related drops * avoid importing beacon node conf in processor
This commit is contained in:
parent
adec9d878e
commit
10d99c166c
|
@ -17,7 +17,7 @@ import
|
|||
./batch_validation,
|
||||
../validators/validator_pool,
|
||||
../beacon_node_types,
|
||||
../beacon_clock, ../conf, ../ssz/sszdump
|
||||
../beacon_clock, ../ssz/sszdump
|
||||
|
||||
# Metrics for tracking attestation and beacon block loss
|
||||
declareCounter beacon_attestations_received,
|
||||
|
@ -52,7 +52,7 @@ declareHistogram beacon_store_block_duration_seconds,
|
|||
|
||||
type
|
||||
Eth2Processor* = object
|
||||
config*: BeaconNodeConf
|
||||
doppelGangerDetectionEnabled*: bool
|
||||
getWallTime*: GetWallTimeFn
|
||||
|
||||
# Local sources of truth for validation
|
||||
|
@ -83,7 +83,7 @@ type
|
|||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc new*(T: type Eth2Processor,
|
||||
config: BeaconNodeConf,
|
||||
doppelGangerDetectionEnabled: bool,
|
||||
verifQueues: ref VerifQueueManager,
|
||||
chainDag: ChainDAGRef,
|
||||
attestationPool: ref AttestationPool,
|
||||
|
@ -93,7 +93,7 @@ proc new*(T: type Eth2Processor,
|
|||
rng: ref BrHmacDrbgContext,
|
||||
getWallTime: GetWallTimeFn): ref Eth2Processor =
|
||||
(ref Eth2Processor)(
|
||||
config: config,
|
||||
doppelGangerDetectionEnabled: doppelGangerDetectionEnabled,
|
||||
getWallTime: getWallTime,
|
||||
verifQueues: verifQueues,
|
||||
chainDag: chainDag,
|
||||
|
@ -182,7 +182,7 @@ proc checkForPotentialDoppelganger(
|
|||
validatorPubkey,
|
||||
attestationSlot = attestationData.slot
|
||||
doppelganger_detection_activated.inc()
|
||||
if self.config.doppelgangerDetection:
|
||||
if self.doppelgangerDetectionEnabled:
|
||||
warn "We believe you are currently running another instance of the same validator. We've disconnected you from the network as this presents a significant slashing risk. Possible next steps are (a) making sure you've disconnected your validator from your old machine before restarting the client; and (b) running the client again with the gossip-slashing-protection option disabled, only if you are absolutely sure this is the only instance of your validator running, and reporting the issue at https://github.com/status-im/nimbus-eth2/issues."
|
||||
quit QuitFailure
|
||||
|
||||
|
@ -277,8 +277,6 @@ proc aggregateValidator*(
|
|||
|
||||
return ValidationResult.Accept
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
proc attesterSlashingValidator*(
|
||||
self: var Eth2Processor, attesterSlashing: AttesterSlashing):
|
||||
ValidationResult =
|
||||
|
@ -323,5 +321,3 @@ proc voluntaryExitValidator*(
|
|||
beacon_voluntary_exits_received.inc()
|
||||
|
||||
ValidationResult.Accept
|
||||
|
||||
{.pop.}
|
||||
|
|
|
@ -12,8 +12,8 @@ import
|
|||
../spec/[crypto, datatypes, digest],
|
||||
../consensus_object_pools/[block_clearance, blockchain_dag, attestation_pool],
|
||||
./consensus_manager,
|
||||
../beacon_node_types,
|
||||
../beacon_clock, ../conf, ../ssz/sszdump
|
||||
".."/[beacon_clock, beacon_node_types],
|
||||
../ssz/sszdump
|
||||
|
||||
# Gossip Queue Manager
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -22,6 +22,12 @@ import
|
|||
declareHistogram beacon_store_block_duration_seconds,
|
||||
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
|
||||
|
||||
declareCounter beacon_attestations_dropped_queue_full,
|
||||
"Number of attestations dropped because queue is full"
|
||||
|
||||
declareCounter beacon_aggregates_dropped_queue_full,
|
||||
"Number of aggregates dropped because queue is full"
|
||||
|
||||
type
|
||||
SyncBlock* = object
|
||||
blk*: SignedBeaconBlock
|
||||
|
@ -72,7 +78,11 @@ type
|
|||
# is there a point to separate
|
||||
# attestations & aggregates here?
|
||||
attestationsQueue: AsyncQueue[AttestationEntry]
|
||||
attestationsDropped: int
|
||||
attestationsDropTime: tuple[afterGenesis: bool, slot: Slot]
|
||||
aggregatesQueue: AsyncQueue[AggregateEntry]
|
||||
aggregatesDropped: int
|
||||
aggregatesDropTime: tuple[afterGenesis: bool, slot: Slot]
|
||||
|
||||
# Consumer
|
||||
# ----------------------------------------------------------------
|
||||
|
@ -85,13 +95,14 @@ type
|
|||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc new*(T: type VerifQueueManager,
|
||||
conf: BeaconNodeConf,
|
||||
dumpEnabled: bool,
|
||||
dumpDirInvalid, dumpDirIncoming: string,
|
||||
consensusManager: ref ConsensusManager,
|
||||
getWallTime: GetWallTimeFn): ref VerifQueueManager =
|
||||
(ref VerifQueueManager)(
|
||||
dumpEnabled: conf.dumpEnabled,
|
||||
dumpDirInvalid: conf.dumpDirInvalid,
|
||||
dumpDirIncoming: conf.dumpDirIncoming,
|
||||
dumpEnabled: dumpEnabled,
|
||||
dumpDirInvalid: dumpDirInvalid,
|
||||
dumpDirIncoming: dumpDirIncoming,
|
||||
|
||||
getWallTime: getWallTime,
|
||||
|
||||
|
@ -107,7 +118,9 @@ proc new*(T: type VerifQueueManager,
|
|||
attestationsQueue: newAsyncQueue[AttestationEntry](
|
||||
(TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int),
|
||||
|
||||
consensusManager: consensusManager
|
||||
consensusManager: consensusManager,
|
||||
attestationsDropTime: getWallTime().toSlot(),
|
||||
aggregatesDropTime: getWallTime().toSlot(),
|
||||
)
|
||||
|
||||
# Sync callbacks
|
||||
|
@ -154,13 +167,22 @@ proc addAttestation*(self: var VerifQueueManager, att: Attestation, att_indices:
|
|||
# Producer:
|
||||
# - Gossip (when synced)
|
||||
while self.attestationsQueue.full():
|
||||
self.attestationsDropped += 1
|
||||
beacon_attestations_dropped_queue_full.inc()
|
||||
|
||||
try:
|
||||
notice "Queue full, dropping oldest attestation",
|
||||
dropped = shortLog(self.attestationsQueue[0].v)
|
||||
discard self.attestationsQueue.popFirstNoWait()
|
||||
except AsyncQueueEmptyError as exc:
|
||||
raiseAssert "If queue is full, we have at least one item! " & exc.msg
|
||||
|
||||
if self.attestationsDropped > 0:
|
||||
let now = self.getWallTime().toSlot() # Print notice once per slot
|
||||
if now != self.attestationsDropTime:
|
||||
notice "Queue full, attestations dropped",
|
||||
count = self.attestationsDropped
|
||||
self.attestationsDropTime = now
|
||||
self.attestationsDropped = 0
|
||||
|
||||
try:
|
||||
self.attestationsQueue.addLastNoWait(
|
||||
AttestationEntry(v: att, attesting_indices: att_indices))
|
||||
|
@ -175,13 +197,22 @@ proc addAggregate*(self: var VerifQueueManager, agg: SignedAggregateAndProof, at
|
|||
# - Gossip (when synced)
|
||||
|
||||
while self.aggregatesQueue.full():
|
||||
self.aggregatesDropped += 1
|
||||
beacon_aggregates_dropped_queue_full.inc()
|
||||
|
||||
try:
|
||||
notice "Queue full, dropping oldest aggregate",
|
||||
dropped = shortLog(self.aggregatesQueue[0].v)
|
||||
discard self.aggregatesQueue.popFirstNoWait()
|
||||
except AsyncQueueEmptyError as exc:
|
||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||
|
||||
if self.aggregatesDropped > 0:
|
||||
let now = self.getWallTime().toSlot() # Print notice once per slot
|
||||
if now != self.aggregatesDropTime:
|
||||
notice "Queue full, aggregates dropped",
|
||||
count = self.aggregatesDropped
|
||||
self.aggregatesDropTime = now
|
||||
self.aggregatesDropped = 0
|
||||
|
||||
try:
|
||||
self.aggregatesQueue.addLastNoWait(AggregateEntry(
|
||||
v: agg.message.aggregate,
|
||||
|
|
|
@ -332,10 +332,11 @@ proc init*(T: type BeaconNode,
|
|||
chainDag, attestationPool, quarantine
|
||||
)
|
||||
verifQueues = VerifQueueManager.new(
|
||||
config, consensusManager,
|
||||
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
|
||||
consensusManager,
|
||||
proc(): BeaconTime = beaconClock.now())
|
||||
processor = Eth2Processor.new(
|
||||
config,
|
||||
config.doppelgangerDetection,
|
||||
verifQueues,
|
||||
chainDag, attestationPool, exitPool, validatorPool,
|
||||
quarantine,
|
||||
|
|
Loading…
Reference in New Issue