From 3e80de34542afd887b63feea4edfac9e1bfd2081 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Mon, 31 Jul 2023 15:09:34 +1000 Subject: [PATCH] Cancel and wait for asyncstatemachine futures when stopping (#493) * Simplify `.then` (promise api) and tests * Remove tracked future when cancelled. Add tracked future tests * Track and cancel statemachine futures The futures created in each asyncstatemachine instance are tracked, and each future is cancelled and waited in `stop`. Change `asyncstatemachine.stop` to be async so `machine.trackedFutures.cancelAndWait` could be called. Add a constructor for `asyncstatemachine` that initialises the `trackedFutures` instance, and call the constructor from derived class constructors. --- codex/purchasing/purchase.nim | 15 +- codex/sales.nim | 2 +- codex/sales/salesagent.nim | 15 +- codex/sales/slotqueue.nim | 3 +- codex/utils/asyncstatemachine.nim | 46 +- codex/utils/then.nim | 137 ++--- codex/{sales => utils}/trackedfutures.nim | 27 +- tests/codex/sales/testsalesagent.nim | 1 + tests/codex/testutils.nim | 1 + tests/codex/utils/testasyncstatemachine.nim | 4 +- tests/codex/utils/testthen.nim | 621 +++++++++++--------- tests/codex/utils/testtrackedfutures.nim | 67 +++ 12 files changed, 536 insertions(+), 403 deletions(-) rename codex/{sales => utils}/trackedfutures.nim (64%) create mode 100644 tests/codex/utils/testtrackedfutures.nim diff --git a/codex/purchasing/purchase.nim b/codex/purchasing/purchase.nim index 07498044..add60a2f 100644 --- a/codex/purchasing/purchase.nim +++ b/codex/purchasing/purchase.nim @@ -31,13 +31,14 @@ func new*( clock: Clock ): Purchase = ## create a new instance of a Purchase - ## - Purchase( - future: Future[void].new(), - requestId: requestId, - market: market, - clock: clock - ) + ## + var purchase = Purchase.new() + purchase.future = Future[void].new() + purchase.requestId = requestId + purchase.market = market + purchase.clock = clock + + return purchase func new*( _: type Purchase, diff --git a/codex/sales.nim b/codex/sales.nim index 8a7f0cb5..c025d867 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -15,10 +15,10 @@ import ./sales/salescontext import ./sales/salesagent import ./sales/statemachine import ./sales/slotqueue -import ./sales/trackedfutures import ./sales/states/preparing import ./sales/states/unknown import ./utils/then +import ./utils/trackedfutures ## Sales holds a list of available storage that it may sell. ## diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index ef7b255b..3f84ff9b 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -32,12 +32,13 @@ proc newSalesAgent*(context: SalesContext, requestId: RequestId, slotIndex: UInt256, request: ?StorageRequest): SalesAgent = - SalesAgent( - context: context, - data: SalesData( - requestId: requestId, - slotIndex: slotIndex, - request: request)) + var agent = SalesAgent.new() + agent.context = context + agent.data = SalesData( + requestId: requestId, + slotIndex: slotIndex, + request: request) + return agent proc retrieveRequest*(agent: SalesAgent) {.async.} = let data = agent.data @@ -96,5 +97,5 @@ proc unsubscribe*(agent: SalesAgent) {.async.} = agent.subscribed = false proc stop*(agent: SalesAgent) {.async.} = - procCall Machine(agent).stop() + await Machine(agent).stop() await agent.unsubscribe() diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index b512360d..80e95aec 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -1,5 +1,4 @@ import std/sequtils -import std/sugar import std/tables import pkg/chronicles import pkg/chronos @@ -7,13 +6,13 @@ import pkg/questionable import pkg/questionable/results import pkg/upraises import ./reservations -import ./trackedfutures import ../errors import ../rng import ../utils import ../contracts/requests import ../utils/asyncheapqueue import ../utils/then +import ../utils/trackedfutures logScope: topics = "marketplace slotqueue" diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index 13392008..3d49f741 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -1,7 +1,10 @@ +import std/sugar import pkg/questionable import pkg/chronos import pkg/chronicles import pkg/upraises +import ./trackedfutures +import ./then push: {.upraises:[].} @@ -10,8 +13,8 @@ type state: State running: Future[void] scheduled: AsyncQueue[Event] - scheduling: Future[void] started: bool + trackedFutures: TrackedFutures State* = ref object of RootObj Query*[T] = proc(state: State): T Event* = proc(state: State): ?State {.gcsafe, upraises:[].} @@ -19,6 +22,9 @@ type logScope: topics = "statemachine" +proc new*[T: Machine](_: type T): T = + T(trackedFutures: TrackedFutures.new()) + method `$`*(state: State): string {.base.} = raiseAssert "not implemented" @@ -60,21 +66,21 @@ proc run(machine: Machine, state: State) {.async.} = discard proc scheduler(machine: Machine) {.async.} = - proc onRunComplete(udata: pointer) {.gcsafe.} = - var fut = cast[FutureBase](udata) - if fut.failed(): - machine.schedule(machine.onError(fut.error)) - + var running: Future[void] try: - while true: - let event = await machine.scheduled.get() + while machine.started: + let event = await machine.scheduled.get().track(machine) if next =? event(machine.state): - if not machine.running.isNil: - await machine.running.cancelAndWait() + if not running.isNil and not running.finished: + await running.cancelAndWait() machine.state = next debug "enter state", state = machine.state - machine.running = machine.run(machine.state) - machine.running.addCallback(onRunComplete) + running = machine.run(machine.state) + running + .track(machine) + .catch((err: ref CatchableError) => + machine.schedule(machine.onError(err)) + ) except CancelledError: discard @@ -84,18 +90,20 @@ proc start*(machine: Machine, initialState: State) = if machine.scheduled.isNil: machine.scheduled = newAsyncQueue[Event]() - machine.scheduling = machine.scheduler() + machine.started = true + machine.scheduler() + .track(machine) + .catch((err: ref CatchableError) => + error("Error in scheduler", error = err.msg) + ) machine.schedule(Event.transition(machine.state, initialState)) -proc stop*(machine: Machine) = +proc stop*(machine: Machine) {.async.} = if not machine.started: return - if not machine.scheduling.isNil: - machine.scheduling.cancel() - if not machine.running.isNil: - machine.running.cancel() + machine.started = false + await machine.trackedFutures.cancelTracked() machine.state = nil - machine.started = false diff --git a/codex/utils/then.nim b/codex/utils/then.nim index 2bb5699e..fbcf7bf3 100644 --- a/codex/utils/then.nim +++ b/codex/utils/then.nim @@ -22,12 +22,11 @@ import pkg/upraises # `.catch` is called when the `Future` fails. In the case when the `Future` # returns a `Result[T, ref CatchableError` (or `?!T`), `.catch` will be called # if the `Result` contains an error. If the `Future` is already failed (or -# `Future[?!T]` contains an error), the `.catch` callback will be excuted +# `Future[?!T]` contains an error), the `.catch` callback will be executed # immediately. -# NOTE: Cancelled `Futures` are discarded as bubbling the `CancelledError` to -# the synchronous closure will likely cause an unintended and unhandled -# exception. +# `.cancelled` is called when the `Future` is cancelled. If the `Future` is +# already cancelled, the `.cancelled` callback will be executed immediately. # More info on JavaScript's Promise API can be found at: # https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise @@ -56,44 +55,30 @@ runnableExamples: type OnSuccess*[T] = proc(val: T) {.gcsafe, upraises: [].} OnError* = proc(err: ref CatchableError) {.gcsafe, upraises: [].} + OnCancelled* = proc() {.gcsafe, upraises: [].} proc ignoreError(err: ref CatchableError) = discard +proc ignoreCancelled() = discard + +template handleFinished(future: FutureBase, + onError: OnError, + onCancelled: OnCancelled) = -template returnOrError(future: FutureBase, onError: OnError) = if not future.finished: return if future.cancelled: - # do not bubble as closure is synchronous + onCancelled() return if future.failed: onError(future.error) return - -proc then*(future: Future[void], - onError: OnError): - Future[void] = +proc then*(future: Future[void], onSuccess: OnSuccess[void]): Future[void] = proc cb(udata: pointer) = - future.returnOrError(onError) - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - - future.addCallback(cb) - future.cancelCallback = cancellation - return future - -proc then*(future: Future[void], - onSuccess: OnSuccess[void], - onError: OnError = ignoreError): - Future[void] = - - proc cb(udata: pointer) = - future.returnOrError(onError) + future.handleFinished(ignoreError, ignoreCancelled) onSuccess() proc cancellation(udata: pointer) = @@ -104,42 +89,13 @@ proc then*(future: Future[void], future.cancelCallback = cancellation return future -proc then*[T](future: Future[T], - onSuccess: OnSuccess[T], - onError: OnError = ignoreError): - Future[T] = +proc then*[T](future: Future[T], onSuccess: OnSuccess[T]): Future[T] = proc cb(udata: pointer) = - future.returnOrError(onError) + future.handleFinished(ignoreError, ignoreCancelled) - without val =? future.read.catch, err: - onError(err) - return - onSuccess(val) - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - - future.addCallback(cb) - future.cancelCallback = cancellation - return future - -proc then*[T](future: Future[?!T], - onSuccess: OnSuccess[T], - onError: OnError = ignoreError): - Future[?!T] = - - proc cb(udata: pointer) = - future.returnOrError(onError) - - try: - without val =? future.read, err: - onError(err) - return + if val =? future.read.catch: onSuccess(val) - except CatchableError as e: - onError(e) proc cancellation(udata: pointer) = if not future.finished(): @@ -149,18 +105,16 @@ proc then*[T](future: Future[?!T], future.cancelCallback = cancellation return future -proc then*(future: Future[?!void], - onError: OnError = ignoreError): - Future[?!void] = +proc then*[T](future: Future[?!T], onSuccess: OnSuccess[T]): Future[?!T] = proc cb(udata: pointer) = - future.returnOrError(onError) + future.handleFinished(ignoreError, ignoreCancelled) try: - if err =? future.read.errorOption: - onError(err) + if val =? future.read: + onSuccess(val) except CatchableError as e: - onError(e) + ignoreError(e) proc cancellation(udata: pointer) = if not future.finished(): @@ -170,22 +124,17 @@ proc then*(future: Future[?!void], future.cancelCallback = cancellation return future -proc then*(future: Future[?!void], - onSuccess: OnSuccess[void], - onError: OnError = ignoreError): - Future[?!void] = +proc then*(future: Future[?!void], onSuccess: OnSuccess[void]): Future[?!void] = proc cb(udata: pointer) = - future.returnOrError(onError) + future.handleFinished(ignoreError, ignoreCancelled) try: - if err =? future.read.errorOption: - onError(err) - return + if future.read.isOk: + onSuccess() except CatchableError as e: - onError(e) + ignoreError(e) return - onSuccess() proc cancellation(udata: pointer) = if not future.finished(): @@ -197,8 +146,10 @@ proc then*(future: Future[?!void], proc catch*[T](future: Future[T], onError: OnError) = + if future.isNil: return + proc cb(udata: pointer) = - future.returnOrError(onError) + future.handleFinished(onError, ignoreCancelled) proc cancellation(udata: pointer) = if not future.finished(): @@ -209,8 +160,10 @@ proc catch*[T](future: Future[T], onError: OnError) = proc catch*[T](future: Future[?!T], onError: OnError) = + if future.isNil: return + proc cb(udata: pointer) = - future.returnOrError(onError) + future.handleFinished(onError, ignoreCancelled) try: if err =? future.read.errorOption: @@ -224,3 +177,31 @@ proc catch*[T](future: Future[?!T], onError: OnError) = future.addCallback(cb) future.cancelCallback = cancellation + +proc cancelled*[T](future: Future[T], onCancelled: OnCancelled): Future[T] = + + proc cb(udata: pointer) = + future.handleFinished(ignoreError, onCancelled) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + onCancelled() + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc cancelled*[T](future: Future[?!T], onCancelled: OnCancelled): Future[?!T] = + + proc cb(udata: pointer) = + future.handleFinished(ignoreError, onCancelled) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + onCancelled() + + future.addCallback(cb) + future.cancelCallback = cancellation + return future diff --git a/codex/sales/trackedfutures.nim b/codex/utils/trackedfutures.nim similarity index 64% rename from codex/sales/trackedfutures.nim rename to codex/utils/trackedfutures.nim index b9d9a565..ea26c4ae 100644 --- a/codex/sales/trackedfutures.nim +++ b/codex/utils/trackedfutures.nim @@ -12,21 +12,25 @@ type logScope: topics = "trackable futures" -proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] = - logScope: - id = fut.id +proc len*(self: TrackedFutures): int = self.futures.len - proc removeFuture() = - if not self.cancelling and not fut.isNil: - trace "removing tracked future" - self.futures.del(fut.id) +proc removeFuture(self: TrackedFutures, future: FutureBase) = + if not self.cancelling and not future.isNil: + trace "removing tracked future" + self.futures.del(future.id) + +proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] = + if self.cancelling: + return fut + + trace "tracking future", id = fut.id + self.futures[fut.id] = FutureBase(fut) fut - .then((val: T) => removeFuture()) - .catch((e: ref CatchableError) => removeFuture()) + .then((val: T) => self.removeFuture(fut)) + .cancelled(() => self.removeFuture(fut)) + .catch((e: ref CatchableError) => self.removeFuture(fut)) - trace "tracking future" - self.futures[fut.id] = FutureBase(fut) return fut proc track*[T, U](future: Future[T], self: U): Future[T] = @@ -43,4 +47,5 @@ proc cancelTracked*(self: TrackedFutures) {.async.} = trace "cancelling tracked future", id = future.id await future.cancelAndWait() + self.futures.clear() self.cancelling = false diff --git a/tests/codex/sales/testsalesagent.nim b/tests/codex/sales/testsalesagent.nim index 90951ff6..690d7902 100644 --- a/tests/codex/sales/testsalesagent.nim +++ b/tests/codex/sales/testsalesagent.nim @@ -10,6 +10,7 @@ import pkg/codex/proving import ../helpers/mockmarket import ../helpers/mockclock import ../helpers/eventually +import ../helpers import ../examples var onCancelCalled = false diff --git a/tests/codex/testutils.nim b/tests/codex/testutils.nim index 0e602db2..6b4b2366 100644 --- a/tests/codex/testutils.nim +++ b/tests/codex/testutils.nim @@ -3,5 +3,6 @@ import ./utils/testkeyutils import ./utils/testasyncstatemachine import ./utils/testtimer import ./utils/testthen +import ./utils/testtrackedfutures {.warning[UnusedImport]: off.} diff --git a/tests/codex/utils/testasyncstatemachine.nim b/tests/codex/utils/testasyncstatemachine.nim index eb0de264..4336657a 100644 --- a/tests/codex/utils/testasyncstatemachine.nim +++ b/tests/codex/utils/testasyncstatemachine.nim @@ -99,7 +99,7 @@ asyncchecksuite "async state machines": test "stops scheduling and current state": machine.start(State2.new()) await sleepAsync(1.millis) - machine.stop() + await machine.stop() machine.schedule(moveToNextStateEvent) await sleepAsync(1.millis) check runs == [0, 1, 0, 0] @@ -130,5 +130,5 @@ asyncchecksuite "async state machines": machine.start(State2.new()) check eventually machine.query(description).isSome - machine.stop() + await machine.stop() check machine.query(description).isNone diff --git a/tests/codex/utils/testthen.nim b/tests/codex/utils/testthen.nim index 7a5037bf..0b73d9ef 100644 --- a/tests/codex/utils/testthen.nim +++ b/tests/codex/utils/testthen.nim @@ -5,340 +5,409 @@ import pkg/questionable/results import codex/utils/then import ../helpers +proc newError(): ref CatchableError = + (ref CatchableError)(msg: "some error") + asyncchecksuite "then - Future[void]": - var returnsVoidWasRun: bool - var error = (ref CatchableError)(msg: "some error") + var error = newError() + var future: Future[void] setup: - returnsVoidWasRun = false + future = newFuture[void]("test void") - proc returnsVoid() {.async.} = - await sleepAsync 1.millis - returnsVoidWasRun = true + teardown: + if not future.finished: + raiseAssert "test should finish future" - proc returnsVoidError() {.async.} = - raise error + test "then callback is fired when future is already finished": + var firedImmediately = false + future.complete() + discard future.then(proc() = firedImmediately = true) + check eventually firedImmediately - proc returnsVoidCancelled() {.async.} = - await sleepAsync(1.seconds) + test "then callback is fired after future is finished": + var fired = false + discard future.then(proc() = fired = true) + future.complete() + check eventually fired - proc wasCancelled(error: ref CancelledError): bool = - not error.isNil and error.msg == "Future operation cancelled!" + test "catch callback is fired when future is already failed": + var actual: ref CatchableError + future.fail(error) + future.catch(proc(err: ref CatchableError) = actual = err) + check eventually actual == error - test "calls async proc when returns Future[void]": - discard returnsVoid().then( - proc(err: ref CatchableError) = discard - ) - check eventually returnsVoidWasRun + test "catch callback is fired after future is failed": + var actual: ref CatchableError + future.catch(proc(err: ref CatchableError) = actual = err) + future.fail(error) + check eventually actual == error - test "calls onSuccess when Future[void] complete": + test "cancelled callback is fired when future is already cancelled": + var fired = false + await future.cancelAndWait() + discard future.cancelled(proc() = fired = true) + check eventually fired + + test "cancelled callback is fired after future is cancelled": + var fired = false + discard future.cancelled(proc() = fired = true) + await future.cancelAndWait() + check eventually fired + + test "does not fire other callbacks when successful": var onSuccessCalled = false - discard returnsVoid().then( - proc() = onSuccessCalled = true, - proc(err: ref CatchableError) = discard - ) - check eventually returnsVoidWasRun - check eventually onSuccessCalled + var onCancelledCalled = false + var onCatchCalled = false - test "can pass only onSuccess for Future[void]": + future + .then(proc() = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + future.complete() + + check eventually onSuccessCalled + check always (not onCancelledCalled and not onCatchCalled) + + test "does not fire other callbacks when fails": var onSuccessCalled = false - discard returnsVoid().then( - proc() = onSuccessCalled = true - ) - check eventually returnsVoidWasRun - check eventually onSuccessCalled + var onCancelledCalled = false + var onCatchCalled = false - test "can chain onSuccess when Future[void] complete": + future + .then(proc() = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + future.fail(error) + + check eventually onCatchCalled + check always (not onCancelledCalled and not onSuccessCalled) + + test "does not fire other callbacks when cancelled": + var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false + + future + .then(proc() = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + await future.cancelAndWait() + + check eventually onCancelledCalled + check always (not onSuccessCalled and not onCatchCalled) + + test "can chain onSuccess when future completes": var onSuccessCalledTimes = 0 - discard returnsVoid() + discard future .then(proc() = inc onSuccessCalledTimes) .then(proc() = inc onSuccessCalledTimes) .then(proc() = inc onSuccessCalledTimes) + future.complete() check eventually onSuccessCalledTimes == 3 - test "calls onError when Future[void] fails": - var errorActual: ref CatchableError - discard returnsVoidError().then( - proc() = discard, - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "calls onError when Future[void] fails": - var errorActual: ref CatchableError - discard returnsVoidError().then( - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "catch callback fired when Future[void] fails": - var errorActual: ref CatchableError - returnsVoidError().catch( - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "does not fire onSuccess callback when Future[void] fails": - var onSuccessCalled = false - - returnsVoidError() - .then(proc() = onSuccessCalled = true) - .then(proc() = onSuccessCalled = true) - .catch(proc(e: ref CatchableError) = discard) - - check always (not onSuccessCalled) - asyncchecksuite "then - Future[T]": - var returnsValWasRun: bool - var error = (ref CatchableError)(msg: "some error") + var error = newError() + var future: Future[int] setup: - returnsValWasRun = false + future = newFuture[int]("test void") - proc returnsVal(): Future[int] {.async.} = - await sleepAsync 1.millis - returnsValWasRun = true - return 1 + teardown: + if not future.finished: + raiseAssert "test should finish future" - proc returnsValError(): Future[int] {.async.} = - raise error + test "then callback is fired when future is already finished": + var cbVal = 0 + future.complete(1) + discard future.then(proc(val: int) = cbVal = val) + check eventually cbVal == 1 - proc returnsValCancelled(): Future[int] {.async.} = - await sleepAsync(1.seconds) + test "then callback is fired after future is finished": + var cbVal = 0 + discard future.then(proc(val: int) = cbVal = val) + future.complete(1) + check eventually cbVal == 1 - proc wasCancelled(error: ref CancelledError): bool = - not error.isNil and error.msg == "Future operation cancelled!" + test "catch callback is fired when future is already failed": + var actual: ref CatchableError + future.fail(error) + future.catch(proc(err: ref CatchableError) = actual = err) + check eventually actual == error - test "calls onSuccess when Future[T] complete": - var returnedVal = 0 - discard returnsVal().then( - proc(val: int) = returnedVal = val, - proc(err: ref CatchableError) = discard - ) - check eventually returnsValWasRun - check eventually returnedVal == 1 + test "catch callback is fired after future is failed": + var actual: ref CatchableError + future.catch(proc(err: ref CatchableError) = actual = err) + future.fail(error) + check eventually actual == error - test "can pass only onSuccess for Future[T]": - var returnedVal = 0 - discard returnsVal().then( - proc(val: int) = returnedVal = val - ) - check eventually returnsValWasRun - check eventually returnedVal == 1 + test "cancelled callback is fired when future is already cancelled": + var fired = false + await future.cancelAndWait() + discard future.cancelled(proc() = fired = true) + check eventually fired - test "can chain onSuccess when Future[T] complete": - var onSuccessCalledWith: seq[int] = @[] - discard returnsVal() - .then(proc(val: int) = onSuccessCalledWith.add(val)) - .then(proc(val: int) = onSuccessCalledWith.add(val)) - .then(proc(val: int) = onSuccessCalledWith.add(val)) - check eventually onSuccessCalledWith == @[1, 1, 1] + test "cancelled callback is fired after future is cancelled": + var fired = false + discard future.cancelled(proc() = fired = true) + await future.cancelAndWait() + check eventually fired - test "calls onError when Future[T] fails": - var errorActual: ref CatchableError - discard returnsValError().then( - proc(val: int) = discard, - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "catch callback fired when Future[T] fails": - var errorActual: ref CatchableError - returnsValError().catch( - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "does not fire onSuccess callback when Future[T] fails": + test "does not fire other callbacks when successful": var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false - returnsValError() + future .then(proc(val: int) = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + future.complete(1) + + check eventually onSuccessCalled + check always (not onCancelledCalled and not onCatchCalled) + + test "does not fire other callbacks when fails": + var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false + + future .then(proc(val: int) = onSuccessCalled = true) - .catch(proc(e: ref CatchableError) = discard) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) - check always (not onSuccessCalled) + future.fail(error) -asyncchecksuite "then - Future[?!void]": - var returnsResultVoidWasRun: bool - var error = (ref CatchableError)(msg: "some error") + check eventually onCatchCalled + check always (not onCancelledCalled and not onSuccessCalled) - setup: - returnsResultVoidWasRun = false - - proc returnsResultVoid(): Future[?!void] {.async.} = - await sleepAsync 1.millis - returnsResultVoidWasRun = true - return success() - - proc returnsResultVoidError(): Future[?!void] {.async.} = - return failure(error) - - - proc returnsResultVoidErrorUncaught(): Future[?!void] {.async.} = - raise error - - proc returnsResultVoidCancelled(): Future[?!void] {.async.} = - await sleepAsync(1.seconds) - return success() - - proc wasCancelled(error: ref CancelledError): bool = - not error.isNil and error.msg == "Future operation cancelled!" - - test "calls onSuccess when Future[?!void] complete": + test "does not fire other callbacks when cancelled": var onSuccessCalled = false - discard returnsResultVoid().then( - proc() = onSuccessCalled = true, - proc(err: ref CatchableError) = discard - ) - check eventually returnsResultVoidWasRun - check eventually onSuccessCalled + var onCancelledCalled = false + var onCatchCalled = false - test "can pass only onSuccess for Future[?!void]": - var onSuccessCalled = false - discard returnsResultVoid().then( - proc() = onSuccessCalled = true - ) - check eventually returnsResultVoidWasRun - check eventually onSuccessCalled + future + .then(proc(val: int) = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) - test "can chain onSuccess when Future[?!void] complete": + await future.cancelAndWait() + + check eventually onCancelledCalled + check always (not onSuccessCalled and not onCatchCalled) + + test "can chain onSuccess when future completes": var onSuccessCalledTimes = 0 - discard returnsResultVoid() - .then(proc() = inc onSuccessCalledTimes) - .then(proc() = inc onSuccessCalledTimes) - .then(proc() = inc onSuccessCalledTimes) + discard future + .then(proc(val: int) = inc onSuccessCalledTimes) + .then(proc(val: int) = inc onSuccessCalledTimes) + .then(proc(val: int) = inc onSuccessCalledTimes) + future.complete(1) check eventually onSuccessCalledTimes == 3 - test "calls onError when Future[?!void] fails": - var errorActual: ref CatchableError - discard returnsResultVoidError().then( - proc() = discard, - proc(e: ref CatchableError) = errorActual = e - ) - await sleepAsync(10.millis) - check eventually error == errorActual - - test "calls onError when Future[?!void] fails": - var errorActual: ref CatchableError - discard returnsResultVoidError().then( - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "catch callback fired when Future[?!void] fails": - var errorActual: ref CatchableError - returnsResultVoidError().catch( - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "does not fire onSuccess callback when Future[?!void] fails": - var onSuccessCalled = false - - returnsResultVoidError() - .then(proc() = onSuccessCalled = true) - .then(proc() = onSuccessCalled = true) - .catch(proc(e: ref CatchableError) = discard) - - check always (not onSuccessCalled) - - test "catch callback fired when Future[?!void] fails with uncaught error": - var errorActual: ref CatchableError - returnsResultVoidErrorUncaught().catch( - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - -asyncchecksuite "then - Future[?!T]": - var returnsResultValWasRun: bool - var error = (ref CatchableError)(msg: "some error") +asyncchecksuite "then - Future[?!void]": + var error = newError() + var future: Future[?!void] setup: - returnsResultValWasRun = false + future = newFuture[?!void]("test void") - proc returnsResultVal(): Future[?!int] {.async.} = - await sleepAsync 1.millis - returnsResultValWasRun = true - return success(2) + teardown: + if not future.finished: + raiseAssert "test should finish future" - proc returnsResultValError(): Future[?!int] {.async.} = - return failure(error) + test "then callback is fired when future is already finished": + var firedImmediately = false + future.complete(success()) + discard future.then(proc() = firedImmediately = true) + check eventually firedImmediately - proc returnsResultValErrorUncaught(): Future[?!int] {.async.} = - raise error + test "then callback is fired after future is finished": + var fired = false + discard future.then(proc() = fired = true) + future.complete(success()) + check eventually fired - proc returnsResultValCancelled(): Future[?!int] {.async.} = - await sleepAsync(1.seconds) - return success(3) + test "catch callback is fired when future is already failed": + var actual: ref CatchableError + future.fail(error) + future.catch(proc(err: ref CatchableError) = actual = err) + check eventually actual == error - proc wasCancelled(error: ref CancelledError): bool = - not error.isNil and error.msg == "Future operation cancelled!" + test "catch callback is fired after future is failed": + var actual: ref CatchableError + future.catch(proc(err: ref CatchableError) = actual = err) + future.fail(error) + check eventually actual == error - test "calls onSuccess when Future[?!T] completes": - var actualVal = 0 - discard returnsResultVal().then( - proc(val: int) = actualVal = val, - proc(err: ref CatchableError) = discard - ) - check eventually returnsResultValWasRun - check eventually actualVal == 2 + test "cancelled callback is fired when future is already cancelled": + var fired = false + await future.cancelAndWait() + discard future.cancelled(proc() = fired = true) + check eventually fired - test "can pass only onSuccess for Future[?!T]": - var actualVal = 0 - discard returnsResultVal().then( - proc(val: int) = actualVal = val - ) - check eventually returnsResultValWasRun - check eventually actualVal == 2 + test "cancelled callback is fired after future is cancelled": + var fired = false + discard future.cancelled(proc() = fired = true) + await future.cancelAndWait() + check eventually fired - test "can chain onSuccess when Future[?!T] complete": - var onSuccessCalledWith: seq[int] = @[] - discard returnsResultVal() - .then(proc(val: int) = onSuccessCalledWith.add val) - .then(proc(val: int) = onSuccessCalledWith.add val) - .then(proc(val: int) = onSuccessCalledWith.add val) - check eventually onSuccessCalledWith == @[2, 2, 2] - - test "calls onError when Future[?!T] fails": - var errorActual: ref CatchableError - discard returnsResultValError().then( - proc(val: int) = discard, - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "calls onError when Future[?!T] fails": - var errorActual: ref CatchableError - discard returnsResultValError().then( - proc(val: int) = discard, - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "catch callback fired when Future[?!T] fails": - var errorActual: ref CatchableError - returnsResultValError().catch( - proc(e: ref CatchableError) = errorActual = e - ) - check eventually error == errorActual - - test "does not fire onSuccess callback when Future[?!T] fails": + test "does not fire other callbacks when successful": var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false - returnsResultValError() + future + .then(proc() = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + future.complete(success()) + + check eventually onSuccessCalled + check always (not onCancelledCalled and not onCatchCalled) + + test "does not fire other callbacks when fails": + var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false + + future + .then(proc() = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + future.fail(error) + + check eventually onCatchCalled + check always (not onCancelledCalled and not onSuccessCalled) + + test "does not fire other callbacks when cancelled": + var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false + + future + .then(proc() = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + await future.cancelAndWait() + + check eventually onCancelledCalled + check always (not onSuccessCalled and not onCatchCalled) + + test "can chain onSuccess when future completes": + var onSuccessCalledTimes = 0 + discard future + .then(proc() = inc onSuccessCalledTimes) + .then(proc() = inc onSuccessCalledTimes) + .then(proc() = inc onSuccessCalledTimes) + future.complete(success()) + check eventually onSuccessCalledTimes == 3 + +asyncchecksuite "then - Future[?!T]": + var error = newError() + var future: Future[?!int] + + setup: + future = newFuture[?!int]("test void") + + teardown: + if not future.finished: + raiseAssert "test should finish future" + + test "then callback is fired when future is already finished": + var cbVal = 0 + future.complete(success(1)) + discard future.then(proc(val: int) = cbVal = val) + check eventually cbVal == 1 + + test "then callback is fired after future is finished": + var cbVal = 0 + discard future.then(proc(val: int) = cbVal = val) + future.complete(success(1)) + check eventually cbVal == 1 + + test "catch callback is fired when future is already failed": + var actual: ref CatchableError + future.fail(error) + future.catch(proc(err: ref CatchableError) = actual = err) + check eventually actual == error + + test "catch callback is fired after future is failed": + var actual: ref CatchableError + future.catch(proc(err: ref CatchableError) = actual = err) + future.fail(error) + check eventually actual == error + + test "cancelled callback is fired when future is already cancelled": + var fired = false + await future.cancelAndWait() + discard future.cancelled(proc() = fired = true) + check eventually fired + + test "cancelled callback is fired after future is cancelled": + var fired = false + discard future.cancelled(proc() = fired = true) + await future.cancelAndWait() + check eventually fired + + test "does not fire other callbacks when successful": + var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false + + future .then(proc(val: int) = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + future.complete(success(1)) + + check eventually onSuccessCalled + check always (not onCancelledCalled and not onCatchCalled) + + test "does not fire other callbacks when fails": + var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false + + future .then(proc(val: int) = onSuccessCalled = true) - .catch(proc(e: ref CatchableError) = discard) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) - check always (not onSuccessCalled) + future.fail(error) - test "catch callback fired when Future[?!T] fails with uncaught error": - var errorActual: ref CatchableError + check eventually onCatchCalled + check always (not onCancelledCalled and not onSuccessCalled) - returnsResultValErrorUncaught() - .then(proc(val: int) = discard) - .then(proc(val: int) = discard) - .catch(proc(e: ref CatchableError) = errorActual = e) + test "does not fire other callbacks when cancelled": + var onSuccessCalled = false + var onCancelledCalled = false + var onCatchCalled = false - check eventually error == errorActual + future + .then(proc(val: int) = onSuccessCalled = true) + .cancelled(proc() = onCancelledCalled = true) + .catch(proc(e: ref CatchableError) = onCatchCalled = true) + + await future.cancelAndWait() + + check eventually onCancelledCalled + check always (not onSuccessCalled and not onCatchCalled) + + test "can chain onSuccess when future completes": + var onSuccessCalledTimes = 0 + discard future + .then(proc(val: int) = inc onSuccessCalledTimes) + .then(proc(val: int) = inc onSuccessCalledTimes) + .then(proc(val: int) = inc onSuccessCalledTimes) + future.complete(success(1)) + check eventually onSuccessCalledTimes == 3 diff --git a/tests/codex/utils/testtrackedfutures.nim b/tests/codex/utils/testtrackedfutures.nim new file mode 100644 index 00000000..78756a8a --- /dev/null +++ b/tests/codex/utils/testtrackedfutures.nim @@ -0,0 +1,67 @@ +import pkg/asynctest +import pkg/chronos +import codex/utils/trackedfutures +import ../helpers/eventually +import ../helpers + +type Module = object + trackedFutures: TrackedFutures + +asyncchecksuite "tracked futures": + var module: Module + + setup: + module = Module(trackedFutures: TrackedFutures.new()) + + test "starts with zero tracked futures": + check module.trackedFutures.len == 0 + + test "tracks unfinished futures": + let fut = newFuture[void]("test") + discard fut.track(module) + check module.trackedFutures.len == 1 + + test "does not track completed futures": + let fut = newFuture[void]("test") + fut.complete() + discard fut.track(module) + check eventually module.trackedFutures.len == 0 + + test "does not track failed futures": + let fut = newFuture[void]("test") + fut.fail((ref CatchableError)(msg: "some error")) + discard fut.track(module) + check eventually module.trackedFutures.len == 0 + + test "does not track cancelled futures": + let fut = newFuture[void]("test") + await fut.cancelAndWait() + discard fut.track(module) + check eventually module.trackedFutures.len == 0 + + test "removes tracked future when finished": + let fut = newFuture[void]("test") + discard fut.track(module) + fut.complete() + check eventually module.trackedFutures.len == 0 + + test "removes tracked future when cancelled": + let fut = newFuture[void]("test") + discard fut.track(module) + await fut.cancelAndWait() + check eventually module.trackedFutures.len == 0 + + test "cancels and removes all tracked futures": + let fut1 = newFuture[void]("test1") + let fut2 = newFuture[void]("test2") + let fut3 = newFuture[void]("test3") + discard fut1.track(module) + discard fut2.track(module) + discard fut3.track(module) + await module.trackedFutures.cancelTracked() + check eventually fut1.cancelled + check eventually fut2.cancelled + check eventually fut3.cancelled + check eventually module.trackedFutures.len == 0 + +