2021-01-11 22:27:01 +01:00
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{. push raises : [ Defect ] . }
2020-08-20 18:30:47 +02:00
import
std / [ math , tables ] ,
stew / results ,
2021-01-11 22:27:01 +01:00
chronicles , chronos , metrics ,
2020-08-20 18:30:47 +02:00
. / spec / [ crypto , datatypes , digest ] ,
. / block_pools / [ clearance , chain_dag ] ,
2020-10-27 18:21:35 +01:00
. / attestation_aggregation , . / exit_pool , . / validator_pool ,
2020-08-20 18:30:47 +02:00
. / beacon_node_types , . / attestation_pool ,
. / time , . / conf , . / sszdump
# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received ,
" Number of beacon chain attestations received by this peer "
declareCounter beacon_aggregates_received ,
" Number of beacon chain aggregate attestations received by this peer "
declareCounter beacon_blocks_received ,
" Number of beacon chain blocks received by this peer "
2020-09-14 14:26:31 +00:00
declareCounter beacon_attester_slashings_received ,
" Number of beacon chain attester slashings received by this peer "
declareCounter beacon_proposer_slashings_received ,
" Number of beacon chain proposer slashings received by this peer "
declareCounter beacon_voluntary_exits_received ,
" Number of beacon chain voluntary exits received by this peer "
2020-08-20 18:30:47 +02:00
2020-10-27 18:21:35 +01:00
declareCounter beacon_duplicate_validator_protection_activated ,
" Number of times duplicate validator protection was activated "
2020-08-20 18:30:47 +02:00
const delayBuckets = [ 2 .0 , 4 .0 , 6 .0 , 8 .0 , 10 .0 , 12 .0 , 14 .0 , Inf ]
declareHistogram beacon_attestation_delay ,
" Time(s) between slot start and attestation reception " , buckets = delayBuckets
declareHistogram beacon_aggregate_delay ,
" Time(s) between slot start and aggregate reception " , buckets = delayBuckets
declareHistogram beacon_block_delay ,
" Time(s) between slot start and beacon block reception " , buckets = delayBuckets
declareHistogram beacon_store_block_duration_seconds ,
" storeBlock() duration " , buckets = [ 0 .25 , 0 .5 , 1 , 2 , 4 , 8 , Inf ]
type
GetWallTimeFn * = proc ( ) : BeaconTime {. gcsafe , raises : [ Defect ] . }
SyncBlock * = object
blk * : SignedBeaconBlock
resfut * : Future [ Result [ void , BlockError ] ]
2020-08-27 09:34:12 +02:00
BlockEntry * = object
v * : SyncBlock
AttestationEntry * = object
v * : Attestation
attesting_indices * : HashSet [ ValidatorIndex ]
AggregateEntry * = AttestationEntry
2020-08-20 18:30:47 +02:00
Eth2Processor * = object
config * : BeaconNodeConf
getWallTime * : GetWallTimeFn
chainDag * : ChainDAGRef
attestationPool * : ref AttestationPool
2020-09-14 14:26:31 +00:00
exitPool : ref ExitPool
2020-10-27 18:21:35 +01:00
validatorPool : ref ValidatorPool
2020-08-20 18:30:47 +02:00
quarantine * : QuarantineRef
2020-11-16 10:44:18 +01:00
blockReceivedDuringSlot * : Future [ void ]
2020-08-20 18:30:47 +02:00
blocksQueue * : AsyncQueue [ BlockEntry ]
attestationsQueue * : AsyncQueue [ AttestationEntry ]
aggregatesQueue * : AsyncQueue [ AggregateEntry ]
2020-10-27 18:21:35 +01:00
gossipSlashingProtection * : DupProtection
2020-10-28 08:55:36 +01:00
proc updateHead * ( self : var Eth2Processor , wallSlot : Slot ) =
2020-08-26 17:23:34 +02:00
## Trigger fork choice and returns the new head block.
## Can return `nil`
2020-08-20 18:30:47 +02:00
# Grab the new head according to our latest attestation data
let newHead = self . attestationPool [ ] . selectHead ( wallSlot )
2020-08-26 17:23:34 +02:00
if newHead . isNil ( ) :
2020-10-28 08:55:36 +01:00
warn " Head selection failed, using previous head " ,
head = shortLog ( self . chainDag . head ) , wallSlot
return
2020-08-20 18:30:47 +02:00
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
2020-11-27 23:16:13 +01:00
let
oldFinalized = self . chainDag . finalizedHead . blck
2020-08-20 18:30:47 +02:00
2020-08-31 09:00:38 +00:00
self . chainDag . updateHead ( newHead , self . quarantine )
2020-08-20 18:30:47 +02:00
# Cleanup the fork choice v2 if we have a finalized head
if oldFinalized ! = self . chainDag . finalizedHead . blck :
self . attestationPool [ ] . prune ( )
proc dumpBlock [ T ] (
self : Eth2Processor , signedBlock : SignedBeaconBlock ,
2020-09-18 11:53:09 +00:00
res : Result [ T , ( ValidationResult , BlockError ) ] ) =
2020-08-20 18:30:47 +02:00
if self . config . dumpEnabled and res . isErr :
2020-09-18 11:53:09 +00:00
case res . error [ 1 ]
2020-08-20 18:30:47 +02:00
of Invalid :
dump (
self . config . dumpDirInvalid , signedBlock )
of MissingParent :
dump (
self . config . dumpDirIncoming , signedBlock )
else :
discard
proc done * ( blk : SyncBlock ) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has passed
## verification successfully.
if blk . resfut ! = nil :
blk . resfut . complete ( Result [ void , BlockError ] . ok ( ) )
proc fail * ( blk : SyncBlock , error : BlockError ) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
## verification with specific ``error``.
if blk . resfut ! = nil :
blk . resfut . complete ( Result [ void , BlockError ] . err ( error ) )
2020-11-20 11:00:22 +01:00
proc complete * ( blk : SyncBlock , res : Result [ void , BlockError ] ) =
2020-08-20 18:30:47 +02:00
## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk``
## verification.
if blk . resfut ! = nil :
blk . resfut . complete ( res )
proc storeBlock (
self : var Eth2Processor , signedBlock : SignedBeaconBlock ,
wallSlot : Slot ) : Result [ void , BlockError ] =
let
start = Moment . now ( )
attestationPool = self . attestationPool
2020-09-14 16:50:03 +02:00
let blck = self . chainDag . addRawBlock ( self . quarantine , signedBlock ) do (
blckRef : BlockRef , signedBlock : SignedBeaconBlock ,
epochRef : EpochRef , state : HashedBeaconState ) :
# Callback add to fork choice if valid
attestationPool [ ] . addForkChoice (
epochRef , blckRef , signedBlock . message , wallSlot )
2020-08-20 18:30:47 +02:00
2020-11-16 10:44:18 +01:00
# Trigger attestation sending
if blck . isOk and not self . blockReceivedDuringSlot . finished :
self . blockReceivedDuringSlot . complete ( )
2020-08-20 18:30:47 +02:00
self . dumpBlock ( signedBlock , blck )
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck . isErr :
2020-09-18 11:53:09 +00:00
return err ( blck . error [ 1 ] )
2020-08-20 18:30:47 +02:00
2020-11-11 15:39:36 +02:00
let duration = ( Moment . now ( ) - start ) . toFloatSeconds ( )
beacon_store_block_duration_seconds . observe ( duration )
2020-11-16 10:44:18 +01:00
ok ( )
2020-08-20 18:30:47 +02:00
proc processAttestation (
self : var Eth2Processor , entry : AttestationEntry ) =
logScope :
signature = shortLog ( entry . v . signature )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
error " Processing attestation before genesis, clock turned back? "
quit 1
2020-08-27 09:34:12 +02:00
trace " Processing attestation "
self . attestationPool [ ] . addAttestation (
entry . v , entry . attesting_indices , wallSlot )
2020-08-20 18:30:47 +02:00
proc processAggregate (
self : var Eth2Processor , entry : AggregateEntry ) =
logScope :
signature = shortLog ( entry . v . signature )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
error " Processing aggregate before genesis, clock turned back? "
quit 1
2020-08-27 09:34:12 +02:00
trace " Processing aggregate "
self . attestationPool [ ] . addAttestation (
entry . v , entry . attesting_indices , wallSlot )
2020-08-20 18:30:47 +02:00
proc processBlock ( self : var Eth2Processor , entry : BlockEntry ) =
logScope :
blockRoot = shortLog ( entry . v . blk . root )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
error " Processing block before genesis, clock turned back? "
quit 1
let
start = now ( chronos . Moment )
res = self . storeBlock ( entry . v . blk , wallSlot )
storeDone = now ( chronos . Moment )
if res . isOk ( ) :
# Eagerly update head in case the new block gets selected
2020-10-28 08:55:36 +01:00
self . updateHead ( wallSlot )
2020-08-20 18:30:47 +02:00
let updateDone = now ( chronos . Moment )
let storeBlockDuration = storeDone - start
let updateHeadDuration = updateDone - storeDone
let overallDuration = updateDone - start
let storeSpeed =
block :
let secs = float ( chronos . seconds ( 1 ) . nanoseconds )
if not ( overallDuration . isZero ( ) ) :
let v = secs / float ( overallDuration . nanoseconds )
round ( v * 10_000 ) / 10_000
else :
0 .0
debug " Block processed " ,
local_head_slot = self . chainDag . head . slot ,
store_speed = storeSpeed ,
block_slot = entry . v . blk . message . slot ,
store_block_duration = $ storeBlockDuration ,
update_head_duration = $ updateHeadDuration ,
overall_duration = $ overallDuration
if entry . v . resFut ! = nil :
entry . v . resFut . complete ( Result [ void , BlockError ] . ok ( ) )
elif res . error ( ) in { BlockError . Duplicate , BlockError . Old } :
# These are harmless / valid outcomes - for the purpose of scoring peers,
# they are ok
if entry . v . resFut ! = nil :
entry . v . resFut . complete ( Result [ void , BlockError ] . ok ( ) )
else :
if entry . v . resFut ! = nil :
entry . v . resFut . complete ( Result [ void , BlockError ] . err ( res . error ( ) ) )
2021-01-11 22:27:01 +01:00
{. pop . } # TODO AsyncQueue.addLast raises Exception in theory but not in practise
2020-08-20 18:30:47 +02:00
proc blockValidator * (
self : var Eth2Processor ,
2020-09-18 11:53:09 +00:00
signedBlock : SignedBeaconBlock ) : ValidationResult =
2020-08-20 18:30:47 +02:00
logScope :
signedBlock = shortLog ( signedBlock . message )
blockRoot = shortLog ( signedBlock . root )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
2020-10-20 12:31:20 +00:00
return ValidationResult . Ignore # not an issue with block, so don't penalize
2020-08-20 18:30:47 +02:00
logScope : wallSlot
let delay = wallTime - signedBlock . message . slot . toBeaconTime
if signedBlock . root in self . chainDag . blocks :
# The gossip algorithm itself already does one round of hashing to find
# already-seen data, but it is fairly aggressive about forgetting about
# what it has seen already
debug " Dropping already-seen gossip block " , delay
2020-10-20 12:31:20 +00:00
return ValidationResult . Ignore # "[IGNORE] The block is the first block ..."
2020-08-20 18:30:47 +02:00
# Start of block processing - in reality, we have already gone through SSZ
# decoding at this stage, which may be significant
debug " Block received " , delay
let blck = self . chainDag . isValidBeaconBlock (
2020-10-28 13:04:21 +00:00
self . quarantine , signedBlock , wallTime , { } )
2020-08-20 18:30:47 +02:00
self . dumpBlock ( signedBlock , blck )
if not blck . isOk :
2020-09-18 11:53:09 +00:00
return blck . error [ 0 ]
2020-08-20 18:30:47 +02:00
2020-08-27 09:34:12 +02:00
beacon_blocks_received . inc ( )
2020-11-11 15:39:36 +02:00
beacon_block_delay . observe ( delay . toFloatSeconds ( ) )
2020-08-27 09:34:12 +02:00
2020-08-20 18:30:47 +02:00
# Block passed validation - enqueue it for processing. The block processing
# queue is effectively unbounded as we use a freestanding task to enqueue
# the block - this is done so that when blocks arrive concurrently with
# sync, we don't lose the gossip blocks, but also don't block the gossip
# propagation of seemingly good blocks
2020-08-27 09:34:12 +02:00
trace " Block validated "
2021-01-11 22:27:01 +01:00
asyncSpawn self . blocksQueue . addLast (
2020-08-20 18:30:47 +02:00
BlockEntry ( v : SyncBlock ( blk : signedBlock ) ) )
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-08-20 18:30:47 +02:00
2021-01-11 22:27:01 +01:00
{. push raises : [ Defect ] . }
2020-10-27 18:21:35 +01:00
proc checkForPotentialSelfSlashing (
self : var Eth2Processor , attestationData : AttestationData ,
attesterIndices : HashSet [ ValidatorIndex ] , wallSlot : Slot ) =
# Attestations remain valid for 32 slots, so avoid confusing with one's own
# reflections, for a ATTESTATION_PROPAGATION_SLOT_RANGE div SLOTS_PER_EPOCH
# period after the attestation slot. For mainnet this can be one additional
# epoch, and for minimal, four epochs. Unlike in the attestation validation
# checks, use the spec version of the constant here.
const
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#configuration
ATTESTATION_PROPAGATION_SLOT_RANGE = 32
GUARD_EPOCHS = ATTESTATION_PROPAGATION_SLOT_RANGE div SLOTS_PER_EPOCH
# If gossipSlashingProtection not dontcheck or stop, it's the default "warn".
let epoch = wallSlot . epoch
if epoch < self . gossipSlashingProtection . broadcastStartEpoch and
epoch > = self . gossipSlashingProtection . probeEpoch and
epoch < = self . gossipSlashingProtection . probeEpoch + GUARD_EPOCHS :
let tgtBlck = self . chainDag . getRef ( attestationData . target . root )
doAssert not tgtBlck . isNil # because attestation is valid above
let epochRef = self . chainDag . getEpochRef (
tgtBlck , attestationData . target . epoch )
for validatorIndex in attesterIndices :
let validatorPubkey = epochRef . validator_keys [ validatorIndex ]
if self . validatorPool [ ] . getValidator ( validatorPubkey ) ! =
default ( AttachedValidator ) :
warn " Duplicate validator detected; would be slashed " ,
validatorIndex ,
validatorPubkey
beacon_duplicate_validator_protection_activated . inc ( )
if self . config . gossipSlashingProtection = = GossipSlashingProtectionMode . stop :
warn " We believe you are currently running another instance of the same validator. We ' ve disconnected you from the network as this presents a significant slashing risk. Possible next steps are (a) making sure you ' ve disconnected your validator from your old machine before restarting the client; and (b) running the client again with the gossip-slashing-protection option disabled, only if you are absolutely sure this is the only instance of your validator running, and reporting the issue at https://github.com/status-im/nimbus-eth2/issues. "
quit QuitFailure
2020-08-20 18:30:47 +02:00
proc attestationValidator * (
self : var Eth2Processor ,
attestation : Attestation ,
2020-12-23 13:59:04 +01:00
committeeIndex : uint64 ,
checksExpensive : bool = true ) : ValidationResult =
2020-08-20 18:30:47 +02:00
logScope :
attestation = shortLog ( attestation )
committeeIndex
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
notice " Attestation before genesis "
2020-10-20 12:31:20 +00:00
return ValidationResult . Ignore
2020-08-20 18:30:47 +02:00
logScope : wallSlot
2020-09-02 16:16:25 +00:00
# Potential under/overflows are fine; would just create odd metrics and logs
2020-08-20 18:30:47 +02:00
let delay = wallTime - attestation . data . slot . toBeaconTime
2020-10-09 08:58:54 +00:00
debug " Attestation received " , delay
2020-08-27 09:34:12 +02:00
let v = self . attestationPool [ ] . validateAttestation (
2020-12-23 13:59:04 +01:00
attestation , wallTime , committeeIndex , checksExpensive )
2020-08-27 09:34:12 +02:00
if v . isErr ( ) :
debug " Dropping attestation " , err = v . error ( )
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-08-20 18:30:47 +02:00
beacon_attestations_received . inc ( )
2020-11-11 15:39:36 +02:00
beacon_attestation_delay . observe ( delay . toFloatSeconds ( ) )
2020-08-20 18:30:47 +02:00
2020-10-27 18:21:35 +01:00
self . checkForPotentialSelfSlashing ( attestation . data , v . value , wallSlot )
2020-08-20 18:30:47 +02:00
while self . attestationsQueue . full ( ) :
2020-12-08 09:59:40 +01:00
try :
notice " Queue full, dropping attestation " ,
dropped = shortLog ( self . attestationsQueue [ 0 ] . v )
discard self . attestationsQueue . popFirstNoWait ( )
2021-01-11 22:27:01 +01:00
except AsyncQueueEmptyError as exc :
2020-12-08 09:59:40 +01:00
raiseAssert " If queue is full, we have at least one item! " & exc . msg
2020-08-20 18:30:47 +02:00
2020-08-27 09:34:12 +02:00
trace " Attestation validated "
2020-12-08 09:59:40 +01:00
try :
self . attestationsQueue . addLastNoWait (
AttestationEntry ( v : attestation , attesting_indices : v . get ( ) ) )
2021-01-11 22:27:01 +01:00
except AsyncQueueFullError as exc :
2020-12-08 09:59:40 +01:00
raiseAssert " We just checked that queue is not full! " & exc . msg
2020-08-20 18:30:47 +02:00
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-08-20 18:30:47 +02:00
proc aggregateValidator * (
2020-09-18 11:53:09 +00:00
self : var Eth2Processor ,
signedAggregateAndProof : SignedAggregateAndProof ) : ValidationResult =
2020-08-20 18:30:47 +02:00
logScope :
aggregate = shortLog ( signedAggregateAndProof . message . aggregate )
signature = shortLog ( signedAggregateAndProof . signature )
let
wallTime = self . getWallTime ( )
( afterGenesis , wallSlot ) = wallTime . toSlot ( )
if not afterGenesis :
notice " Aggregate before genesis "
2020-10-20 12:31:20 +00:00
return ValidationResult . Ignore
2020-08-20 18:30:47 +02:00
logScope : wallSlot
2020-09-02 16:16:25 +00:00
# Potential under/overflows are fine; would just create odd logs
2020-08-20 18:30:47 +02:00
let delay =
wallTime - signedAggregateAndProof . message . aggregate . data . slot . toBeaconTime
debug " Aggregate received " , delay
2020-08-27 09:34:12 +02:00
let v = self . attestationPool [ ] . validateAggregate (
signedAggregateAndProof , wallTime )
if v . isErr :
2020-12-14 20:58:32 +00:00
debug " Dropping aggregate " ,
err = v . error ,
aggregator_index = signedAggregateAndProof . message . aggregator_index ,
selection_proof = signedAggregateAndProof . message . selection_proof ,
wallSlot
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-08-20 18:30:47 +02:00
beacon_aggregates_received . inc ( )
2020-11-11 15:39:36 +02:00
beacon_aggregate_delay . observe ( delay . toFloatSeconds ( ) )
2020-08-20 18:30:47 +02:00
2020-10-27 18:21:35 +01:00
self . checkForPotentialSelfSlashing (
signedAggregateAndProof . message . aggregate . data , v . value , wallSlot )
2020-08-20 18:30:47 +02:00
while self . aggregatesQueue . full ( ) :
2020-12-08 09:59:40 +01:00
try :
notice " Queue full, dropping aggregate " ,
dropped = shortLog ( self . aggregatesQueue [ 0 ] . v )
discard self . aggregatesQueue . popFirstNoWait ( )
2021-01-11 22:27:01 +01:00
except AsyncQueueEmptyError as exc :
2020-12-08 09:59:40 +01:00
raiseAssert " We just checked that queue is not full! " & exc . msg
2020-08-20 18:30:47 +02:00
2020-12-14 20:58:32 +00:00
trace " Aggregate validated " ,
aggregator_index = signedAggregateAndProof . message . aggregator_index ,
selection_proof = signedAggregateAndProof . message . selection_proof ,
wallSlot
2020-12-08 09:59:40 +01:00
try :
self . aggregatesQueue . addLastNoWait ( AggregateEntry (
v : signedAggregateAndProof . message . aggregate ,
attesting_indices : v . get ( ) ) )
2021-01-11 22:27:01 +01:00
except AsyncQueueFullError as exc :
2020-12-08 09:59:40 +01:00
raiseAssert " We just checked that queue is not full! " & exc . msg
2020-08-20 18:30:47 +02:00
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-08-20 18:30:47 +02:00
2020-09-14 14:26:31 +00:00
proc attesterSlashingValidator * (
2020-09-18 11:53:09 +00:00
self : var Eth2Processor , attesterSlashing : AttesterSlashing ) :
ValidationResult =
2020-09-14 14:26:31 +00:00
logScope :
attesterSlashing = shortLog ( attesterSlashing )
let v = self . exitPool [ ] . validateAttesterSlashing ( attesterSlashing )
if v . isErr :
debug " Dropping attester slashing " , err = v . error
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-09-14 14:26:31 +00:00
beacon_attester_slashings_received . inc ( )
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-09-18 11:53:09 +00:00
2020-09-14 14:26:31 +00:00
proc proposerSlashingValidator * (
2020-09-18 11:53:09 +00:00
self : var Eth2Processor , proposerSlashing : ProposerSlashing ) :
ValidationResult =
2020-09-14 14:26:31 +00:00
logScope :
proposerSlashing = shortLog ( proposerSlashing )
let v = self . exitPool [ ] . validateProposerSlashing ( proposerSlashing )
if v . isErr :
debug " Dropping proposer slashing " , err = v . error
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-09-14 14:26:31 +00:00
beacon_proposer_slashings_received . inc ( )
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-09-18 11:53:09 +00:00
2020-09-14 14:26:31 +00:00
proc voluntaryExitValidator * (
2020-09-24 17:05:49 +00:00
self : var Eth2Processor , signedVoluntaryExit : SignedVoluntaryExit ) :
ValidationResult =
2020-09-14 14:26:31 +00:00
logScope :
2020-09-24 17:05:49 +00:00
signedVoluntaryExit = shortLog ( signedVoluntaryExit )
2020-09-14 14:26:31 +00:00
2020-09-24 17:05:49 +00:00
let v = self . exitPool [ ] . validateVoluntaryExit ( signedVoluntaryExit )
2020-09-14 14:26:31 +00:00
if v . isErr :
debug " Dropping voluntary exit " , err = v . error
2020-09-18 11:53:09 +00:00
return v . error [ 0 ]
2020-09-14 14:26:31 +00:00
beacon_voluntary_exits_received . inc ( )
2020-10-20 12:31:20 +00:00
ValidationResult . Accept
2020-09-18 11:53:09 +00:00
2021-01-11 22:27:01 +01:00
{. pop . } # TODO raises in chronos
2020-08-20 18:30:47 +02:00
proc runQueueProcessingLoop * ( self : ref Eth2Processor ) {. async . } =
# Blocks in eth2 arrive on a schedule for every slot:
#
# * Block arrives at time 0
# * Attestations arrives at time 4
# * Aggregate arrives at time 8
var
blockFut = self [ ] . blocksQueue . popFirst ( )
aggregateFut = self [ ] . aggregatesQueue . popFirst ( )
attestationFut = self [ ] . attestationsQueue . popFirst ( )
while true :
2021-01-11 22:27:01 +01:00
# Cooperative concurrency: one idle calculation step per loop - because
# we run both networking and CPU-heavy things like block processing
# on the same thread, we need to make sure that there is steady progress
# on the networking side or we get long lockups that lead to timeouts.
#
2021-01-13 16:33:36 +01:00
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case (attestations will get dropped) - doing so also allows us
# to benefit from more batching / larger network reads when under load.
discard await idleAsync ( ) . withTimeout ( 100 . milliseconds )
2021-01-11 22:27:01 +01:00
# Avoid one more `await` when there's work to do
if not ( blockFut . finished or aggregateFut . finished or attestationFut . finished ) :
trace " Waiting for processing work "
await blockFut or aggregateFut or attestationFut
# Only run one task per idle iteration, in priority order: blocks are needed
# for all other processing - then come aggregates which are cheap to
# process but might have a big impact on fork choice - last come
# attestations which individually have the smallest effect on chain progress
if blockFut . finished :
self [ ] . processBlock ( blockFut . read ( ) )
2020-08-20 18:30:47 +02:00
blockFut = self [ ] . blocksQueue . popFirst ( )
2021-01-11 22:27:01 +01:00
elif aggregateFut . finished :
# aggregates will be dropped under heavy load on producer side
self [ ] . processAggregate ( aggregateFut . read ( ) )
2020-08-20 18:30:47 +02:00
aggregateFut = self [ ] . aggregatesQueue . popFirst ( )
2021-01-11 22:27:01 +01:00
elif attestationFut . finished :
# attestations will be dropped under heavy load on producer side
self [ ] . processAttestation ( attestationFut . read ( ) )
2020-08-20 18:30:47 +02:00
attestationFut = self [ ] . attestationsQueue . popFirst ( )
proc new * ( T : type Eth2Processor ,
config : BeaconNodeConf ,
chainDag : ChainDAGRef ,
attestationPool : ref AttestationPool ,
2020-09-14 14:26:31 +00:00
exitPool : ref ExitPool ,
2020-10-27 18:21:35 +01:00
validatorPool : ref ValidatorPool ,
2020-08-20 18:30:47 +02:00
quarantine : QuarantineRef ,
getWallTime : GetWallTimeFn ) : ref Eth2Processor =
( ref Eth2Processor ) (
config : config ,
getWallTime : getWallTime ,
chainDag : chainDag ,
attestationPool : attestationPool ,
2020-09-14 14:26:31 +00:00
exitPool : exitPool ,
2020-10-27 18:21:35 +01:00
validatorPool : validatorPool ,
2020-08-20 18:30:47 +02:00
quarantine : quarantine ,
2020-11-16 10:44:18 +01:00
blockReceivedDuringSlot : newFuture [ void ] ( ) ,
2020-08-20 18:30:47 +02:00
blocksQueue : newAsyncQueue [ BlockEntry ] ( 1 ) ,
aggregatesQueue : newAsyncQueue [ AggregateEntry ] ( MAX_ATTESTATIONS . int ) ,
attestationsQueue : newAsyncQueue [ AttestationEntry ] ( TARGET_COMMITTEE_SIZE . int * 4 ) ,
)