166 lines
4.3 KiB
Nim
166 lines
4.3 KiB
Nim
import std/sugar
|
|
|
|
import pkg/questionable
|
|
import pkg/chronos
|
|
|
|
import ./iter
|
|
|
|
export iter
|
|
|
|
## AsyncIter[T] is similar to `Iter[Future[T]]` with addition of methods specific to asynchronous processing
|
|
##
|
|
|
|
type
|
|
AsyncIter*[T] = ref object
|
|
finished: bool
|
|
next*: GenNext[Future[T]]
|
|
|
|
proc finish*[T](self: AsyncIter[T]): void =
|
|
self.finished = true
|
|
|
|
proc finished*[T](self: AsyncIter[T]): bool =
|
|
self.finished
|
|
|
|
iterator items*[T](self: AsyncIter[T]): Future[T] =
|
|
while not self.finished:
|
|
yield self.next()
|
|
|
|
iterator pairs*[T](self: AsyncIter[T]): tuple[key: int, val: Future[T]] {.inline.} =
|
|
var i = 0
|
|
while not self.finished:
|
|
yield (i, self.next())
|
|
inc(i)
|
|
|
|
proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
|
|
let t = await fut
|
|
fn(t)
|
|
|
|
proc flatMap*[T, U](fut: Future[T], fn: Function[T, Future[U]]): Future[U] {.async.} =
|
|
let t = await fut
|
|
await fn(t)
|
|
|
|
proc new*[T](_: type AsyncIter[T], genNext: GenNext[Future[T]], isFinished: IsFinished, finishOnErr: bool = true): AsyncIter[T] =
|
|
## Creates a new Iter using elements returned by supplier function `genNext`.
|
|
## Iter is finished whenever `isFinished` returns true.
|
|
##
|
|
|
|
var iter = AsyncIter[T]()
|
|
|
|
proc next(): Future[T] {.async.} =
|
|
if not iter.finished:
|
|
var item: T
|
|
try:
|
|
item = await genNext()
|
|
except CancelledError as err:
|
|
iter.finish
|
|
raise err
|
|
except CatchableError as err:
|
|
if finishOnErr or isFinished():
|
|
iter.finish
|
|
raise err
|
|
|
|
if isFinished():
|
|
iter.finish
|
|
return item
|
|
else:
|
|
raise newException(CatchableError, "AsyncIter is finished but next item was requested")
|
|
|
|
if isFinished():
|
|
iter.finish
|
|
|
|
iter.next = next
|
|
return iter
|
|
|
|
proc mapAsync*[T, U](iter: Iter[T], fn: Function[T, Future[U]]): AsyncIter[U] =
|
|
AsyncIter[U].new(
|
|
genNext = () => fn(iter.next()),
|
|
isFinished = () => iter.finished()
|
|
)
|
|
|
|
proc new*[U, V: Ordinal](_: type AsyncIter[U], slice: HSlice[U, V]): AsyncIter[U] =
|
|
## Creates new Iter from a slice
|
|
##
|
|
|
|
let iter = Iter[U].new(slice)
|
|
mapAsync[U, U](iter,
|
|
proc (i: U): Future[U] {.async.} =
|
|
i
|
|
)
|
|
|
|
proc new*[U, V, S: Ordinal](_: type AsyncIter[U], a: U, b: V, step: S = 1): AsyncIter[U] =
|
|
## Creates new Iter in range a..b with specified step (default 1)
|
|
##
|
|
|
|
let iter = Iter[U].new(a, b, step)
|
|
mapAsync[U, U](iter,
|
|
proc (i: U): Future[U] {.async.} =
|
|
i
|
|
)
|
|
|
|
proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] =
|
|
## Creates an empty AsyncIter
|
|
##
|
|
|
|
proc genNext(): Future[T] {.raises: [CatchableError].} =
|
|
raise newException(CatchableError, "Next item requested from an empty AsyncIter")
|
|
proc isFinished(): bool = true
|
|
|
|
AsyncIter[T].new(genNext, isFinished)
|
|
|
|
proc map*[T, U](iter: AsyncIter[T], fn: Function[T, Future[U]]): AsyncIter[U] =
|
|
AsyncIter[U].new(
|
|
genNext = () => iter.next().flatMap(fn),
|
|
isFinished = () => iter.finished
|
|
)
|
|
|
|
proc mapFilter*[T, U](iter: AsyncIter[T], mapPredicate: Function[T, Future[Option[U]]]): Future[AsyncIter[U]] {.async.} =
|
|
var nextFutU: Option[Future[U]]
|
|
|
|
proc tryFetch(): Future[void] {.async.} =
|
|
nextFutU = Future[U].none
|
|
while not iter.finished:
|
|
let futT = iter.next()
|
|
try:
|
|
if u =? await futT.flatMap(mapPredicate):
|
|
let futU = newFuture[U]("AsyncIter.mapFilterAsync")
|
|
futU.complete(u)
|
|
nextFutU = some(futU)
|
|
break
|
|
except CancelledError as err:
|
|
raise err
|
|
except CatchableError as err:
|
|
let errFut = newFuture[U]("AsyncIter.mapFilterAsync")
|
|
errFut.fail(err)
|
|
nextFutU = some(errFut)
|
|
break
|
|
|
|
proc genNext(): Future[U] {.async.} =
|
|
let futU = nextFutU.unsafeGet
|
|
await tryFetch()
|
|
await futU
|
|
|
|
proc isFinished(): bool =
|
|
nextFutU.isNone
|
|
|
|
await tryFetch()
|
|
AsyncIter[U].new(genNext, isFinished)
|
|
|
|
proc filter*[T](iter: AsyncIter[T], predicate: Function[T, Future[bool]]): Future[AsyncIter[T]] {.async.} =
|
|
proc wrappedPredicate(t: T): Future[Option[T]] {.async.} =
|
|
if await predicate(t):
|
|
some(t)
|
|
else:
|
|
T.none
|
|
|
|
await mapFilter[T, T](iter, wrappedPredicate)
|
|
|
|
proc delayBy*[T](iter: AsyncIter[T], d: Duration): AsyncIter[T] =
|
|
## Delays emitting each item by given duration
|
|
##
|
|
|
|
map[T, T](iter,
|
|
proc (t: T): Future[T] {.async.} =
|
|
await sleepAsync(d)
|
|
t
|
|
)
|