diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 507288bba..f09cdf16d 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -169,12 +169,11 @@ from ../consensus_object_pools/spec_cache import get_attesting_indices from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock proc storeBlock*( - self: ref BlockProcessor, + self: var BlockProcessor, src: MsgSource, wallTime: BeaconTime, signedBlock: ForkySignedBeaconBlock, payloadValid: bool, queueTick: Moment = Moment.now(), - validationDur = Duration()): - Future[Result[BlockRef, BlockError]] {.async.} = + validationDur = Duration()): Result[BlockRef, BlockError] = ## storeBlock is the main entry point for unvalidated blocks - all untrusted ## blocks, regardless of origin, pass through here. When storing a block, ## we will add it to the dag and pass it to all block consumers that need @@ -217,7 +216,7 @@ proc storeBlock*( trustedBlock.message.slot, trustedBlock.root, state.data.current_sync_committee.pubkeys.data[i]) - self[].dumpBlock(signedBlock, blck) + self.dumpBlock(signedBlock, blck) # There can be a scenario where we receive a block we already received. # However this block was before the last finalized epoch and so its parent @@ -260,11 +259,8 @@ proc storeBlock*( # called valid blocks have already been registered as verified. The head # can lag a slot behind wall clock, complicating detecting synced status # for validating, otherwise. - # - # TODO have a third version which is fire-and-forget for when it is merge - # but payloadValid is true, i.e. fcU is for EL's benefit, not CL. Current - # behavior adds unnecessary latency to CL event loop. - await self.consensusManager.updateHeadWithExecution(wallTime.slotOrZero) + asyncSpawn self.consensusManager.updateHeadWithExecution( + wallTime.slotOrZero) let updateHeadTick = Moment.now() @@ -281,7 +277,7 @@ proc storeBlock*( for quarantined in self.consensusManager.quarantine[].pop(blck.get().root): # Process the blocks that had the newly accepted block as parent - self[].addBlock(MsgSource.gossip, quarantined) + self.addBlock(MsgSource.gossip, quarantined) return blck @@ -324,8 +320,7 @@ proc addBlock*( # ------------------------------------------------------------------------------ proc processBlock( - self: ref BlockProcessor, entry: BlockEntry, payloadValid: bool) - {.async.} = + self: var BlockProcessor, entry: BlockEntry, payloadValid: bool) = logScope: blockRoot = shortLog(entry.blck.root) @@ -338,7 +333,7 @@ proc processBlock( quit 1 let res = withBlck(entry.blck): - await self.storeBlock( + self.storeBlock( entry.src, wallTime, blck, payloadValid, entry.queueTick, entry.validationDur) @@ -489,7 +484,7 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = else: if executionPayloadStatus == PayloadExecutionStatus.valid or self[].is_optimistic_candidate_block(blck.blck): - await self.processBlock( + self[].processBlock( blck, executionPayloadStatus == PayloadExecutionStatus.valid) else: debug "runQueueProcessingLoop: block cannot be optimistically imported", diff --git a/beacon_chain/gossip_processing/consensus_manager.nim b/beacon_chain/gossip_processing/consensus_manager.nim index 71c179769..9c5bd31ff 100644 --- a/beacon_chain/gossip_processing/consensus_manager.nim +++ b/beacon_chain/gossip_processing/consensus_manager.nim @@ -183,20 +183,24 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, wallSlot: Slot) ## `pruneFinalized` must be called for pruning. # Grab the new head according to our latest attestation data - let newHead = self.attestationPool[].selectOptimisticHead( - wallSlot.start_beacon_time).valueOr: - warn "Head selection failed, using previous head", - head = shortLog(self.dag.head), wallSlot - return + try: + let newHead = self.attestationPool[].selectOptimisticHead( + wallSlot.start_beacon_time).valueOr: + warn "Head selection failed, using previous head", + head = shortLog(self.dag.head), wallSlot + return - # Ensure dag.updateHead has most current information - await self.updateExecutionClientHead(newHead) + # Ensure dag.updateHead has most current information + await self.updateExecutionClientHead(newHead) - # Store the new head in the chain DAG - this may cause epochs to be - # justified and finalized - self.dag.updateHead(newHead, self.quarantine[]) + # Store the new head in the chain DAG - this may cause epochs to be + # justified and finalized + self.dag.updateHead(newHead, self.quarantine[]) - self[].checkExpectedBlock() + self[].checkExpectedBlock() + except CatchableError as exc: + debug "updateHeadWithExecution error", + error = exc.msg proc pruneStateCachesAndForkChoice*(self: var ConsensusManager) = ## Prune unneeded and invalidated data after finalization diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 2c55860a1..4e33b1b36 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -151,7 +151,7 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature), error = res.error() let - newBlockRef = await router[].blockProcessor.storeBlock( + newBlockRef = router[].blockProcessor[].storeBlock( MsgSource.api, sendTime, blck, true) # The boolean we return tells the caller whether the block was integrated diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 32d8a77fe..2d9fc6033 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -47,8 +47,9 @@ suite "Block processor" & preset(): validatorMonitor, getTimeFn, safeSlotsToImportOptimistically = 128) test "Reverse order block add & get" & preset(): - let missing = waitFor processor.storeBlock( - MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, payloadValid = true) + let missing = processor[].storeBlock( + MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, + payloadValid = true) check: missing.error == BlockError.MissingParent check: @@ -57,8 +58,9 @@ suite "Block processor" & preset(): FetchRecord(root: b1.root) in quarantine[].checkMissing() let - status = waitFor processor.storeBlock( - MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, payloadValid = true) + status = processor[].storeBlock( + MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, + payloadValid = true) b1Get = dag.getBlockRef(b1.root) check: