mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-06-26 12:29:30 +00:00
Merge branch 'master' into feat/libstorage-metrics
This commit is contained in:
commit
b6306003d1
@ -51,6 +51,9 @@ type
|
||||
|
||||
FinishedFailed*[T] = tuple[success: seq[Future[T]], failure: seq[Future[T]]]
|
||||
|
||||
FinishedFutures*[T] =
|
||||
tuple[completed: seq[Future[T]], failed: seq[Future[T]], cancelled: seq[Future[T]]]
|
||||
|
||||
proc wantBlocksError*(kind: WantBlocksErrorKind, msg: string): ref WantBlocksError =
|
||||
(ref WantBlocksError)(kind: kind, msg: msg)
|
||||
|
||||
@ -115,3 +118,37 @@ proc allFinishedValues*[T](
|
||||
if b.finished:
|
||||
b.value
|
||||
return success values
|
||||
|
||||
proc allDone*[T](
|
||||
futs: auto
|
||||
): Future[FinishedFutures[T]] {.async: (raises: [CancelledError]).} =
|
||||
## Returns a future which will complete only when all futures in ``futs``
|
||||
## will be completed, failed or cancelled.
|
||||
##
|
||||
## Returned FinishedFutures will hold all the Future[T] objects passed to
|
||||
## ``allDone``, grouped by their end state, with the order preserved.
|
||||
##
|
||||
## If the argument is empty, the returned future COMPLETES immediately.
|
||||
##
|
||||
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
|
||||
##
|
||||
## This is different from nim-chronos' ``allFinished``, in that the completed
|
||||
## futures are grouped by their end state.
|
||||
await allFutures(futs)
|
||||
var res: FinishedFutures[T] = (@[], @[], @[])
|
||||
|
||||
for f in futs:
|
||||
if f.completed:
|
||||
when T is Result:
|
||||
# count successfully completed Future containing Results with errors as
|
||||
# failed
|
||||
if f.value.isErr:
|
||||
res.failed.add f
|
||||
continue
|
||||
res.completed.add f
|
||||
elif f.cancelled:
|
||||
res.cancelled.add f
|
||||
else:
|
||||
res.failed.add f
|
||||
|
||||
return res
|
||||
|
||||
@ -120,10 +120,14 @@ proc updateExpiry*(
|
||||
self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry)
|
||||
)
|
||||
|
||||
let res = await allFinishedFailed[?!void](ensuringFutures)
|
||||
if res.failure.len > 0:
|
||||
trace "Some blocks failed to update expiry", len = res.failure.len
|
||||
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
|
||||
let res = await allDone[?!void](ensuringFutures)
|
||||
if res.failed.len > 0:
|
||||
trace "Some blocks failed to update expiry", len = res.failed.len
|
||||
return failure("Some blocks failed to update expiry (" & $res.failed.len & " )")
|
||||
if res.cancelled.len > 0:
|
||||
trace "Block expiry update was cancelled in some blocks", len = res.cancelled.len
|
||||
raise
|
||||
newException(CancelledError, "Block expiry update was cancelled in some blocks")
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
|
||||
@ -163,29 +163,33 @@ proc stop*(s: StorageServer) {.async.} =
|
||||
if s.restServer != nil:
|
||||
futures.add(s.restServer.stop())
|
||||
|
||||
let res = await noCancel allFinishedFailed[void](futures)
|
||||
let res = await noCancel allDone[void](futures)
|
||||
|
||||
s.isStarted = false
|
||||
|
||||
if res.failure.len > 0:
|
||||
error "Failed to stop Storage node", failures = res.failure.len
|
||||
if res.failed.len > 0:
|
||||
error "Failed to stop Storage node", failures = res.failed.len
|
||||
raise newException(
|
||||
StorageError,
|
||||
"Failed to stop Storage node: " & res.failure.mapIt(it.error.msg).join(", "),
|
||||
"Failed to stop Storage node: " & res.failed.mapIt(it.error.msg).join(", "),
|
||||
)
|
||||
if res.cancelled.len > 0:
|
||||
warn "Storage node stop was cancelled due to child stop routine(s) being cancelled, child routines cancelled: ",
|
||||
cancellations = res.cancelled.len
|
||||
raise newException(
|
||||
CancelledError,
|
||||
"Storage node stop was cancelled due to child stop routine(s) being cancelled, child routines cancelled: " &
|
||||
$res.cancelled.len,
|
||||
)
|
||||
|
||||
proc close*(s: StorageServer) {.async.} =
|
||||
var futures =
|
||||
@[s.storageNode.close(), s.repoStore.close(), s.storageNode.discovery.close()]
|
||||
|
||||
let res = await noCancel allFinishedFailed[void](futures)
|
||||
let res = await noCancel allDone[void](futures)
|
||||
|
||||
if not s.taskpool.isNil:
|
||||
try:
|
||||
s.taskpool.shutdown()
|
||||
except Exception as exc:
|
||||
error "Failed to stop the taskpool", failures = res.failure.len
|
||||
raise newException(StorageError, "Failure in taskpool shutdown: " & exc.msg)
|
||||
s.taskpool.shutdown()
|
||||
|
||||
when defaultChroniclesStream.outputs.type.arity >= 3:
|
||||
proc noOutput(logLevel: LogLevel, msg: LogOutputStr) =
|
||||
@ -197,11 +201,19 @@ proc close*(s: StorageServer) {.async.} =
|
||||
if error =? closeFile(s.logFile.get()).errorOption:
|
||||
error "Failed to close log file", errorCode = $error
|
||||
|
||||
if res.failure.len > 0:
|
||||
error "Failed to close Storage node", failures = res.failure.len
|
||||
if res.failed.len > 0:
|
||||
error "Failed to close Storage node", failures = res.failed.len
|
||||
raise newException(
|
||||
StorageError,
|
||||
"Failed to close Storage node: " & res.failure.mapIt(it.error.msg).join(", "),
|
||||
"Failed to close Storage node: " & res.failed.mapIt(it.error.msg).join(", "),
|
||||
)
|
||||
if res.cancelled.len > 0:
|
||||
warn "Storage node close was cancelled due to child close routine(s) being cancelled, child routines cancelled: ",
|
||||
cancellations = res.cancelled.len
|
||||
raise newException(
|
||||
CancelledError,
|
||||
"Storage node close was cancelled due to child close routine(s) being cancelled, child routines cancelled: " &
|
||||
$res.cancelled.len,
|
||||
)
|
||||
|
||||
proc shutdown*(server: StorageServer) {.async.} =
|
||||
|
||||
@ -233,3 +233,18 @@ asyncchecksuite "Test Node - Basic":
|
||||
let randomBlock = bt.Block.new("Random block".toBytes).tryGet()
|
||||
|
||||
check (await node.hasLocalBlock(randomBlock.cid)) == false
|
||||
|
||||
test "updateExpiry succeeds when all blocks have metadata":
|
||||
let
|
||||
md = await storeDataGetManifest(localStore, chunker)
|
||||
expiry = SecondsSince1970(9999999999)
|
||||
let res = await node.updateExpiry(md.manifestCid, expiry)
|
||||
check res.isOk
|
||||
|
||||
test "updateExpiry returns failure when block metadata is missing":
|
||||
let
|
||||
manifest = Manifest.example
|
||||
manifestBlk = (await node.storeManifest(manifest)).tryGet()
|
||||
expiry = SecondsSince1970(9999999999)
|
||||
let res = await node.updateExpiry(manifestBlk.cid, expiry)
|
||||
check res.isErr
|
||||
|
||||
104
tests/storage/testerrors.nim
Normal file
104
tests/storage/testerrors.nim
Normal file
@ -0,0 +1,104 @@
|
||||
import pkg/chronos
|
||||
|
||||
import pkg/storage/errors
|
||||
|
||||
import ../asynctest
|
||||
import ../checktest
|
||||
|
||||
asyncchecksuite "allDone":
|
||||
test "empty sequence completes immediately with all groups empty":
|
||||
let futs = newSeq[Future[void]]()
|
||||
let res = await allDone[void](futs)
|
||||
check res.completed.len == 0
|
||||
check res.failed.len == 0
|
||||
check res.cancelled.len == 0
|
||||
|
||||
test "all completed futures go in the completed group":
|
||||
let
|
||||
fut1 = Future[void].Raising([CancelledError]).init("f1")
|
||||
fut2 = Future[void].Raising([CancelledError]).init("f2")
|
||||
fut1.complete()
|
||||
fut2.complete()
|
||||
let res = await allDone[void](@[fut1, fut2])
|
||||
check res.completed.len == 2
|
||||
check res.failed.len == 0
|
||||
check res.cancelled.len == 0
|
||||
|
||||
test "all failed futures go in the failed group":
|
||||
let
|
||||
fut1 = Future[void].Raising([CancelledError, CatchableError]).init("f1")
|
||||
fut2 = Future[void].Raising([CancelledError, CatchableError]).init("f2")
|
||||
fut1.fail(newException(CatchableError, "e1"))
|
||||
fut2.fail(newException(CatchableError, "e2"))
|
||||
let res = await allDone[void](@[fut1, fut2])
|
||||
check res.completed.len == 0
|
||||
check res.failed.len == 2
|
||||
check res.cancelled.len == 0
|
||||
|
||||
test "all cancelled futures go in the cancelled group":
|
||||
let
|
||||
fut1 = Future[void].Raising([CancelledError]).init("f1")
|
||||
fut2 = Future[void].Raising([CancelledError]).init("f2")
|
||||
await fut1.cancelAndWait()
|
||||
await fut2.cancelAndWait()
|
||||
let res = await allDone[void](@[fut1, fut2])
|
||||
check res.completed.len == 0
|
||||
check res.failed.len == 0
|
||||
check res.cancelled.len == 2
|
||||
|
||||
test "futures are grouped by their end state":
|
||||
let
|
||||
completed =
|
||||
Future[void].Raising([CancelledError, CatchableError]).init("completed")
|
||||
failed = Future[void].Raising([CancelledError, CatchableError]).init("failed")
|
||||
cancelled =
|
||||
Future[void].Raising([CancelledError, CatchableError]).init("cancelled")
|
||||
completed.complete()
|
||||
failed.fail(newException(CatchableError, "e"))
|
||||
await cancelled.cancelAndWait()
|
||||
let res = await allDone[void](@[completed, failed, cancelled])
|
||||
check res.completed.len == 1
|
||||
check res.failed.len == 1
|
||||
check res.cancelled.len == 1
|
||||
check res.completed[0] == completed
|
||||
check res.failed[0] == failed
|
||||
check res.cancelled[0] == cancelled
|
||||
|
||||
test "order within each group matches the original input order":
|
||||
let
|
||||
fut1 = Future[void].Raising([CancelledError]).init("f1")
|
||||
fut2 = Future[void].Raising([CancelledError]).init("f2")
|
||||
fut3 = Future[void].Raising([CancelledError]).init("f3")
|
||||
fut1.complete()
|
||||
fut2.complete()
|
||||
fut3.complete()
|
||||
let res = await allDone[void](@[fut1, fut2, fut3])
|
||||
check res.completed.len == 3
|
||||
check res.completed[0] == fut1
|
||||
check res.completed[1] == fut2
|
||||
check res.completed[2] == fut3
|
||||
|
||||
test "cancelling allDone does not cancel the awaited futures":
|
||||
let
|
||||
fut1 = Future[void].Raising([CancelledError]).init("f1")
|
||||
fut2 = Future[void].Raising([CancelledError]).init("f2")
|
||||
let doneFut = allDone[void](@[fut1, fut2])
|
||||
await doneFut.cancelAndWait()
|
||||
check doneFut.cancelled
|
||||
check not fut1.cancelled
|
||||
check not fut2.cancelled
|
||||
fut1.complete()
|
||||
fut2.complete()
|
||||
|
||||
test "failed Result is grouped as a failed future":
|
||||
let
|
||||
fut1 = Future[Result[int, string]].Raising([CancelledError]).init("f1")
|
||||
fut2 = Future[Result[int, string]].Raising([CancelledError]).init("f2")
|
||||
fut1.complete(Result[int, string].ok(42))
|
||||
fut2.complete(Result[int, string].err("error"))
|
||||
let res = await allDone[Result[int, string]](@[fut1, fut2])
|
||||
check res.completed.len == 1
|
||||
check res.failed.len == 1
|
||||
check res.cancelled.len == 0
|
||||
check res.completed[0] == fut1
|
||||
check res.failed[0] == fut2
|
||||
Loading…
x
Reference in New Issue
Block a user