From 6328c777783f3dd7fad83a5360193701cb50c145 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 22 Jan 2024 17:34:54 +0100 Subject: [PATCH] raises for gossip (#5808) * raises for gossip * fix light client --- beacon_chain/beacon_node_light_client.nim | 2 +- .../consensus_manager.nim | 72 +++++++++---------- beacon_chain/el/el_manager.nim | 17 +++-- .../gossip_processing/batch_validation.nim | 33 +++++---- .../gossip_processing/block_processor.nim | 10 +-- .../gossip_processing/eth2_processor.nim | 10 +-- .../gossip_processing/gossip_validation.nim | 10 +-- .../light_client_processor.nim | 2 +- .../optimistic_processor.nim | 4 +- beacon_chain/light_client.nim | 4 +- beacon_chain/networking/eth2_network.nim | 13 ++-- beacon_chain/nimbus_beacon_node.nim | 10 +-- beacon_chain/nimbus_light_client.nim | 2 +- beacon_chain/sync/light_client_manager.nim | 41 +++++------ tests/test_validator_client.nim | 4 +- vendor/nim-chronos | 2 +- 16 files changed, 124 insertions(+), 112 deletions(-) diff --git a/beacon_chain/beacon_node_light_client.nim b/beacon_chain/beacon_node_light_client.nim index ce152bc63..ed1b6dd7e 100644 --- a/beacon_chain/beacon_node_light_client.nim +++ b/beacon_chain/beacon_node_light_client.nim @@ -39,7 +39,7 @@ proc initLightClient*( let optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): - Future[void] {.async.} = + Future[void] {.async: (raises: [CancelledError]).} = debug "New LC optimistic block", opt = signedBlock.toBlockId(), dag = node.dag.head.bid, diff --git a/beacon_chain/consensus_object_pools/consensus_manager.nim b/beacon_chain/consensus_object_pools/consensus_manager.nim index b166f332b..0f5ef99f8 100644 --- a/beacon_chain/consensus_object_pools/consensus_manager.nim +++ b/beacon_chain/consensus_object_pools/consensus_manager.nim @@ -28,7 +28,7 @@ from ../validators/action_tracker import ActionTracker, getNextProposalSlot type ConsensusManager* = object expectedSlot: Slot - expectedBlockReceived: Future[bool] + expectedBlockReceived: Future[bool].Raising([CancelledError]) # Validated & Verified # ---------------------------------------------------------------- @@ -97,7 +97,8 @@ proc checkExpectedBlock(self: var ConsensusManager) = self.expectedBlockReceived.complete(true) self.expectedBlockReceived = nil # Don't keep completed futures around! -proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool] = +proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool] + {.async: (raises: [CancelledError], raw: true).} = ## Return a future that will complete when a head is selected whose slot is ## equal or greater than the given slot, or a new expectation is created if self.expectedBlockReceived != nil: @@ -153,7 +154,7 @@ func setOptimisticHead*( self.optimisticHead = (bid: bid, execution_block_hash: execution_block_hash) proc updateExecutionClientHead(self: ref ConsensusManager, - newHead: BeaconHead): Future[Opt[void]] {.async.} = + newHead: BeaconHead): Future[Opt[void]] {.async: (raises: [CancelledError]).} = let headExecutionPayloadHash = self.dag.loadExecutionBlockHash(newHead.blck) if headExecutionPayloadHash.isZero: @@ -318,7 +319,7 @@ proc getGasLimit*(self: ConsensusManager, pubkey: ValidatorPubKey): uint64 = from ../spec/datatypes/bellatrix import PayloadID proc runProposalForkchoiceUpdated*( - self: ref ConsensusManager, wallSlot: Slot): Future[Opt[void]] {.async.} = + self: ref ConsensusManager, wallSlot: Slot): Future[Opt[void]] {.async: (raises: [CancelledError]).} = let nextWallSlot = wallSlot + 1 (validatorIndex, nextProposer) = self.checkNextProposer(wallSlot).valueOr: @@ -351,46 +352,43 @@ proc runProposalForkchoiceUpdated*( if headBlockHash.isZero: return err() - try: - let safeBlockHash = beaconHead.safeExecutionPayloadHash + let safeBlockHash = beaconHead.safeExecutionPayloadHash - withState(self.dag.headState): - template callForkchoiceUpdated(fcPayloadAttributes: auto) = - let (status, _) = await self.elManager.forkchoiceUpdated( - headBlockHash, safeBlockHash, - beaconHead.finalizedExecutionPayloadHash, - payloadAttributes = some fcPayloadAttributes) - debug "Fork-choice updated for proposal", status + withState(self.dag.headState): + template callForkchoiceUpdated(fcPayloadAttributes: auto) = + let (status, _) = await self.elManager.forkchoiceUpdated( + headBlockHash, safeBlockHash, + beaconHead.finalizedExecutionPayloadHash, + payloadAttributes = some fcPayloadAttributes) + debug "Fork-choice updated for proposal", status - static: doAssert high(ConsensusFork) == ConsensusFork.Deneb - when consensusFork >= ConsensusFork.Deneb: - callForkchoiceUpdated(PayloadAttributesV3( - timestamp: Quantity timestamp, - prevRandao: FixedBytes[32] randomData, - suggestedFeeRecipient: feeRecipient, - withdrawals: - toEngineWithdrawals get_expected_withdrawals(forkyState.data), - parentBeaconBlockRoot: beaconHead.blck.bid.root.asBlockHash)) - elif consensusFork >= ConsensusFork.Capella: - callForkchoiceUpdated(PayloadAttributesV2( - timestamp: Quantity timestamp, - prevRandao: FixedBytes[32] randomData, - suggestedFeeRecipient: feeRecipient, - withdrawals: - toEngineWithdrawals get_expected_withdrawals(forkyState.data))) - else: - callForkchoiceUpdated(PayloadAttributesV1( - timestamp: Quantity timestamp, - prevRandao: FixedBytes[32] randomData, - suggestedFeeRecipient: feeRecipient)) - except CatchableError as err: - error "Engine API fork-choice update failed", err = err.msg + static: doAssert high(ConsensusFork) == ConsensusFork.Deneb + when consensusFork >= ConsensusFork.Deneb: + callForkchoiceUpdated(PayloadAttributesV3( + timestamp: Quantity timestamp, + prevRandao: FixedBytes[32] randomData, + suggestedFeeRecipient: feeRecipient, + withdrawals: + toEngineWithdrawals get_expected_withdrawals(forkyState.data), + parentBeaconBlockRoot: beaconHead.blck.bid.root.asBlockHash)) + elif consensusFork >= ConsensusFork.Capella: + callForkchoiceUpdated(PayloadAttributesV2( + timestamp: Quantity timestamp, + prevRandao: FixedBytes[32] randomData, + suggestedFeeRecipient: feeRecipient, + withdrawals: + toEngineWithdrawals get_expected_withdrawals(forkyState.data))) + else: + callForkchoiceUpdated(PayloadAttributesV1( + timestamp: Quantity timestamp, + prevRandao: FixedBytes[32] randomData, + suggestedFeeRecipient: feeRecipient)) ok() proc updateHeadWithExecution*( self: ref ConsensusManager, initialNewHead: BeaconHead, - getBeaconTimeFn: GetBeaconTimeFn) {.async.} = + getBeaconTimeFn: GetBeaconTimeFn) {.async: (raises: [CancelledError]).} = ## Trigger fork choice and update the DAG with the new head block ## This does not automatically prune the DAG after finalization ## `pruneFinalized` must be called for pruning. diff --git a/beacon_chain/el/el_manager.nim b/beacon_chain/el/el_manager.nim index 47bef8140..48b660256 100644 --- a/beacon_chain/el/el_manager.nim +++ b/beacon_chain/el/el_manager.nim @@ -1117,7 +1117,7 @@ proc forkchoiceUpdated*(m: ELManager, payloadAttributes: Option[PayloadAttributesV1] | Option[PayloadAttributesV2] | Option[PayloadAttributesV3]): - Future[(PayloadExecutionStatus, Option[BlockHash])] {.async.} = + Future[(PayloadExecutionStatus, Option[BlockHash])] {.async: (raises: [CancelledError]).} = doAssert not headBlockHash.isZero # Allow finalizedBlockHash to be 0 to avoid sync deadlocks. @@ -1208,12 +1208,20 @@ proc forkchoiceUpdated*(m: ELManager, finalizedBlockHash: finalizedBlockHash, payloadAttributes: payloadAttributesV3)) + template getSelected: untyped = + let + data = + try: + requests[responseProcessor.selectedResponse.get].read + except CatchableError: + raiseAssert "Only completed requests get selected" + (data.status, data.latestValidHash) + if responseProcessor.disagreementAlreadyDetected: return (PayloadExecutionStatus.invalid, none BlockHash) elif responseProcessor.selectedResponse.isSome: assignNextExpectedPayloadParams() - return (requests[responseProcessor.selectedResponse.get].read.status, - requests[responseProcessor.selectedResponse.get].read.latestValidHash) + return getSelected() await requestsCompleted or deadline @@ -1225,8 +1233,7 @@ proc forkchoiceUpdated*(m: ELManager, (PayloadExecutionStatus.invalid, none BlockHash) elif responseProcessor.selectedResponse.isSome: assignNextExpectedPayloadParams() - (requests[responseProcessor.selectedResponse.get].read.status, - requests[responseProcessor.selectedResponse.get].read.latestValidHash) + getSelected() else: (PayloadExecutionStatus.syncing, none BlockHash) diff --git a/beacon_chain/gossip_processing/batch_validation.nim b/beacon_chain/gossip_processing/batch_validation.nim index 467cb7f67..60e5abd2f 100644 --- a/beacon_chain/gossip_processing/batch_validation.nim +++ b/beacon_chain/gossip_processing/batch_validation.nim @@ -85,6 +85,8 @@ type Valid Timeout + FutureBatchResult = Future[BatchResult].Raising([CancelledError]) + Eager = proc(): bool {.gcsafe, raises: [].} ## Callback that returns true if eager processing should be done to lower ## latency at the expense of spending more cycles validating things, @@ -92,7 +94,7 @@ type BatchItem* = object sigset: SignatureSet - fut: Future[BatchResult] + fut: FutureBatchResult Batch* = object ## A batch represents up to BatchedCryptoSize non-aggregated signatures @@ -103,7 +105,7 @@ type VerifierItem = object verifier: ref BatchVerifier signal: ThreadSignalPtr - inflight: Future[void] + inflight: Future[void].Raising([CancelledError]) BatchCrypto* = object batches: Deque[ref Batch] @@ -129,7 +131,7 @@ type # Most scheduled checks require this immutable value, so don't require it # to be provided separately each time - processor: Future[void] + processor: Future[void].Raising([CancelledError]) BatchTask = object ok: Atomic[bool] @@ -237,7 +239,7 @@ proc spawnBatchVerifyTask(tp: Taskpool, task: ptr BatchTask) = proc batchVerifyAsync*( verifier: ref BatchVerifier, signal: ThreadSignalPtr, - batch: ref Batch): Future[bool] {.async.} = + batch: ref Batch): Future[bool] {.async: (raises: [CancelledError]).} = var task = BatchTask( setsPtr: makeUncheckedArray(baseAddr batch[].sigsets), numSets: batch[].sigsets.len, @@ -253,12 +255,17 @@ proc batchVerifyAsync*( doAssert verifier[].taskpool.numThreads > 1, "Must have at least one separate thread or signal will never be fired" verifier[].taskpool.spawnBatchVerifyTask(taskPtr) - await signal.wait() + try: + await signal.wait() + except AsyncError as exc: + warn "Batch verification verification failed - report bug", err = exc.msg + return false + task.ok.load() proc processBatch( batchCrypto: ref BatchCrypto, batch: ref Batch, - verifier: ref BatchVerifier, signal: ThreadSignalPtr) {.async.} = + verifier: ref BatchVerifier, signal: ThreadSignalPtr) {.async: (raises: [CancelledError]).} = let numSets = batch[].sigsets.len() @@ -307,7 +314,7 @@ proc processBatch( batchCrypto[].complete(batch[], ok) -proc processLoop(batchCrypto: ref BatchCrypto) {.async.} = +proc processLoop(batchCrypto: ref BatchCrypto) {.async: (raises: [CancelledError]).} = ## Process pending crypto check after some time has passed - the time is ## chosen such that there's time to fill the batch but not so long that ## latency across the network is negatively affected @@ -354,7 +361,7 @@ proc scheduleProcessor(batchCrypto: ref BatchCrypto) = proc verifySoon( batchCrypto: ref BatchCrypto, name: static string, - sigset: SignatureSet): Future[BatchResult] = + sigset: SignatureSet): Future[BatchResult]{.async: (raises: [CancelledError], raw: true).} = let batch = batchCrypto[].getBatch() fut = newFuture[BatchResult](name) @@ -385,7 +392,7 @@ proc scheduleAttestationCheck*( batchCrypto: ref BatchCrypto, fork: Fork, attestationData: AttestationData, pubkey: CookedPubKey, signature: ValidatorSig - ): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] = + ): Result[tuple[fut: FutureBatchResult, sig: CookedSig], cstring] = ## Schedule crypto verification of an attestation ## ## The buffer is processed: @@ -410,7 +417,7 @@ proc scheduleAggregateChecks*( signedAggregateAndProof: SignedAggregateAndProof, dag: ChainDAGRef, attesting_indices: openArray[ValidatorIndex] ): Result[tuple[ - aggregatorFut, slotFut, aggregateFut: Future[BatchResult], + aggregatorFut, slotFut, aggregateFut: FutureBatchResult, sig: CookedSig], cstring] = ## Schedule crypto verification of an aggregate ## @@ -462,7 +469,7 @@ proc scheduleSyncCommitteeMessageCheck*( batchCrypto: ref BatchCrypto, fork: Fork, slot: Slot, beacon_block_root: Eth2Digest, pubkey: CookedPubKey, signature: ValidatorSig - ): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] = + ): Result[tuple[fut: FutureBatchResult, sig: CookedSig], cstring] = ## Schedule crypto verification of an attestation ## ## The buffer is processed: @@ -486,7 +493,7 @@ proc scheduleContributionChecks*( batchCrypto: ref BatchCrypto, fork: Fork, signedContributionAndProof: SignedContributionAndProof, subcommitteeIdx: SyncSubcommitteeIndex, dag: ChainDAGRef): Result[tuple[ - aggregatorFut, proofFut, contributionFut: Future[BatchResult], + aggregatorFut, proofFut, contributionFut: FutureBatchResult, sig: CookedSig], cstring] = ## Schedule crypto verification of all signatures in a ## SignedContributionAndProof message @@ -535,7 +542,7 @@ proc scheduleContributionChecks*( proc scheduleBlsToExecutionChangeCheck*( batchCrypto: ref BatchCrypto, genesis_fork: Fork, signedBLSToExecutionChange: SignedBLSToExecutionChange): - Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] = + Result[tuple[fut: FutureBatchResult, sig: CookedSig], cstring] = ## Schedule crypto verification of all signatures in a ## SignedBLSToExecutionChange message ## diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index bd387c38b..3cc34f192 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -238,7 +238,7 @@ from ../el/el_manager import proc expectValidForkchoiceUpdated( elManager: ELManager, headBlockPayloadAttributesType: typedesc, headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest, - receivedBlock: ForkySignedBeaconBlock): Future[void] {.async.} = + receivedBlock: ForkySignedBeaconBlock): Future[void] {.async: (raises: [CancelledError]).} = let (payloadExecutionStatus, _) = await elManager.forkchoiceUpdated( headBlockHash = headBlockHash, @@ -291,7 +291,7 @@ from ../spec/datatypes/deneb import SignedBeaconBlock, asTrusted, shortLog proc newExecutionPayload*( elManager: ELManager, blck: SomeForkyBeaconBlock): - Future[Opt[PayloadExecutionStatus]] {.async.} = + Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} = template executionPayload: untyped = blck.body.execution_payload @@ -329,7 +329,7 @@ proc getExecutionValidity( elManager: ELManager, blck: bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock | deneb.SignedBeaconBlock): - Future[NewPayloadStatus] {.async.} = + Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} = if not blck.message.is_execution_block: return NewPayloadStatus.valid # vacuously @@ -413,7 +413,7 @@ proc storeBlock( blobsOpt: Opt[BlobSidecars], maybeFinalized = false, queueTick: Moment = Moment.now(), validationDur = Duration()): - Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} = + Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async: (raises: [CancelledError]).} = ## storeBlock is the main entry point for unvalidated blocks - all untrusted ## blocks, regardless of origin, pass through here. When storing a block, ## we will add it to the dag and pass it to all block consumers that need @@ -774,7 +774,7 @@ proc addBlock*( # ------------------------------------------------------------------------------ proc processBlock( - self: ref BlockProcessor, entry: BlockEntry) {.async.} = + self: ref BlockProcessor, entry: BlockEntry) {.async: (raises: [CancelledError]).} = logScope: blockRoot = shortLog(entry.blck.root) diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 072ab0400..d020fc523 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -367,7 +367,7 @@ proc checkForPotentialDoppelganger( proc processAttestation*( self: ref Eth2Processor, src: MsgSource, attestation: Attestation, subnet_id: SubnetId, - checkSignature: bool = true): Future[ValidationRes] {.async.} = + checkSignature: bool = true): Future[ValidationRes] {.async: (raises: [CancelledError]).} = var wallTime = self.getCurrentBeaconTime() let (afterGenesis, wallSlot) = wallTime.toSlot() @@ -415,7 +415,7 @@ proc processAttestation*( proc processSignedAggregateAndProof*( self: ref Eth2Processor, src: MsgSource, signedAggregateAndProof: SignedAggregateAndProof, - checkSignature = true, checkCover = true): Future[ValidationRes] {.async.} = + checkSignature = true, checkCover = true): Future[ValidationRes] {.async: (raises: [CancelledError]).} = var wallTime = self.getCurrentBeaconTime() let (afterGenesis, wallSlot) = wallTime.toSlot() @@ -472,7 +472,7 @@ proc processSignedAggregateAndProof*( proc processBlsToExecutionChange*( self: ref Eth2Processor, src: MsgSource, blsToExecutionChange: SignedBLSToExecutionChange): - Future[ValidationRes] {.async.} = + Future[ValidationRes] {.async: (raises: [CancelledError]).} = logScope: blsToExecutionChange = shortLog(blsToExecutionChange) @@ -568,7 +568,7 @@ proc processSyncCommitteeMessage*( self: ref Eth2Processor, src: MsgSource, syncCommitteeMsg: SyncCommitteeMessage, subcommitteeIdx: SyncSubcommitteeIndex, - checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} = + checkSignature: bool = true): Future[Result[void, ValidationError]] {.async: (raises: [CancelledError]).} = let wallTime = self.getCurrentBeaconTime() wallSlot = wallTime.slotOrZero() @@ -612,7 +612,7 @@ proc processSyncCommitteeMessage*( proc processSignedContributionAndProof*( self: ref Eth2Processor, src: MsgSource, contributionAndProof: SignedContributionAndProof, - checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} = + checkSignature: bool = true): Future[Result[void, ValidationError]] {.async: (raises: [CancelledError]).} = let wallTime = self.getCurrentBeaconTime() wallSlot = wallTime.slotOrZero() diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index d96d997cd..71968278b 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -625,7 +625,7 @@ proc validateAttestation*( subnet_id: SubnetId, checkSignature: bool): Future[Result[ tuple[attesting_index: ValidatorIndex, sig: CookedSig], - ValidationError]] {.async.} = + ValidationError]] {.async: (raises: [CancelledError]).} = # Some of the checks below have been reordered compared to the spec, to # perform the cheap checks first - in particular, we want to avoid loading # an `EpochRef` and checking signatures. This reordering might lead to @@ -796,7 +796,7 @@ proc validateAggregate*( checkSignature = true, checkCover = true): Future[Result[ tuple[attestingIndices: seq[ValidatorIndex], sig: CookedSig], - ValidationError]] {.async.} = + ValidationError]] {.async: (raises: [CancelledError]).} = # Some of the checks below have been reordered compared to the spec, to # perform the cheap checks first - in particular, we want to avoid loading # an `EpochRef` and checking signatures. This reordering might lead to @@ -1004,7 +1004,7 @@ proc validateAggregate*( proc validateBlsToExecutionChange*( pool: ValidatorChangePool, batchCrypto: ref BatchCrypto, signed_address_change: SignedBLSToExecutionChange, - wallEpoch: Epoch): Future[Result[void, ValidationError]] {.async.} = + wallEpoch: Epoch): Future[Result[void, ValidationError]] {.async: (raises: [CancelledError]).} = # [IGNORE] `current_epoch >= CAPELLA_FORK_EPOCH`, where `current_epoch` is # defined by the current wall-clock time. if not (wallEpoch >= pool.dag.cfg.CAPELLA_FORK_EPOCH): @@ -1149,7 +1149,7 @@ proc validateSyncCommitteeMessage*( wallTime: BeaconTime, checkSignature: bool): Future[Result[ - (BlockId, CookedSig, seq[uint64]), ValidationError]] {.async.} = + (BlockId, CookedSig, seq[uint64]), ValidationError]] {.async: (raises: [CancelledError]).} = block: # [IGNORE] The message's slot is for the current slot (with a # `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance), i.e. @@ -1241,7 +1241,7 @@ proc validateContribution*( wallTime: BeaconTime, checkSignature: bool ): Future[Result[ - (BlockId, CookedSig, seq[ValidatorIndex]), ValidationError]] {.async.} = + (BlockId, CookedSig, seq[ValidatorIndex]), ValidationError]] {.async: (raises: [CancelledError]).} = block: # [IGNORE] The contribution's slot is for the current slot # (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) diff --git a/beacon_chain/gossip_processing/light_client_processor.nim b/beacon_chain/gossip_processing/light_client_processor.nim index b29d32902..ed37acb73 100644 --- a/beacon_chain/gossip_processing/light_client_processor.nim +++ b/beacon_chain/gossip_processing/light_client_processor.nim @@ -439,7 +439,7 @@ proc addObject*( self: var LightClientProcessor, src: MsgSource, obj: SomeForkedLightClientObject, - resfut: Future[Result[void, VerifierError]] = nil) = + resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil) = ## Enqueue a Gossip-validated light client object for verification # Backpressure: # Only one object is validated at any time - diff --git a/beacon_chain/gossip_processing/optimistic_processor.nim b/beacon_chain/gossip_processing/optimistic_processor.nim index 0005a7066..684171651 100644 --- a/beacon_chain/gossip_processing/optimistic_processor.nim +++ b/beacon_chain/gossip_processing/optimistic_processor.nim @@ -30,14 +30,14 @@ const type MsgTrustedBlockProcessor* = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): Future[void] {. - gcsafe, raises: [].} + async: (raises: [CancelledError]).} OptimisticProcessor* = ref object getBeaconTime: GetBeaconTimeFn optimisticVerifier: MsgTrustedBlockProcessor blocks: Table[Eth2Digest, ref ForkedSignedBeaconBlock] latestOptimisticSlot: Slot - processFut: Future[void] + processFut: Future[void].Raising([CancelledError]) logMoment: Moment proc initOptimisticProcessor*( diff --git a/beacon_chain/light_client.nim b/beacon_chain/light_client.nim index 658d471b0..e757be813 100644 --- a/beacon_chain/light_client.nim +++ b/beacon_chain/light_client.nim @@ -136,8 +136,8 @@ proc createLightClient( strictVerification) proc lightClientVerifier(obj: SomeForkedLightClientObject): - Future[Result[void, VerifierError]] = - let resfut = newFuture[Result[void, VerifierError]]("lightClientVerifier") + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = + let resfut = Future[Result[void, VerifierError]].Raising([CancelledError]).init("lightClientVerifier") lightClient.processor[].addObject(MsgSource.gossip, obj, resfut) resfut proc bootstrapVerifier(obj: ForkedLightClientBootstrap): auto = diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index ef0f2b28d..94081e413 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -2076,12 +2076,16 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async: (raises: [CancelledError]).} = await allFutures(updateFutures) + reset(updateFutures) + for peer in node.peers.values: if peer.connectionState != Connected: continue if peer.failedMetadataRequests > MetadataRequestMaxFailures: debug "no metadata from peer, kicking it", peer - await peer.disconnect(PeerScoreLow) + updateFutures.add(peer.disconnect(PeerScoreLow)) + + await allFutures(updateFutures) await sleepAsync(5.seconds) @@ -2363,7 +2367,8 @@ proc subscribe*( # Passing in `nil` because we do all message processing in the validator node.pubsub.subscribe(topic, nil) -proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult] = +proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult] + {.async: (raises: [CancelledError], raw: true).} = let res = newFuture[ValidationResult]("eth2_network.execValidator") res.complete(v) res @@ -2406,9 +2411,9 @@ proc addValidator*[MsgType](node: Eth2Node, proc addAsyncValidator*[MsgType](node: Eth2Node, topic: string, msgValidator: proc(msg: MsgType): - Future[ValidationResult] {.gcsafe, raises: [].} ) = + Future[ValidationResult] {.async: (raises: [CancelledError]).} ) = proc execValidator(topic: string, message: GossipMsg): - Future[ValidationResult] {.raises: [].} = + Future[ValidationResult] {.async: (raw: true).} = inc nbc_gossip_messages_received trace "Validating incoming gossip message", len = message.data.len, topic diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 461c32f58..a82d4faec 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1662,7 +1662,7 @@ proc installMessageValidators(node: BeaconNode) = node.network.addAsyncValidator( getAttestationTopic(digest, subnet_id), proc ( attestation: Attestation - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processAttestation( MsgSource.gossip, attestation, subnet_id))) @@ -1672,7 +1672,7 @@ proc installMessageValidators(node: BeaconNode) = node.network.addAsyncValidator( getAggregateAndProofsTopic(digest), proc ( signedAggregateAndProof: SignedAggregateAndProof - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processSignedAggregateAndProof( MsgSource.gossip, signedAggregateAndProof))) @@ -1716,7 +1716,7 @@ proc installMessageValidators(node: BeaconNode) = node.network.addAsyncValidator( getSyncCommitteeTopic(digest, idx), proc ( msg: SyncCommitteeMessage - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processSyncCommitteeMessage( MsgSource.gossip, msg, idx))) @@ -1726,7 +1726,7 @@ proc installMessageValidators(node: BeaconNode) = node.network.addAsyncValidator( getSyncCommitteeContributionAndProofTopic(digest), proc ( msg: SignedContributionAndProof - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processSignedContributionAndProof( MsgSource.gossip, msg))) @@ -1736,7 +1736,7 @@ proc installMessageValidators(node: BeaconNode) = node.network.addAsyncValidator( getBlsToExecutionChangeTopic(digest), proc ( msg: SignedBLSToExecutionChange - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processBlsToExecutionChange( MsgSource.gossip, msg))) diff --git a/beacon_chain/nimbus_light_client.nim b/beacon_chain/nimbus_light_client.nim index 8cc6aafc7..4f2a2b190 100644 --- a/beacon_chain/nimbus_light_client.nim +++ b/beacon_chain/nimbus_light_client.nim @@ -106,7 +106,7 @@ programMain: nil optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): - Future[void] {.async.} = + Future[void] {.async: (raises: [CancelledError]).} = notice "New LC optimistic block", opt = signedBlock.toBlockId(), wallSlot = getBeaconTime().slotOrZero diff --git a/beacon_chain/sync/light_client_manager.nim b/beacon_chain/sync/light_client_manager.nim index b4177fa4a..ebbbd342b 100644 --- a/beacon_chain/sync/light_client_manager.nim +++ b/beacon_chain/sync/light_client_manager.nim @@ -35,7 +35,7 @@ type Endpoint[Nothing, ForkedLightClientOptimisticUpdate] ValueVerifier[V] = - proc(v: V): Future[Result[void, VerifierError]] {.gcsafe, raises: [].} + proc(v: V): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} BootstrapVerifier* = ValueVerifier[ForkedLightClientBootstrap] UpdateVerifier* = @@ -65,7 +65,7 @@ type getFinalizedPeriod: GetSyncCommitteePeriodCallback getOptimisticPeriod: GetSyncCommitteePeriodCallback getBeaconTime: GetBeaconTimeFn - loopFuture: Future[void] + loopFuture: Future[void].Raising([CancelledError]) func init*( T: type LightClientManager, @@ -115,8 +115,7 @@ proc doRequest( e: typedesc[Bootstrap], peer: Peer, blockRoot: Eth2Digest -): Future[NetRes[ForkedLightClientBootstrap]] {. - raises: [IOError].} = +): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError], raw: true).} = peer.lightClientBootstrap(blockRoot) # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange @@ -126,8 +125,7 @@ proc doRequest( e: typedesc[UpdatesByRange], peer: Peer, key: tuple[startPeriod: SyncCommitteePeriod, count: uint64] -): Future[LightClientUpdatesByRangeResponse] {. - async.} = +): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [ResponseError, CancelledError]).} = let (startPeriod, count) = key doAssert count > 0 and count <= MAX_REQUEST_LIGHT_CLIENT_UPDATES let response = await peer.lightClientUpdatesByRange(startPeriod, count) @@ -142,14 +140,14 @@ proc doRequest( proc doRequest( e: typedesc[FinalityUpdate], peer: Peer -): Future[NetRes[ForkedLightClientFinalityUpdate]] = +): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError], raw: true).} = peer.lightClientFinalityUpdate() # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate proc doRequest( e: typedesc[OptimisticUpdate], peer: Peer -): Future[NetRes[ForkedLightClientOptimisticUpdate]] = +): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError], raw: true).} = peer.lightClientOptimisticUpdate() template valueVerifier[E]( @@ -179,7 +177,7 @@ proc workerTask[E]( self: LightClientManager, e: typedesc[E], key: E.K -): Future[bool] {.async.} = +): Future[bool] {.async: (raises: [CancelledError]).} = var peer: Peer didProgress = false @@ -246,12 +244,6 @@ proc workerTask[E]( raise exc except PeerPoolError as exc: debug "Failed to acquire peer", exc = exc.msg - except CatchableError as exc: - if peer != nil: - peer.updateScore(PeerScoreNoValues) - debug "Unexpected exception while receiving value", exc = exc.msg, - endpoint = E.name, peer, peer_score = peer.getScore() - raise exc finally: if peer != nil: self.network.peerPool.release(peer) @@ -261,13 +253,13 @@ proc query[E]( self: LightClientManager, e: typedesc[E], key: E.K -): Future[bool] {.async.} = +): Future[bool] {.async: (raises: [CancelledError]).} = const PARALLEL_REQUESTS = 2 var workers: array[PARALLEL_REQUESTS, Future[bool]] let - progressFut = newFuture[void]("lcmanProgress") - doneFut = newFuture[void]("lcmanDone") + progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress") + doneFut = Future[void].Raising([CancelledError]).init("lcmanDone") var numCompleted = 0 maxCompleted = workers.len @@ -300,7 +292,10 @@ proc query[E]( workers[i].complete(false) # Wait for any worker to report progress, or for all workers to finish - discard await race(progressFut, doneFut) + try: + discard await race(progressFut, doneFut) + except ValueError: + raiseAssert "race API invariant" finally: for i in 0 ..< maxCompleted: if workers[i] == nil: @@ -330,11 +325,11 @@ proc query[E]( template query[E]( self: LightClientManager, e: typedesc[E] -): Future[bool] = +): Future[bool].Raising([CancelledError]) = self.query(e, Nothing()) # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.6/specs/altair/light-client/light-client.md#light-client-sync-process -proc loop(self: LightClientManager) {.async.} = +proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = var nextSyncTaskTime = self.getBeaconTime() while true: # Periodically wake and check for changes @@ -391,8 +386,8 @@ proc start*(self: var LightClientManager) = doAssert self.loopFuture == nil self.loopFuture = self.loop() -proc stop*(self: var LightClientManager) {.async.} = +proc stop*(self: var LightClientManager) {.async: (raises: []).} = ## Stop light client manager's loop. if self.loopFuture != nil: - await self.loopFuture.cancelAndWait() + await noCancel self.loopFuture.cancelAndWait() self.loopFuture = nil diff --git a/tests/test_validator_client.nim b/tests/test_validator_client.nim index 4e6b72864..d2220b891 100644 --- a/tests/test_validator_client.nim +++ b/tests/test_validator_client.nim @@ -576,7 +576,7 @@ suite "Validator Client test suite": else: return await request.respond(Http404, "Page not found") else: - return dumbResponse() + return defaultResponse() let server = createServer(initTAddress("127.0.0.1:0"), process, false) server.start() @@ -660,7 +660,7 @@ suite "Validator Client test suite": else: return await request.respond(Http404, "Page not found") else: - return dumbResponse() + return defaultResponse() let server = createServer(initTAddress("127.0.0.1:0"), process, false) server.start() diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 3ca2c5e6b..e296ae30c 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 3ca2c5e6b510c15ce88c94ed25731b30f7ad46b5 +Subproject commit e296ae30c84bdd1f0b12c50ab551ed080f8a815c