Split Eth2Processor in prep for batching (#2396)
* Split Eth2Processor in gossip and consensus part and materialize the shared block queue * Update initialization in test_sync_manager
This commit is contained in:
parent
ef4a5b0cc3
commit
c47d636cb3
|
@ -24,6 +24,8 @@ type
|
|||
|
||||
BeaconTime* = distinct Duration ## Nanoseconds from beacon genesis time
|
||||
|
||||
GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].}
|
||||
|
||||
proc init*(T: type BeaconClock, genesis_time: uint64): T =
|
||||
# ~290 billion years into the future
|
||||
doAssert genesis_time <= high(int64).uint64
|
||||
|
|
|
@ -16,7 +16,7 @@ import
|
|||
# Local modules
|
||||
./conf, ./beacon_clock, ./beacon_chain_db,
|
||||
./beacon_node_types,
|
||||
./gossip_processing/eth2_processor,
|
||||
./gossip_processing/[eth2_processor, gossip_to_consensus, consensus_manager],
|
||||
./networking/eth2_network,
|
||||
./eth1/eth1_monitor,
|
||||
./consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool],
|
||||
|
@ -57,6 +57,8 @@ type
|
|||
genesisSnapshotContent*: string
|
||||
attestationSubnets*: AttestationSubnets
|
||||
processor*: ref Eth2Processor
|
||||
verifQueues*: ref VerifQueueManager
|
||||
consensusManager*: ref ConsensusManager
|
||||
attachedValidatorBalanceTotal*: uint64
|
||||
|
||||
const
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
# beacon_chain
|
||||
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
chronicles, chronos,
|
||||
../spec/[crypto, datatypes, digest],
|
||||
../consensus_object_pools/[blockchain_dag, attestation_pool]
|
||||
|
||||
# TODO: Move to "consensus_object_pools" folder
|
||||
|
||||
type
|
||||
ConsensusManager* = object
|
||||
expectedSlot: Slot
|
||||
expectedBlockReceived: Future[bool]
|
||||
|
||||
# Validated & Verified
|
||||
# ----------------------------------------------------------------
|
||||
chainDag*: ChainDAGRef
|
||||
attestationPool*: ref AttestationPool
|
||||
|
||||
# Missing info
|
||||
# ----------------------------------------------------------------
|
||||
quarantine*: QuarantineRef
|
||||
|
||||
# Initialization
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc new*(T: type ConsensusManager,
|
||||
chainDag: ChainDAGRef,
|
||||
attestationPool: ref AttestationPool,
|
||||
quarantine: QuarantineRef
|
||||
): ref ConsensusManager =
|
||||
(ref ConsensusManager)(
|
||||
chainDag: chainDag,
|
||||
attestationPool: attestationPool,
|
||||
quarantine: quarantine
|
||||
)
|
||||
|
||||
# Consensus Management
|
||||
# -----------------------------------------------------------------------------------
|
||||
|
||||
proc checkExpectedBlock(self: var ConsensusManager) =
|
||||
if self.expectedBlockReceived == nil:
|
||||
return
|
||||
|
||||
if self.chainDag.head.slot < self.expectedSlot:
|
||||
return
|
||||
|
||||
self.expectedBlockReceived.complete(true)
|
||||
self.expectedBlockReceived = nil # Don't keep completed futures around!
|
||||
|
||||
proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool] =
|
||||
## Return a future that will complete when a head is selected whose slot is
|
||||
## equal or greater than the given slot, or a new expectation is created
|
||||
if self.expectedBlockReceived != nil:
|
||||
# Reset the old future to not leave it hanging.. an alternative would be to
|
||||
# cancel it, but it doesn't make any practical difference for now
|
||||
self.expectedBlockReceived.complete(false)
|
||||
|
||||
let fut = newFuture[bool]("ConsensusManager.expectBlock")
|
||||
self.expectedSlot = expectedSlot
|
||||
self.expectedBlockReceived = fut
|
||||
|
||||
# It might happen that by the time we're expecting a block, it might have
|
||||
# already been processed!
|
||||
self.checkExpectedBlock()
|
||||
|
||||
return fut
|
||||
|
||||
proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
|
||||
## Trigger fork choice and update the DAG with the new head block
|
||||
## This does not automatically prune the DAG after finalization
|
||||
## `pruneFinalized` must be called for pruning.
|
||||
|
||||
# Grab the new head according to our latest attestation data
|
||||
let newHead = self.attestationPool[].selectHead(wallSlot)
|
||||
if newHead.isNil():
|
||||
warn "Head selection failed, using previous head",
|
||||
head = shortLog(self.chainDag.head), wallSlot
|
||||
return
|
||||
|
||||
# Store the new head in the chain DAG - this may cause epochs to be
|
||||
# justified and finalized
|
||||
self.chainDag.updateHead(newHead, self.quarantine)
|
||||
|
||||
self.checkExpectedBlock()
|
||||
|
||||
proc pruneStateCachesAndForkChoice*(self: var ConsensusManager) =
|
||||
## Prune unneeded and invalidated data after finalization
|
||||
## - the DAG state checkpoints
|
||||
## - the DAG EpochRef
|
||||
## - the attestation pool/fork choice
|
||||
|
||||
# Cleanup DAG & fork choice if we have a finalized head
|
||||
if self.chainDag.needStateCachesAndForkChoicePruning():
|
||||
self.chainDag.pruneStateCachesDAG()
|
||||
self.attestationPool[].prune()
|
|
@ -13,7 +13,7 @@ import
|
|||
chronicles, chronos, metrics,
|
||||
../spec/[crypto, datatypes, digest],
|
||||
../consensus_object_pools/[block_clearance, blockchain_dag, exit_pool, attestation_pool],
|
||||
./gossip_validation,
|
||||
./gossip_validation, ./gossip_to_consensus,
|
||||
../validators/validator_pool,
|
||||
../beacon_node_types,
|
||||
../beacon_clock, ../conf, ../ssz/sszdump
|
||||
|
@ -50,245 +50,55 @@ declareHistogram beacon_store_block_duration_seconds,
|
|||
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
|
||||
|
||||
type
|
||||
GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].}
|
||||
|
||||
SyncBlock* = object
|
||||
blk*: SignedBeaconBlock
|
||||
resfut*: Future[Result[void, BlockError]]
|
||||
|
||||
BlockEntry* = object
|
||||
v*: SyncBlock
|
||||
|
||||
AttestationEntry* = object
|
||||
v*: Attestation
|
||||
attesting_indices*: seq[ValidatorIndex]
|
||||
|
||||
AggregateEntry* = AttestationEntry
|
||||
|
||||
Eth2Processor* = object
|
||||
config*: BeaconNodeConf
|
||||
getWallTime*: GetWallTimeFn
|
||||
|
||||
# Local sources of truth for validation
|
||||
# ----------------------------------------------------------------
|
||||
chainDag*: ChainDAGRef
|
||||
attestationPool*: ref AttestationPool
|
||||
exitPool: ref ExitPool
|
||||
validatorPool: ref ValidatorPool
|
||||
quarantine*: QuarantineRef
|
||||
expectedSlot: Slot
|
||||
expectedBlockReceived: Future[bool]
|
||||
|
||||
blocksQueue*: AsyncQueue[BlockEntry]
|
||||
attestationsQueue*: AsyncQueue[AttestationEntry]
|
||||
aggregatesQueue*: AsyncQueue[AggregateEntry]
|
||||
|
||||
doppelgangerDetection*: DoppelgangerProtection
|
||||
|
||||
proc checkExpectedBlock(self: var Eth2Processor) =
|
||||
if self.expectedBlockReceived == nil:
|
||||
return
|
||||
# Gossip validated -> enqueue for further verification
|
||||
# ----------------------------------------------------------------
|
||||
verifQueues: ref VerifQueueManager
|
||||
|
||||
if self.chainDag.head.slot < self.expectedSlot:
|
||||
return
|
||||
# Validated with no further verification required
|
||||
# ----------------------------------------------------------------
|
||||
exitPool: ref ExitPool
|
||||
|
||||
self.expectedBlockReceived.complete(true)
|
||||
self.expectedBlockReceived = nil # Don't keep completed futures around!
|
||||
# Missing information
|
||||
# ----------------------------------------------------------------
|
||||
quarantine*: QuarantineRef
|
||||
|
||||
proc expectBlock*(self: var Eth2Processor, expectedSlot: Slot): Future[bool] =
|
||||
## Return a future that will complete when a head is selected whose slot is
|
||||
## equal or greater than the given slot, or a new expectation is created
|
||||
if self.expectedBlockReceived != nil:
|
||||
# Reset the old future to not leave it hanging.. an alternative would be to
|
||||
# cancel it, but it doesn't make any practical difference for now
|
||||
self.expectedBlockReceived.complete(false)
|
||||
# Initialization
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
let fut = newFuture[bool]("Eth2Processor.expectBlock")
|
||||
self.expectedSlot = expectedSlot
|
||||
self.expectedBlockReceived = fut
|
||||
proc new*(T: type Eth2Processor,
|
||||
config: BeaconNodeConf,
|
||||
verifQueues: ref VerifQueueManager,
|
||||
chainDag: ChainDAGRef,
|
||||
attestationPool: ref AttestationPool,
|
||||
exitPool: ref ExitPool,
|
||||
validatorPool: ref ValidatorPool,
|
||||
quarantine: QuarantineRef,
|
||||
getWallTime: GetWallTimeFn): ref Eth2Processor =
|
||||
(ref Eth2Processor)(
|
||||
config: config,
|
||||
getWallTime: getWallTime,
|
||||
verifQueues: verifQueues,
|
||||
chainDag: chainDag,
|
||||
attestationPool: attestationPool,
|
||||
exitPool: exitPool,
|
||||
validatorPool: validatorPool,
|
||||
quarantine: quarantine
|
||||
)
|
||||
|
||||
# It might happen that by the time we're expecting a block, it might have
|
||||
# already been processed!
|
||||
self.checkExpectedBlock()
|
||||
|
||||
return fut
|
||||
|
||||
proc updateHead*(self: var Eth2Processor, wallSlot: Slot) =
|
||||
## Trigger fork choice and update the DAG with the new head block
|
||||
## This does not automatically prune the DAG after finalization
|
||||
## `pruneFinalized` must be called for pruning.
|
||||
|
||||
# TODO: DAG & fork choice procs are unrelated to gossip validation
|
||||
|
||||
# Grab the new head according to our latest attestation data
|
||||
let newHead = self.attestationPool[].selectHead(wallSlot)
|
||||
if newHead.isNil():
|
||||
warn "Head selection failed, using previous head",
|
||||
head = shortLog(self.chainDag.head), wallSlot
|
||||
return
|
||||
|
||||
# Store the new head in the chain DAG - this may cause epochs to be
|
||||
# justified and finalized
|
||||
self.chainDag.updateHead(newHead, self.quarantine)
|
||||
|
||||
self.checkExpectedBlock()
|
||||
|
||||
proc pruneStateCachesAndForkChoice*(self: var Eth2Processor) =
|
||||
## Prune unneeded and invalidated data after finalization
|
||||
## - the DAG state checkpoints
|
||||
## - the DAG EpochRef
|
||||
## - the attestation pool/fork choice
|
||||
|
||||
# TODO: DAG & fork choice procs are unrelated to gossip validation
|
||||
|
||||
# Cleanup DAG & fork choice if we have a finalized head
|
||||
if self.chainDag.needStateCachesAndForkChoicePruning():
|
||||
self.chainDag.pruneStateCachesDAG()
|
||||
self.attestationPool[].prune()
|
||||
|
||||
proc dumpBlock[T](
|
||||
self: Eth2Processor, signedBlock: SignedBeaconBlock,
|
||||
res: Result[T, (ValidationResult, BlockError)]) =
|
||||
if self.config.dumpEnabled and res.isErr:
|
||||
case res.error[1]
|
||||
of Invalid:
|
||||
dump(
|
||||
self.config.dumpDirInvalid, signedBlock)
|
||||
of MissingParent:
|
||||
dump(
|
||||
self.config.dumpDirIncoming, signedBlock)
|
||||
else:
|
||||
discard
|
||||
|
||||
proc done*(blk: SyncBlock) =
|
||||
## Send signal to [Sync/Request]Manager that the block ``blk`` has passed
|
||||
## verification successfully.
|
||||
if blk.resfut != nil:
|
||||
blk.resfut.complete(Result[void, BlockError].ok())
|
||||
|
||||
proc fail*(blk: SyncBlock, error: BlockError) =
|
||||
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
|
||||
## verification with specific ``error``.
|
||||
if blk.resfut != nil:
|
||||
blk.resfut.complete(Result[void, BlockError].err(error))
|
||||
|
||||
proc complete*(blk: SyncBlock, res: Result[void, BlockError]) =
|
||||
## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk``
|
||||
## verification.
|
||||
if blk.resfut != nil:
|
||||
blk.resfut.complete(res)
|
||||
|
||||
proc storeBlock(
|
||||
self: var Eth2Processor, signedBlock: SignedBeaconBlock,
|
||||
wallSlot: Slot): Result[void, BlockError] =
|
||||
let
|
||||
start = Moment.now()
|
||||
attestationPool = self.attestationPool
|
||||
|
||||
let blck = self.chainDag.addRawBlock(self.quarantine, signedBlock) do (
|
||||
blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock,
|
||||
epochRef: EpochRef, state: HashedBeaconState):
|
||||
# Callback add to fork choice if valid
|
||||
attestationPool[].addForkChoice(
|
||||
epochRef, blckRef, trustedBlock.message, wallSlot)
|
||||
|
||||
self.dumpBlock(signedBlock, blck)
|
||||
|
||||
# There can be a scenario where we receive a block we already received.
|
||||
# However this block was before the last finalized epoch and so its parent
|
||||
# was pruned from the ForkChoice.
|
||||
if blck.isErr:
|
||||
return err(blck.error[1])
|
||||
|
||||
let duration = (Moment.now() - start).toFloatSeconds()
|
||||
beacon_store_block_duration_seconds.observe(duration)
|
||||
ok()
|
||||
|
||||
proc processAttestation(
|
||||
self: var Eth2Processor, entry: AttestationEntry) =
|
||||
logScope:
|
||||
signature = shortLog(entry.v.signature)
|
||||
|
||||
let
|
||||
wallTime = self.getWallTime()
|
||||
(afterGenesis, wallSlot) = wallTime.toSlot()
|
||||
|
||||
if not afterGenesis:
|
||||
error "Processing attestation before genesis, clock turned back?"
|
||||
quit 1
|
||||
|
||||
trace "Processing attestation"
|
||||
self.attestationPool[].addAttestation(
|
||||
entry.v, entry.attesting_indices, wallSlot)
|
||||
|
||||
proc processAggregate(
|
||||
self: var Eth2Processor, entry: AggregateEntry) =
|
||||
logScope:
|
||||
signature = shortLog(entry.v.signature)
|
||||
|
||||
let
|
||||
wallTime = self.getWallTime()
|
||||
(afterGenesis, wallSlot) = wallTime.toSlot()
|
||||
|
||||
if not afterGenesis:
|
||||
error "Processing aggregate before genesis, clock turned back?"
|
||||
quit 1
|
||||
|
||||
trace "Processing aggregate"
|
||||
self.attestationPool[].addAttestation(
|
||||
entry.v, entry.attesting_indices, wallSlot)
|
||||
|
||||
proc processBlock(self: var Eth2Processor, entry: BlockEntry) =
|
||||
logScope:
|
||||
blockRoot = shortLog(entry.v.blk.root)
|
||||
|
||||
let
|
||||
wallTime = self.getWallTime()
|
||||
(afterGenesis, wallSlot) = wallTime.toSlot()
|
||||
|
||||
if not afterGenesis:
|
||||
error "Processing block before genesis, clock turned back?"
|
||||
quit 1
|
||||
|
||||
let
|
||||
start = now(chronos.Moment)
|
||||
res = self.storeBlock(entry.v.blk, wallSlot)
|
||||
storeDone = now(chronos.Moment)
|
||||
|
||||
if res.isOk():
|
||||
# Eagerly update head in case the new block gets selected
|
||||
self.updateHead(wallSlot) # This also eagerly prunes the blocks DAG to prevent processing forks.
|
||||
# self.pruneStateCachesDAG() # Amortized pruning, we don't prune states & fork choice here but in `onSlotEnd`()
|
||||
|
||||
let updateDone = now(chronos.Moment)
|
||||
let storeBlockDuration = storeDone - start
|
||||
let updateHeadDuration = updateDone - storeDone
|
||||
let overallDuration = updateDone - start
|
||||
let storeSpeed =
|
||||
block:
|
||||
let secs = float(chronos.seconds(1).nanoseconds)
|
||||
if not(overallDuration.isZero()):
|
||||
let v = secs / float(overallDuration.nanoseconds)
|
||||
round(v * 10_000) / 10_000
|
||||
else:
|
||||
0.0
|
||||
debug "Block processed",
|
||||
local_head_slot = self.chainDag.head.slot,
|
||||
store_speed = storeSpeed,
|
||||
block_slot = entry.v.blk.message.slot,
|
||||
store_block_duration = $storeBlockDuration,
|
||||
update_head_duration = $updateHeadDuration,
|
||||
overall_duration = $overallDuration
|
||||
|
||||
if entry.v.resFut != nil:
|
||||
entry.v.resFut.complete(Result[void, BlockError].ok())
|
||||
elif res.error() in {BlockError.Duplicate, BlockError.Old}:
|
||||
# These are harmless / valid outcomes - for the purpose of scoring peers,
|
||||
# they are ok
|
||||
if entry.v.resFut != nil:
|
||||
entry.v.resFut.complete(Result[void, BlockError].ok())
|
||||
else:
|
||||
if entry.v.resFut != nil:
|
||||
entry.v.resFut.complete(Result[void, BlockError].err(res.error()))
|
||||
|
||||
{.pop.} # TODO AsyncQueue.addLast raises Exception in theory but not in practice
|
||||
# Gossip Management
|
||||
# -----------------------------------------------------------------------------------
|
||||
|
||||
proc blockValidator*(
|
||||
self: var Eth2Processor,
|
||||
|
@ -322,7 +132,7 @@ proc blockValidator*(
|
|||
let blck = self.chainDag.isValidBeaconBlock(
|
||||
self.quarantine, signedBlock, wallTime, {})
|
||||
|
||||
self.dumpBlock(signedBlock, blck)
|
||||
self.verifQueues[].dumpBlock(signedBlock, blck)
|
||||
|
||||
if not blck.isOk:
|
||||
return blck.error[0]
|
||||
|
@ -336,13 +146,15 @@ proc blockValidator*(
|
|||
# sync, we don't lose the gossip blocks, but also don't block the gossip
|
||||
# propagation of seemingly good blocks
|
||||
trace "Block validated"
|
||||
asyncSpawn self.blocksQueue.addLast(
|
||||
BlockEntry(v: SyncBlock(blk: signedBlock)))
|
||||
try:
|
||||
self.verifQueues[].addBlock(SyncBlock(blk: signedBlock))
|
||||
except Exception as e:
|
||||
# Chronos can in theory raise an untyped exception in `internalCheckComplete`
|
||||
# but in practice that's always a Defect not a Catchable exception
|
||||
raiseAssert e.msg
|
||||
|
||||
ValidationResult.Accept
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
proc checkForPotentialDoppelganger(
|
||||
self: var Eth2Processor, attestationData: AttestationData,
|
||||
attesterIndices: openArray[ValidatorIndex], wallSlot: Slot) =
|
||||
|
@ -405,20 +217,8 @@ proc attestationValidator*(
|
|||
|
||||
self.checkForPotentialDoppelganger(attestation.data, v.value, wallSlot)
|
||||
|
||||
while self.attestationsQueue.full():
|
||||
try:
|
||||
notice "Queue full, dropping attestation",
|
||||
dropped = shortLog(self.attestationsQueue[0].v)
|
||||
discard self.attestationsQueue.popFirstNoWait()
|
||||
except AsyncQueueEmptyError as exc:
|
||||
raiseAssert "If queue is full, we have at least one item! " & exc.msg
|
||||
|
||||
trace "Attestation validated"
|
||||
try:
|
||||
self.attestationsQueue.addLastNoWait(
|
||||
AttestationEntry(v: attestation, attesting_indices: v.get()))
|
||||
except AsyncQueueFullError as exc:
|
||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||
self.verifQueues[].addAttestation(attestation, v.get())
|
||||
|
||||
ValidationResult.Accept
|
||||
|
||||
|
@ -460,25 +260,12 @@ proc aggregateValidator*(
|
|||
self.checkForPotentialDoppelganger(
|
||||
signedAggregateAndProof.message.aggregate.data, v.value, wallSlot)
|
||||
|
||||
while self.aggregatesQueue.full():
|
||||
try:
|
||||
notice "Queue full, dropping aggregate",
|
||||
dropped = shortLog(self.aggregatesQueue[0].v)
|
||||
discard self.aggregatesQueue.popFirstNoWait()
|
||||
except AsyncQueueEmptyError as exc:
|
||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||
|
||||
trace "Aggregate validated",
|
||||
aggregator_index = signedAggregateAndProof.message.aggregator_index,
|
||||
selection_proof = signedAggregateAndProof.message.selection_proof,
|
||||
wallSlot
|
||||
|
||||
try:
|
||||
self.aggregatesQueue.addLastNoWait(AggregateEntry(
|
||||
v: signedAggregateAndProof.message.aggregate,
|
||||
attesting_indices: v.get()))
|
||||
except AsyncQueueFullError as exc:
|
||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||
self.verifQueues[].addAggregate(signedAggregateAndProof, v.get())
|
||||
|
||||
ValidationResult.Accept
|
||||
|
||||
|
@ -527,95 +314,4 @@ proc voluntaryExitValidator*(
|
|||
|
||||
ValidationResult.Accept
|
||||
|
||||
{.pop.} # TODO raises in chronos
|
||||
|
||||
proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} =
|
||||
# Blocks in eth2 arrive on a schedule for every slot:
|
||||
#
|
||||
# * Block arrives at time 0
|
||||
# * Attestations arrives at time 4
|
||||
# * Aggregate arrives at time 8
|
||||
|
||||
var
|
||||
blockFut = self[].blocksQueue.popFirst()
|
||||
aggregateFut = self[].aggregatesQueue.popFirst()
|
||||
attestationFut = self[].attestationsQueue.popFirst()
|
||||
|
||||
while true:
|
||||
# Cooperative concurrency: one idle calculation step per loop - because
|
||||
# we run both networking and CPU-heavy things like block processing
|
||||
# on the same thread, we need to make sure that there is steady progress
|
||||
# on the networking side or we get long lockups that lead to timeouts.
|
||||
const
|
||||
# We cap waiting for an idle slot in case there's a lot of network traffic
|
||||
# taking up all CPU - we don't want to _completely_ stop processing blocks
|
||||
# in this case (attestations will get dropped) - doing so also allows us
|
||||
# to benefit from more batching / larger network reads when under load.
|
||||
idleTimeout = 10.milliseconds
|
||||
|
||||
# Attestation processing is fairly quick and therefore done in batches to
|
||||
# avoid some of the `Future` overhead
|
||||
attestationBatch = 16
|
||||
|
||||
discard await idleAsync().withTimeout(idleTimeout)
|
||||
|
||||
# Avoid one more `await` when there's work to do
|
||||
if not (blockFut.finished or aggregateFut.finished or attestationFut.finished):
|
||||
trace "Waiting for processing work"
|
||||
await blockFut or aggregateFut or attestationFut
|
||||
|
||||
# Only run one task per idle iteration, in priority order: blocks are needed
|
||||
# for all other processing - then come aggregates which are cheap to
|
||||
# process but might have a big impact on fork choice - last come
|
||||
# attestations which individually have the smallest effect on chain progress
|
||||
if blockFut.finished:
|
||||
self[].processBlock(blockFut.read())
|
||||
blockFut = self[].blocksQueue.popFirst()
|
||||
elif aggregateFut.finished:
|
||||
# aggregates will be dropped under heavy load on producer side
|
||||
self[].processAggregate(aggregateFut.read())
|
||||
for i in 0..<attestationBatch: # process a few at a time - this is fairly fast
|
||||
if self[].aggregatesQueue.empty():
|
||||
break
|
||||
self[].processAggregate(self[].aggregatesQueue.popFirstNoWait())
|
||||
|
||||
aggregateFut = self[].aggregatesQueue.popFirst()
|
||||
elif attestationFut.finished:
|
||||
# attestations will be dropped under heavy load on producer side
|
||||
self[].processAttestation(attestationFut.read())
|
||||
|
||||
for i in 0..<attestationBatch: # process a few at a time - this is fairly fast
|
||||
if self[].attestationsQueue.empty():
|
||||
break
|
||||
self[].processAttestation(self[].attestationsQueue.popFirstNoWait())
|
||||
|
||||
attestationFut = self[].attestationsQueue.popFirst()
|
||||
|
||||
proc new*(T: type Eth2Processor,
|
||||
config: BeaconNodeConf,
|
||||
chainDag: ChainDAGRef,
|
||||
attestationPool: ref AttestationPool,
|
||||
exitPool: ref ExitPool,
|
||||
validatorPool: ref ValidatorPool,
|
||||
quarantine: QuarantineRef,
|
||||
getWallTime: GetWallTimeFn): ref Eth2Processor =
|
||||
(ref Eth2Processor)(
|
||||
config: config,
|
||||
getWallTime: getWallTime,
|
||||
chainDag: chainDag,
|
||||
attestationPool: attestationPool,
|
||||
exitPool: exitPool,
|
||||
validatorPool: validatorPool,
|
||||
quarantine: quarantine,
|
||||
blocksQueue: newAsyncQueue[BlockEntry](1),
|
||||
# limit to the max number of aggregates we expect to see in one slot
|
||||
aggregatesQueue: newAsyncQueue[AggregateEntry](
|
||||
(TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT).int),
|
||||
# This queue is a bit harder to bound reasonably - we want to get a good
|
||||
# spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE
|
||||
# per committee - assuming randomness in vote arrival, this limit should
|
||||
# cover that but of course, when votes arrive depends on a number of
|
||||
# factors that are not entire random
|
||||
attestationsQueue: newAsyncQueue[AttestationEntry](
|
||||
(TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int),
|
||||
)
|
||||
{.pop.}
|
||||
|
|
|
@ -0,0 +1,397 @@
|
|||
# beacon_chain
|
||||
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
import
|
||||
std/math,
|
||||
stew/results,
|
||||
chronicles, chronos, metrics,
|
||||
../spec/[crypto, datatypes, digest],
|
||||
../consensus_object_pools/[block_clearance, blockchain_dag, attestation_pool],
|
||||
./consensus_manager,
|
||||
../beacon_node_types,
|
||||
../beacon_clock, ../conf, ../ssz/sszdump
|
||||
|
||||
# Gossip Queue Manager
|
||||
# ------------------------------------------------------------------------------
|
||||
# The queue manager moves blocks from "Gossip validated" to "Consensus verified"
|
||||
|
||||
declareHistogram beacon_store_block_duration_seconds,
|
||||
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
|
||||
|
||||
type
|
||||
SyncBlock* = object
|
||||
blk*: SignedBeaconBlock
|
||||
resfut*: Future[Result[void, BlockError]]
|
||||
|
||||
BlockEntry* = object
|
||||
# Exported for "test_sync_manager"
|
||||
v*: SyncBlock
|
||||
|
||||
AttestationEntry = object
|
||||
v: Attestation
|
||||
attesting_indices: seq[ValidatorIndex]
|
||||
|
||||
AggregateEntry* = AttestationEntry
|
||||
|
||||
VerifQueueManager* = object
|
||||
## This manages the queues of blocks and attestations.
|
||||
## Blocks and attestations are enqueued in a gossip-validated state
|
||||
##
|
||||
## from:
|
||||
## - Gossip (when synced)
|
||||
## - SyncManager (during sync)
|
||||
## - RequestManager (missing ancestor blocks)
|
||||
##
|
||||
## are then consensus-verified and added to:
|
||||
## - the blockchain DAG
|
||||
## - database
|
||||
## - attestation pool
|
||||
## - fork choice
|
||||
##
|
||||
## The queue manager doesn't manage exits (voluntary, attester slashing or proposer slashing)
|
||||
## as don't need extra verification and can be added to the exit pool as soon as they are gossip-validated.
|
||||
|
||||
# Config
|
||||
# ----------------------------------------------------------------
|
||||
dumpEnabled: bool
|
||||
dumpDirInvalid: string
|
||||
dumpDirIncoming: string
|
||||
|
||||
# Clock
|
||||
# ----------------------------------------------------------------
|
||||
getWallTime: GetWallTimeFn
|
||||
|
||||
# Producers
|
||||
# ----------------------------------------------------------------
|
||||
blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager"
|
||||
attestationsQueue: AsyncQueue[AttestationEntry]
|
||||
aggregatesQueue: AsyncQueue[AggregateEntry]
|
||||
|
||||
# Consumer
|
||||
# ----------------------------------------------------------------
|
||||
consensusManager: ref ConsensusManager
|
||||
## Blockchain DAG, AttestationPool and Quarantine
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
# Initialization
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc new*(T: type VerifQueueManager,
|
||||
conf: BeaconNodeConf,
|
||||
consensusManager: ref ConsensusManager,
|
||||
getWallTime: GetWallTimeFn): ref VerifQueueManager =
|
||||
(ref VerifQueueManager)(
|
||||
dumpEnabled: conf.dumpEnabled,
|
||||
dumpDirInvalid: conf.dumpDirInvalid,
|
||||
dumpDirIncoming: conf.dumpDirIncoming,
|
||||
|
||||
getWallTime: getWallTime,
|
||||
|
||||
blocksQueue: newAsyncQueue[BlockEntry](1),
|
||||
# limit to the max number of aggregates we expect to see in one slot
|
||||
aggregatesQueue: newAsyncQueue[AggregateEntry](
|
||||
(TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT).int),
|
||||
# This queue is a bit harder to bound reasonably - we want to get a good
|
||||
# spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE
|
||||
# per committee - assuming randomness in vote arrival, this limit should
|
||||
# cover that but of course, when votes arrive depends on a number of
|
||||
# factors that are not entire random
|
||||
attestationsQueue: newAsyncQueue[AttestationEntry](
|
||||
(TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int),
|
||||
|
||||
consensusManager: consensusManager
|
||||
)
|
||||
|
||||
# Sync callbacks
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc done*(blk: SyncBlock) =
|
||||
## Send signal to [Sync/Request]Manager that the block ``blk`` has passed
|
||||
## verification successfully.
|
||||
if blk.resfut != nil:
|
||||
blk.resfut.complete(Result[void, BlockError].ok())
|
||||
|
||||
proc fail*(blk: SyncBlock, error: BlockError) =
|
||||
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
|
||||
## verification with specific ``error``.
|
||||
if blk.resfut != nil:
|
||||
blk.resfut.complete(Result[void, BlockError].err(error))
|
||||
|
||||
proc complete*(blk: SyncBlock, res: Result[void, BlockError]) =
|
||||
## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk``
|
||||
## verification.
|
||||
if blk.resfut != nil:
|
||||
blk.resfut.complete(res)
|
||||
|
||||
# Enqueue
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
{.pop.}
|
||||
|
||||
proc addBlock*(self: var VerifQueueManager, syncBlock: SyncBlock) {.raises: [Exception].} =
|
||||
## Enqueue a Gossip-validated block for consensus verification
|
||||
# Backpressure:
|
||||
# If no item can be enqueued because buffer is full,
|
||||
# we suspend here.
|
||||
# Producers:
|
||||
# - Gossip (when synced)
|
||||
# - SyncManager (during sync)
|
||||
# - RequestManager (missing ancestor blocks)
|
||||
|
||||
# TODO: solve the signature requiring raise: [Exception]
|
||||
# even when push/pop is used
|
||||
|
||||
asyncSpawn(
|
||||
try:
|
||||
self.blocksQueue.addLast(BlockEntry(v: syncBlock))
|
||||
except Exception as e:
|
||||
# Chronos can in theory raise an untyped exception in `internalCheckComplete`
|
||||
# which asyncSpawn doesn't like.
|
||||
raiseAssert e.msg
|
||||
)
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
proc addAttestation*(self: var VerifQueueManager, att: Attestation, att_indices: seq[ValidatorIndex]) =
|
||||
## Enqueue a Gossip-validated attestation for consensus verification
|
||||
# Backpressure:
|
||||
# no handling
|
||||
# Producer:
|
||||
# - Gossip (when synced)
|
||||
while self.attestationsQueue.full():
|
||||
try:
|
||||
notice "Queue full, dropping oldest attestation",
|
||||
dropped = shortLog(self.attestationsQueue[0].v)
|
||||
discard self.attestationsQueue.popFirstNoWait()
|
||||
except AsyncQueueEmptyError as exc:
|
||||
raiseAssert "If queue is full, we have at least one item! " & exc.msg
|
||||
|
||||
try:
|
||||
self.attestationsQueue.addLastNoWait(
|
||||
AttestationEntry(v: att, attesting_indices: att_indices))
|
||||
except AsyncQueueFullError as exc:
|
||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||
|
||||
proc addAggregate*(self: var VerifQueueManager, agg: SignedAggregateAndProof, att_indices: seq[ValidatorIndex]) =
|
||||
## Enqueue a Gossip-validated aggregate attestation for consensus verification
|
||||
# Backpressure:
|
||||
# no handling
|
||||
# Producer:
|
||||
# - Gossip (when synced)
|
||||
|
||||
while self.aggregatesQueue.full():
|
||||
try:
|
||||
notice "Queue full, dropping oldest aggregate",
|
||||
dropped = shortLog(self.aggregatesQueue[0].v)
|
||||
discard self.aggregatesQueue.popFirstNoWait()
|
||||
except AsyncQueueEmptyError as exc:
|
||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||
|
||||
try:
|
||||
self.aggregatesQueue.addLastNoWait(AggregateEntry(
|
||||
v: agg.message.aggregate,
|
||||
attesting_indices: att_indices))
|
||||
except AsyncQueueFullError as exc:
|
||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||
|
||||
# Storage
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc dumpBlock*[T](
|
||||
self: VerifQueueManager, signedBlock: SignedBeaconBlock,
|
||||
res: Result[T, (ValidationResult, BlockError)]) =
|
||||
if self.dumpEnabled and res.isErr:
|
||||
case res.error[1]
|
||||
of Invalid:
|
||||
dump(
|
||||
self.dumpDirInvalid, signedBlock)
|
||||
of MissingParent:
|
||||
dump(
|
||||
self.dumpDirIncoming, signedBlock)
|
||||
else:
|
||||
discard
|
||||
|
||||
proc storeBlock(
|
||||
self: var VerifQueueManager, signedBlock: SignedBeaconBlock,
|
||||
wallSlot: Slot): Result[void, BlockError] =
|
||||
let
|
||||
start = Moment.now()
|
||||
attestationPool = self.consensusManager.attestationPool
|
||||
|
||||
let blck = self.consensusManager.chainDag.addRawBlock(self.consensusManager.quarantine, signedBlock) do (
|
||||
blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock,
|
||||
epochRef: EpochRef, state: HashedBeaconState):
|
||||
# Callback add to fork choice if valid
|
||||
attestationPool[].addForkChoice(
|
||||
epochRef, blckRef, trustedBlock.message, wallSlot)
|
||||
|
||||
self.dumpBlock(signedBlock, blck)
|
||||
|
||||
# There can be a scenario where we receive a block we already received.
|
||||
# However this block was before the last finalized epoch and so its parent
|
||||
# was pruned from the ForkChoice.
|
||||
if blck.isErr:
|
||||
return err(blck.error[1])
|
||||
|
||||
let duration = (Moment.now() - start).toFloatSeconds()
|
||||
beacon_store_block_duration_seconds.observe(duration)
|
||||
ok()
|
||||
|
||||
# Event Loop
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc processAttestation(
|
||||
self: var VerifQueueManager, entry: AttestationEntry) =
|
||||
logScope:
|
||||
signature = shortLog(entry.v.signature)
|
||||
|
||||
let
|
||||
wallTime = self.getWallTime()
|
||||
(afterGenesis, wallSlot) = wallTime.toSlot()
|
||||
|
||||
if not afterGenesis:
|
||||
error "Processing attestation before genesis, clock turned back?"
|
||||
quit 1
|
||||
|
||||
trace "Processing attestation"
|
||||
self.consensusManager.attestationPool[].addAttestation(
|
||||
entry.v, entry.attesting_indices, wallSlot)
|
||||
|
||||
proc processAggregate(
|
||||
self: var VerifQueueManager, entry: AggregateEntry) =
|
||||
logScope:
|
||||
signature = shortLog(entry.v.signature)
|
||||
|
||||
let
|
||||
wallTime = self.getWallTime()
|
||||
(afterGenesis, wallSlot) = wallTime.toSlot()
|
||||
|
||||
if not afterGenesis:
|
||||
error "Processing aggregate before genesis, clock turned back?"
|
||||
quit 1
|
||||
|
||||
trace "Processing aggregate"
|
||||
self.consensusManager.attestationPool[].addAttestation(
|
||||
entry.v, entry.attesting_indices, wallSlot)
|
||||
|
||||
proc processBlock(self: var VerifQueueManager, entry: BlockEntry) =
|
||||
logScope:
|
||||
blockRoot = shortLog(entry.v.blk.root)
|
||||
|
||||
let
|
||||
wallTime = self.getWallTime()
|
||||
(afterGenesis, wallSlot) = wallTime.toSlot()
|
||||
|
||||
if not afterGenesis:
|
||||
error "Processing block before genesis, clock turned back?"
|
||||
quit 1
|
||||
|
||||
let
|
||||
start = now(chronos.Moment)
|
||||
res = self.storeBlock(entry.v.blk, wallSlot)
|
||||
storeDone = now(chronos.Moment)
|
||||
|
||||
if res.isOk():
|
||||
# Eagerly update head in case the new block gets selected
|
||||
self.consensusManager[].updateHead(wallSlot) # This also eagerly prunes the blocks DAG to prevent processing forks.
|
||||
# self.consensusManager.pruneStateCachesDAG() # Amortized pruning, we don't prune states & fork choice here but in `onSlotEnd`()
|
||||
|
||||
let updateDone = now(chronos.Moment)
|
||||
let storeBlockDuration = storeDone - start
|
||||
let updateHeadDuration = updateDone - storeDone
|
||||
let overallDuration = updateDone - start
|
||||
let storeSpeed =
|
||||
block:
|
||||
let secs = float(chronos.seconds(1).nanoseconds)
|
||||
if not(overallDuration.isZero()):
|
||||
let v = secs / float(overallDuration.nanoseconds)
|
||||
round(v * 10_000) / 10_000
|
||||
else:
|
||||
0.0
|
||||
debug "Block processed",
|
||||
local_head_slot = self.consensusManager.chainDag.head.slot,
|
||||
store_speed = storeSpeed,
|
||||
block_slot = entry.v.blk.message.slot,
|
||||
store_block_duration = $storeBlockDuration,
|
||||
update_head_duration = $updateHeadDuration,
|
||||
overall_duration = $overallDuration
|
||||
|
||||
if entry.v.resFut != nil:
|
||||
entry.v.resFut.complete(Result[void, BlockError].ok())
|
||||
elif res.error() in {BlockError.Duplicate, BlockError.Old}:
|
||||
# These are harmless / valid outcomes - for the purpose of scoring peers,
|
||||
# they are ok
|
||||
if entry.v.resFut != nil:
|
||||
entry.v.resFut.complete(Result[void, BlockError].ok())
|
||||
else:
|
||||
if entry.v.resFut != nil:
|
||||
entry.v.resFut.complete(Result[void, BlockError].err(res.error()))
|
||||
|
||||
{.pop.} # Chronos: Error: can raise an unlisted exception: ref Exception
|
||||
|
||||
proc runQueueProcessingLoop*(self: ref VerifQueueManager) {.async.} =
|
||||
# Blocks in eth2 arrive on a schedule for every slot:
|
||||
#
|
||||
# * Block arrives at time 0
|
||||
# * Attestations arrives at time 4
|
||||
# * Aggregate arrives at time 8
|
||||
|
||||
var
|
||||
blockFut = self[].blocksQueue.popFirst()
|
||||
aggregateFut = self[].aggregatesQueue.popFirst()
|
||||
attestationFut = self[].attestationsQueue.popFirst()
|
||||
|
||||
while true:
|
||||
# Cooperative concurrency: one idle calculation step per loop - because
|
||||
# we run both networking and CPU-heavy things like block processing
|
||||
# on the same thread, we need to make sure that there is steady progress
|
||||
# on the networking side or we get long lockups that lead to timeouts.
|
||||
const
|
||||
# We cap waiting for an idle slot in case there's a lot of network traffic
|
||||
# taking up all CPU - we don't want to _completely_ stop processing blocks
|
||||
# in this case (attestations will get dropped) - doing so also allows us
|
||||
# to benefit from more batching / larger network reads when under load.
|
||||
idleTimeout = 10.milliseconds
|
||||
|
||||
# Attestation processing is fairly quick and therefore done in batches to
|
||||
# avoid some of the `Future` overhead
|
||||
attestationBatch = 16
|
||||
|
||||
discard await idleAsync().withTimeout(idleTimeout)
|
||||
|
||||
# Avoid one more `await` when there's work to do
|
||||
if not (blockFut.finished or aggregateFut.finished or attestationFut.finished):
|
||||
trace "Waiting for processing work"
|
||||
await blockFut or aggregateFut or attestationFut
|
||||
|
||||
# Only run one task per idle iteration, in priority order: blocks are needed
|
||||
# for all other processing - then come aggregates which are cheap to
|
||||
# process but might have a big impact on fork choice - last come
|
||||
# attestations which individually have the smallest effect on chain progress
|
||||
if blockFut.finished:
|
||||
self[].processBlock(blockFut.read())
|
||||
blockFut = self[].blocksQueue.popFirst()
|
||||
elif aggregateFut.finished:
|
||||
# aggregates will be dropped under heavy load on producer side
|
||||
self[].processAggregate(aggregateFut.read())
|
||||
for i in 0..<attestationBatch: # process a few at a time - this is fairly fast
|
||||
if self[].aggregatesQueue.empty():
|
||||
break
|
||||
self[].processAggregate(self[].aggregatesQueue.popFirstNoWait())
|
||||
|
||||
aggregateFut = self[].aggregatesQueue.popFirst()
|
||||
elif attestationFut.finished:
|
||||
# attestations will be dropped under heavy load on producer side
|
||||
self[].processAttestation(attestationFut.read())
|
||||
|
||||
for i in 0..<attestationBatch: # process a few at a time - this is fairly fast
|
||||
if self[].attestationsQueue.empty():
|
||||
break
|
||||
self[].processAttestation(self[].attestationsQueue.popFirstNoWait())
|
||||
|
||||
attestationFut = self[].attestationsQueue.popFirst()
|
|
@ -29,7 +29,7 @@ import
|
|||
nimbus_binary_common, ssz/merkleization, statusbar,
|
||||
beacon_clock, version],
|
||||
./networking/[eth2_discovery, eth2_network, network_metadata],
|
||||
./gossip_processing/eth2_processor,
|
||||
./gossip_processing/[eth2_processor, gossip_to_consensus, consensus_manager],
|
||||
./validators/[attestation_aggregation, validator_duties, validator_pool, slashing_protection, keystore_management],
|
||||
./sync/[sync_manager, sync_protocol, request_manager],
|
||||
./rpc/[beacon_api, config_api, debug_api, event_api, nimbus_api, node_api,
|
||||
|
@ -311,9 +311,19 @@ proc init*(T: type BeaconNode,
|
|||
disagreementBehavior = kChooseV2
|
||||
)
|
||||
validatorPool = newClone(ValidatorPool.init(slashingProtectionDB))
|
||||
|
||||
consensusManager = ConsensusManager.new(
|
||||
chainDag, attestationPool, quarantine
|
||||
)
|
||||
verifQueues = VerifQueueManager.new(
|
||||
config, consensusManager,
|
||||
proc(): BeaconTime = beaconClock.now())
|
||||
processor = Eth2Processor.new(
|
||||
config, chainDag, attestationPool, exitPool, validatorPool,
|
||||
quarantine, proc(): BeaconTime = beaconClock.now())
|
||||
config,
|
||||
verifQueues,
|
||||
chainDag, attestationPool, exitPool, validatorPool,
|
||||
quarantine,
|
||||
proc(): BeaconTime = beaconClock.now())
|
||||
|
||||
var res = BeaconNode(
|
||||
nickname: nickname,
|
||||
|
@ -335,7 +345,9 @@ proc init*(T: type BeaconNode,
|
|||
topicBeaconBlocks: topicBeaconBlocks,
|
||||
topicAggregateAndProofs: topicAggregateAndProofs,
|
||||
processor: processor,
|
||||
requestManager: RequestManager.init(network, processor.blocksQueue)
|
||||
verifQueues: verifQueues,
|
||||
consensusManager: consensusManager,
|
||||
requestManager: RequestManager.init(network, verifQueues)
|
||||
)
|
||||
|
||||
# set topic validation routine
|
||||
|
@ -896,7 +908,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
|
|||
|
||||
# Delay part of pruning until latency critical duties are done.
|
||||
# The other part of pruning, `pruneBlocksDAG`, is done eagerly.
|
||||
node.processor[].pruneStateCachesAndForkChoice()
|
||||
node.consensusManager[].pruneStateCachesAndForkChoice()
|
||||
|
||||
when declared(GC_fullCollect):
|
||||
# The slots in the beacon node work as frames in a game: we want to make
|
||||
|
@ -980,7 +992,7 @@ proc onSlotStart(
|
|||
if node.config.verifyFinalization:
|
||||
verifyFinalization(node, wallSlot)
|
||||
|
||||
node.processor[].updateHead(wallSlot)
|
||||
node.consensusManager[].updateHead(wallSlot)
|
||||
|
||||
await node.handleValidatorDuties(lastSlot, wallSlot)
|
||||
|
||||
|
@ -1115,7 +1127,7 @@ proc startSyncManager(node: BeaconNode) =
|
|||
|
||||
node.syncManager = newSyncManager[Peer, PeerID](
|
||||
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
|
||||
getFirstSlotAtFinalizedEpoch, node.processor.blocksQueue, chunkSize = 32
|
||||
getFirstSlotAtFinalizedEpoch, node.verifQueues, chunkSize = 32
|
||||
)
|
||||
node.syncManager.start()
|
||||
|
||||
|
@ -1193,7 +1205,7 @@ proc run*(node: BeaconNode) =
|
|||
let startTime = node.beaconClock.now()
|
||||
asyncSpawn runSlotLoop(node, startTime)
|
||||
asyncSpawn runOnSecondLoop(node)
|
||||
asyncSpawn runQueueProcessingLoop(node.processor)
|
||||
asyncSpawn runQueueProcessingLoop(node.verifQueues)
|
||||
|
||||
node.requestManager.start()
|
||||
node.startSyncManager()
|
||||
|
|
|
@ -34,7 +34,7 @@ template loadOrExitFalse(signature: ValidatorSig): blscurve.Signature =
|
|||
## Exists the **caller** with false if the signature is invalid
|
||||
let sig = signature.load()
|
||||
if sig.isNone:
|
||||
return false # this exists the calling scope, as templates are inlined.
|
||||
return false # this exits the calling scope, as templates are inlined.
|
||||
sig.unsafeGet()
|
||||
|
||||
template loadWithCacheOrExitFalse(pubkey: ValidatorPubKey): blscurve.PublicKey =
|
||||
|
@ -42,7 +42,7 @@ template loadWithCacheOrExitFalse(pubkey: ValidatorPubKey): blscurve.PublicKey =
|
|||
## Exists the **caller** with false if the public key is invalid
|
||||
let pk = pubkey.loadWithCache()
|
||||
if pk.isNone:
|
||||
return false # this exists the calling scope, as templates are inlined.
|
||||
return false # this exits the calling scope, as templates are inlined.
|
||||
pk.unsafeGet()
|
||||
|
||||
func addSignatureSet[T](
|
||||
|
|
|
@ -4,7 +4,7 @@ import ../spec/[datatypes, digest],
|
|||
../networking/eth2_network,
|
||||
../beacon_node_types,
|
||||
../ssz/merkleization,
|
||||
../gossip_processing/eth2_processor,
|
||||
../gossip_processing/gossip_to_consensus,
|
||||
./sync_protocol, ./sync_manager
|
||||
export sync_manager
|
||||
|
||||
|
@ -22,7 +22,7 @@ type
|
|||
RequestManager* = object
|
||||
network*: Eth2Node
|
||||
inpQueue*: AsyncQueue[FetchRecord]
|
||||
outQueue*: AsyncQueue[BlockEntry]
|
||||
verifQueues: ref VerifQueueManager
|
||||
loopFuture: Future[void]
|
||||
|
||||
func shortLog*(x: seq[Eth2Digest]): string =
|
||||
|
@ -32,11 +32,11 @@ func shortLog*(x: seq[FetchRecord]): string =
|
|||
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"
|
||||
|
||||
proc init*(T: type RequestManager, network: Eth2Node,
|
||||
outputQueue: AsyncQueue[BlockEntry]): RequestManager =
|
||||
verifQueues: ref VerifQueueManager): RequestManager =
|
||||
RequestManager(
|
||||
network: network,
|
||||
inpQueue: newAsyncQueue[FetchRecord](),
|
||||
outQueue: outputQueue
|
||||
verifQueues: verifQueues
|
||||
)
|
||||
|
||||
proc checkResponse(roots: openArray[Eth2Digest],
|
||||
|
@ -59,7 +59,7 @@ proc validate(rman: RequestManager,
|
|||
blk: b,
|
||||
resfut: newFuture[Result[void, BlockError]]("request.manager.validate")
|
||||
)
|
||||
await rman.outQueue.addLast(BlockEntry(v: sblock))
|
||||
rman.verifQueues[].addBlock(sblock)
|
||||
return await sblock.resfut
|
||||
|
||||
proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
|
||||
|
|
|
@ -4,7 +4,7 @@ import stew/results, chronos, chronicles
|
|||
import ../spec/[datatypes, digest, helpers, eth2_apis/callsigs_types],
|
||||
../networking/[peer_pool, eth2_network]
|
||||
|
||||
import ../gossip_processing/eth2_processor
|
||||
import ../gossip_processing/gossip_to_consensus
|
||||
import ../consensus_object_pools/block_pools_types
|
||||
export datatypes, digest, chronos, chronicles, results, block_pools_types
|
||||
|
||||
|
@ -86,7 +86,7 @@ type
|
|||
debtsCount: uint64
|
||||
readyQueue: HeapQueue[SyncResult[T]]
|
||||
rewind: Option[RewindPoint]
|
||||
outQueue: AsyncQueue[BlockEntry]
|
||||
verifQueues: ref VerifQueueManager
|
||||
|
||||
SyncWorkerStatus* {.pure.} = enum
|
||||
Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Processing
|
||||
|
@ -113,7 +113,7 @@ type
|
|||
chunkSize: uint64
|
||||
queue: SyncQueue[A]
|
||||
syncFut: Future[void]
|
||||
outQueue: AsyncQueue[BlockEntry]
|
||||
verifQueues: ref VerifQueueManager
|
||||
inProgress*: bool
|
||||
insSyncSpeed*: float
|
||||
avgSyncSpeed*: float
|
||||
|
@ -139,7 +139,7 @@ proc validate*[T](sq: SyncQueue[T],
|
|||
blk: blk,
|
||||
resfut: newFuture[Result[void, BlockError]]("sync.manager.validate")
|
||||
)
|
||||
await sq.outQueue.addLast(BlockEntry(v: sblock))
|
||||
sq.verifQueues[].addBlock(sblock)
|
||||
return await sblock.resfut
|
||||
|
||||
proc getShortMap*[T](req: SyncRequest[T],
|
||||
|
@ -240,16 +240,16 @@ proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
|
|||
proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
|
||||
start, last: Slot, chunkSize: uint64,
|
||||
getFinalizedSlotCb: GetSlotCallback,
|
||||
outputQueue: AsyncQueue[BlockEntry],
|
||||
queueSize: int = -1): SyncQueue[T] =
|
||||
verifQueues: ref VerifQueueManager,
|
||||
syncQueueSize: int = -1): SyncQueue[T] =
|
||||
## Create new synchronization queue with parameters
|
||||
##
|
||||
## ``start`` and ``last`` are starting and finishing Slots.
|
||||
##
|
||||
## ``chunkSize`` maximum number of slots in one request.
|
||||
##
|
||||
## ``queueSize`` maximum queue size for incoming data. If ``queueSize > 0``
|
||||
## queue will help to keep backpressure under control. If ``queueSize <= 0``
|
||||
## ``syncQueueSize`` maximum queue size for incoming data. If ``syncQueueSize > 0``
|
||||
## queue will help to keep backpressure under control. If ``syncQueueSize <= 0``
|
||||
## then queue size is unlimited (default).
|
||||
##
|
||||
## ``updateCb`` procedure which will be used to send downloaded blocks to
|
||||
|
@ -298,7 +298,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
|
|||
startSlot: start,
|
||||
lastSlot: last,
|
||||
chunkSize: chunkSize,
|
||||
queueSize: queueSize,
|
||||
queueSize: syncQueueSize,
|
||||
getFinalizedSlot: getFinalizedSlotCb,
|
||||
waiters: newSeq[SyncWaiter[T]](),
|
||||
counter: 1'u64,
|
||||
|
@ -306,7 +306,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
|
|||
debtsQueue: initHeapQueue[SyncRequest[T]](),
|
||||
inpSlot: start,
|
||||
outSlot: start,
|
||||
outQueue: outputQueue
|
||||
verifQueues: verifQueues
|
||||
)
|
||||
|
||||
proc `<`*[T](a, b: SyncRequest[T]): bool {.inline.} =
|
||||
|
@ -666,7 +666,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
|||
getLocalHeadSlotCb: GetSlotCallback,
|
||||
getLocalWallSlotCb: GetSlotCallback,
|
||||
getFinalizedSlotCb: GetSlotCallback,
|
||||
outputQueue: AsyncQueue[BlockEntry],
|
||||
verifQueues: ref VerifQueueManager,
|
||||
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
|
||||
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
|
||||
sleepTime = (int(SLOTS_PER_EPOCH) *
|
||||
|
@ -677,7 +677,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
|||
): SyncManager[A, B] =
|
||||
|
||||
let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(),
|
||||
chunkSize, getFinalizedSlotCb, outputQueue, 1)
|
||||
chunkSize, getFinalizedSlotCb, verifQueues, 1)
|
||||
|
||||
result = SyncManager[A, B](
|
||||
pool: pool,
|
||||
|
@ -689,7 +689,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
|||
sleepTime: sleepTime,
|
||||
chunkSize: chunkSize,
|
||||
queue: queue,
|
||||
outQueue: outputQueue,
|
||||
verifQueues: verifQueues,
|
||||
notInSyncEvent: newAsyncEvent(),
|
||||
inRangeEvent: newAsyncEvent(),
|
||||
notInRangeEvent: newAsyncEvent(),
|
||||
|
@ -1109,7 +1109,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
man.queue = SyncQueue.init(A, man.getLocalHeadSlot(),
|
||||
man.getLocalWallSlot(),
|
||||
man.chunkSize, man.getFinalizedSlot,
|
||||
man.outQueue, 1)
|
||||
man.verifQueues, 1)
|
||||
man.notInSyncEvent.fire()
|
||||
man.inProgress = true
|
||||
else:
|
||||
|
|
|
@ -30,7 +30,8 @@ import
|
|||
".."/[beacon_node_common, beacon_node_types, version],
|
||||
../ssz, ../ssz/sszdump, ../sync/sync_manager,
|
||||
./slashing_protection, ./attestation_aggregation,
|
||||
./validator_pool, ./keystore_management
|
||||
./validator_pool, ./keystore_management,
|
||||
../gossip_processing/consensus_manager
|
||||
|
||||
# Metrics for tracking attestation and beacon block loss
|
||||
const delayBuckets = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05,
|
||||
|
@ -679,7 +680,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
|
|||
attestationCutoff = shortLog(attestationCutoff.offset)
|
||||
|
||||
# Wait either for the block or the attestation cutoff time to arrive
|
||||
if await node.processor[].expectBlock(slot).withTimeout(attestationCutoff.offset):
|
||||
if await node.consensusManager[].expectBlock(slot).withTimeout(attestationCutoff.offset):
|
||||
# The expected block arrived (or expectBlock was called again which
|
||||
# shouldn't happen as this is the only place we use it) - according to the
|
||||
# spec, we should now wait for abs(slotTimingEntropy) - in our async loop
|
||||
|
@ -706,7 +707,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
|
|||
await sleepAsync(afterBlockCutoff.offset)
|
||||
|
||||
# Time passed - we might need to select a new head in that case
|
||||
node.processor[].updateHead(slot)
|
||||
node.consensusManager[].updateHead(slot)
|
||||
head = node.chainDag.head
|
||||
|
||||
handleAttestations(node, head, slot)
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import unittest
|
||||
import chronos
|
||||
import ../beacon_chain/gossip_processing/eth2_processor,
|
||||
import ../beacon_chain/gossip_processing/[eth2_processor, gossip_to_consensus],
|
||||
../beacon_chain/sync/sync_manager
|
||||
|
||||
type
|
||||
|
@ -17,6 +17,13 @@ proc updateScore(peer: SomeTPeer, score: int) =
|
|||
proc getFirstSlotAtFinalizedEpoch(): Slot =
|
||||
Slot(0)
|
||||
|
||||
proc newVerifQueues(): ref VerifQueueManager =
|
||||
# We only want VerifQueueManager.blocksQueue to be an AsyncQueue of size 1
|
||||
# The rest of the fields are unused
|
||||
(ref VerifQueueManager)(
|
||||
blocksQueue: newAsyncQueue[BlockEntry](1)
|
||||
)
|
||||
|
||||
suite "SyncManager test suite":
|
||||
proc createChain(start, finish: Slot): seq[SignedBeaconBlock] =
|
||||
doAssert(start <= finish)
|
||||
|
@ -29,7 +36,7 @@ suite "SyncManager test suite":
|
|||
|
||||
test "[SyncQueue] Start and finish slots equal":
|
||||
let p1 = SomeTPeer()
|
||||
let aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq)
|
||||
check len(queue) == 1
|
||||
|
@ -46,7 +53,7 @@ suite "SyncManager test suite":
|
|||
r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64
|
||||
|
||||
test "[SyncQueue] Two full requests success/fail":
|
||||
let aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq)
|
||||
let p1 = SomeTPeer()
|
||||
|
@ -75,7 +82,7 @@ suite "SyncManager test suite":
|
|||
r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64
|
||||
|
||||
test "[SyncQueue] Full and incomplete success/fail start from zero":
|
||||
let aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq)
|
||||
let p1 = SomeTPeer()
|
||||
|
@ -115,7 +122,7 @@ suite "SyncManager test suite":
|
|||
r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64
|
||||
|
||||
test "[SyncQueue] Full and incomplete success/fail start from non-zero":
|
||||
let aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq)
|
||||
let p1 = SomeTPeer()
|
||||
|
@ -144,7 +151,7 @@ suite "SyncManager test suite":
|
|||
r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64
|
||||
|
||||
test "[SyncQueue] Smart and stupid success/fail":
|
||||
let aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq)
|
||||
let p1 = SomeTPeer()
|
||||
|
@ -173,7 +180,7 @@ suite "SyncManager test suite":
|
|||
r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64
|
||||
|
||||
test "[SyncQueue] One smart and one stupid + debt split + empty":
|
||||
let aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq)
|
||||
let p1 = SomeTPeer()
|
||||
|
@ -218,12 +225,12 @@ suite "SyncManager test suite":
|
|||
sblock.v.fail(BlockError.Invalid)
|
||||
sblock.v.done()
|
||||
|
||||
var aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var chain = createChain(Slot(0), Slot(2))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq, 1)
|
||||
|
||||
var validatorFut = simpleValidator(aq)
|
||||
var validatorFut = simpleValidator(aq[].blocksQueue)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
let p3 = SomeTPeer()
|
||||
|
@ -267,7 +274,7 @@ suite "SyncManager test suite":
|
|||
sblock.v.fail(BlockError.Invalid)
|
||||
sblock.v.done()
|
||||
|
||||
var aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var chain = createChain(Slot(5), Slot(11))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq, 2)
|
||||
|
@ -277,7 +284,7 @@ suite "SyncManager test suite":
|
|||
let p3 = SomeTPeer()
|
||||
let p4 = SomeTPeer()
|
||||
|
||||
var validatorFut = simpleValidator(aq)
|
||||
var validatorFut = simpleValidator(aq[].blocksQueue)
|
||||
|
||||
var r21 = queue.pop(Slot(11), p1)
|
||||
var r22 = queue.pop(Slot(11), p2)
|
||||
|
@ -322,7 +329,7 @@ suite "SyncManager test suite":
|
|||
sblock.v.fail(BlockError.Invalid)
|
||||
sblock.v.done()
|
||||
|
||||
var aq = newAsyncQueue[BlockEntry](1)
|
||||
let aq = newVerifQueues()
|
||||
var chain = createChain(Slot(5), Slot(18))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64,
|
||||
getFirstSlotAtFinalizedEpoch, aq, 2)
|
||||
|
@ -334,7 +341,7 @@ suite "SyncManager test suite":
|
|||
let p6 = SomeTPeer()
|
||||
let p7 = SomeTPeer()
|
||||
|
||||
var validatorFut = simpleValidator(aq)
|
||||
var validatorFut = simpleValidator(aq[].blocksQueue)
|
||||
|
||||
var r21 = queue.pop(Slot(20), p1)
|
||||
var r22 = queue.pop(Slot(20), p2)
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
-d:"libp2p_pki_schemes=secp256k1"
|
Loading…
Reference in New Issue