mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-01-09 20:45:38 +00:00
Merge branch 'master' into feature/blkexc-peer-selection
This commit is contained in:
commit
f0b1e098fc
@ -2,7 +2,6 @@ import std/sugar
|
|||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
import ../logutils
|
import ../logutils
|
||||||
import ./then
|
|
||||||
import ./trackedfutures
|
import ./trackedfutures
|
||||||
|
|
||||||
{.push raises:[].}
|
{.push raises:[].}
|
||||||
|
@ -1,207 +0,0 @@
|
|||||||
import pkg/chronos
|
|
||||||
import pkg/questionable
|
|
||||||
import pkg/questionable/results
|
|
||||||
import pkg/upraises
|
|
||||||
|
|
||||||
# Similar to JavaScript's Promise API, `.then` and `.catch` can be used to
|
|
||||||
# handle results and errors of async `Futures` within a synchronous closure.
|
|
||||||
# They can be used as an alternative to `asyncSpawn` which does not return a
|
|
||||||
# value and will raise a `FutureDefect` if there are unhandled errors
|
|
||||||
# encountered. Both `.then` and `.catch` act as callbacks that do not block the
|
|
||||||
# synchronous closure's flow.
|
|
||||||
|
|
||||||
# `.then` is called when the `Future` is successfully completed and can be
|
|
||||||
# chained as many times as desired, calling each `.then` callback in order. When
|
|
||||||
# the `Future` returns `Result[T, ref CatchableError]` (or `?!T`), the value
|
|
||||||
# called in the `.then` callback will be unpacked from the `Result` as a
|
|
||||||
# convenience. In other words, for `Future[?!T]`, the `.then` callback will take
|
|
||||||
# a single parameter `T`. See `tests/utils/testthen.nim` for more examples. To
|
|
||||||
# allow for chaining, `.then` returns its future. If the future is already
|
|
||||||
# complete, the `.then` callback will be executed immediately.
|
|
||||||
|
|
||||||
# `.catch` is called when the `Future` fails. In the case when the `Future`
|
|
||||||
# returns a `Result[T, ref CatchableError` (or `?!T`), `.catch` will be called
|
|
||||||
# if the `Result` contains an error. If the `Future` is already failed (or
|
|
||||||
# `Future[?!T]` contains an error), the `.catch` callback will be executed
|
|
||||||
# immediately.
|
|
||||||
|
|
||||||
# `.cancelled` is called when the `Future` is cancelled. If the `Future` is
|
|
||||||
# already cancelled, the `.cancelled` callback will be executed immediately.
|
|
||||||
|
|
||||||
# More info on JavaScript's Promise API can be found at:
|
|
||||||
# https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
|
|
||||||
|
|
||||||
runnableExamples:
|
|
||||||
proc asyncProc(): Future[int] {.async.} =
|
|
||||||
await sleepAsync(1.millis)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
asyncProc()
|
|
||||||
.then(proc(i: int) = echo "returned ", i)
|
|
||||||
.catch(proc(e: ref CatchableError) = doAssert false, "will not be triggered")
|
|
||||||
|
|
||||||
# outputs "returned 1"
|
|
||||||
|
|
||||||
proc asyncProcWithError(): Future[int] {.async.} =
|
|
||||||
await sleepAsync(1.millis)
|
|
||||||
raise newException(ValueError, "some error")
|
|
||||||
|
|
||||||
asyncProcWithError()
|
|
||||||
.then(proc(i: int) = doAssert false, "will not be triggered")
|
|
||||||
.catch(proc(e: ref CatchableError) = echo "errored: ", e.msg)
|
|
||||||
|
|
||||||
# outputs "errored: some error"
|
|
||||||
|
|
||||||
type
|
|
||||||
OnSuccess*[T] = proc(val: T) {.gcsafe, upraises: [].}
|
|
||||||
OnError* = proc(err: ref CatchableError) {.gcsafe, upraises: [].}
|
|
||||||
OnCancelled* = proc() {.gcsafe, upraises: [].}
|
|
||||||
|
|
||||||
proc ignoreError(err: ref CatchableError) = discard
|
|
||||||
proc ignoreCancelled() = discard
|
|
||||||
|
|
||||||
template handleFinished(future: FutureBase,
|
|
||||||
onError: OnError,
|
|
||||||
onCancelled: OnCancelled) =
|
|
||||||
|
|
||||||
if not future.finished:
|
|
||||||
return
|
|
||||||
|
|
||||||
if future.cancelled:
|
|
||||||
onCancelled()
|
|
||||||
return
|
|
||||||
|
|
||||||
if future.failed:
|
|
||||||
onError(future.error)
|
|
||||||
return
|
|
||||||
|
|
||||||
proc then*(future: Future[void], onSuccess: OnSuccess[void]): Future[void] =
|
|
||||||
|
|
||||||
proc cb(udata: pointer) =
|
|
||||||
future.handleFinished(ignoreError, ignoreCancelled)
|
|
||||||
onSuccess()
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) =
|
|
||||||
if not future.finished():
|
|
||||||
future.removeCallback(cb)
|
|
||||||
|
|
||||||
future.addCallback(cb)
|
|
||||||
future.cancelCallback = cancellation
|
|
||||||
return future
|
|
||||||
|
|
||||||
proc then*[T](future: Future[T], onSuccess: OnSuccess[T]): Future[T] =
|
|
||||||
|
|
||||||
proc cb(udata: pointer) =
|
|
||||||
future.handleFinished(ignoreError, ignoreCancelled)
|
|
||||||
|
|
||||||
if val =? future.read.catch:
|
|
||||||
onSuccess(val)
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) =
|
|
||||||
if not future.finished():
|
|
||||||
future.removeCallback(cb)
|
|
||||||
|
|
||||||
future.addCallback(cb)
|
|
||||||
future.cancelCallback = cancellation
|
|
||||||
return future
|
|
||||||
|
|
||||||
proc then*[T](future: Future[?!T], onSuccess: OnSuccess[T]): Future[?!T] =
|
|
||||||
|
|
||||||
proc cb(udata: pointer) =
|
|
||||||
future.handleFinished(ignoreError, ignoreCancelled)
|
|
||||||
|
|
||||||
try:
|
|
||||||
if val =? future.read:
|
|
||||||
onSuccess(val)
|
|
||||||
except CatchableError as e:
|
|
||||||
ignoreError(e)
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) =
|
|
||||||
if not future.finished():
|
|
||||||
future.removeCallback(cb)
|
|
||||||
|
|
||||||
future.addCallback(cb)
|
|
||||||
future.cancelCallback = cancellation
|
|
||||||
return future
|
|
||||||
|
|
||||||
proc then*(future: Future[?!void], onSuccess: OnSuccess[void]): Future[?!void] =
|
|
||||||
|
|
||||||
proc cb(udata: pointer) =
|
|
||||||
future.handleFinished(ignoreError, ignoreCancelled)
|
|
||||||
|
|
||||||
try:
|
|
||||||
if future.read.isOk:
|
|
||||||
onSuccess()
|
|
||||||
except CatchableError as e:
|
|
||||||
ignoreError(e)
|
|
||||||
return
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) =
|
|
||||||
if not future.finished():
|
|
||||||
future.removeCallback(cb)
|
|
||||||
|
|
||||||
future.addCallback(cb)
|
|
||||||
future.cancelCallback = cancellation
|
|
||||||
return future
|
|
||||||
|
|
||||||
proc catch*[T](future: Future[T], onError: OnError) =
|
|
||||||
|
|
||||||
if future.isNil: return
|
|
||||||
|
|
||||||
proc cb(udata: pointer) =
|
|
||||||
future.handleFinished(onError, ignoreCancelled)
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) =
|
|
||||||
if not future.finished():
|
|
||||||
future.removeCallback(cb)
|
|
||||||
|
|
||||||
future.addCallback(cb)
|
|
||||||
future.cancelCallback = cancellation
|
|
||||||
|
|
||||||
proc catch*[T](future: Future[?!T], onError: OnError) =
|
|
||||||
|
|
||||||
if future.isNil: return
|
|
||||||
|
|
||||||
proc cb(udata: pointer) =
|
|
||||||
future.handleFinished(onError, ignoreCancelled)
|
|
||||||
|
|
||||||
try:
|
|
||||||
if err =? future.read.errorOption:
|
|
||||||
onError(err)
|
|
||||||
except CatchableError as e:
|
|
||||||
onError(e)
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) =
|
|
||||||
if not future.finished():
|
|
||||||
future.removeCallback(cb)
|
|
||||||
|
|
||||||
future.addCallback(cb)
|
|
||||||
future.cancelCallback = cancellation
|
|
||||||
|
|
||||||
proc cancelled*[T](future: Future[T], onCancelled: OnCancelled): Future[T] =
|
|
||||||
|
|
||||||
proc cb(udata: pointer) =
|
|
||||||
future.handleFinished(ignoreError, onCancelled)
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) =
|
|
||||||
if not future.finished():
|
|
||||||
future.removeCallback(cb)
|
|
||||||
onCancelled()
|
|
||||||
|
|
||||||
future.addCallback(cb)
|
|
||||||
future.cancelCallback = cancellation
|
|
||||||
return future
|
|
||||||
|
|
||||||
proc cancelled*[T](future: Future[?!T], onCancelled: OnCancelled): Future[?!T] =
|
|
||||||
|
|
||||||
proc cb(udata: pointer) =
|
|
||||||
future.handleFinished(ignoreError, onCancelled)
|
|
||||||
|
|
||||||
proc cancellation(udata: pointer) =
|
|
||||||
if not future.finished():
|
|
||||||
future.removeCallback(cb)
|
|
||||||
onCancelled()
|
|
||||||
|
|
||||||
future.addCallback(cb)
|
|
||||||
future.cancelCallback = cancellation
|
|
||||||
return future
|
|
@ -3,7 +3,6 @@ import ./utils/testkeyutils
|
|||||||
import ./utils/testasyncstatemachine
|
import ./utils/testasyncstatemachine
|
||||||
import ./utils/testasynciter
|
import ./utils/testasynciter
|
||||||
import ./utils/testtimer
|
import ./utils/testtimer
|
||||||
import ./utils/testthen
|
|
||||||
import ./utils/testtrackedfutures
|
import ./utils/testtrackedfutures
|
||||||
|
|
||||||
{.warning[UnusedImport]: off.}
|
{.warning[UnusedImport]: off.}
|
||||||
|
@ -1,414 +0,0 @@
|
|||||||
import pkg/chronos
|
|
||||||
import pkg/questionable
|
|
||||||
import pkg/questionable/results
|
|
||||||
import codex/utils/then
|
|
||||||
|
|
||||||
import ../../asynctest
|
|
||||||
import ../helpers
|
|
||||||
|
|
||||||
proc newError(): ref CatchableError =
|
|
||||||
(ref CatchableError)(msg: "some error")
|
|
||||||
|
|
||||||
asyncchecksuite "then - Future[void]":
|
|
||||||
var error = newError()
|
|
||||||
var future: Future[void]
|
|
||||||
|
|
||||||
setup:
|
|
||||||
future = newFuture[void]("test void")
|
|
||||||
|
|
||||||
teardown:
|
|
||||||
if not future.finished:
|
|
||||||
raiseAssert "test should finish future"
|
|
||||||
|
|
||||||
test "then callback is fired when future is already finished":
|
|
||||||
var firedImmediately = false
|
|
||||||
future.complete()
|
|
||||||
discard future.then(proc() = firedImmediately = true)
|
|
||||||
check eventually firedImmediately
|
|
||||||
|
|
||||||
test "then callback is fired after future is finished":
|
|
||||||
var fired = false
|
|
||||||
discard future.then(proc() = fired = true)
|
|
||||||
future.complete()
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "catch callback is fired when future is already failed":
|
|
||||||
var actual: ref CatchableError
|
|
||||||
future.fail(error)
|
|
||||||
future.catch(proc(err: ref CatchableError) = actual = err)
|
|
||||||
check eventually actual == error
|
|
||||||
|
|
||||||
test "catch callback is fired after future is failed":
|
|
||||||
var actual: ref CatchableError
|
|
||||||
future.catch(proc(err: ref CatchableError) = actual = err)
|
|
||||||
future.fail(error)
|
|
||||||
check eventually actual == error
|
|
||||||
|
|
||||||
test "cancelled callback is fired when future is already cancelled":
|
|
||||||
var fired = false
|
|
||||||
await future.cancelAndWait()
|
|
||||||
discard future.cancelled(proc() = fired = true)
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "cancelled callback is fired after future is cancelled":
|
|
||||||
var fired = false
|
|
||||||
discard future.cancelled(proc() = fired = true)
|
|
||||||
await future.cancelAndWait()
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "does not fire other callbacks when successful":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc() = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
future.complete()
|
|
||||||
|
|
||||||
check eventually onSuccessCalled
|
|
||||||
check always (not onCancelledCalled and not onCatchCalled)
|
|
||||||
|
|
||||||
test "does not fire other callbacks when fails":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc() = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
future.fail(error)
|
|
||||||
|
|
||||||
check eventually onCatchCalled
|
|
||||||
check always (not onCancelledCalled and not onSuccessCalled)
|
|
||||||
|
|
||||||
test "does not fire other callbacks when cancelled":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc() = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
await future.cancelAndWait()
|
|
||||||
|
|
||||||
check eventually onCancelledCalled
|
|
||||||
check always (not onSuccessCalled and not onCatchCalled)
|
|
||||||
|
|
||||||
test "can chain onSuccess when future completes":
|
|
||||||
var onSuccessCalledTimes = 0
|
|
||||||
discard future
|
|
||||||
.then(proc() = inc onSuccessCalledTimes)
|
|
||||||
.then(proc() = inc onSuccessCalledTimes)
|
|
||||||
.then(proc() = inc onSuccessCalledTimes)
|
|
||||||
future.complete()
|
|
||||||
check eventually onSuccessCalledTimes == 3
|
|
||||||
|
|
||||||
asyncchecksuite "then - Future[T]":
|
|
||||||
var error = newError()
|
|
||||||
var future: Future[int]
|
|
||||||
|
|
||||||
setup:
|
|
||||||
future = newFuture[int]("test void")
|
|
||||||
|
|
||||||
teardown:
|
|
||||||
if not future.finished:
|
|
||||||
raiseAssert "test should finish future"
|
|
||||||
|
|
||||||
test "then callback is fired when future is already finished":
|
|
||||||
var cbVal = 0
|
|
||||||
future.complete(1)
|
|
||||||
discard future.then(proc(val: int) = cbVal = val)
|
|
||||||
check eventually cbVal == 1
|
|
||||||
|
|
||||||
test "then callback is fired after future is finished":
|
|
||||||
var cbVal = 0
|
|
||||||
discard future.then(proc(val: int) = cbVal = val)
|
|
||||||
future.complete(1)
|
|
||||||
check eventually cbVal == 1
|
|
||||||
|
|
||||||
test "catch callback is fired when future is already failed":
|
|
||||||
var actual: ref CatchableError
|
|
||||||
future.fail(error)
|
|
||||||
future.catch(proc(err: ref CatchableError) = actual = err)
|
|
||||||
check eventually actual == error
|
|
||||||
|
|
||||||
test "catch callback is fired after future is failed":
|
|
||||||
var actual: ref CatchableError
|
|
||||||
future.catch(proc(err: ref CatchableError) = actual = err)
|
|
||||||
future.fail(error)
|
|
||||||
check eventually actual == error
|
|
||||||
|
|
||||||
test "cancelled callback is fired when future is already cancelled":
|
|
||||||
var fired = false
|
|
||||||
await future.cancelAndWait()
|
|
||||||
discard future.cancelled(proc() = fired = true)
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "cancelled callback is fired after future is cancelled":
|
|
||||||
var fired = false
|
|
||||||
discard future.cancelled(proc() = fired = true)
|
|
||||||
await future.cancelAndWait()
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "does not fire other callbacks when successful":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc(val: int) = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
future.complete(1)
|
|
||||||
|
|
||||||
check eventually onSuccessCalled
|
|
||||||
check always (not onCancelledCalled and not onCatchCalled)
|
|
||||||
|
|
||||||
test "does not fire other callbacks when fails":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc(val: int) = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
future.fail(error)
|
|
||||||
|
|
||||||
check eventually onCatchCalled
|
|
||||||
check always (not onCancelledCalled and not onSuccessCalled)
|
|
||||||
|
|
||||||
test "does not fire other callbacks when cancelled":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc(val: int) = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
await future.cancelAndWait()
|
|
||||||
|
|
||||||
check eventually onCancelledCalled
|
|
||||||
check always (not onSuccessCalled and not onCatchCalled)
|
|
||||||
|
|
||||||
test "can chain onSuccess when future completes":
|
|
||||||
var onSuccessCalledTimes = 0
|
|
||||||
discard future
|
|
||||||
.then(proc(val: int) = inc onSuccessCalledTimes)
|
|
||||||
.then(proc(val: int) = inc onSuccessCalledTimes)
|
|
||||||
.then(proc(val: int) = inc onSuccessCalledTimes)
|
|
||||||
future.complete(1)
|
|
||||||
check eventually onSuccessCalledTimes == 3
|
|
||||||
|
|
||||||
asyncchecksuite "then - Future[?!void]":
|
|
||||||
var error = newError()
|
|
||||||
var future: Future[?!void]
|
|
||||||
|
|
||||||
setup:
|
|
||||||
future = newFuture[?!void]("test void")
|
|
||||||
|
|
||||||
teardown:
|
|
||||||
if not future.finished:
|
|
||||||
raiseAssert "test should finish future"
|
|
||||||
|
|
||||||
test "then callback is fired when future is already finished":
|
|
||||||
var firedImmediately = false
|
|
||||||
future.complete(success())
|
|
||||||
discard future.then(proc() = firedImmediately = true)
|
|
||||||
check eventually firedImmediately
|
|
||||||
|
|
||||||
test "then callback is fired after future is finished":
|
|
||||||
var fired = false
|
|
||||||
discard future.then(proc() = fired = true)
|
|
||||||
future.complete(success())
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "catch callback is fired when future is already failed":
|
|
||||||
var actual: ref CatchableError
|
|
||||||
future.fail(error)
|
|
||||||
future.catch(proc(err: ref CatchableError) = actual = err)
|
|
||||||
check eventually actual == error
|
|
||||||
|
|
||||||
test "catch callback is fired after future is failed":
|
|
||||||
var actual: ref CatchableError
|
|
||||||
future.catch(proc(err: ref CatchableError) = actual = err)
|
|
||||||
future.fail(error)
|
|
||||||
check eventually actual == error
|
|
||||||
|
|
||||||
test "cancelled callback is fired when future is already cancelled":
|
|
||||||
var fired = false
|
|
||||||
await future.cancelAndWait()
|
|
||||||
discard future.cancelled(proc() = fired = true)
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "cancelled callback is fired after future is cancelled":
|
|
||||||
var fired = false
|
|
||||||
discard future.cancelled(proc() = fired = true)
|
|
||||||
await future.cancelAndWait()
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "does not fire other callbacks when successful":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc() = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
future.complete(success())
|
|
||||||
|
|
||||||
check eventually onSuccessCalled
|
|
||||||
check always (not onCancelledCalled and not onCatchCalled)
|
|
||||||
|
|
||||||
test "does not fire other callbacks when fails":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc() = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
future.fail(error)
|
|
||||||
|
|
||||||
check eventually onCatchCalled
|
|
||||||
check always (not onCancelledCalled and not onSuccessCalled)
|
|
||||||
|
|
||||||
test "does not fire other callbacks when cancelled":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc() = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
await future.cancelAndWait()
|
|
||||||
|
|
||||||
check eventually onCancelledCalled
|
|
||||||
check always (not onSuccessCalled and not onCatchCalled)
|
|
||||||
|
|
||||||
test "can chain onSuccess when future completes":
|
|
||||||
var onSuccessCalledTimes = 0
|
|
||||||
discard future
|
|
||||||
.then(proc() = inc onSuccessCalledTimes)
|
|
||||||
.then(proc() = inc onSuccessCalledTimes)
|
|
||||||
.then(proc() = inc onSuccessCalledTimes)
|
|
||||||
future.complete(success())
|
|
||||||
check eventually onSuccessCalledTimes == 3
|
|
||||||
|
|
||||||
asyncchecksuite "then - Future[?!T]":
|
|
||||||
var error = newError()
|
|
||||||
var future: Future[?!int]
|
|
||||||
|
|
||||||
setup:
|
|
||||||
future = newFuture[?!int]("test void")
|
|
||||||
|
|
||||||
teardown:
|
|
||||||
if not future.finished:
|
|
||||||
raiseAssert "test should finish future"
|
|
||||||
|
|
||||||
test "then callback is fired when future is already finished":
|
|
||||||
var cbVal = 0
|
|
||||||
future.complete(success(1))
|
|
||||||
discard future.then(proc(val: int) = cbVal = val)
|
|
||||||
check eventually cbVal == 1
|
|
||||||
|
|
||||||
test "then callback is fired after future is finished":
|
|
||||||
var cbVal = 0
|
|
||||||
discard future.then(proc(val: int) = cbVal = val)
|
|
||||||
future.complete(success(1))
|
|
||||||
check eventually cbVal == 1
|
|
||||||
|
|
||||||
test "catch callback is fired when future is already failed":
|
|
||||||
var actual: ref CatchableError
|
|
||||||
future.fail(error)
|
|
||||||
future.catch(proc(err: ref CatchableError) = actual = err)
|
|
||||||
check eventually actual == error
|
|
||||||
|
|
||||||
test "catch callback is fired after future is failed":
|
|
||||||
var actual: ref CatchableError
|
|
||||||
future.catch(proc(err: ref CatchableError) = actual = err)
|
|
||||||
future.fail(error)
|
|
||||||
check eventually actual == error
|
|
||||||
|
|
||||||
test "cancelled callback is fired when future is already cancelled":
|
|
||||||
var fired = false
|
|
||||||
await future.cancelAndWait()
|
|
||||||
discard future.cancelled(proc() = fired = true)
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "cancelled callback is fired after future is cancelled":
|
|
||||||
var fired = false
|
|
||||||
discard future.cancelled(proc() = fired = true)
|
|
||||||
await future.cancelAndWait()
|
|
||||||
check eventually fired
|
|
||||||
|
|
||||||
test "does not fire other callbacks when successful":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc(val: int) = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
future.complete(success(1))
|
|
||||||
|
|
||||||
check eventually onSuccessCalled
|
|
||||||
check always (not onCancelledCalled and not onCatchCalled)
|
|
||||||
|
|
||||||
test "does not fire other callbacks when fails":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc(val: int) = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
future.fail(error)
|
|
||||||
|
|
||||||
check eventually onCatchCalled
|
|
||||||
check always (not onCancelledCalled and not onSuccessCalled)
|
|
||||||
|
|
||||||
test "does not fire other callbacks when cancelled":
|
|
||||||
var onSuccessCalled = false
|
|
||||||
var onCancelledCalled = false
|
|
||||||
var onCatchCalled = false
|
|
||||||
|
|
||||||
future
|
|
||||||
.then(proc(val: int) = onSuccessCalled = true)
|
|
||||||
.cancelled(proc() = onCancelledCalled = true)
|
|
||||||
.catch(proc(e: ref CatchableError) = onCatchCalled = true)
|
|
||||||
|
|
||||||
await future.cancelAndWait()
|
|
||||||
|
|
||||||
check eventually onCancelledCalled
|
|
||||||
check always (not onSuccessCalled and not onCatchCalled)
|
|
||||||
|
|
||||||
test "can chain onSuccess when future completes":
|
|
||||||
var onSuccessCalledTimes = 0
|
|
||||||
discard future
|
|
||||||
.then(proc(val: int) = inc onSuccessCalledTimes)
|
|
||||||
.then(proc(val: int) = inc onSuccessCalledTimes)
|
|
||||||
.then(proc(val: int) = inc onSuccessCalledTimes)
|
|
||||||
future.complete(success(1))
|
|
||||||
check eventually onSuccessCalledTimes == 3
|
|
@ -34,10 +34,10 @@ method startedOutput(node: CodexProcess): string =
|
|||||||
method processOptions(node: CodexProcess): set[AsyncProcessOption] =
|
method processOptions(node: CodexProcess): set[AsyncProcessOption] =
|
||||||
return {AsyncProcessOption.StdErrToStdOut}
|
return {AsyncProcessOption.StdErrToStdOut}
|
||||||
|
|
||||||
method outputLineEndings(node: CodexProcess): string =
|
method outputLineEndings(node: CodexProcess): string {.raises: [].} =
|
||||||
return "\n"
|
return "\n"
|
||||||
|
|
||||||
method onOutputLineCaptured(node: CodexProcess, line: string) =
|
method onOutputLineCaptured(node: CodexProcess, line: string) {.raises: [].} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc dataDir(node: CodexProcess): string =
|
proc dataDir(node: CodexProcess): string =
|
||||||
|
@ -37,7 +37,7 @@ method startedOutput(node: HardhatProcess): string =
|
|||||||
method processOptions(node: HardhatProcess): set[AsyncProcessOption] =
|
method processOptions(node: HardhatProcess): set[AsyncProcessOption] =
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
method outputLineEndings(node: HardhatProcess): string =
|
method outputLineEndings(node: HardhatProcess): string {.raises: [].} =
|
||||||
return "\n"
|
return "\n"
|
||||||
|
|
||||||
proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle =
|
proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle =
|
||||||
|
@ -38,10 +38,10 @@ method startedOutput(node: NodeProcess): string {.base.} =
|
|||||||
method processOptions(node: NodeProcess): set[AsyncProcessOption] {.base.} =
|
method processOptions(node: NodeProcess): set[AsyncProcessOption] {.base.} =
|
||||||
raiseAssert "not implemented"
|
raiseAssert "not implemented"
|
||||||
|
|
||||||
method outputLineEndings(node: NodeProcess): string {.base.} =
|
method outputLineEndings(node: NodeProcess): string {.base, raises: [].} =
|
||||||
raiseAssert "not implemented"
|
raiseAssert "not implemented"
|
||||||
|
|
||||||
method onOutputLineCaptured(node: NodeProcess, line: string) {.base.} =
|
method onOutputLineCaptured(node: NodeProcess, line: string) {.base, raises: [].} =
|
||||||
raiseAssert "not implemented"
|
raiseAssert "not implemented"
|
||||||
|
|
||||||
method start*(node: NodeProcess) {.base, async.} =
|
method start*(node: NodeProcess) {.base, async.} =
|
||||||
@ -74,7 +74,7 @@ proc captureOutput(
|
|||||||
node: NodeProcess,
|
node: NodeProcess,
|
||||||
output: string,
|
output: string,
|
||||||
started: Future[void]
|
started: Future[void]
|
||||||
) {.async.} =
|
) {.async: (raises: []).} =
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
nodeName = node.name
|
nodeName = node.name
|
||||||
@ -98,7 +98,10 @@ proc captureOutput(
|
|||||||
await sleepAsync(1.millis)
|
await sleepAsync(1.millis)
|
||||||
await sleepAsync(1.millis)
|
await sleepAsync(1.millis)
|
||||||
|
|
||||||
except AsyncStreamReadError as e:
|
except CancelledError:
|
||||||
|
discard # do not propagate as captureOutput was asyncSpawned
|
||||||
|
|
||||||
|
except AsyncStreamError as e:
|
||||||
error "error reading output stream", error = e.msgDetail
|
error "error reading output stream", error = e.msgDetail
|
||||||
|
|
||||||
proc startNode*[T: NodeProcess](
|
proc startNode*[T: NodeProcess](
|
||||||
@ -155,7 +158,8 @@ proc waitUntilStarted*(node: NodeProcess) {.async.} =
|
|||||||
|
|
||||||
let started = newFuture[void]()
|
let started = newFuture[void]()
|
||||||
try:
|
try:
|
||||||
discard node.captureOutput(node.startedOutput, started).track(node)
|
let fut = node.captureOutput(node.startedOutput, started).track(node)
|
||||||
|
asyncSpawn fut
|
||||||
await started.wait(60.seconds) # allow enough time for proof generation
|
await started.wait(60.seconds) # allow enough time for proof generation
|
||||||
trace "node started"
|
trace "node started"
|
||||||
except AsyncTimeoutError:
|
except AsyncTimeoutError:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user