2021-01-11 21:27:01 +00:00
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{. push raises : [ Defect ] . }
2020-08-20 16:30:47 +00:00
import
std / [ math , tables ] ,
stew / results ,
2021-01-11 21:27:01 +00:00
chronicles , chronos , metrics ,
2021-03-05 13:12:00 +00:00
.. / spec / [ crypto , datatypes , digest ] ,
.. / consensus_object_pools / [ block_clearance , blockchain_dag , exit_pool , attestation_pool ] ,
2021-03-06 07:32:55 +00:00
. / gossip_validation ,
2021-03-05 13:12:00 +00:00
.. / validators / validator_pool ,
.. / beacon_node_types ,
.. / beacon_clock , .. / conf , .. / ssz / sszdump
2020-08-20 16:30:47 +00:00
# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received ,
" Number of beacon chain attestations received by this peer "
declareCounter beacon_aggregates_received ,
" Number of beacon chain aggregate attestations received by this peer "
declareCounter beacon_blocks_received ,
" Number of beacon chain blocks received by this peer "
2020-09-14 14:26:31 +00:00
declareCounter beacon_attester_slashings_received ,
" Number of beacon chain attester slashings received by this peer "
declareCounter beacon_proposer_slashings_received ,
" Number of beacon chain proposer slashings received by this peer "
declareCounter beacon_voluntary_exits_received ,
" Number of beacon chain voluntary exits received by this peer "
2020-08-20 16:30:47 +00:00
2021-02-03 17:11:42 +00:00
declareCounter doppelganger_detection_activated ,
" Number of times doppelganger detection was activated "
2020-10-27 17:21:35 +00:00
2020-08-20 16:30:47 +00:00
const delayBuckets = [ 2 .0 , 4 .0 , 6 .0 , 8 .0 , 10 .0 , 12 .0 , 14 .0 , Inf ]
declareHistogram beacon_attestation_delay ,
" Time(s) between slot start and attestation reception " , buckets = delayBuckets
declareHistogram beacon_aggregate_delay ,
" Time(s) between slot start and aggregate reception " , buckets = delayBuckets
declareHistogram beacon_block_delay ,
" Time(s) between slot start and beacon block reception " , buckets = delayBuckets
declareHistogram beacon_store_block_duration_seconds ,
" storeBlock() duration " , buckets = [ 0 .25 , 0 .5 , 1 , 2 , 4 , 8 , Inf ]
type
GetWallTimeFn * = proc ( ) : BeaconTime {. gcsafe , raises : [ Defect ] . }
SyncBlock * = object
blk * : SignedBeaconBlock
resfut * : Future [ Result [ void , BlockError ] ]
2020-08-27 07:34:12 +00:00
BlockEntry * = object
v * : SyncBlock
AttestationEntry * = object
v * : Attestation
2021-02-08 07:27:30 +00:00
attesting_indices * : seq [ ValidatorIndex ]
2020-08-27 07:34:12 +00:00
AggregateEntry * = AttestationEntry
2020-08-20 16:30:47 +00:00
Eth2Processor * = object
config * : BeaconNodeConf
getWallTime * : GetWallTimeFn
chainDag * : ChainDAGRef
attestationPool * : ref AttestationPool
2020-09-14 14:26:31 +00:00
exitPool : ref ExitPool
2020-10-27 17:21:35 +00:00
validatorPool : ref ValidatorPool
2020-08-20 16:30:47 +00:00
quarantine * : QuarantineRef
2021-03-01 16:36:06 +00:00
expectedSlot : Slot
expectedBlockReceived : Future [ bool ]
2020-08-20 16:30:47 +00:00
blocksQueue * : AsyncQueue [ BlockEntry ]
attestationsQueue * : AsyncQueue [ AttestationEntry ]
aggregatesQueue * : AsyncQueue [ AggregateEntry ]
2021-02-01 11:18:16 +00:00
doppelgangerDetection * : DoppelgangerProtection
2020-10-27 17:21:35 +00:00
2021-03-01 16:36:06 +00:00
proc checkExpectedBlock ( self : var Eth2Processor ) =
if self . expectedBlockReceived = = nil :
return
if self . chainDag . head . slot < self . expectedSlot :
return
self . expectedBlockReceived . complete ( true )
self . expectedBlockReceived = nil # Don't keep completed futures around!
proc expectBlock * ( self : var Eth2Processor , expectedSlot : Slot ) : Future [ bool ] =
## Return a future that will complete when a head is selected whose slot is
## equal or greater than the given slot, or a new expectation is created
if self . expectedBlockReceived ! = nil :
# Reset the old future to not leave it hanging.. an alternative would be to
# cancel it, but it doesn't make any practical difference for now
self . expectedBlockReceived . complete ( false )
let fut = newFuture [ bool ] ( " Eth2Processor.expectBlock " )
self . expectedSlot = expectedSlot
self . expectedBlockReceived = fut
# It might happen that by the time we're expecting a block, it might have
# already been processed!
self . checkExpectedBlock ( )
return fut
2020-10-28 07:55:36 +00:00
proc updateHead * ( self : var Eth2Processor , wallSlot : Slot ) =
2021-03-09 14:36:17 +00:00
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# TODO: DAG & fork choice procs are unrelated to gossip validation
2020-08-20 16:30:47 +00:00
# Grab the new head according to our latest attestation data
let newHead = self . attestationPool [ ] . selectHead ( wallSlot )
2020-08-26 15:23:34 +00:00
if newHead . isNil ( ) :
2020-10-28 07:55:36 +00:00
warn " Head selection failed, using previous head " ,
head = shortLog ( self . chainDag . head ) , wallSlot
return
2020-08-20 16:30:47 +00:00
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
2020-08-31 09:00:38 +00:00
self . chainDag . updateHead ( newHead , self . quarantine )
2020-08-20 16:30:47 +00:00
2021-03-01 16:36:06 +00:00
self . checkExpectedBlock ( )
2021-03-09 14:36:17 +00:00
proc pruneStateCachesAndForkChoice * ( self : var Eth2Processor ) =
## Prune unneeded and invalidated data after finalization
## - the DAG state checkpoints
## - the DAG EpochRef
## - the attestation pool/fork choice
# TODO: DAG & fork choice procs are unrelated to gossip validation
# Cleanup DAG & fork choice if we have a finalized head
if self . chainDag . needStateCachesAndForkChoicePruning ( ) :
self . chainDag . pruneStateCachesDAG ( )
self . attestationPool [ ] . prune ( )
2020-08-20 16:30:47 +00:00
proc dumpBlock [ T ] (
self : Eth2Processor , signedBlock : SignedBeaconBlock ,
2020-09-18 11:53:09 +00:00
res : Result [ T , ( ValidationResult , BlockError ) ] ) =
2020-08-20 16:30:47 +00:00
if self . config . dumpEnabled and res . isErr :
2020-09-18 11:53:09 +00:00
case res . error [ 1 ]
2020-08-20 16:30:47 +00:00
of Invalid :
dump (
self . config . dumpDirInvalid , signedBlock )
of MissingParent :
dump (
self . config . dumpDirIncoming , signedBlock )
else :
discard
proc done * ( blk : SyncBlock ) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has passed
## verification successfully.
if blk . resfut ! = nil :
blk . resfut . complete ( Result [ void , BlockError ] . ok ( ) )
proc fail * ( blk : SyncBlock , error : BlockError ) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
## verification with specific ``error``.
if blk . resfut ! = nil :
blk . resfut . complete ( Result [ void , BlockError ] . err ( error ) )
2020-11-20 10:00:22 +00:00
proc complete * ( blk : SyncBlock , res : Result [ void , BlockError ] ) =
2020-08-20 16:30:47 +00:00
## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk``
## verification.
if blk . resfut ! = nil :
blk . resfut . complete ( res )
proc storeBlock (
self : var Eth2Processor , signedBlock : SignedBeaconBlock ,
wallSlot : Slot ) : Result [ void , BlockError ] =
let
start = Moment . now ( )
attestationPool = self . attestationPool
2020-09-14 14:50:03 +00:00
let blck = self . chainDag . addRawBlock ( self . quarantine , signedBlock ) do (
2021-01-25 18:45:48 +00:00
blckRef : BlockRef , trustedBlock : TrustedSignedBeaconBlock ,
2020-09-14 14:50:03 +00:00
epochRef : EpochRef , state : HashedBeaconState ) :
# Callback add to fork choice if valid
attestationPool [ ] . addForkChoice (
2021-01-25 18:45:48 +00:00
epochRef , blckRef , trustedBlock . message , wallSlot )
2020-08-20 16:30:47 +00:00
self . dumpBlock ( signedBlock , blck )
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck . isErr :
2020-09-18 11:53:09 +00:00
return err ( blck . error [ 1 ] )
2020-08-20 16:30:47 +00:00
2020-11-11 13:39:36 +00:00
let duration = ( Moment . now ( ) - start ) . toFloatSeconds ( )
beacon_store_block_duration_seconds . observe ( duration )
2020-11-16 09:44:18 +00:00
ok ( )
2020-08-20 16:30:47 +00:00
proc processAttestation (
self : var Eth2Processor , entry : AttestationEntry ) =
logScope :
signature = shortLog ( entry . v . signature )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
error " Processing attestation before genesis, clock turned back? "
quit 1
2020-08-27 07:34:12 +00:00
trace " Processing attestation "
self . attestationPool [ ] . addAttestation (
entry . v , entry . attesting_indices , wallSlot )
2020-08-20 16:30:47 +00:00
proc processAggregate (
self : var Eth2Processor , entry : AggregateEntry ) =
logScope :
signature = shortLog ( entry . v . signature )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
error " Processing aggregate before genesis, clock turned back? "
quit 1
2020-08-27 07:34:12 +00:00
trace " Processing aggregate "
self . attestationPool [ ] . addAttestation (
entry . v , entry . attesting_indices , wallSlot )
2020-08-20 16:30:47 +00:00
proc processBlock ( self : var Eth2Processor , entry : BlockEntry ) =
logScope :
blockRoot = shortLog ( entry . v . blk . root )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
error " Processing block before genesis, clock turned back? "
quit 1
let
start = now ( chronos . Moment )
res = self . storeBlock ( entry . v . blk , wallSlot )
storeDone = now ( chronos . Moment )
if res . isOk ( ) :
# Eagerly update head in case the new block gets selected
2021-03-09 14:36:17 +00:00
self . updateHead ( wallSlot ) # This also eagerly prunes the blocks DAG to prevent processing forks.
# self.pruneStateCachesDAG() # Amortized pruning, we don't prune states & fork choice here but in `onSlotEnd`()
2020-10-28 07:55:36 +00:00
2020-08-20 16:30:47 +00:00
let updateDone = now ( chronos . Moment )
let storeBlockDuration = storeDone - start
let updateHeadDuration = updateDone - storeDone
let overallDuration = updateDone - start
let storeSpeed =
block :
let secs = float ( chronos . seconds ( 1 ) . nanoseconds )
if not ( overallDuration . isZero ( ) ) :
let v = secs / float ( overallDuration . nanoseconds )
round ( v * 10_000 ) / 10_000
else :
0 .0
debug " Block processed " ,
local_head_slot = self . chainDag . head . slot ,
store_speed = storeSpeed ,
block_slot = entry . v . blk . message . slot ,
store_block_duration = $ storeBlockDuration ,
update_head_duration = $ updateHeadDuration ,
overall_duration = $ overallDuration
if entry . v . resFut ! = nil :
entry . v . resFut . complete ( Result [ void , BlockError ] . ok ( ) )
elif res . error ( ) in { BlockError . Duplicate , BlockError . Old } :
# These are harmless / valid outcomes - for the purpose of scoring peers,
# they are ok
if entry . v . resFut ! = nil :
entry . v . resFut . complete ( Result [ void , BlockError ] . ok ( ) )
else :
if entry . v . resFut ! = nil :
entry . v . resFut . complete ( Result [ void , BlockError ] . err ( res . error ( ) ) )
2021-03-09 14:36:17 +00:00
{. pop . } # TODO AsyncQueue.addLast raises Exception in theory but not in practice
2021-01-11 21:27:01 +00:00
2020-08-20 16:30:47 +00:00
proc blockValidator * (
self : var Eth2Processor ,
2020-09-18 11:53:09 +00:00
signedBlock : SignedBeaconBlock ) : ValidationResult =
2020-08-20 16:30:47 +00:00
logScope :
signedBlock = shortLog ( signedBlock . message )
blockRoot = shortLog ( signedBlock . root )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
2020-10-20 12:31:20 +00:00
return ValidationResult . Ignore # not an issue with block, so don't penalize
2020-08-20 16:30:47 +00:00
logScope : wallSlot
let delay = wallTime - signedBlock . message . slot . toBeaconTime
if signedBlock . root in self . chainDag . blocks :
# The gossip algorithm itself already does one round of hashing to find
# already-seen data, but it is fairly aggressive about forgetting about
# what it has seen already
debug " Dropping already-seen gossip block " , delay
2020-10-20 12:31:20 +00:00
return ValidationResult . Ignore # "[IGNORE] The block is the first block ..."
2020-08-20 16:30:47 +00:00
# Start of block processing - in reality, we have already gone through SSZ
# decoding at this stage, which may be significant
debug " Block received " , delay
let blck = self . chainDag . isValidBeaconBlock (
2020-10-28 13:04:21 +00:00
self . quarantine , signedBlock , wallTime , { } )
2020-08-20 16:30:47 +00:00
self . dumpBlock ( signedBlock , blck )
if not blck . isOk :
2020-09-18 11:53:09 +00:00
return blck . error [ 0 ]
2020-08-20 16:30:47 +00:00
2020-08-27 07:34:12 +00:00
beacon_blocks_received . inc ( )
2020-11-11 13:39:36 +00:00
beacon_block_delay . observe ( delay . toFloatSeconds ( ) )
2020-08-27 07:34:12 +00:00
2020-08-20 16:30:47 +00:00
# Block passed validation - enqueue it for processing. The block processing
# queue is effectively unbounded as we use a freestanding task to enqueue
# the block - this is done so that when blocks arrive concurrently with
# sync, we don't lose the gossip blocks, but also don't block the gossip
# propagation of seemingly good blocks
2020-08-27 07:34:12 +00:00
trace " Block validated "
2021-01-11 21:27:01 +00:00
asyncSpawn self . blocksQueue . addLast (
2020-08-20 16:30:47 +00:00
BlockEntry ( v : SyncBlock ( blk : signedBlock ) ) )
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-08-20 16:30:47 +00:00
2021-01-11 21:27:01 +00:00
{. push raises : [ Defect ] . }
2021-01-29 12:38:52 +00:00
proc checkForPotentialDoppelganger (
2020-10-27 17:21:35 +00:00
self : var Eth2Processor , attestationData : AttestationData ,
2021-02-08 07:27:30 +00:00
attesterIndices : openArray [ ValidatorIndex ] , wallSlot : Slot ) =
2020-10-27 17:21:35 +00:00
let epoch = wallSlot . epoch
2021-03-01 10:09:05 +00:00
# Only check for current epoch, not potential attestations bouncing around
# from up to several minutes prior.
if attestationData . slot . epoch < epoch :
return
2021-02-01 11:18:16 +00:00
if epoch < self . doppelgangerDetection . broadcastStartEpoch :
2020-10-27 17:21:35 +00:00
let tgtBlck = self . chainDag . getRef ( attestationData . target . root )
doAssert not tgtBlck . isNil # because attestation is valid above
let epochRef = self . chainDag . getEpochRef (
tgtBlck , attestationData . target . epoch )
for validatorIndex in attesterIndices :
let validatorPubkey = epochRef . validator_keys [ validatorIndex ]
if self . validatorPool [ ] . getValidator ( validatorPubkey ) ! =
default ( AttachedValidator ) :
warn " Duplicate validator detected; would be slashed " ,
validatorIndex ,
2021-03-01 10:09:05 +00:00
validatorPubkey ,
attestationSlot = attestationData . slot
2021-02-03 17:11:42 +00:00
doppelganger_detection_activated . inc ( )
if self . config . doppelgangerDetection :
2020-10-27 17:21:35 +00:00
warn " We believe you are currently running another instance of the same validator. We ' ve disconnected you from the network as this presents a significant slashing risk. Possible next steps are (a) making sure you ' ve disconnected your validator from your old machine before restarting the client; and (b) running the client again with the gossip-slashing-protection option disabled, only if you are absolutely sure this is the only instance of your validator running, and reporting the issue at https://github.com/status-im/nimbus-eth2/issues. "
quit QuitFailure
2020-08-20 16:30:47 +00:00
proc attestationValidator * (
self : var Eth2Processor ,
attestation : Attestation ,
2020-12-23 12:59:04 +00:00
committeeIndex : uint64 ,
checksExpensive : bool = true ) : ValidationResult =
2020-08-20 16:30:47 +00:00
logScope :
attestation = shortLog ( attestation )
committeeIndex
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
notice " Attestation before genesis "
2020-10-20 12:31:20 +00:00
return ValidationResult . Ignore
2020-08-20 16:30:47 +00:00
logScope : wallSlot
2020-09-02 16:16:25 +00:00
# Potential under/overflows are fine; would just create odd metrics and logs
2020-08-20 16:30:47 +00:00
let delay = wallTime - attestation . data . slot . toBeaconTime
2020-10-09 08:58:54 +00:00
debug " Attestation received " , delay
2020-08-27 07:34:12 +00:00
let v = self . attestationPool [ ] . validateAttestation (
2020-12-23 12:59:04 +00:00
attestation , wallTime , committeeIndex , checksExpensive )
2020-08-27 07:34:12 +00:00
if v . isErr ( ) :
debug " Dropping attestation " , err = v . error ( )
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-08-20 16:30:47 +00:00
beacon_attestations_received . inc ( )
2020-11-11 13:39:36 +00:00
beacon_attestation_delay . observe ( delay . toFloatSeconds ( ) )
2020-08-20 16:30:47 +00:00
2021-01-29 12:38:52 +00:00
self . checkForPotentialDoppelganger ( attestation . data , v . value , wallSlot )
2020-10-27 17:21:35 +00:00
2020-08-20 16:30:47 +00:00
while self . attestationsQueue . full ( ) :
2020-12-08 08:59:40 +00:00
try :
notice " Queue full, dropping attestation " ,
dropped = shortLog ( self . attestationsQueue [ 0 ] . v )
discard self . attestationsQueue . popFirstNoWait ( )
2021-01-11 21:27:01 +00:00
except AsyncQueueEmptyError as exc :
2020-12-08 08:59:40 +00:00
raiseAssert " If queue is full, we have at least one item! " & exc . msg
2020-08-20 16:30:47 +00:00
2020-08-27 07:34:12 +00:00
trace " Attestation validated "
2020-12-08 08:59:40 +00:00
try :
self . attestationsQueue . addLastNoWait (
AttestationEntry ( v : attestation , attesting_indices : v . get ( ) ) )
2021-01-11 21:27:01 +00:00
except AsyncQueueFullError as exc :
2020-12-08 08:59:40 +00:00
raiseAssert " We just checked that queue is not full! " & exc . msg
2020-08-20 16:30:47 +00:00
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-08-20 16:30:47 +00:00
proc aggregateValidator * (
2020-09-18 11:53:09 +00:00
self : var Eth2Processor ,
signedAggregateAndProof : SignedAggregateAndProof ) : ValidationResult =
2020-08-20 16:30:47 +00:00
logScope :
aggregate = shortLog ( signedAggregateAndProof . message . aggregate )
signature = shortLog ( signedAggregateAndProof . signature )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
notice " Aggregate before genesis "
2020-10-20 12:31:20 +00:00
return ValidationResult . Ignore
2020-08-20 16:30:47 +00:00
logScope : wallSlot
2020-09-02 16:16:25 +00:00
# Potential under/overflows are fine; would just create odd logs
2020-08-20 16:30:47 +00:00
let delay =
wallTime - signedAggregateAndProof . message . aggregate . data . slot . toBeaconTime
debug " Aggregate received " , delay
2020-08-27 07:34:12 +00:00
let v = self . attestationPool [ ] . validateAggregate (
signedAggregateAndProof , wallTime )
if v . isErr :
2020-12-14 20:58:32 +00:00
debug " Dropping aggregate " ,
err = v . error ,
aggregator_index = signedAggregateAndProof . message . aggregator_index ,
selection_proof = signedAggregateAndProof . message . selection_proof ,
wallSlot
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-08-20 16:30:47 +00:00
beacon_aggregates_received . inc ( )
2020-11-11 13:39:36 +00:00
beacon_aggregate_delay . observe ( delay . toFloatSeconds ( ) )
2020-08-20 16:30:47 +00:00
2021-01-29 12:38:52 +00:00
self . checkForPotentialDoppelganger (
2020-10-27 17:21:35 +00:00
signedAggregateAndProof . message . aggregate . data , v . value , wallSlot )
2020-08-20 16:30:47 +00:00
while self . aggregatesQueue . full ( ) :
2020-12-08 08:59:40 +00:00
try :
notice " Queue full, dropping aggregate " ,
dropped = shortLog ( self . aggregatesQueue [ 0 ] . v )
discard self . aggregatesQueue . popFirstNoWait ( )
2021-01-11 21:27:01 +00:00
except AsyncQueueEmptyError as exc :
2020-12-08 08:59:40 +00:00
raiseAssert " We just checked that queue is not full! " & exc . msg
2020-08-20 16:30:47 +00:00
2020-12-14 20:58:32 +00:00
trace " Aggregate validated " ,
aggregator_index = signedAggregateAndProof . message . aggregator_index ,
selection_proof = signedAggregateAndProof . message . selection_proof ,
wallSlot
2020-12-08 08:59:40 +00:00
try :
self . aggregatesQueue . addLastNoWait ( AggregateEntry (
v : signedAggregateAndProof . message . aggregate ,
attesting_indices : v . get ( ) ) )
2021-01-11 21:27:01 +00:00
except AsyncQueueFullError as exc :
2020-12-08 08:59:40 +00:00
raiseAssert " We just checked that queue is not full! " & exc . msg
2020-08-20 16:30:47 +00:00
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-08-20 16:30:47 +00:00
2020-09-14 14:26:31 +00:00
proc attesterSlashingValidator * (
2020-09-18 11:53:09 +00:00
self : var Eth2Processor , attesterSlashing : AttesterSlashing ) :
ValidationResult =
2020-09-14 14:26:31 +00:00
logScope :
attesterSlashing = shortLog ( attesterSlashing )
let v = self . exitPool [ ] . validateAttesterSlashing ( attesterSlashing )
if v . isErr :
debug " Dropping attester slashing " , err = v . error
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-09-14 14:26:31 +00:00
beacon_attester_slashings_received . inc ( )
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-09-18 11:53:09 +00:00
2020-09-14 14:26:31 +00:00
proc proposerSlashingValidator * (
2020-09-18 11:53:09 +00:00
self : var Eth2Processor , proposerSlashing : ProposerSlashing ) :
ValidationResult =
2020-09-14 14:26:31 +00:00
logScope :
proposerSlashing = shortLog ( proposerSlashing )
let v = self . exitPool [ ] . validateProposerSlashing ( proposerSlashing )
if v . isErr :
debug " Dropping proposer slashing " , err = v . error
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-09-14 14:26:31 +00:00
beacon_proposer_slashings_received . inc ( )
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-09-18 11:53:09 +00:00
2020-09-14 14:26:31 +00:00
proc voluntaryExitValidator * (
2020-09-24 17:05:49 +00:00
self : var Eth2Processor , signedVoluntaryExit : SignedVoluntaryExit ) :
ValidationResult =
2020-09-14 14:26:31 +00:00
logScope :
2020-09-24 17:05:49 +00:00
signedVoluntaryExit = shortLog ( signedVoluntaryExit )
2020-09-14 14:26:31 +00:00
2020-09-24 17:05:49 +00:00
let v = self . exitPool [ ] . validateVoluntaryExit ( signedVoluntaryExit )
2020-09-14 14:26:31 +00:00
if v . isErr :
debug " Dropping voluntary exit " , err = v . error
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-09-14 14:26:31 +00:00
beacon_voluntary_exits_received . inc ( )
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-09-18 11:53:09 +00:00
2021-01-11 21:27:01 +00:00
{. pop . } # TODO raises in chronos
2020-08-20 16:30:47 +00:00
proc runQueueProcessingLoop * ( self : ref Eth2Processor ) {. async . } =
# Blocks in eth2 arrive on a schedule for every slot:
#
# * Block arrives at time 0
# * Attestations arrives at time 4
# * Aggregate arrives at time 8
var
blockFut = self [ ] . blocksQueue . popFirst ( )
aggregateFut = self [ ] . aggregatesQueue . popFirst ( )
attestationFut = self [ ] . attestationsQueue . popFirst ( )
while true :
2021-01-11 21:27:01 +00:00
# Cooperative concurrency: one idle calculation step per loop - because
# we run both networking and CPU-heavy things like block processing
# on the same thread, we need to make sure that there is steady progress
# on the networking side or we get long lockups that lead to timeouts.
2021-02-08 15:13:02 +00:00
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case (attestations will get dropped) - doing so also allows us
# to benefit from more batching / larger network reads when under load.
idleTimeout = 10 . milliseconds
# Attestation processing is fairly quick and therefore done in batches to
# avoid some of the `Future` overhead
attestationBatch = 16
discard await idleAsync ( ) . withTimeout ( idleTimeout )
2021-01-11 21:27:01 +00:00
# Avoid one more `await` when there's work to do
if not ( blockFut . finished or aggregateFut . finished or attestationFut . finished ) :
trace " Waiting for processing work "
await blockFut or aggregateFut or attestationFut
# Only run one task per idle iteration, in priority order: blocks are needed
# for all other processing - then come aggregates which are cheap to
# process but might have a big impact on fork choice - last come
# attestations which individually have the smallest effect on chain progress
if blockFut . finished :
self [ ] . processBlock ( blockFut . read ( ) )
2020-08-20 16:30:47 +00:00
blockFut = self [ ] . blocksQueue . popFirst ( )
2021-01-11 21:27:01 +00:00
elif aggregateFut . finished :
# aggregates will be dropped under heavy load on producer side
self [ ] . processAggregate ( aggregateFut . read ( ) )
2021-02-08 15:13:02 +00:00
for i in 0 .. < attestationBatch : # process a few at a time - this is fairly fast
2021-01-26 09:10:57 +00:00
if self [ ] . aggregatesQueue . empty ( ) :
break
self [ ] . processAggregate ( self [ ] . aggregatesQueue . popFirstNoWait ( ) )
2020-08-20 16:30:47 +00:00
aggregateFut = self [ ] . aggregatesQueue . popFirst ( )
2021-01-11 21:27:01 +00:00
elif attestationFut . finished :
# attestations will be dropped under heavy load on producer side
self [ ] . processAttestation ( attestationFut . read ( ) )
2021-01-26 09:10:57 +00:00
2021-02-08 15:13:02 +00:00
for i in 0 .. < attestationBatch : # process a few at a time - this is fairly fast
2021-01-26 09:10:57 +00:00
if self [ ] . attestationsQueue . empty ( ) :
break
self [ ] . processAttestation ( self [ ] . attestationsQueue . popFirstNoWait ( ) )
2020-08-20 16:30:47 +00:00
attestationFut = self [ ] . attestationsQueue . popFirst ( )
proc new * ( T : type Eth2Processor ,
config : BeaconNodeConf ,
chainDag : ChainDAGRef ,
attestationPool : ref AttestationPool ,
2020-09-14 14:26:31 +00:00
exitPool : ref ExitPool ,
2020-10-27 17:21:35 +00:00
validatorPool : ref ValidatorPool ,
2020-08-20 16:30:47 +00:00
quarantine : QuarantineRef ,
getWallTime : GetWallTimeFn ) : ref Eth2Processor =
( ref Eth2Processor ) (
config : config ,
getWallTime : getWallTime ,
chainDag : chainDag ,
attestationPool : attestationPool ,
2020-09-14 14:26:31 +00:00
exitPool : exitPool ,
2020-10-27 17:21:35 +00:00
validatorPool : validatorPool ,
2020-08-20 16:30:47 +00:00
quarantine : quarantine ,
blocksQueue : newAsyncQueue [ BlockEntry ] ( 1 ) ,
2021-01-26 09:10:57 +00:00
# limit to the max number of aggregates we expect to see in one slot
aggregatesQueue : newAsyncQueue [ AggregateEntry ] (
( TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT ) . int ) ,
# This queue is a bit harder to bound reasonably - we want to get a good
# spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE
# per committee - assuming randomness in vote arrival, this limit should
# cover that but of course, when votes arrive depends on a number of
# factors that are not entire random
attestationsQueue : newAsyncQueue [ AttestationEntry ] (
( TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT ) . int ) ,
2020-08-20 16:30:47 +00:00
)