From 9db5588dbd1b47c599f041595535933b56fec695 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Thu, 31 Jul 2025 12:07:18 +0200 Subject: [PATCH] Initial commit --- .gitignore | 6 + .vscode/settings.json | 13 + LICENSE.md | 5 + README.md | 3 + asynciterators.nim | 7 + asynciterators.nimble | 14 + asynciterators/asynciter.nim | 219 +++++++++++++++ asynciterators/asyncresultiter.nim | 246 +++++++++++++++++ asynciterators/iter.nim | 212 +++++++++++++++ config.nims | 4 + tests/config.nims | 7 + tests/test.nimble | 18 ++ tests/testasynciter.nim | 167 ++++++++++++ tests/testasyncresultiter.nim | 421 +++++++++++++++++++++++++++++ tests/testiter.nim | 122 +++++++++ 15 files changed, 1464 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 LICENSE.md create mode 100644 README.md create mode 100644 asynciterators.nim create mode 100644 asynciterators.nimble create mode 100644 asynciterators/asynciter.nim create mode 100644 asynciterators/asyncresultiter.nim create mode 100644 asynciterators/iter.nim create mode 100644 config.nims create mode 100644 tests/config.nims create mode 100644 tests/test.nimble create mode 100644 tests/testasynciter.nim create mode 100644 tests/testasyncresultiter.nim create mode 100644 tests/testiter.nim diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..442aeef --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +* +!*/ +!*.* +nimbledeps +nimble.develop +nimble.paths diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..f9e0027 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,13 @@ +{ + "nim.projectMapping": [ + { + "projectFile": "tests/testiter.nim", + "fileRegex": "tests/.*\\.nim" + }, + { + "projectFile": "asynciterators.nim", + "fileRegex": "asynciterators/.*\\.nim" + } + ], + "nim.formatOnSave": true, +} diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..efac7ea --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,5 @@ +Licensed and distributed under either of +[MIT license](http://opensource.org/licenses/MIT) or +[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) +at your option. These files may not be copied, modified, or distributed except +according to those terms. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..66378cb --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# asynciterators + +TBD... \ No newline at end of file diff --git a/asynciterators.nim b/asynciterators.nim new file mode 100644 index 0000000..ff2cfbe --- /dev/null +++ b/asynciterators.nim @@ -0,0 +1,7 @@ +import ./asynciterators/iter +import ./asynciterators/asynciter +import ./asynciterators/asyncresultiter + +export iter +import asynciter +import asyncresultiter diff --git a/asynciterators.nimble b/asynciterators.nimble new file mode 100644 index 0000000..7808d83 --- /dev/null +++ b/asynciterators.nimble @@ -0,0 +1,14 @@ +version = "0.1.0" +author = "Async Iterators Authors" +description = "Nim Async iterator helpers for Chronos" +license = "MIT" + +requires "questionable >= 0.10.15 & < 0.11.0" +requires "results" +requires "chronos" + +task test, "Runs the test suite": + withDir "tests/": + delEnv "NIMBLE_DIR" # use nimbledeps dir + exec "nimble install -d -y" + exec "nimble test -y" diff --git a/asynciterators/asynciter.nim b/asynciterators/asynciter.nim new file mode 100644 index 0000000..7b0c42f --- /dev/null +++ b/asynciterators/asynciter.nim @@ -0,0 +1,219 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/sugar + +import pkg/questionable +import pkg/chronos + +import ./iter + +## AsyncIter[T] is similar to `Iter[Future[T]]` with +## addition of methods specific to asynchronous processing. +## +## Public interface: +## +## Attributes +## - next - allows to set a custom function to be called when the next item is requested +## +## Operations: +## - new - to create a new async iterator (AsyncIter) +## - finish - to finish the async iterator +## - finished - to check if the async iterator is finished +## - next - to get the next item from the async iterator +## - items - to iterate over the async iterator +## - pairs - to iterate over the async iterator and return the index of each item +## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures +## - mapAsync - to convert a regular sync iterator (Iter) to an async iterator (AsyncIter) +## - map - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter) +## - mapFilter - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter) and apply filtering at the same time +## - filter - to filter an async iterator (AsyncIter) and return another async iterator (AsyncIter) +## - delayBy - to delay each item returned by async iterator by a given duration +## - empty - to create an empty async iterator (AsyncIter) + +type + AsyncIterFunc[T, U] = proc(fut: T): Future[U] {.async.} + AsyncIterIsFinished = proc(): bool {.raises: [], gcsafe.} + AsyncIterGenNext[T] = proc(): Future[T] {.async.} + + AsyncIter*[T] = ref object + finished: bool + next*: AsyncIterGenNext[T] + +proc flatMap[T, U](fut: Future[T], fn: AsyncIterFunc[T, U]): Future[U] {.async.} = + let t = await fut + await fn(t) + +######################################################################## +## AsyncIter public interface methods +######################################################################## + +proc new*[T]( + _: type AsyncIter[T], + genNext: AsyncIterGenNext[T], + isFinished: AsyncIterIsFinished, + finishOnErr: bool = true, +): AsyncIter[T] = + ## Creates a new Iter using elements returned by supplier function `genNext`. + ## Iter is finished whenever `isFinished` returns true. + ## + + var iter = AsyncIter[T]() + + proc next(): Future[T] {.async.} = + if not iter.finished: + var item: T + try: + item = await genNext() + except CancelledError as err: + iter.finish + raise err + except CatchableError as err: + if finishOnErr or isFinished(): + iter.finish + raise err + + if isFinished(): + iter.finish + return item + else: + raise newException( + CatchableError, "AsyncIter is finished but next item was requested" + ) + + if isFinished(): + iter.finish + + iter.next = next + return iter + +# forward declaration +proc mapAsync*[T, U](iter: Iter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] + +proc new*[U, V: Ordinal](_: type AsyncIter[U], slice: HSlice[U, V]): AsyncIter[U] = + ## Creates new Iter from a slice + ## + + let iter = Iter[U].new(slice) + mapAsync[U, U]( + iter, + proc(i: U): Future[U] {.async.} = + i, + ) + +proc new*[U, V, S: Ordinal]( + _: type AsyncIter[U], a: U, b: V, step: S = 1 +): AsyncIter[U] = + ## Creates new Iter in range a..b with specified step (default 1) + ## + + let iter = Iter[U].new(a, b, step) + mapAsync[U, U]( + iter, + proc(i: U): Future[U] {.async.} = + i, + ) + +proc finish*[T](self: AsyncIter[T]): void = + self.finished = true + +proc finished*[T](self: AsyncIter[T]): bool = + self.finished + +iterator items*[T](self: AsyncIter[T]): Future[T] = + while not self.finished: + yield self.next() + +iterator pairs*[T](self: AsyncIter[T]): tuple[key: int, val: Future[T]] {.inline.} = + var i = 0 + while not self.finished: + yield (i, self.next()) + inc(i) + +proc mapFuture*[T, U](fut: Future[T], fn: AsyncIterFunc[T, U]): Future[U] {.async.} = + let t = await fut + fn(t) + +proc mapAsync*[T, U](iter: Iter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] = + AsyncIter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished()) + +proc map*[T, U](iter: AsyncIter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] = + AsyncIter[U].new( + genNext = () => iter.next().flatMap(fn), isFinished = () => iter.finished + ) + +proc mapFilter*[T, U]( + iter: AsyncIter[T], mapPredicate: AsyncIterFunc[T, Option[U]] +): Future[AsyncIter[U]] {.async: (raises: [CancelledError]).} = + var nextFutU: Option[Future[U]] + + proc tryFetch(): Future[void] {.async: (raises: [CancelledError]).} = + nextFutU = Future[U].none + while not iter.finished: + let futT = iter.next() + try: + if u =? await futT.flatMap(mapPredicate): + let futU = newFuture[U]("AsyncIter.mapFilterAsync") + futU.complete(u) + nextFutU = some(futU) + break + except CancelledError as err: + raise err + except CatchableError as err: + let errFut = newFuture[U]("AsyncIter.mapFilterAsync") + errFut.fail(err) + nextFutU = some(errFut) + break + + proc genNext(): Future[U] {.async.} = + let futU = nextFutU.unsafeGet + await tryFetch() + await futU + + proc isFinished(): bool = + nextFutU.isNone + + await tryFetch() + AsyncIter[U].new(genNext, isFinished) + +proc filter*[T]( + iter: AsyncIter[T], predicate: AsyncIterFunc[T, bool] +): Future[AsyncIter[T]] {.async: (raises: [CancelledError]).} = + proc wrappedPredicate(t: T): Future[Option[T]] {.async.} = + if await predicate(t): + some(t) + else: + T.none + + await mapFilter[T, T](iter, wrappedPredicate) + +proc delayBy*[T](iter: AsyncIter[T], d: Duration): AsyncIter[T] = + ## Delays emitting each item by given duration + ## + + map[T, T]( + iter, + proc(t: T): Future[T] {.async.} = + await sleepAsync(d) + t, + ) + +proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] = + ## Creates an empty AsyncIter + ## + + proc genNext(): Future[T] {.async.} = + raise newException(CatchableError, "Next item requested from an empty AsyncIter") + + proc isFinished(): bool = + true + + AsyncIter[T].new(genNext, isFinished) diff --git a/asynciterators/asyncresultiter.nim b/asynciterators/asyncresultiter.nim new file mode 100644 index 0000000..0920e41 --- /dev/null +++ b/asynciterators/asyncresultiter.nim @@ -0,0 +1,246 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/sugar + +import pkg/questionable +import pkg/questionable/results +import pkg/chronos + +import ./iter + +## AsyncResultIter[T] is similar to `AsyncIterator[Future[T]]` +## but does not throw exceptions others than CancelledError. +## +## Instead of throwing exception, it uses Result to communicate errors ( +## thus the name AsyncResultIter). +## +## Public interface: +## +## Attributes +## - next - allows to set a custom function to be called when the next item is requested +## +## Operations: +## - new - to create a new async iterator (AsyncResultIter) +## - finish - to finish the async iterator +## - finished - to check if the async iterator is finished +## - next - to get the next item from the async iterator +## - items - to iterate over the async iterator +## - pairs - to iterate over the async iterator and return the index of each item +## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures +## - mapAsync - to convert a regular sync iterator (Iter) to an async iterator (AsyncResultIter) +## - map - to convert one async iterator (AsyncResultIter) to another async iterator (AsyncResultIter) +## - mapFilter - to convert one async iterator (AsyncResultIter) to another async iterator (AsyncResultIter) and apply filtering at the same time +## - filter - to filter an async iterator (AsyncResultIter) and return another async iterator (AsyncResultIter) +## - delayBy - to delay each item returned by async iterator by a given duration +## - empty - to create an empty async iterator (AsyncResultIter) + +type + AsyncResultIterFunc[T, U] = + proc(fut: T): Future[U] {.async: (raises: [CancelledError]).} + AsyncResultIterIsFinished = proc(): bool {.raises: [], gcsafe.} + AsyncResultIterGenNext[T] = proc(): Future[T] {.async: (raises: [CancelledError]).} + + AsyncResultIter*[T] = ref object + finished: bool + next*: AsyncResultIterGenNext[?!T] + +proc flatMap[T, U]( + fut: auto, fn: AsyncResultIterFunc[?!T, ?!U] +): Future[?!U] {.async: (raises: [CancelledError]).} = + let t = await fut + await fn(t) + +proc flatMap[T, U]( + fut: auto, fn: AsyncResultIterFunc[?!T, Option[?!U]] +): Future[Option[?!U]] {.async: (raises: [CancelledError]).} = + let t = await fut + await fn(t) + +######################################################################## +## AsyncResultIter public interface methods +######################################################################## + +proc new*[T]( + _: type AsyncResultIter[T], + genNext: AsyncResultIterGenNext[?!T], + isFinished: AsyncResultIterIsFinished, + finishOnErr: bool = true, +): AsyncResultIter[T] = + ## Creates a new Iter using elements returned by supplier function `genNext`. + ## Iter is finished whenever `isFinished` returns true. + ## + + var iter = AsyncResultIter[T]() + + proc next(): Future[?!T] {.async: (raises: [CancelledError]).} = + try: + if not iter.finished: + let item = await genNext() + if finishOnErr and err =? item.errorOption: + iter.finished = true + return failure(err) + if isFinished(): + iter.finished = true + return item + else: + return failure("AsyncResultIter is finished but next item was requested") + except CancelledError as err: + iter.finished = true + raise err + + if isFinished(): + iter.finished = true + + iter.next = next + return iter + +# forward declaration +proc mapAsync*[T, U]( + iter: Iter[T], fn: AsyncResultIterFunc[T, ?!U], finishOnErr: bool = true +): AsyncResultIter[U] + +proc new*[U, V: Ordinal]( + _: type AsyncResultIter[U], slice: HSlice[U, V], finishOnErr: bool = true +): AsyncResultIter[U] = + ## Creates new Iter from a slice + ## + + let iter = Iter[U].new(slice) + mapAsync[U, U]( + iter, + proc(i: U): Future[?!U] {.async: (raises: [CancelledError]).} = + success[U](i), + finishOnErr = finishOnErr, + ) + +proc new*[U, V, S: Ordinal]( + _: type AsyncResultIter[U], a: U, b: V, step: S = 1, finishOnErr: bool = true +): AsyncResultIter[U] = + ## Creates new Iter in range a..b with specified step (default 1) + ## + + let iter = Iter[U].new(a, b, step) + mapAsync[U, U]( + iter, + proc(i: U): Future[?!U] {.async: (raises: [CancelledError]).} = + U.success(i), + finishOnErr = finishOnErr, + ) + +proc finish*[T](self: AsyncResultIter[T]): void = + self.finished = true + +proc finished*[T](self: AsyncResultIter[T]): bool = + self.finished + +iterator items*[T](self: AsyncResultIter[T]): auto {.inline.} = + while not self.finished: + yield self.next() + +iterator pairs*[T](self: AsyncResultIter[T]): auto {.inline.} = + var i = 0 + while not self.finished: + yield (i, self.next()) + inc(i) + +proc mapFuture*[T, U]( + fut: auto, fn: AsyncResultIterFunc[T, U] +): Future[U] {.async: (raises: [CancelledError]).} = + let t = await fut + await fn(t) + +proc mapAsync*[T, U]( + iter: Iter[T], fn: AsyncResultIterFunc[T, ?!U], finishOnErr: bool = true +): AsyncResultIter[U] = + AsyncResultIter[U].new( + genNext = () => fn(iter.next()), + isFinished = () => iter.finished(), + finishOnErr = finishOnErr, + ) + +proc map*[T, U]( + iter: AsyncResultIter[T], + fn: AsyncResultIterFunc[?!T, ?!U], + finishOnErr: bool = true, +): AsyncResultIter[U] = + AsyncResultIter[U].new( + genNext = () => iter.next().flatMap(fn), + isFinished = () => iter.finished, + finishOnErr = finishOnErr, + ) + +proc mapFilter*[T, U]( + iter: AsyncResultIter[T], + mapPredicate: AsyncResultIterFunc[?!T, Option[?!U]], + finishOnErr: bool = true, +): Future[AsyncResultIter[U]] {.async: (raises: [CancelledError]).} = + var nextU: Option[?!U] + + proc filter(): Future[void] {.async: (raises: [CancelledError]).} = + nextU = none(?!U) + while not iter.finished: + let futT = iter.next() + if mappedValue =? await futT.flatMap(mapPredicate): + nextU = some(mappedValue) + break + + proc genNext(): Future[?!U] {.async: (raises: [CancelledError]).} = + let u = nextU.unsafeGet + await filter() + u + + proc isFinished(): bool = + nextU.isNone + + await filter() + AsyncResultIter[U].new(genNext, isFinished, finishOnErr = finishOnErr) + +proc filter*[T]( + iter: AsyncResultIter[T], + predicate: AsyncResultIterFunc[?!T, bool], + finishOnErr: bool = true, +): Future[AsyncResultIter[T]] {.async: (raises: [CancelledError]).} = + proc wrappedPredicate( + t: ?!T + ): Future[Option[?!T]] {.async: (raises: [CancelledError]).} = + if await predicate(t): + some(t) + else: + none(?!T) + + await mapFilter[T, T](iter, wrappedPredicate, finishOnErr = finishOnErr) + +proc delayBy*[T]( + iter: AsyncResultIter[T], d: Duration, finishOnErr: bool = true +): AsyncResultIter[T] = + ## Delays emitting each item by given duration + ## + + map[T, T]( + iter, + proc(t: ?!T): Future[?!T] {.async: (raises: [CancelledError]).} = + await sleepAsync(d) + return t, + finishOnErr = finishOnErr, + ) + +proc empty*[T](_: type AsyncResultIter[T]): AsyncResultIter[T] = + ## Creates an empty AsyncResultIter + ## + + proc genNext(): Future[?!T] {.async: (raises: [CancelledError]).} = + T.failure("Next item requested from an empty AsyncResultIter") + + proc isFinished(): bool = + true + + AsyncResultIter[T].new(genNext, isFinished) diff --git a/asynciterators/iter.nim b/asynciterators/iter.nim new file mode 100644 index 0000000..607332c --- /dev/null +++ b/asynciterators/iter.nim @@ -0,0 +1,212 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/sugar + +import pkg/questionable +import pkg/questionable/results + +## Public interface: +## +## Attributes +## - next - allows to set a custom function to be called when the next item is requested +## +## Operations: +## - new - to create a new iterator (Iter) +## - finish - to finish the iterator +## - finished - to check if the iterator is finished +## - next - to get the next item from the iterator +## - items - to iterate over the iterator +## - pairs - to iterate over the iterator and return the index of each item +## - map - to convert one iterator (Iter) to another iterator (Iter) +## - mapFilter - to convert one iterator (Iter) to another iterator (Iter) and apply filtering at the same time +## - filter - to filter an iterator (Iter) and return another iterator (Iter) +## - empty - to create an empty async iterator (AsyncIter) + +type + IterFunction[T, U] = proc(value: T): U {.raises: [CatchableError], gcsafe.} + IterIsFinished = proc(): bool {.raises: [], gcsafe.} + IterGenNext[T] = proc(): T {.raises: [CatchableError], gcsafe.} + Iterator[T] = iterator (): T + + Iter*[T] = ref object + finished: bool + next*: IterGenNext[T] + +######################################################################## +## Iter public interface methods +######################################################################## + +proc new*[T]( + _: type Iter[T], + genNext: IterGenNext[T], + isFinished: IterIsFinished, + finishOnErr: bool = true, +): Iter[T] = + ## Creates a new Iter using elements returned by supplier function `genNext`. + ## Iter is finished whenever `isFinished` returns true. + ## + + var iter = Iter[T]() + + proc next(): T {.raises: [CatchableError].} = + if not iter.finished: + var item: T + try: + item = genNext() + except CatchableError as err: + if finishOnErr or isFinished(): + iter.finish + raise err + + if isFinished(): + iter.finish + return item + else: + raise newException(CatchableError, "Iter is finished but next item was requested") + + if isFinished(): + iter.finish + + iter.next = next + return iter + +proc new*[U, V, S: Ordinal](_: type Iter[U], a: U, b: V, step: S = 1): Iter[U] = + ## Creates a new Iter in range a..b with specified step (default 1) + ## + + var i = a + + proc genNext(): U = + let u = i + inc(i, step) + u + + proc isFinished(): bool = + (step > 0 and i > b) or (step < 0 and i < b) + + Iter[U].new(genNext, isFinished) + +proc new*[U, V: Ordinal](_: type Iter[U], slice: HSlice[U, V]): Iter[U] = + ## Creates a new Iter from a slice + ## + + Iter[U].new(slice.a.int, slice.b.int, 1) + +proc new*[T](_: type Iter[T], items: seq[T]): Iter[T] = + ## Creates a new Iter from a sequence + ## + + Iter[int].new(0 ..< items.len).map((i: int) => items[i]) + +proc new*[T](_: type Iter[T], iter: Iterator[T]): Iter[T] = + ## Creates a new Iter from an iterator + ## + var nextOrErr: Option[?!T] + proc tryNext(): void = + nextOrErr = none(?!T) + while not iter.finished: + try: + let t: T = iter() + if not iter.finished: + nextOrErr = some(success(t)) + break + except CatchableError as err: + nextOrErr = some(T.failure(err)) + + proc genNext(): T {.raises: [CatchableError].} = + if nextOrErr.isNone: + raise newException(CatchableError, "Iterator finished but genNext was called") + + without u =? nextOrErr.unsafeGet, err: + raise err + + tryNext() + return u + + proc isFinished(): bool = + nextOrErr.isNone + + tryNext() + Iter[T].new(genNext, isFinished) + +proc finish*[T](self: Iter[T]): void = + self.finished = true + +proc finished*[T](self: Iter[T]): bool = + self.finished + +iterator items*[T](self: Iter[T]): T = + while not self.finished: + yield self.next() + +iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} = + var i = 0 + while not self.finished: + yield (i, self.next()) + inc(i) + +proc map*[T, U](iter: Iter[T], fn: IterFunction[T, U]): Iter[U] = + Iter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished) + +proc mapFilter*[T, U]( + iter: Iter[T], mapPredicate: IterFunction[T, Option[U]] +): Iter[U] = + var nextUOrErr: Option[?!U] + + proc tryFetch(): void = + nextUOrErr = none(?!U) + while not iter.finished: + try: + let t = iter.next() + if u =? mapPredicate(t): + nextUOrErr = some(success(u)) + break + except CatchableError as err: + nextUOrErr = some(U.failure(err)) + + proc genNext(): U {.raises: [CatchableError].} = + if nextUOrErr.isNone: + raise newException(CatchableError, "Iterator finished but genNext was called") + + # at this point nextUOrErr should always be some(..) + without u =? nextUOrErr.unsafeGet, err: + raise err + + tryFetch() + return u + + proc isFinished(): bool = + nextUOrErr.isNone + + tryFetch() + Iter[U].new(genNext, isFinished) + +proc filter*[T](iter: Iter[T], predicate: IterFunction[T, bool]): Iter[T] = + proc wrappedPredicate(t: T): Option[T] = + if predicate(t): + some(t) + else: + T.none + + mapFilter[T, T](iter, wrappedPredicate) + +proc empty*[T](_: type Iter[T]): Iter[T] = + ## Creates an empty Iter + ## + + proc genNext(): T {.raises: [CatchableError].} = + raise newException(CatchableError, "Next item requested from an empty Iter") + + proc isFinished(): bool = + true + + Iter[T].new(genNext, isFinished) diff --git a/config.nims b/config.nims new file mode 100644 index 0000000..8ee48d2 --- /dev/null +++ b/config.nims @@ -0,0 +1,4 @@ +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/tests/config.nims b/tests/config.nims new file mode 100644 index 0000000..a76986a --- /dev/null +++ b/tests/config.nims @@ -0,0 +1,7 @@ +--path: + "../" +import "../config.nims" +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/tests/test.nimble b/tests/test.nimble new file mode 100644 index 0000000..6d987fc --- /dev/null +++ b/tests/test.nimble @@ -0,0 +1,18 @@ +version = "0.1.0" +author = "Async Iterators Authors" +description = "Tests for Nim Async iterator helpers for Chronos" +license = "MIT" + +requires "questionable >= 0.10.15 & < 0.11.0" +requires "results" +requires "chronos" +requires "asynctest >= 0.5.4 & < 0.6.0" + +task test, "Runs the test suite": + var options = "-f -r --skipParentCfg" + when (NimMajor, NimMinor) >= (1, 4): + options &= " --warningAsError[UnsafeDefault]:on" + options &= " --warningAsError[ProveInit]:on" + exec "nim c " & options & " testiter.nim" + exec "nim c " & options & " testasynciter.nim" + exec "nim c " & options & " testasyncresultiter.nim" diff --git a/tests/testasynciter.nim b/tests/testasynciter.nim new file mode 100644 index 0000000..d400d0c --- /dev/null +++ b/tests/testasynciter.nim @@ -0,0 +1,167 @@ +import std/sugar +import std/sequtils + +import pkg/questionable +import pkg/chronos + +import asynciterators/iter +import asynciterators/asynciter + +import pkg/asynctest/chronos/unittest2 + +suite "Test AsyncIter": + test "Should be finished": + let iter = AsyncIter[int].empty() + + check: + iter.finished == true + + test "Should map each item using `map`": + let + iter1 = AsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = map[int, string]( + iter1, + proc(i: int): Future[string] {.async.} = + $i, + ) + + var collected: seq[string] + + for fut in iter2: + collected.add(await fut) + + check: + collected == @["0", "1", "2", "3", "4"] + + test "Should leave only odd items using `filter`": + let + iter1 = AsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await filter[int]( + iter1, + proc(i: int): Future[bool] {.async.} = + (i mod 2) == 1, + ) + + var collected: seq[int] + + for fut in iter2: + collected.add(await fut) + + check: + collected == @[1, 3] + + test "Should leave only odd items using `mapFilter`": + let + iter1 = AsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: int): Future[?string] {.async.} = + if (i mod 2) == 1: + some($i) + else: + string.none, + ) + + var collected: seq[string] + + for fut in iter2: + collected.add(await fut) + + check: + collected == @["1", "3"] + + test "Should yield all items before err using `map`": + let + iter1 = AsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = map[int, string]( + iter1, + proc(i: int): Future[string] {.async.} = + if i < 3: + return $i + else: + raise newException(CatchableError, "Some error"), + ) + + var collected: seq[string] + + expect CatchableError: + for fut in iter2: + collected.add(await fut) + + check: + collected == @["0", "1", "2"] + iter2.finished + + test "Should yield all items before err using `filter`": + let + iter1 = AsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await filter[int]( + iter1, + proc(i: int): Future[bool] {.async.} = + if i < 3: + return true + else: + raise newException(CatchableError, "Some error"), + ) + + var collected: seq[int] + + expect CatchableError: + for fut in iter2: + collected.add(await fut) + + check: + collected == @[0, 1, 2] + iter2.finished + + test "Should yield all items before err using `mapFilter`": + let + iter1 = AsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: int): Future[?string] {.async.} = + if i < 3: + return some($i) + else: + raise newException(CatchableError, "Some error"), + ) + + var collected: seq[string] + + expect CatchableError: + for fut in iter2: + collected.add(await fut) + + check: + collected == @["0", "1", "2"] + iter2.finished + + test "Should propagate cancellation error immediately": + let fut = newFuture[?string]("testasynciter") + + let + iter1 = AsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: int): Future[?string] {.async.} = + if i < 3: + return some($i) + else: + return await fut, + ) + + proc cancelFut(): Future[void] {.async.} = + await sleepAsync(100.millis) + await fut.cancelAndWait() + + asyncSpawn(cancelFut()) + + var collected: seq[string] + + expect CancelledError: + for fut in iter2: + collected.add(await fut) + + check: + collected == @["0", "1"] + iter2.finished diff --git a/tests/testasyncresultiter.nim b/tests/testasyncresultiter.nim new file mode 100644 index 0000000..a06b53e --- /dev/null +++ b/tests/testasyncresultiter.nim @@ -0,0 +1,421 @@ +import std/sugar +import std/sequtils + +import pkg/questionable +import pkg/questionable/results +import pkg/chronos + +import asynciterators/iter +import asynciterators/asyncresultiter + +import pkg/asynctest/chronos/unittest2 + + +suite "Test AsyncResultIter": + test "Should be finished": + let iter = AsyncResultIter[int].empty() + + check: + iter.finished == true + + test "using with async generator": + let value = 1 + var intIter = Iter.new(0 ..< 5) + let expectedSeq = newSeqWith(5, intIter.next()) + intIter = Iter.new(0 ..< 5) + proc asyncGen(): Future[?!int] {.async: (raw: true, raises: [CancelledError]).} = + let fut = newFuture[?!int]() + fut.complete(success(intIter.next())) + return fut + + let iter = AsyncResultIter[int].new(asyncGen, () => intIter.finished) + + var collected: seq[int] + for iFut in iter: + let iRes = await iFut + if i =? iRes: + collected.add(i) + else: + fail() + + check collected == expectedSeq + let nextRes = await iter.next() + assert nextRes.isFailure + check nextRes.error.msg == "AsyncResultIter is finished but next item was requested" + + test "getting async iter for simple sync range iterator": + let iter1 = AsyncResultIter[int].new(0 ..< 5) + + var collected: seq[int] + for iFut in iter1: + let iRes = await iFut + if i =? iRes: + collected.add(i) + else: + fail() + check: + collected == @[0, 1, 2, 3, 4] + + test "Should map each item using `map`": + let iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + + let iter2 = map[int, string]( + iter1, + proc(iRes: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? iRes: + return success($i) + else: + return failure("Some error"), + ) + + var collected: seq[string] + + for fut in iter2: + if i =? (await fut): + collected.add(i) + else: + fail() + + check: + collected == @["0", "1", "2", "3", "4"] + + test "Should leave only odd items using `filter`": + let + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await filter[int]( + iter1, + proc(i: ?!int): Future[bool] {.async: (raises: [CancelledError]).} = + if i =? i: + return (i mod 2) == 1 + else: + return false, + ) + + var collected: seq[int] + + for fut in iter2: + if i =? (await fut): + collected.add(i) + else: + fail() + + check: + collected == @[1, 3] + + test "Should leave only odd items using `mapFilter`": + let + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = + if i =? i: + if (i mod 2) == 1: + return some(success($i)) + Result[system.string, ref CatchableError].none, + ) + + var collected: seq[string] + + for fut in iter2: + if i =? (await fut): + collected.add(i) + else: + fail() + + check: + collected == @["1", "3"] + + test "Collecting errors on `map` when finish on error is true": + let + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i < 3: + return success($i) + else: + return failure("Error on item: " & $i) + return failure("Unexpected error"), + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["0", "1", "2"] + collectedFailure == @["Error on item: 3"] + iter2.finished + + test "Collecting errors on `map` when finish on error is false": + let + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i < 3: + return success($i) + else: + return failure("Error on item: " & $i) + return failure("Unexpected error"), + finishOnErr = false, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["0", "1", "2"] + collectedFailure == @["Error on item: 3", "Error on item: 4"] + iter2.finished + + test "Collecting errors on `map` when errors are mixed with successes": + let + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 1 or i == 3: + return success($i) + else: + return failure("Error on item: " & $i) + return failure("Unexpected error"), + finishOnErr = false, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["1", "3"] + collectedFailure == @["Error on item: 0", "Error on item: 2", "Error on item: 4"] + iter2.finished + + test "Collecting errors on `mapFilter` when finish on error is true": + let + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 1: + return some(string.failure("Error on item: " & $i)) + elif i < 3: + return some(success($i)) + else: + return Result[system.string, ref CatchableError].none + return some(string.failure("Unexpected error")), + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["0"] + collectedFailure == @["Error on item: 1"] + iter2.finished + + test "Collecting errors on `mapFilter` when finish on error is false": + let + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 1: + return some(string.failure("Error on item: " & $i)) + elif i < 3: + return some(success($i)) + else: + return Result[system.string, ref CatchableError].none + return some(string.failure("Unexpected error")), + finishOnErr = false, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["0", "2"] + collectedFailure == @["Error on item: 1"] + iter2.finished + + test "Collecting errors on `filter` when finish on error is false": + let + iter1 = AsyncResultIter[int].new(0 ..< 5) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 1 or i == 2: + return failure("Error on item: " & $i) + elif i < 4: + return success($i) + return failure("Unexpected error"), + finishOnErr = false, + ) + iter3 = await filter[string]( + iter2, + proc(i: ?!string): Future[bool] {.async: (raises: [CancelledError]).} = + without i =? i, err: + if err.msg == "Error on item: 1": + return false + else: + return true + if i == "0": + return false + else: + return true, + finishOnErr = false, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter3: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["3"] + collectedFailure == @["Error on item: 2", "Unexpected error"] + iter3.finished + + test "Collecting errors on `filter` when finish on error is true": + let + iter1 = AsyncResultIter[int].new(0 ..< 5) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 3: + return failure("Error on item: " & $i) + elif i < 3: + return success($i) + return failure("Unexpected error"), + finishOnErr = false, + ) + iter3 = await filter[string]( + iter2, + proc(i: ?!string): Future[bool] {.async: (raises: [CancelledError]).} = + without i =? i, err: + if err.msg == "Unexpected error": + return false + else: + return true + if i == "0": + return false + else: + return true, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter3: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["1", "2"] + # On error iterator finishes and returns the error of the item + # that caused the error = that's why we see it here + collectedFailure == @["Error on item: 3"] + iter3.finished + + test "Should propagate cancellation error immediately": + # This test can be a bit tricky to understand because it is + # quite tightly coupled with the way the iterator is implemented. + # When `mapFilter` is called, it already performs first iteration + # step: this is necessary, so that if there is nothing there left + # after filtering, the iterator state should be market as "finished" + # before event trying to call `next()` for the very first time (a standard + # practice is for the called to check if the iterator is finished before + # attempting to call `next()`). Thus, internally, the value that is to be + # returned for the first iteration is already resolved and ready to be returned. + # And this follows in the same for the next iterations. On calling `next()` + # the iterator first makes a temporary copy of the value already captured in + # the precious step, awaits for the next value (and if there is no more values + # to be returned it marks the iterator as finished), and then returns the + # local copy of the previously captured value. + # Now, to make sure that this mechanism works, and to document its + # cancellation semantics, this test shows that when the async predicate + # function is cancelled, this cancellation has immediate effect, which means + # that `next()` (or more precisely `getNext()` in `mapFilter` function), is + # interrupted immediately. If this is the case, the the iterator be interrupted + # before `next()` returns this locally captured value from the previous + # iteration and this is exactly the reason why at the end of the test + # we expect only values "0" and "1" to be collected while value "2" - although + # already resolved and ready to be returned, is not returned because of the + # cancellation of the async predicate function. + + let fut: Future[Option[?!string]].Raising([CancelledError]) = + Future[Option[?!string]].Raising([CancelledError]).init("testasyncresultiter") + + let iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) + let iter2 = await mapFilter[int, string]( + iter1, + proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = + if i =? i: + if (i < 3): + return some(success($i)) + return await fut, + ) + + proc cancelFut(): Future[void] {.async.} = + await sleepAsync(100.millis) + await fut.cancelAndWait() + + asyncSpawn(cancelFut()) + + var collected: seq[string] + + expect CancelledError: + for fut in iter2: + if i =? (await fut): + collected.add(i) + else: + fail() + + check: + # We expect only values "0" and "1" to be collected + # and not value "2" that - although resolved and ready to be returned - + # will not be returned because of the cancellation. + collected == @["0", "1"] + iter2.finished diff --git a/tests/testiter.nim b/tests/testiter.nim new file mode 100644 index 0000000..fc0430f --- /dev/null +++ b/tests/testiter.nim @@ -0,0 +1,122 @@ +import std/sugar +import std/sequtils + +import pkg/questionable +import pkg/chronos +import pkg/asynctest/chronos/unittest2 + +import asynciterators/iter + +suite "Test Iter": + test "Should be finished": + let iter = Iter[int].empty() + + check: + iter.finished == true + + test "Should be iterable with `items`": + let iter = Iter.new(0 ..< 5) + + let items = collect: + for v in iter: + v + + check: + items == @[0, 1, 2, 3, 4] + + test "Should be iterable with `pairs`": + let iter = Iter.new(0 ..< 5) + + let pairs = collect: + for i, v in iter: + (i, v) + + check: + pairs == @[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)] + + test "Should map each item using `map`": + let iter = Iter.new(0 ..< 5).map((i: int) => $i) + + check: + iter.toSeq() == @["0", "1", "2", "3", "4"] + + test "Should leave only odd items using `filter`": + let iter = Iter.new(0 ..< 5).filter((i: int) => (i mod 2) == 1) + + check: + iter.toSeq() == @[1, 3] + + test "Should leave only odd items using `mapFilter`": + let + iter1 = Iter.new(0 ..< 5) + iter2 = mapFilter[int, string]( + iter1, + proc(i: int): ?string = + if (i mod 2) == 1: + some($i) + else: + string.none, + ) + + check: + iter2.toSeq() == @["1", "3"] + + test "Should yield all items before err using `map`": + let iter = Iter.new(0 ..< 5).map( + proc(i: int): string = + if i < 3: + return $i + else: + raise newException(CatchableError, "Some error") + ) + + var collected: seq[string] + + expect CatchableError: + for i in iter: + collected.add(i) + + check: + collected == @["0", "1", "2"] + iter.finished + + test "Should yield all items before err using `filter`": + let iter = Iter.new(0 ..< 5).filter( + proc(i: int): bool = + if i < 3: + return true + else: + raise newException(CatchableError, "Some error") + ) + + var collected: seq[int] + + expect CatchableError: + for i in iter: + collected.add(i) + + check: + collected == @[0, 1, 2] + iter.finished + + test "Should yield all items before err using `mapFilter`": + let + iter1 = Iter.new(0 ..< 5) + iter2 = mapFilter[int, string]( + iter1, + proc(i: int): ?string = + if i < 3: + return some($i) + else: + raise newException(CatchableError, "Some error"), + ) + + var collected: seq[string] + + expect CatchableError: + for i in iter2: + collected.add(i) + + check: + collected == @["0", "1", "2"] + iter2.finished