fix: allFinishedFailed -> allDone to retain cancellations (#1451)

This commit is contained in:
Eric 2026-06-19 01:26:00 +10:00 committed by GitHub
parent 7931aba01a
commit 647fbb84a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 189 additions and 17 deletions

View File

@ -51,6 +51,9 @@ type
FinishedFailed*[T] = tuple[success: seq[Future[T]], failure: seq[Future[T]]]
FinishedFutures*[T] =
tuple[completed: seq[Future[T]], failed: seq[Future[T]], cancelled: seq[Future[T]]]
proc wantBlocksError*(kind: WantBlocksErrorKind, msg: string): ref WantBlocksError =
(ref WantBlocksError)(kind: kind, msg: msg)
@ -115,3 +118,37 @@ proc allFinishedValues*[T](
if b.finished:
b.value
return success values
proc allDone*[T](
futs: auto
): Future[FinishedFutures[T]] {.async: (raises: [CancelledError]).} =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or cancelled.
##
## Returned FinishedFutures will hold all the Future[T] objects passed to
## ``allDone``, grouped by their end state, with the order preserved.
##
## If the argument is empty, the returned future COMPLETES immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
##
## This is different from nim-chronos' ``allFinished``, in that the completed
## futures are grouped by their end state.
await allFutures(futs)
var res: FinishedFutures[T] = (@[], @[], @[])
for f in futs:
if f.completed:
when T is Result:
# count successfully completed Future containing Results with errors as
# failed
if f.value.isErr:
res.failed.add f
continue
res.completed.add f
elif f.cancelled:
res.cancelled.add f
else:
res.failed.add f
return res

View File

@ -120,10 +120,14 @@ proc updateExpiry*(
self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry)
)
let res = await allFinishedFailed[?!void](ensuringFutures)
if res.failure.len > 0:
trace "Some blocks failed to update expiry", len = res.failure.len
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
let res = await allDone[?!void](ensuringFutures)
if res.failed.len > 0:
trace "Some blocks failed to update expiry", len = res.failed.len
return failure("Some blocks failed to update expiry (" & $res.failed.len & " )")
if res.cancelled.len > 0:
trace "Block expiry update was cancelled in some blocks", len = res.cancelled.len
raise
newException(CancelledError, "Block expiry update was cancelled in some blocks")
except CancelledError as exc:
raise exc
except CatchableError as exc:

View File

@ -163,29 +163,33 @@ proc stop*(s: StorageServer) {.async.} =
if s.restServer != nil:
futures.add(s.restServer.stop())
let res = await noCancel allFinishedFailed[void](futures)
let res = await noCancel allDone[void](futures)
s.isStarted = false
if res.failure.len > 0:
error "Failed to stop Storage node", failures = res.failure.len
if res.failed.len > 0:
error "Failed to stop Storage node", failures = res.failed.len
raise newException(
StorageError,
"Failed to stop Storage node: " & res.failure.mapIt(it.error.msg).join(", "),
"Failed to stop Storage node: " & res.failed.mapIt(it.error.msg).join(", "),
)
if res.cancelled.len > 0:
warn "Storage node stop was cancelled due to child stop routine(s) being cancelled, child routines cancelled: ",
cancellations = res.cancelled.len
raise newException(
CancelledError,
"Storage node stop was cancelled due to child stop routine(s) being cancelled, child routines cancelled: " &
$res.cancelled.len,
)
proc close*(s: StorageServer) {.async.} =
var futures =
@[s.storageNode.close(), s.repoStore.close(), s.storageNode.discovery.close()]
let res = await noCancel allFinishedFailed[void](futures)
let res = await noCancel allDone[void](futures)
if not s.taskpool.isNil:
try:
s.taskpool.shutdown()
except Exception as exc:
error "Failed to stop the taskpool", failures = res.failure.len
raise newException(StorageError, "Failure in taskpool shutdown: " & exc.msg)
s.taskpool.shutdown()
when defaultChroniclesStream.outputs.type.arity >= 3:
proc noOutput(logLevel: LogLevel, msg: LogOutputStr) =
@ -197,11 +201,19 @@ proc close*(s: StorageServer) {.async.} =
if error =? closeFile(s.logFile.get()).errorOption:
error "Failed to close log file", errorCode = $error
if res.failure.len > 0:
error "Failed to close Storage node", failures = res.failure.len
if res.failed.len > 0:
error "Failed to close Storage node", failures = res.failed.len
raise newException(
StorageError,
"Failed to close Storage node: " & res.failure.mapIt(it.error.msg).join(", "),
"Failed to close Storage node: " & res.failed.mapIt(it.error.msg).join(", "),
)
if res.cancelled.len > 0:
warn "Storage node close was cancelled due to child close routine(s) being cancelled, child routines cancelled: ",
cancellations = res.cancelled.len
raise newException(
CancelledError,
"Storage node close was cancelled due to child close routine(s) being cancelled, child routines cancelled: " &
$res.cancelled.len,
)
proc shutdown*(server: StorageServer) {.async.} =

View File

@ -233,3 +233,18 @@ asyncchecksuite "Test Node - Basic":
let randomBlock = bt.Block.new("Random block".toBytes).tryGet()
check (await node.hasLocalBlock(randomBlock.cid)) == false
test "updateExpiry succeeds when all blocks have metadata":
let
md = await storeDataGetManifest(localStore, chunker)
expiry = SecondsSince1970(9999999999)
let res = await node.updateExpiry(md.manifestCid, expiry)
check res.isOk
test "updateExpiry returns failure when block metadata is missing":
let
manifest = Manifest.example
manifestBlk = (await node.storeManifest(manifest)).tryGet()
expiry = SecondsSince1970(9999999999)
let res = await node.updateExpiry(manifestBlk.cid, expiry)
check res.isErr

View File

@ -0,0 +1,104 @@
import pkg/chronos
import pkg/storage/errors
import ../asynctest
import ../checktest
asyncchecksuite "allDone":
test "empty sequence completes immediately with all groups empty":
let futs = newSeq[Future[void]]()
let res = await allDone[void](futs)
check res.completed.len == 0
check res.failed.len == 0
check res.cancelled.len == 0
test "all completed futures go in the completed group":
let
fut1 = Future[void].Raising([CancelledError]).init("f1")
fut2 = Future[void].Raising([CancelledError]).init("f2")
fut1.complete()
fut2.complete()
let res = await allDone[void](@[fut1, fut2])
check res.completed.len == 2
check res.failed.len == 0
check res.cancelled.len == 0
test "all failed futures go in the failed group":
let
fut1 = Future[void].Raising([CancelledError, CatchableError]).init("f1")
fut2 = Future[void].Raising([CancelledError, CatchableError]).init("f2")
fut1.fail(newException(CatchableError, "e1"))
fut2.fail(newException(CatchableError, "e2"))
let res = await allDone[void](@[fut1, fut2])
check res.completed.len == 0
check res.failed.len == 2
check res.cancelled.len == 0
test "all cancelled futures go in the cancelled group":
let
fut1 = Future[void].Raising([CancelledError]).init("f1")
fut2 = Future[void].Raising([CancelledError]).init("f2")
await fut1.cancelAndWait()
await fut2.cancelAndWait()
let res = await allDone[void](@[fut1, fut2])
check res.completed.len == 0
check res.failed.len == 0
check res.cancelled.len == 2
test "futures are grouped by their end state":
let
completed =
Future[void].Raising([CancelledError, CatchableError]).init("completed")
failed = Future[void].Raising([CancelledError, CatchableError]).init("failed")
cancelled =
Future[void].Raising([CancelledError, CatchableError]).init("cancelled")
completed.complete()
failed.fail(newException(CatchableError, "e"))
await cancelled.cancelAndWait()
let res = await allDone[void](@[completed, failed, cancelled])
check res.completed.len == 1
check res.failed.len == 1
check res.cancelled.len == 1
check res.completed[0] == completed
check res.failed[0] == failed
check res.cancelled[0] == cancelled
test "order within each group matches the original input order":
let
fut1 = Future[void].Raising([CancelledError]).init("f1")
fut2 = Future[void].Raising([CancelledError]).init("f2")
fut3 = Future[void].Raising([CancelledError]).init("f3")
fut1.complete()
fut2.complete()
fut3.complete()
let res = await allDone[void](@[fut1, fut2, fut3])
check res.completed.len == 3
check res.completed[0] == fut1
check res.completed[1] == fut2
check res.completed[2] == fut3
test "cancelling allDone does not cancel the awaited futures":
let
fut1 = Future[void].Raising([CancelledError]).init("f1")
fut2 = Future[void].Raising([CancelledError]).init("f2")
let doneFut = allDone[void](@[fut1, fut2])
await doneFut.cancelAndWait()
check doneFut.cancelled
check not fut1.cancelled
check not fut2.cancelled
fut1.complete()
fut2.complete()
test "failed Result is grouped as a failed future":
let
fut1 = Future[Result[int, string]].Raising([CancelledError]).init("f1")
fut2 = Future[Result[int, string]].Raising([CancelledError]).init("f2")
fut1.complete(Result[int, string].ok(42))
fut2.complete(Result[int, string].err("error"))
let res = await allDone[Result[int, string]](@[fut1, fut2])
check res.completed.len == 1
check res.failed.len == 1
check res.cancelled.len == 0
check res.completed[0] == fut1
check res.failed[0] == fut2