From c47d636cb3418e597dbcaeb2d5e382a0471293e5 Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Thu, 11 Mar 2021 11:10:57 +0100 Subject: [PATCH] Split Eth2Processor in prep for batching (#2396) * Split Eth2Processor in gossip and consensus part and materialize the shared block queue * Update initialization in test_sync_manager --- beacon_chain/beacon_clock.nim | 2 + beacon_chain/beacon_node_common.nim | 4 +- .../gossip_processing/consensus_manager.nim | 103 +++++ .../gossip_processing/eth2_processor.nim | 396 ++--------------- .../gossip_processing/gossip_to_consensus.nim | 397 ++++++++++++++++++ beacon_chain/nimbus_beacon_node.nim | 28 +- beacon_chain/spec/signatures_batch.nim | 4 +- beacon_chain/sync/request_manager.nim | 10 +- beacon_chain/sync/sync_manager.nim | 28 +- beacon_chain/validators/validator_duties.nim | 7 +- tests/test_sync_manager.nim | 33 +- tests/test_sync_manager.nim.cfg | 1 + 12 files changed, 617 insertions(+), 396 deletions(-) create mode 100644 beacon_chain/gossip_processing/consensus_manager.nim create mode 100644 beacon_chain/gossip_processing/gossip_to_consensus.nim create mode 100644 tests/test_sync_manager.nim.cfg diff --git a/beacon_chain/beacon_clock.nim b/beacon_chain/beacon_clock.nim index 0875860c5..7a9b1845e 100644 --- a/beacon_chain/beacon_clock.nim +++ b/beacon_chain/beacon_clock.nim @@ -24,6 +24,8 @@ type BeaconTime* = distinct Duration ## Nanoseconds from beacon genesis time + GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].} + proc init*(T: type BeaconClock, genesis_time: uint64): T = # ~290 billion years into the future doAssert genesis_time <= high(int64).uint64 diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index 95ccc668e..b3fb9e657 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -16,7 +16,7 @@ import # Local modules ./conf, ./beacon_clock, ./beacon_chain_db, ./beacon_node_types, - ./gossip_processing/eth2_processor, + ./gossip_processing/[eth2_processor, gossip_to_consensus, consensus_manager], ./networking/eth2_network, ./eth1/eth1_monitor, ./consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool], @@ -57,6 +57,8 @@ type genesisSnapshotContent*: string attestationSubnets*: AttestationSubnets processor*: ref Eth2Processor + verifQueues*: ref VerifQueueManager + consensusManager*: ref ConsensusManager attachedValidatorBalanceTotal*: uint64 const diff --git a/beacon_chain/gossip_processing/consensus_manager.nim b/beacon_chain/gossip_processing/consensus_manager.nim new file mode 100644 index 000000000..6a26ab0f2 --- /dev/null +++ b/beacon_chain/gossip_processing/consensus_manager.nim @@ -0,0 +1,103 @@ +# beacon_chain +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + chronicles, chronos, + ../spec/[crypto, datatypes, digest], + ../consensus_object_pools/[blockchain_dag, attestation_pool] + +# TODO: Move to "consensus_object_pools" folder + +type + ConsensusManager* = object + expectedSlot: Slot + expectedBlockReceived: Future[bool] + + # Validated & Verified + # ---------------------------------------------------------------- + chainDag*: ChainDAGRef + attestationPool*: ref AttestationPool + + # Missing info + # ---------------------------------------------------------------- + quarantine*: QuarantineRef + +# Initialization +# ------------------------------------------------------------------------------ + +proc new*(T: type ConsensusManager, + chainDag: ChainDAGRef, + attestationPool: ref AttestationPool, + quarantine: QuarantineRef + ): ref ConsensusManager = + (ref ConsensusManager)( + chainDag: chainDag, + attestationPool: attestationPool, + quarantine: quarantine + ) + +# Consensus Management +# ----------------------------------------------------------------------------------- + +proc checkExpectedBlock(self: var ConsensusManager) = + if self.expectedBlockReceived == nil: + return + + if self.chainDag.head.slot < self.expectedSlot: + return + + self.expectedBlockReceived.complete(true) + self.expectedBlockReceived = nil # Don't keep completed futures around! + +proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool] = + ## Return a future that will complete when a head is selected whose slot is + ## equal or greater than the given slot, or a new expectation is created + if self.expectedBlockReceived != nil: + # Reset the old future to not leave it hanging.. an alternative would be to + # cancel it, but it doesn't make any practical difference for now + self.expectedBlockReceived.complete(false) + + let fut = newFuture[bool]("ConsensusManager.expectBlock") + self.expectedSlot = expectedSlot + self.expectedBlockReceived = fut + + # It might happen that by the time we're expecting a block, it might have + # already been processed! + self.checkExpectedBlock() + + return fut + +proc updateHead*(self: var ConsensusManager, wallSlot: Slot) = + ## Trigger fork choice and update the DAG with the new head block + ## This does not automatically prune the DAG after finalization + ## `pruneFinalized` must be called for pruning. + + # Grab the new head according to our latest attestation data + let newHead = self.attestationPool[].selectHead(wallSlot) + if newHead.isNil(): + warn "Head selection failed, using previous head", + head = shortLog(self.chainDag.head), wallSlot + return + + # Store the new head in the chain DAG - this may cause epochs to be + # justified and finalized + self.chainDag.updateHead(newHead, self.quarantine) + + self.checkExpectedBlock() + +proc pruneStateCachesAndForkChoice*(self: var ConsensusManager) = + ## Prune unneeded and invalidated data after finalization + ## - the DAG state checkpoints + ## - the DAG EpochRef + ## - the attestation pool/fork choice + + # Cleanup DAG & fork choice if we have a finalized head + if self.chainDag.needStateCachesAndForkChoicePruning(): + self.chainDag.pruneStateCachesDAG() + self.attestationPool[].prune() diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index d22ec6c23..477b5fb86 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -13,7 +13,7 @@ import chronicles, chronos, metrics, ../spec/[crypto, datatypes, digest], ../consensus_object_pools/[block_clearance, blockchain_dag, exit_pool, attestation_pool], - ./gossip_validation, + ./gossip_validation, ./gossip_to_consensus, ../validators/validator_pool, ../beacon_node_types, ../beacon_clock, ../conf, ../ssz/sszdump @@ -50,245 +50,55 @@ declareHistogram beacon_store_block_duration_seconds, "storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] type - GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].} - - SyncBlock* = object - blk*: SignedBeaconBlock - resfut*: Future[Result[void, BlockError]] - - BlockEntry* = object - v*: SyncBlock - - AttestationEntry* = object - v*: Attestation - attesting_indices*: seq[ValidatorIndex] - - AggregateEntry* = AttestationEntry - Eth2Processor* = object config*: BeaconNodeConf getWallTime*: GetWallTimeFn + + # Local sources of truth for validation + # ---------------------------------------------------------------- chainDag*: ChainDAGRef attestationPool*: ref AttestationPool - exitPool: ref ExitPool validatorPool: ref ValidatorPool - quarantine*: QuarantineRef - expectedSlot: Slot - expectedBlockReceived: Future[bool] - - blocksQueue*: AsyncQueue[BlockEntry] - attestationsQueue*: AsyncQueue[AttestationEntry] - aggregatesQueue*: AsyncQueue[AggregateEntry] doppelgangerDetection*: DoppelgangerProtection -proc checkExpectedBlock(self: var Eth2Processor) = - if self.expectedBlockReceived == nil: - return + # Gossip validated -> enqueue for further verification + # ---------------------------------------------------------------- + verifQueues: ref VerifQueueManager - if self.chainDag.head.slot < self.expectedSlot: - return + # Validated with no further verification required + # ---------------------------------------------------------------- + exitPool: ref ExitPool - self.expectedBlockReceived.complete(true) - self.expectedBlockReceived = nil # Don't keep completed futures around! + # Missing information + # ---------------------------------------------------------------- + quarantine*: QuarantineRef -proc expectBlock*(self: var Eth2Processor, expectedSlot: Slot): Future[bool] = - ## Return a future that will complete when a head is selected whose slot is - ## equal or greater than the given slot, or a new expectation is created - if self.expectedBlockReceived != nil: - # Reset the old future to not leave it hanging.. an alternative would be to - # cancel it, but it doesn't make any practical difference for now - self.expectedBlockReceived.complete(false) +# Initialization +# ------------------------------------------------------------------------------ - let fut = newFuture[bool]("Eth2Processor.expectBlock") - self.expectedSlot = expectedSlot - self.expectedBlockReceived = fut +proc new*(T: type Eth2Processor, + config: BeaconNodeConf, + verifQueues: ref VerifQueueManager, + chainDag: ChainDAGRef, + attestationPool: ref AttestationPool, + exitPool: ref ExitPool, + validatorPool: ref ValidatorPool, + quarantine: QuarantineRef, + getWallTime: GetWallTimeFn): ref Eth2Processor = + (ref Eth2Processor)( + config: config, + getWallTime: getWallTime, + verifQueues: verifQueues, + chainDag: chainDag, + attestationPool: attestationPool, + exitPool: exitPool, + validatorPool: validatorPool, + quarantine: quarantine + ) - # It might happen that by the time we're expecting a block, it might have - # already been processed! - self.checkExpectedBlock() - - return fut - -proc updateHead*(self: var Eth2Processor, wallSlot: Slot) = - ## Trigger fork choice and update the DAG with the new head block - ## This does not automatically prune the DAG after finalization - ## `pruneFinalized` must be called for pruning. - - # TODO: DAG & fork choice procs are unrelated to gossip validation - - # Grab the new head according to our latest attestation data - let newHead = self.attestationPool[].selectHead(wallSlot) - if newHead.isNil(): - warn "Head selection failed, using previous head", - head = shortLog(self.chainDag.head), wallSlot - return - - # Store the new head in the chain DAG - this may cause epochs to be - # justified and finalized - self.chainDag.updateHead(newHead, self.quarantine) - - self.checkExpectedBlock() - -proc pruneStateCachesAndForkChoice*(self: var Eth2Processor) = - ## Prune unneeded and invalidated data after finalization - ## - the DAG state checkpoints - ## - the DAG EpochRef - ## - the attestation pool/fork choice - - # TODO: DAG & fork choice procs are unrelated to gossip validation - - # Cleanup DAG & fork choice if we have a finalized head - if self.chainDag.needStateCachesAndForkChoicePruning(): - self.chainDag.pruneStateCachesDAG() - self.attestationPool[].prune() - -proc dumpBlock[T]( - self: Eth2Processor, signedBlock: SignedBeaconBlock, - res: Result[T, (ValidationResult, BlockError)]) = - if self.config.dumpEnabled and res.isErr: - case res.error[1] - of Invalid: - dump( - self.config.dumpDirInvalid, signedBlock) - of MissingParent: - dump( - self.config.dumpDirIncoming, signedBlock) - else: - discard - -proc done*(blk: SyncBlock) = - ## Send signal to [Sync/Request]Manager that the block ``blk`` has passed - ## verification successfully. - if blk.resfut != nil: - blk.resfut.complete(Result[void, BlockError].ok()) - -proc fail*(blk: SyncBlock, error: BlockError) = - ## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed - ## verification with specific ``error``. - if blk.resfut != nil: - blk.resfut.complete(Result[void, BlockError].err(error)) - -proc complete*(blk: SyncBlock, res: Result[void, BlockError]) = - ## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk`` - ## verification. - if blk.resfut != nil: - blk.resfut.complete(res) - -proc storeBlock( - self: var Eth2Processor, signedBlock: SignedBeaconBlock, - wallSlot: Slot): Result[void, BlockError] = - let - start = Moment.now() - attestationPool = self.attestationPool - - let blck = self.chainDag.addRawBlock(self.quarantine, signedBlock) do ( - blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock, - epochRef: EpochRef, state: HashedBeaconState): - # Callback add to fork choice if valid - attestationPool[].addForkChoice( - epochRef, blckRef, trustedBlock.message, wallSlot) - - self.dumpBlock(signedBlock, blck) - - # There can be a scenario where we receive a block we already received. - # However this block was before the last finalized epoch and so its parent - # was pruned from the ForkChoice. - if blck.isErr: - return err(blck.error[1]) - - let duration = (Moment.now() - start).toFloatSeconds() - beacon_store_block_duration_seconds.observe(duration) - ok() - -proc processAttestation( - self: var Eth2Processor, entry: AttestationEntry) = - logScope: - signature = shortLog(entry.v.signature) - - let - wallTime = self.getWallTime() - (afterGenesis, wallSlot) = wallTime.toSlot() - - if not afterGenesis: - error "Processing attestation before genesis, clock turned back?" - quit 1 - - trace "Processing attestation" - self.attestationPool[].addAttestation( - entry.v, entry.attesting_indices, wallSlot) - -proc processAggregate( - self: var Eth2Processor, entry: AggregateEntry) = - logScope: - signature = shortLog(entry.v.signature) - - let - wallTime = self.getWallTime() - (afterGenesis, wallSlot) = wallTime.toSlot() - - if not afterGenesis: - error "Processing aggregate before genesis, clock turned back?" - quit 1 - - trace "Processing aggregate" - self.attestationPool[].addAttestation( - entry.v, entry.attesting_indices, wallSlot) - -proc processBlock(self: var Eth2Processor, entry: BlockEntry) = - logScope: - blockRoot = shortLog(entry.v.blk.root) - - let - wallTime = self.getWallTime() - (afterGenesis, wallSlot) = wallTime.toSlot() - - if not afterGenesis: - error "Processing block before genesis, clock turned back?" - quit 1 - - let - start = now(chronos.Moment) - res = self.storeBlock(entry.v.blk, wallSlot) - storeDone = now(chronos.Moment) - - if res.isOk(): - # Eagerly update head in case the new block gets selected - self.updateHead(wallSlot) # This also eagerly prunes the blocks DAG to prevent processing forks. - # self.pruneStateCachesDAG() # Amortized pruning, we don't prune states & fork choice here but in `onSlotEnd`() - - let updateDone = now(chronos.Moment) - let storeBlockDuration = storeDone - start - let updateHeadDuration = updateDone - storeDone - let overallDuration = updateDone - start - let storeSpeed = - block: - let secs = float(chronos.seconds(1).nanoseconds) - if not(overallDuration.isZero()): - let v = secs / float(overallDuration.nanoseconds) - round(v * 10_000) / 10_000 - else: - 0.0 - debug "Block processed", - local_head_slot = self.chainDag.head.slot, - store_speed = storeSpeed, - block_slot = entry.v.blk.message.slot, - store_block_duration = $storeBlockDuration, - update_head_duration = $updateHeadDuration, - overall_duration = $overallDuration - - if entry.v.resFut != nil: - entry.v.resFut.complete(Result[void, BlockError].ok()) - elif res.error() in {BlockError.Duplicate, BlockError.Old}: - # These are harmless / valid outcomes - for the purpose of scoring peers, - # they are ok - if entry.v.resFut != nil: - entry.v.resFut.complete(Result[void, BlockError].ok()) - else: - if entry.v.resFut != nil: - entry.v.resFut.complete(Result[void, BlockError].err(res.error())) - -{.pop.} # TODO AsyncQueue.addLast raises Exception in theory but not in practice +# Gossip Management +# ----------------------------------------------------------------------------------- proc blockValidator*( self: var Eth2Processor, @@ -322,7 +132,7 @@ proc blockValidator*( let blck = self.chainDag.isValidBeaconBlock( self.quarantine, signedBlock, wallTime, {}) - self.dumpBlock(signedBlock, blck) + self.verifQueues[].dumpBlock(signedBlock, blck) if not blck.isOk: return blck.error[0] @@ -336,13 +146,15 @@ proc blockValidator*( # sync, we don't lose the gossip blocks, but also don't block the gossip # propagation of seemingly good blocks trace "Block validated" - asyncSpawn self.blocksQueue.addLast( - BlockEntry(v: SyncBlock(blk: signedBlock))) + try: + self.verifQueues[].addBlock(SyncBlock(blk: signedBlock)) + except Exception as e: + # Chronos can in theory raise an untyped exception in `internalCheckComplete` + # but in practice that's always a Defect not a Catchable exception + raiseAssert e.msg ValidationResult.Accept -{.push raises: [Defect].} - proc checkForPotentialDoppelganger( self: var Eth2Processor, attestationData: AttestationData, attesterIndices: openArray[ValidatorIndex], wallSlot: Slot) = @@ -405,20 +217,8 @@ proc attestationValidator*( self.checkForPotentialDoppelganger(attestation.data, v.value, wallSlot) - while self.attestationsQueue.full(): - try: - notice "Queue full, dropping attestation", - dropped = shortLog(self.attestationsQueue[0].v) - discard self.attestationsQueue.popFirstNoWait() - except AsyncQueueEmptyError as exc: - raiseAssert "If queue is full, we have at least one item! " & exc.msg - trace "Attestation validated" - try: - self.attestationsQueue.addLastNoWait( - AttestationEntry(v: attestation, attesting_indices: v.get())) - except AsyncQueueFullError as exc: - raiseAssert "We just checked that queue is not full! " & exc.msg + self.verifQueues[].addAttestation(attestation, v.get()) ValidationResult.Accept @@ -460,25 +260,12 @@ proc aggregateValidator*( self.checkForPotentialDoppelganger( signedAggregateAndProof.message.aggregate.data, v.value, wallSlot) - while self.aggregatesQueue.full(): - try: - notice "Queue full, dropping aggregate", - dropped = shortLog(self.aggregatesQueue[0].v) - discard self.aggregatesQueue.popFirstNoWait() - except AsyncQueueEmptyError as exc: - raiseAssert "We just checked that queue is not full! " & exc.msg - trace "Aggregate validated", aggregator_index = signedAggregateAndProof.message.aggregator_index, selection_proof = signedAggregateAndProof.message.selection_proof, wallSlot - try: - self.aggregatesQueue.addLastNoWait(AggregateEntry( - v: signedAggregateAndProof.message.aggregate, - attesting_indices: v.get())) - except AsyncQueueFullError as exc: - raiseAssert "We just checked that queue is not full! " & exc.msg + self.verifQueues[].addAggregate(signedAggregateAndProof, v.get()) ValidationResult.Accept @@ -527,95 +314,4 @@ proc voluntaryExitValidator*( ValidationResult.Accept -{.pop.} # TODO raises in chronos - -proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} = - # Blocks in eth2 arrive on a schedule for every slot: - # - # * Block arrives at time 0 - # * Attestations arrives at time 4 - # * Aggregate arrives at time 8 - - var - blockFut = self[].blocksQueue.popFirst() - aggregateFut = self[].aggregatesQueue.popFirst() - attestationFut = self[].attestationsQueue.popFirst() - - while true: - # Cooperative concurrency: one idle calculation step per loop - because - # we run both networking and CPU-heavy things like block processing - # on the same thread, we need to make sure that there is steady progress - # on the networking side or we get long lockups that lead to timeouts. - const - # We cap waiting for an idle slot in case there's a lot of network traffic - # taking up all CPU - we don't want to _completely_ stop processing blocks - # in this case (attestations will get dropped) - doing so also allows us - # to benefit from more batching / larger network reads when under load. - idleTimeout = 10.milliseconds - - # Attestation processing is fairly quick and therefore done in batches to - # avoid some of the `Future` overhead - attestationBatch = 16 - - discard await idleAsync().withTimeout(idleTimeout) - - # Avoid one more `await` when there's work to do - if not (blockFut.finished or aggregateFut.finished or attestationFut.finished): - trace "Waiting for processing work" - await blockFut or aggregateFut or attestationFut - - # Only run one task per idle iteration, in priority order: blocks are needed - # for all other processing - then come aggregates which are cheap to - # process but might have a big impact on fork choice - last come - # attestations which individually have the smallest effect on chain progress - if blockFut.finished: - self[].processBlock(blockFut.read()) - blockFut = self[].blocksQueue.popFirst() - elif aggregateFut.finished: - # aggregates will be dropped under heavy load on producer side - self[].processAggregate(aggregateFut.read()) - for i in 0.. 0`` - ## queue will help to keep backpressure under control. If ``queueSize <= 0`` + ## ``syncQueueSize`` maximum queue size for incoming data. If ``syncQueueSize > 0`` + ## queue will help to keep backpressure under control. If ``syncQueueSize <= 0`` ## then queue size is unlimited (default). ## ## ``updateCb`` procedure which will be used to send downloaded blocks to @@ -298,7 +298,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], startSlot: start, lastSlot: last, chunkSize: chunkSize, - queueSize: queueSize, + queueSize: syncQueueSize, getFinalizedSlot: getFinalizedSlotCb, waiters: newSeq[SyncWaiter[T]](), counter: 1'u64, @@ -306,7 +306,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], debtsQueue: initHeapQueue[SyncRequest[T]](), inpSlot: start, outSlot: start, - outQueue: outputQueue + verifQueues: verifQueues ) proc `<`*[T](a, b: SyncRequest[T]): bool {.inline.} = @@ -666,7 +666,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], getLocalHeadSlotCb: GetSlotCallback, getLocalWallSlotCb: GetSlotCallback, getFinalizedSlotCb: GetSlotCallback, - outputQueue: AsyncQueue[BlockEntry], + verifQueues: ref VerifQueueManager, maxStatusAge = uint64(SLOTS_PER_EPOCH * 4), maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), sleepTime = (int(SLOTS_PER_EPOCH) * @@ -677,7 +677,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], ): SyncManager[A, B] = let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(), - chunkSize, getFinalizedSlotCb, outputQueue, 1) + chunkSize, getFinalizedSlotCb, verifQueues, 1) result = SyncManager[A, B]( pool: pool, @@ -689,7 +689,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], sleepTime: sleepTime, chunkSize: chunkSize, queue: queue, - outQueue: outputQueue, + verifQueues: verifQueues, notInSyncEvent: newAsyncEvent(), inRangeEvent: newAsyncEvent(), notInRangeEvent: newAsyncEvent(), @@ -1109,7 +1109,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = man.queue = SyncQueue.init(A, man.getLocalHeadSlot(), man.getLocalWallSlot(), man.chunkSize, man.getFinalizedSlot, - man.outQueue, 1) + man.verifQueues, 1) man.notInSyncEvent.fire() man.inProgress = true else: diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index b17ee8f31..e1098e424 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -30,7 +30,8 @@ import ".."/[beacon_node_common, beacon_node_types, version], ../ssz, ../ssz/sszdump, ../sync/sync_manager, ./slashing_protection, ./attestation_aggregation, - ./validator_pool, ./keystore_management + ./validator_pool, ./keystore_management, + ../gossip_processing/consensus_manager # Metrics for tracking attestation and beacon block loss const delayBuckets = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05, @@ -679,7 +680,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} = attestationCutoff = shortLog(attestationCutoff.offset) # Wait either for the block or the attestation cutoff time to arrive - if await node.processor[].expectBlock(slot).withTimeout(attestationCutoff.offset): + if await node.consensusManager[].expectBlock(slot).withTimeout(attestationCutoff.offset): # The expected block arrived (or expectBlock was called again which # shouldn't happen as this is the only place we use it) - according to the # spec, we should now wait for abs(slotTimingEntropy) - in our async loop @@ -706,7 +707,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} = await sleepAsync(afterBlockCutoff.offset) # Time passed - we might need to select a new head in that case - node.processor[].updateHead(slot) + node.consensusManager[].updateHead(slot) head = node.chainDag.head handleAttestations(node, head, slot) diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 01603e15c..495bd6563 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -2,7 +2,7 @@ import unittest import chronos -import ../beacon_chain/gossip_processing/eth2_processor, +import ../beacon_chain/gossip_processing/[eth2_processor, gossip_to_consensus], ../beacon_chain/sync/sync_manager type @@ -17,6 +17,13 @@ proc updateScore(peer: SomeTPeer, score: int) = proc getFirstSlotAtFinalizedEpoch(): Slot = Slot(0) +proc newVerifQueues(): ref VerifQueueManager = + # We only want VerifQueueManager.blocksQueue to be an AsyncQueue of size 1 + # The rest of the fields are unused + (ref VerifQueueManager)( + blocksQueue: newAsyncQueue[BlockEntry](1) + ) + suite "SyncManager test suite": proc createChain(start, finish: Slot): seq[SignedBeaconBlock] = doAssert(start <= finish) @@ -29,7 +36,7 @@ suite "SyncManager test suite": test "[SyncQueue] Start and finish slots equal": let p1 = SomeTPeer() - let aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64, getFirstSlotAtFinalizedEpoch, aq) check len(queue) == 1 @@ -46,7 +53,7 @@ suite "SyncManager test suite": r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64 test "[SyncQueue] Two full requests success/fail": - let aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64, getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() @@ -75,7 +82,7 @@ suite "SyncManager test suite": r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64 test "[SyncQueue] Full and incomplete success/fail start from zero": - let aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64, getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() @@ -115,7 +122,7 @@ suite "SyncManager test suite": r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64 test "[SyncQueue] Full and incomplete success/fail start from non-zero": - let aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64, getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() @@ -144,7 +151,7 @@ suite "SyncManager test suite": r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64 test "[SyncQueue] Smart and stupid success/fail": - let aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() @@ -173,7 +180,7 @@ suite "SyncManager test suite": r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64 test "[SyncQueue] One smart and one stupid + debt split + empty": - let aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() @@ -218,12 +225,12 @@ suite "SyncManager test suite": sblock.v.fail(BlockError.Invalid) sblock.v.done() - var aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var chain = createChain(Slot(0), Slot(2)) var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64, getFirstSlotAtFinalizedEpoch, aq, 1) - var validatorFut = simpleValidator(aq) + var validatorFut = simpleValidator(aq[].blocksQueue) let p1 = SomeTPeer() let p2 = SomeTPeer() let p3 = SomeTPeer() @@ -267,7 +274,7 @@ suite "SyncManager test suite": sblock.v.fail(BlockError.Invalid) sblock.v.done() - var aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var chain = createChain(Slot(5), Slot(11)) var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64, getFirstSlotAtFinalizedEpoch, aq, 2) @@ -277,7 +284,7 @@ suite "SyncManager test suite": let p3 = SomeTPeer() let p4 = SomeTPeer() - var validatorFut = simpleValidator(aq) + var validatorFut = simpleValidator(aq[].blocksQueue) var r21 = queue.pop(Slot(11), p1) var r22 = queue.pop(Slot(11), p2) @@ -322,7 +329,7 @@ suite "SyncManager test suite": sblock.v.fail(BlockError.Invalid) sblock.v.done() - var aq = newAsyncQueue[BlockEntry](1) + let aq = newVerifQueues() var chain = createChain(Slot(5), Slot(18)) var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64, getFirstSlotAtFinalizedEpoch, aq, 2) @@ -334,7 +341,7 @@ suite "SyncManager test suite": let p6 = SomeTPeer() let p7 = SomeTPeer() - var validatorFut = simpleValidator(aq) + var validatorFut = simpleValidator(aq[].blocksQueue) var r21 = queue.pop(Slot(20), p1) var r22 = queue.pop(Slot(20), p2) diff --git a/tests/test_sync_manager.nim.cfg b/tests/test_sync_manager.nim.cfg new file mode 100644 index 000000000..e88af1eb3 --- /dev/null +++ b/tests/test_sync_manager.nim.cfg @@ -0,0 +1 @@ +-d:"libp2p_pki_schemes=secp256k1"