From c5c1f979c0c3d3003a1f5be4e4c7dd731389a39f Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 24 Jun 2025 15:16:44 +0200 Subject: [PATCH] refactors "Iter" and "AsyncIter" to be better separated and having adequate annotations --- codex/utils/asynciter.nim | 125 +++++++++++++++++++++++++------------- codex/utils/iter.nim | 97 +++++++++++++++++++---------- 2 files changed, 149 insertions(+), 73 deletions(-) diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index d87ff67f..12df2f40 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -1,3 +1,14 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + import std/sugar import pkg/questionable @@ -5,43 +16,52 @@ import pkg/chronos import ./iter -export iter - -## AsyncIter[T] is similar to `Iter[Future[T]]` with addition of methods specific to asynchronous processing +## AsyncIter[T] is similar to `Iter[Future[T]]` with +## addition of methods specific to asynchronous processing. ## +## Public interface: +## +## Attributes +## - next - allows to set a custom function to be called when the next item is requested +## +## Operations: +## - new - to create a new async iterator (AsyncIter) +## - finish - to finish the async iterator +## - finished - to check if the async iterator is finished +## - next - to get the next item from the async iterator +## - items - to iterate over the async iterator +## - pairs - to iterate over the async iterator and return the index of each item +## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures +## - mapAsync - to convert a regular sync iterator (Iter) to an async iterator (AsyncIter) +## - map - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter) +## - mapFilter - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter) and apply filtering at the same time +## - filter - to filter an async iterator (AsyncIter) and return another async iterator (AsyncIter) +## - delayBy - to delay each item returned by async iterator by a given duration +## - empty - to create an empty async iterator (AsyncIter) -type AsyncIter*[T] = ref object - finished: bool - next*: GenNext[Future[T]] +type + AsyncIterFunc[T, U] = + proc(fut: T): Future[U] {.async.} + AsyncIterIsFinished = proc(): bool {.raises: [], gcsafe.} + AsyncIterGenNext[T] = + proc(): Future[T] {.async.} -proc finish*[T](self: AsyncIter[T]): void = - self.finished = true + AsyncIter*[T] = ref object + finished: bool + next*: AsyncIterGenNext[T] -proc finished*[T](self: AsyncIter[T]): bool = - self.finished - -iterator items*[T](self: AsyncIter[T]): Future[T] = - while not self.finished: - yield self.next() - -iterator pairs*[T](self: AsyncIter[T]): tuple[key: int, val: Future[T]] {.inline.} = - var i = 0 - while not self.finished: - yield (i, self.next()) - inc(i) - -proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} = - let t = await fut - fn(t) - -proc flatMap*[T, U](fut: Future[T], fn: Function[T, Future[U]]): Future[U] {.async.} = +proc flatMap[T, U](fut: Future[T], fn: AsyncIterFunc[T, U]): Future[U] {.async.} = let t = await fut await fn(t) +######################################################################## +## AsyncIter public interface methods +######################################################################## + proc new*[T]( _: type AsyncIter[T], - genNext: GenNext[Future[T]], - isFinished: IsFinished, + genNext: AsyncIterGenNext[T], + isFinished: AsyncIterIsFinished, finishOnErr: bool = true, ): AsyncIter[T] = ## Creates a new Iter using elements returned by supplier function `genNext`. @@ -77,8 +97,8 @@ proc new*[T]( iter.next = next return iter -proc mapAsync*[T, U](iter: Iter[T], fn: Function[T, Future[U]]): AsyncIter[U] = - AsyncIter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished()) +# forward declaration +proc mapAsync*[T, U](iter: Iter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] proc new*[U, V: Ordinal](_: type AsyncIter[U], slice: HSlice[U, V]): AsyncIter[U] = ## Creates new Iter from a slice @@ -104,25 +124,36 @@ proc new*[U, V, S: Ordinal]( i, ) -proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] = - ## Creates an empty AsyncIter - ## +proc finish*[T](self: AsyncIter[T]): void = + self.finished = true - proc genNext(): Future[T] {.raises: [CatchableError].} = - raise newException(CatchableError, "Next item requested from an empty AsyncIter") +proc finished*[T](self: AsyncIter[T]): bool = + self.finished - proc isFinished(): bool = - true +iterator items*[T](self: AsyncIter[T]): Future[T] = + while not self.finished: + yield self.next() - AsyncIter[T].new(genNext, isFinished) +iterator pairs*[T](self: AsyncIter[T]): tuple[key: int, val: Future[T]] {.inline.} = + var i = 0 + while not self.finished: + yield (i, self.next()) + inc(i) -proc map*[T, U](iter: AsyncIter[T], fn: Function[T, Future[U]]): AsyncIter[U] = +proc mapFuture*[T, U](fut: Future[T], fn: AsyncIterFunc[T, U]): Future[U] {.async.} = + let t = await fut + fn(t) + +proc mapAsync*[T, U](iter: Iter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] = + AsyncIter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished()) + +proc map*[T, U](iter: AsyncIter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] = AsyncIter[U].new( genNext = () => iter.next().flatMap(fn), isFinished = () => iter.finished ) proc mapFilter*[T, U]( - iter: AsyncIter[T], mapPredicate: Function[T, Future[Option[U]]] + iter: AsyncIter[T], mapPredicate: AsyncIterFunc[T, Option[U]] ): Future[AsyncIter[U]] {.async: (raises: [CancelledError]).} = var nextFutU: Option[Future[U]] @@ -156,7 +187,7 @@ proc mapFilter*[T, U]( AsyncIter[U].new(genNext, isFinished) proc filter*[T]( - iter: AsyncIter[T], predicate: Function[T, Future[bool]] + iter: AsyncIter[T], predicate: AsyncIterFunc[T, bool] ): Future[AsyncIter[T]] {.async: (raises: [CancelledError]).} = proc wrappedPredicate(t: T): Future[Option[T]] {.async.} = if await predicate(t): @@ -176,3 +207,15 @@ proc delayBy*[T](iter: AsyncIter[T], d: Duration): AsyncIter[T] = await sleepAsync(d) t, ) + +proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] = + ## Creates an empty AsyncIter + ## + + proc genNext(): Future[T] {.async.} = + raise newException(CatchableError, "Next item requested from an empty AsyncIter") + + proc isFinished(): bool = + true + + AsyncIter[T].new(genNext, isFinished) \ No newline at end of file diff --git a/codex/utils/iter.nim b/codex/utils/iter.nim index 9afd6c12..56c0f206 100644 --- a/codex/utils/iter.nim +++ b/codex/utils/iter.nim @@ -1,37 +1,54 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + import std/sugar import pkg/questionable import pkg/questionable/results +## Public interface: +## +## Attributes +## - next - allows to set a custom function to be called when the next item is requested +## +## Operations: +## - new - to create a new iterator (Iter) +## - finish - to finish the iterator +## - finished - to check if the iterator is finished +## - next - to get the next item from the iterator +## - items - to iterate over the iterator +## - pairs - to iterate over the iterator and return the index of each item +## - map - to convert one iterator (Iter) to another iterator (Iter) +## - mapFilter - to convert one iterator (Iter) to another iterator (Iter) and apply filtering at the same time +## - filter - to filter an iterator (Iter) and return another iterator (Iter) +## - empty - to create an empty async iterator (AsyncIter) + type - Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, closure.} - IsFinished* = proc(): bool {.raises: [], gcsafe, closure.} - GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.} + IterFunction[T, U] = proc(value: T): U {.raises: [CatchableError], gcsafe.} + IterIsFinished = proc(): bool {.raises: [], gcsafe.} + IterGenNext[T] = proc(): T {.raises: [CatchableError], gcsafe.} Iterator[T] = iterator (): T + Iter*[T] = ref object finished: bool - next*: GenNext[T] + next*: IterGenNext[T] -proc finish*[T](self: Iter[T]): void = - self.finished = true - -proc finished*[T](self: Iter[T]): bool = - self.finished - -iterator items*[T](self: Iter[T]): T = - while not self.finished: - yield self.next() - -iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} = - var i = 0 - while not self.finished: - yield (i, self.next()) - inc(i) +######################################################################## +## Iter public interface methods +######################################################################## proc new*[T]( _: type Iter[T], - genNext: GenNext[T], - isFinished: IsFinished, + genNext: IterGenNext[T], + isFinished: IterIsFinished, finishOnErr: bool = true, ): Iter[T] = ## Creates a new Iter using elements returned by supplier function `genNext`. @@ -121,22 +138,26 @@ proc new*[T](_: type Iter[T], iter: Iterator[T]): Iter[T] = tryNext() Iter[T].new(genNext, isFinished) -proc empty*[T](_: type Iter[T]): Iter[T] = - ## Creates an empty Iter - ## +proc finish*[T](self: Iter[T]): void = + self.finished = true - proc genNext(): T {.raises: [CatchableError].} = - raise newException(CatchableError, "Next item requested from an empty Iter") +proc finished*[T](self: Iter[T]): bool = + self.finished - proc isFinished(): bool = - true +iterator items*[T](self: Iter[T]): T = + while not self.finished: + yield self.next() - Iter[T].new(genNext, isFinished) +iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} = + var i = 0 + while not self.finished: + yield (i, self.next()) + inc(i) -proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = +proc map*[T, U](iter: Iter[T], fn: IterFunction[T, U]): Iter[U] = Iter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished) -proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter[U] = +proc mapFilter*[T, U](iter: Iter[T], mapPredicate: IterFunction[T, Option[U]]): Iter[U] = var nextUOrErr: Option[?!U] proc tryFetch(): void = @@ -167,7 +188,7 @@ proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter tryFetch() Iter[U].new(genNext, isFinished) -proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = +proc filter*[T](iter: Iter[T], predicate: IterFunction[T, bool]): Iter[T] = proc wrappedPredicate(t: T): Option[T] = if predicate(t): some(t) @@ -175,3 +196,15 @@ proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = T.none mapFilter[T, T](iter, wrappedPredicate) + +proc empty*[T](_: type Iter[T]): Iter[T] = + ## Creates an empty Iter + ## + + proc genNext(): T {.raises: [CatchableError].} = + raise newException(CatchableError, "Next item requested from an empty Iter") + + proc isFinished(): bool = + true + + Iter[T].new(genNext, isFinished) \ No newline at end of file