diff --git a/beacon_chain/el/el_manager.nim b/beacon_chain/el/el_manager.nim index be5f95195..df7f2564d 100644 --- a/beacon_chain/el/el_manager.nim +++ b/beacon_chain/el/el_manager.nim @@ -353,7 +353,7 @@ proc trackEngineApiRequest(connection: ELConnection, deadline.addCallback do (udata: pointer) {.gcsafe, raises: [].}: if not request.finished: - request.cancel() + request.cancelSoon() engine_api_timeouts.inc(1, [connection.engineUrl.url, requestName]) if not failureAllowed: connection.setDegradedState(requestName, 0, "Request timed out") @@ -944,7 +944,7 @@ proc getPayload*(m: ELManager, var bestPayloadIdx = none int for idx, req in requests: if not req.finished: - req.cancel() + req.cancelSoon() elif req.failed: error "Failed to get execution payload from EL", url = m.elConnections[idx].engineUrl.url, @@ -1059,12 +1059,12 @@ proc selectConnectionForChainSyncing(m: ELManager): Future[ELConnection] {.async await firstCompletedFuture(connectionsFuts) except CancelledError as err: for future in connectionsFuts: - future.cancel() + future.cancelSoon() raise err for future in connectionsFuts: if future != firstConnected: - future.cancel() + future.cancelSoon() return m.elConnections[find(connectionsFuts, firstConnected)] @@ -1447,7 +1447,7 @@ proc exchangeTransitionConfiguration*(m: ELManager) {.async.} = var cancelled = 0 for idx, req in requests: if not req.finished: - req.cancel() + req.cancelSoon() inc cancelled if cancelled == requests.len: @@ -1871,7 +1871,7 @@ proc new*(T: type ELManager, proc safeCancel(fut: var Future[void]) = if not fut.isNil and not fut.finished: - fut.cancel() + fut.cancelSoon() fut = nil func clear(chain: var Eth1Chain) = diff --git a/beacon_chain/future_combinators.nim b/beacon_chain/future_combinators.nim index 7c0d7c5d9..5a22a5a92 100644 --- a/beacon_chain/future_combinators.nim +++ b/beacon_chain/future_combinators.nim @@ -102,4 +102,4 @@ proc firstCompleted*[T](futs: varargs[Future[T]]): Future[T] = subFuture.addCallback(cb, cast[pointer](subFuture)) retFuture.cancelCallback = proc (udata: pointer) = - subFuture.cancel() + subFuture.cancelSoon() diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index bfdf6c348..ed376d9ae 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1317,7 +1317,7 @@ proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} = await workfut or deadline if workfut.finished(): if not deadline.finished(): - deadline.cancel() + deadline.cancelSoon() inc nbc_successful_dials else: debug "Connection to remote peer timed out" diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 36c7dd158..d928aa6f3 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -114,19 +114,21 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, var fut2 = outgoingEvent(eventType).wait() try: discard await one(fut1, fut2) - if fut1.finished: - if not(fut2.finished): - fut2.cancel() + if fut1.finished(): + if not(fut2.finished()): + await fut2.cancelAndWait() incomingEvent(eventType).clear() else: - if not(fut1.finished): - fut1.cancel() + if not(fut1.finished()): + await fut1.cancelAndWait() outgoingEvent(eventType).clear() except CancelledError as exc: - if not(fut1.finished): - fut1.cancel() - if not(fut2.finished): - fut2.cancel() + var pending: seq[FutureBase] + if not(fut1.finished()): + pending.add(fut1.cancelAndWait()) + if not(fut2.finished()): + pending.add(fut2.cancelAndWait()) + await noCancel allFutures(pending) raise exc elif PeerType.Incoming in filter: await incomingEvent(eventType).wait() diff --git a/beacon_chain/nimbus_signing_node.nim b/beacon_chain/nimbus_signing_node.nim index 7b8c52bab..1b1e5b7d7 100644 --- a/beacon_chain/nimbus_signing_node.nim +++ b/beacon_chain/nimbus_signing_node.nim @@ -420,7 +420,7 @@ proc asyncRun*(sn: SigningNodeRef) {.async.} = pending.add(cancelAndWait(sn.runKeystoreCachePruningLoopFut)) pending.add(sn.stop()) pending.add(sn.close()) - await allFutures(pending) + await noCancel allFutures(pending) template runWithSignals(sn: SigningNodeRef, body: untyped): bool = let future = body @@ -434,7 +434,7 @@ template runWithSignals(sn: SigningNodeRef, body: untyped): bool = pending.add(cancelAndWait(sn.sigintHandleFut)) if not(sn.sigtermHandleFut.finished()): pending.add(cancelAndWait(sn.sigtermHandleFut)) - await allFutures(pending) + await noCancel allFutures(pending) false else: true @@ -446,7 +446,7 @@ template runWithSignals(sn: SigningNodeRef, body: untyped): bool = pending.add(cancelAndWait(sn.sigintHandleFut)) if not(sn.sigtermHandleFut.finished()): pending.add(cancelAndWait(sn.sigtermHandleFut)) - await allFutures(pending) + await noCancel allFutures(pending) false proc runSigningNode(config: SigningNodeConf) {.async.} = diff --git a/beacon_chain/rpc/rest_event_api.nim b/beacon_chain/rpc/rest_event_api.nim index 162aa6dbf..b27f2faed 100644 --- a/beacon_chain/rpc/rest_event_api.nim +++ b/beacon_chain/rpc/rest_event_api.nim @@ -8,6 +8,7 @@ {.push raises: [].} import + std/sequtils, stew/results, chronicles, ./rest_utils, @@ -160,12 +161,6 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) = # One of the handlers finished, it means that connection has been droped, so # we cancelling all other handlers. let pending = - block: - var res: seq[Future[void]] - for fut in handlers: - if not(fut.finished()): - fut.cancel() - res.add(fut) - res - await allFutures(pending) + handlers.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) return diff --git a/beacon_chain/sync/light_client_manager.nim b/beacon_chain/sync/light_client_manager.nim index 3b65c5b42..ea0060473 100644 --- a/beacon_chain/sync/light_client_manager.nim +++ b/beacon_chain/sync/light_client_manager.nim @@ -281,7 +281,7 @@ proc query[E]( progressFut.complete() except CancelledError as exc: if not progressFut.finished: - progressFut.cancel() + progressFut.cancelSoon() except CatchableError as exc: discard finally: @@ -311,7 +311,7 @@ proc query[E]( doneFut.complete() break if not workers[i].finished: - workers[i].cancel() + workers[i].cancelSoon() while true: try: await allFutures(workers[0 ..< maxCompleted]) @@ -326,7 +326,7 @@ proc query[E]( continue if not progressFut.finished: - progressFut.cancel() + progressFut.cancelSoon() return progressFut.completed template query[E]( diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 85db76752..8fe3b3ff5 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -353,7 +353,7 @@ proc start*(rman: var RequestManager) = proc stop*(rman: RequestManager) = ## Stop Request Manager's loop. if not(isNil(rman.blockLoopFuture)): - rman.blockLoopFuture.cancel() + rman.blockLoopFuture.cancelSoon() if not(isNil(rman.blobLoopFuture)): - rman.blobLoopFuture.cancel() + rman.blobLoopFuture.cancelSoon() diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 7248a7459..2bd07a20a 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -630,18 +630,15 @@ proc toTimeLeftString*(d: Duration): string = proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void], speedTaskFut: Future[void]) {.async.} = - guardTaskFut.cancel() - speedTaskFut.cancel() - await allFutures(guardTaskFut, speedTaskFut) - let pendingTasks = - block: - var res: seq[Future[void]] - for worker in man.workers: - doAssert(worker.status in {Sleeping, WaitingPeer}) - worker.future.cancel() - res.add(worker.future) - res - await allFutures(pendingTasks) + var pending: seq[FutureBase] + if not(guardTaskFut.finished()): + pending.add(guardTaskFut.cancelAndWait()) + if not(speedTaskFut.finished()): + pending.add(speedTaskFut.cancelAndWait()) + for worker in man.workers: + doAssert(worker.status in {Sleeping, WaitingPeer}) + pending.add(worker.future.cancelAndWait()) + await noCancel allFutures(pending) proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = logScope: