From 647fbb84a5ada270c0e42f8a44b7334619dd3efe Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Fri, 19 Jun 2026 01:26:00 +1000 Subject: [PATCH] fix: `allFinishedFailed` -> `allDone` to retain cancellations (#1451) --- storage/errors.nim | 37 ++++++++++++ storage/node.nim | 12 ++-- storage/storage.nim | 38 ++++++++---- tests/storage/node/testnode.nim | 15 +++++ tests/storage/testerrors.nim | 104 ++++++++++++++++++++++++++++++++ 5 files changed, 189 insertions(+), 17 deletions(-) create mode 100644 tests/storage/testerrors.nim diff --git a/storage/errors.nim b/storage/errors.nim index b890a5fa..1095dcb9 100644 --- a/storage/errors.nim +++ b/storage/errors.nim @@ -51,6 +51,9 @@ type FinishedFailed*[T] = tuple[success: seq[Future[T]], failure: seq[Future[T]]] + FinishedFutures*[T] = + tuple[completed: seq[Future[T]], failed: seq[Future[T]], cancelled: seq[Future[T]]] + proc wantBlocksError*(kind: WantBlocksErrorKind, msg: string): ref WantBlocksError = (ref WantBlocksError)(kind: kind, msg: msg) @@ -115,3 +118,37 @@ proc allFinishedValues*[T]( if b.finished: b.value return success values + +proc allDone*[T]( + futs: auto +): Future[FinishedFutures[T]] {.async: (raises: [CancelledError]).} = + ## Returns a future which will complete only when all futures in ``futs`` + ## will be completed, failed or cancelled. + ## + ## Returned FinishedFutures will hold all the Future[T] objects passed to + ## ``allDone``, grouped by their end state, with the order preserved. + ## + ## If the argument is empty, the returned future COMPLETES immediately. + ## + ## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled. + ## + ## This is different from nim-chronos' ``allFinished``, in that the completed + ## futures are grouped by their end state. + await allFutures(futs) + var res: FinishedFutures[T] = (@[], @[], @[]) + + for f in futs: + if f.completed: + when T is Result: + # count successfully completed Future containing Results with errors as + # failed + if f.value.isErr: + res.failed.add f + continue + res.completed.add f + elif f.cancelled: + res.cancelled.add f + else: + res.failed.add f + + return res diff --git a/storage/node.nim b/storage/node.nim index f347a4c3..90c40454 100644 --- a/storage/node.nim +++ b/storage/node.nim @@ -120,10 +120,14 @@ proc updateExpiry*( self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry) ) - let res = await allFinishedFailed[?!void](ensuringFutures) - if res.failure.len > 0: - trace "Some blocks failed to update expiry", len = res.failure.len - return failure("Some blocks failed to update expiry (" & $res.failure.len & " )") + let res = await allDone[?!void](ensuringFutures) + if res.failed.len > 0: + trace "Some blocks failed to update expiry", len = res.failed.len + return failure("Some blocks failed to update expiry (" & $res.failed.len & " )") + if res.cancelled.len > 0: + trace "Block expiry update was cancelled in some blocks", len = res.cancelled.len + raise + newException(CancelledError, "Block expiry update was cancelled in some blocks") except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/storage/storage.nim b/storage/storage.nim index a26c04e8..558f24b1 100644 --- a/storage/storage.nim +++ b/storage/storage.nim @@ -163,29 +163,33 @@ proc stop*(s: StorageServer) {.async.} = if s.restServer != nil: futures.add(s.restServer.stop()) - let res = await noCancel allFinishedFailed[void](futures) + let res = await noCancel allDone[void](futures) s.isStarted = false - if res.failure.len > 0: - error "Failed to stop Storage node", failures = res.failure.len + if res.failed.len > 0: + error "Failed to stop Storage node", failures = res.failed.len raise newException( StorageError, - "Failed to stop Storage node: " & res.failure.mapIt(it.error.msg).join(", "), + "Failed to stop Storage node: " & res.failed.mapIt(it.error.msg).join(", "), + ) + if res.cancelled.len > 0: + warn "Storage node stop was cancelled due to child stop routine(s) being cancelled, child routines cancelled: ", + cancellations = res.cancelled.len + raise newException( + CancelledError, + "Storage node stop was cancelled due to child stop routine(s) being cancelled, child routines cancelled: " & + $res.cancelled.len, ) proc close*(s: StorageServer) {.async.} = var futures = @[s.storageNode.close(), s.repoStore.close(), s.storageNode.discovery.close()] - let res = await noCancel allFinishedFailed[void](futures) + let res = await noCancel allDone[void](futures) if not s.taskpool.isNil: - try: - s.taskpool.shutdown() - except Exception as exc: - error "Failed to stop the taskpool", failures = res.failure.len - raise newException(StorageError, "Failure in taskpool shutdown: " & exc.msg) + s.taskpool.shutdown() when defaultChroniclesStream.outputs.type.arity >= 3: proc noOutput(logLevel: LogLevel, msg: LogOutputStr) = @@ -197,11 +201,19 @@ proc close*(s: StorageServer) {.async.} = if error =? closeFile(s.logFile.get()).errorOption: error "Failed to close log file", errorCode = $error - if res.failure.len > 0: - error "Failed to close Storage node", failures = res.failure.len + if res.failed.len > 0: + error "Failed to close Storage node", failures = res.failed.len raise newException( StorageError, - "Failed to close Storage node: " & res.failure.mapIt(it.error.msg).join(", "), + "Failed to close Storage node: " & res.failed.mapIt(it.error.msg).join(", "), + ) + if res.cancelled.len > 0: + warn "Storage node close was cancelled due to child close routine(s) being cancelled, child routines cancelled: ", + cancellations = res.cancelled.len + raise newException( + CancelledError, + "Storage node close was cancelled due to child close routine(s) being cancelled, child routines cancelled: " & + $res.cancelled.len, ) proc shutdown*(server: StorageServer) {.async.} = diff --git a/tests/storage/node/testnode.nim b/tests/storage/node/testnode.nim index 13091e24..7731c0b7 100644 --- a/tests/storage/node/testnode.nim +++ b/tests/storage/node/testnode.nim @@ -233,3 +233,18 @@ asyncchecksuite "Test Node - Basic": let randomBlock = bt.Block.new("Random block".toBytes).tryGet() check (await node.hasLocalBlock(randomBlock.cid)) == false + + test "updateExpiry succeeds when all blocks have metadata": + let + md = await storeDataGetManifest(localStore, chunker) + expiry = SecondsSince1970(9999999999) + let res = await node.updateExpiry(md.manifestCid, expiry) + check res.isOk + + test "updateExpiry returns failure when block metadata is missing": + let + manifest = Manifest.example + manifestBlk = (await node.storeManifest(manifest)).tryGet() + expiry = SecondsSince1970(9999999999) + let res = await node.updateExpiry(manifestBlk.cid, expiry) + check res.isErr diff --git a/tests/storage/testerrors.nim b/tests/storage/testerrors.nim new file mode 100644 index 00000000..c61c500e --- /dev/null +++ b/tests/storage/testerrors.nim @@ -0,0 +1,104 @@ +import pkg/chronos + +import pkg/storage/errors + +import ../asynctest +import ../checktest + +asyncchecksuite "allDone": + test "empty sequence completes immediately with all groups empty": + let futs = newSeq[Future[void]]() + let res = await allDone[void](futs) + check res.completed.len == 0 + check res.failed.len == 0 + check res.cancelled.len == 0 + + test "all completed futures go in the completed group": + let + fut1 = Future[void].Raising([CancelledError]).init("f1") + fut2 = Future[void].Raising([CancelledError]).init("f2") + fut1.complete() + fut2.complete() + let res = await allDone[void](@[fut1, fut2]) + check res.completed.len == 2 + check res.failed.len == 0 + check res.cancelled.len == 0 + + test "all failed futures go in the failed group": + let + fut1 = Future[void].Raising([CancelledError, CatchableError]).init("f1") + fut2 = Future[void].Raising([CancelledError, CatchableError]).init("f2") + fut1.fail(newException(CatchableError, "e1")) + fut2.fail(newException(CatchableError, "e2")) + let res = await allDone[void](@[fut1, fut2]) + check res.completed.len == 0 + check res.failed.len == 2 + check res.cancelled.len == 0 + + test "all cancelled futures go in the cancelled group": + let + fut1 = Future[void].Raising([CancelledError]).init("f1") + fut2 = Future[void].Raising([CancelledError]).init("f2") + await fut1.cancelAndWait() + await fut2.cancelAndWait() + let res = await allDone[void](@[fut1, fut2]) + check res.completed.len == 0 + check res.failed.len == 0 + check res.cancelled.len == 2 + + test "futures are grouped by their end state": + let + completed = + Future[void].Raising([CancelledError, CatchableError]).init("completed") + failed = Future[void].Raising([CancelledError, CatchableError]).init("failed") + cancelled = + Future[void].Raising([CancelledError, CatchableError]).init("cancelled") + completed.complete() + failed.fail(newException(CatchableError, "e")) + await cancelled.cancelAndWait() + let res = await allDone[void](@[completed, failed, cancelled]) + check res.completed.len == 1 + check res.failed.len == 1 + check res.cancelled.len == 1 + check res.completed[0] == completed + check res.failed[0] == failed + check res.cancelled[0] == cancelled + + test "order within each group matches the original input order": + let + fut1 = Future[void].Raising([CancelledError]).init("f1") + fut2 = Future[void].Raising([CancelledError]).init("f2") + fut3 = Future[void].Raising([CancelledError]).init("f3") + fut1.complete() + fut2.complete() + fut3.complete() + let res = await allDone[void](@[fut1, fut2, fut3]) + check res.completed.len == 3 + check res.completed[0] == fut1 + check res.completed[1] == fut2 + check res.completed[2] == fut3 + + test "cancelling allDone does not cancel the awaited futures": + let + fut1 = Future[void].Raising([CancelledError]).init("f1") + fut2 = Future[void].Raising([CancelledError]).init("f2") + let doneFut = allDone[void](@[fut1, fut2]) + await doneFut.cancelAndWait() + check doneFut.cancelled + check not fut1.cancelled + check not fut2.cancelled + fut1.complete() + fut2.complete() + + test "failed Result is grouped as a failed future": + let + fut1 = Future[Result[int, string]].Raising([CancelledError]).init("f1") + fut2 = Future[Result[int, string]].Raising([CancelledError]).init("f2") + fut1.complete(Result[int, string].ok(42)) + fut2.complete(Result[int, string].err("error")) + let res = await allDone[Result[int, string]](@[fut1, fut2]) + check res.completed.len == 1 + check res.failed.len == 1 + check res.cancelled.len == 0 + check res.completed[0] == fut1 + check res.failed[0] == fut2