diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 4d55696c8..89855588b 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -86,7 +86,7 @@ proc lazyWaiter(node: BeaconNodeServerRef, request: FutureBase, ApiFailure.Communication, requestName, strategy, node, $request.error.msg) node.updateStatus(RestBeaconNodeStatus.Offline, failure) - except CancelledError as exc: + except CancelledError: await cancelAndWait(request) proc lazyWait(nodes: seq[BeaconNodeServerRef], requests: seq[FutureBase], @@ -257,7 +257,7 @@ template firstSuccessParallel*( for future in pendingRequests.items(): if not(future.finished()): pendingCancel.add(future.cancelAndWait()) - await allFutures(pendingCancel) + await noCancel allFutures(pendingCancel) raise exc except CatchableError as exc: # This should not be happened, because allFutures() and race() did not @@ -423,7 +423,7 @@ template bestSuccess*( if not(future.finished()): pendingCancel.add(future.cancelAndWait()) # Awaiting cancellations. - await allFutures(pendingCancel) + await noCancel allFutures(pendingCancel) raise exc except CatchableError as exc: # This should not be happened, because allFutures() and race() @@ -526,7 +526,7 @@ template onceToAll*( pendingCancel.add(fut.cancelAndWait()) if not(isNil(timerFut)) and not(timerFut.finished()): pendingCancel.add(timerFut.cancelAndWait()) - await allFutures(pendingCancel) + await noCancel allFutures(pendingCancel) raise exc except CatchableError: # This should not be happened, because allFutures() and race() did not @@ -664,7 +664,7 @@ template firstSuccessSequential*( pending.add(bodyFut.cancelAndWait()) if not(isNil(timerFut)) and not(timerFut.finished()): pending.add(timerFut.cancelAndWait()) - await allFutures(pending) + await noCancel allFutures(pending) raise exc except CatchableError as exc: # This case should not happen. diff --git a/beacon_chain/validator_client/attestation_service.nim b/beacon_chain/validator_client/attestation_service.nim index 6d9739802..e060bd6f1 100644 --- a/beacon_chain/validator_client/attestation_service.nim +++ b/beacon_chain/validator_client/attestation_service.nim @@ -205,10 +205,9 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef, try: await allFutures(pendingAttestations) except CancelledError as exc: - for fut in pendingAttestations: - if not(fut.finished()): - fut.cancel() - await allFutures(pendingAttestations) + let pending = pendingAttestations + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc for future in pendingAttestations: @@ -300,10 +299,9 @@ proc produceAndPublishAggregates(service: AttestationServiceRef, try: await allFutures(pendingAggregates) except CancelledError as exc: - for fut in pendingAggregates: - if not(fut.finished()): - fut.cancel() - await allFutures(pendingAggregates) + let pending = pendingAggregates + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc for future in pendingAggregates: @@ -393,7 +391,7 @@ proc spawnAttestationTasks(service: AttestationServiceRef, except CancelledError as exc: # Cancelling all the pending tasks. let pending = tasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) - await allFutures(pending) + await noCancel allFutures(pending) raise exc except CatchableError as exc: error "Unexpected error while processing attestation duties", diff --git a/beacon_chain/validator_client/block_service.nim b/beacon_chain/validator_client/block_service.nim index 0cd889bbc..ad4cad057 100644 --- a/beacon_chain/validator_client/block_service.nim +++ b/beacon_chain/validator_client/block_service.nim @@ -465,7 +465,7 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch, debug "Cancelling running proposal duty task", slot = task.duty.slot, validator = shortLog(task.duty.pubkey) - task.future.cancel() + task.future.cancelSoon() else: # If task is already running for proper slot, we keep it alive. debug "Keep running previous proposal duty task", @@ -714,18 +714,18 @@ proc runBlockPollMonitor(service: BlockServiceRef, break res if blockReceived: - var pending: seq[Future[void]] - for future in pendingTasks: - if not(future.finished()): pending.add(future.cancelAndWait()) - await allFutures(pending) + let pending = + pendingTasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + # We use `noCancel` here because its cleanup and we have `break` + # after it. + await noCancel allFutures(pending) break pendingTasks.keepItIf(it != completedFuture) if len(pendingTasks) == 0: break except CancelledError as exc: - var pending: seq[Future[void]] - for future in pendingTasks: - if not(future.finished()): pending.add(future.cancelAndWait()) - await allFutures(pending) + let pending = + pendingTasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc except CatchableError as exc: warn "An unexpected error occurred while running block monitoring", @@ -755,10 +755,9 @@ proc runBlockMonitor(service: BlockServiceRef) {.async.} = try: await allFutures(pendingTasks) except CancelledError as exc: - var pending: seq[Future[void]] - for future in pendingTasks: - if not(future.finished()): pending.add(future.cancelAndWait()) - await allFutures(pending) + let pending = + pendingTasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc except CatchableError as exc: warn "An unexpected error occurred while running block monitoring", @@ -781,12 +780,12 @@ proc mainLoop(service: BlockServiceRef) {.async.} = err_msg = exc.msg # We going to cleanup all the pending proposer tasks. - var res: seq[Future[void]] + var res: seq[FutureBase] for epoch, data in vc.proposers.pairs(): for duty in data.duties.items(): if not(duty.future.finished()): res.add(duty.future.cancelAndWait()) - await allFutures(res) + await noCancel allFutures(res) proc init*(t: typedesc[BlockServiceRef], vc: ValidatorClientRef): Future[BlockServiceRef] {.async.} = diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index ea27720ef..7202bc9aa 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -1581,10 +1581,9 @@ proc fillSyncCommitteeSelectionProofs*( try: discard await race(pendingRequests) except CancelledError as exc: - var pending: seq[Future[void]] - for future in pendingRequests: - if not(future.finished()): pending.add(future.cancelAndWait()) - await allFutures(pending) + let pending = pendingRequests + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc (requests, pendingRequests) = @@ -1660,10 +1659,9 @@ proc fillAttestationSelectionProofs*( try: discard await race(pendingRequests) except CancelledError as exc: - var pending: seq[Future[void]] - for future in pendingRequests: - if not(future.finished()): pending.add(future.cancelAndWait()) - await allFutures(pending) + let pending = pendingRequests + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc (requests, pendingRequests) = diff --git a/beacon_chain/validator_client/fallback_service.nim b/beacon_chain/validator_client/fallback_service.nim index 83f1690e3..fa2750687 100644 --- a/beacon_chain/validator_client/fallback_service.nim +++ b/beacon_chain/validator_client/fallback_service.nim @@ -333,11 +333,9 @@ proc checkNodes*(service: FallbackServiceRef): Future[bool] {.async.} = if fut.completed() and fut.read(): res = true except CancelledError as exc: - var pending: seq[Future[void]] - for future in pendingChecks: - if not(future.finished()): - pending.add(future.cancelAndWait()) - await allFutures(pending) + let pending = pendingChecks + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc return res @@ -456,10 +454,9 @@ proc processTimeMonitoring(service: FallbackServiceRef) {.async.} = pendingChecks.add(service.runTimeMonitor(node)) await allFutures(pendingChecks) except CancelledError as exc: - var pending: seq[Future[void]] - for future in pendingChecks: - if not(future.finished()): pending.add(future.cancelAndWait()) - await allFutures(pending) + let pending = pendingChecks + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc except CatchableError as exc: warn "An unexpected error occurred while running time monitoring", diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index e815f10f6..3906d94fd 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -113,10 +113,9 @@ proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef, try: await allFutures(pendingSyncCommitteeMessages) except CancelledError as exc: - for fut in pendingSyncCommitteeMessages: - if not(fut.finished()): - fut.cancel() - await allFutures(pendingSyncCommitteeMessages) + let pending = pendingSyncCommitteeMessages + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc for future in pendingSyncCommitteeMessages: @@ -253,10 +252,9 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef, try: discard await race(pendingFutures) except CancelledError as exc: - var pending: seq[Future[void]] - for future in pendingFutures: - if not(future.finished()): pending.add(future.cancelAndWait()) - await allFutures(pending) + let pending = pendingFutures + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) raise exc var completed: seq[int] @@ -308,12 +306,11 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef, var errored, succeed, failed = 0 try: await allFutures(pendingAggregates) - except CancelledError as err: - for fut in pendingAggregates: - if not(fut.finished()): - fut.cancel() - await allFutures(pendingAggregates) - raise err + except CancelledError as exc: + let pending = pendingAggregates + .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + raise exc for future in pendingAggregates: if future.completed():