2025-06-24 15:16:44 +02:00
## Nim-Codex
## Copyright (c) 2025 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{. push raises : [ ] . }
2023-11-14 13:02:17 +01:00
import std / sugar
import pkg / questionable
import pkg / chronos
2024-06-11 00:47:29 +02:00
import . / iter
2025-06-24 15:16:44 +02:00
## AsyncIter[T] is similar to `Iter[Future[T]]` with
## addition of methods specific to asynchronous processing.
2024-06-11 00:47:29 +02:00
##
2025-06-24 15:16:44 +02:00
## Public interface:
##
## Attributes
## - next - allows to set a custom function to be called when the next item is requested
##
## Operations:
## - new - to create a new async iterator (AsyncIter)
## - finish - to finish the async iterator
## - finished - to check if the async iterator is finished
## - next - to get the next item from the async iterator
## - items - to iterate over the async iterator
## - pairs - to iterate over the async iterator and return the index of each item
## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures
## - mapAsync - to convert a regular sync iterator (Iter) to an async iterator (AsyncIter)
## - map - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter)
## - mapFilter - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter) and apply filtering at the same time
## - filter - to filter an async iterator (AsyncIter) and return another async iterator (AsyncIter)
## - delayBy - to delay each item returned by async iterator by a given duration
## - empty - to create an empty async iterator (AsyncIter)
type
2025-06-24 15:24:55 +02:00
AsyncIterFunc [ T , U ] = proc ( fut : T ) : Future [ U ] {. async . }
2025-06-24 15:16:44 +02:00
AsyncIterIsFinished = proc ( ) : bool {. raises : [ ] , gcsafe . }
2025-06-24 15:24:55 +02:00
AsyncIterGenNext [ T ] = proc ( ) : Future [ T ] {. async . }
2025-06-24 15:16:44 +02:00
AsyncIter * [ T ] = ref object
finished : bool
next * : AsyncIterGenNext [ T ]
proc flatMap [ T , U ] ( fut : Future [ T ] , fn : AsyncIterFunc [ T , U ] ) : Future [ U ] {. async . } =
2024-06-11 00:47:29 +02:00
let t = await fut
await fn ( t )
2025-06-24 15:16:44 +02:00
########################################################################
## AsyncIter public interface methods
########################################################################
2024-06-11 00:47:29 +02:00
proc new * [ T ] (
_ : type AsyncIter [ T ] ,
2025-06-24 15:16:44 +02:00
genNext : AsyncIterGenNext [ T ] ,
isFinished : AsyncIterIsFinished ,
2024-06-11 00:47:29 +02:00
finishOnErr : bool = true ,
) : AsyncIter [ T ] =
## Creates a new Iter using elements returned by supplier function `genNext`.
## Iter is finished whenever `isFinished` returns true.
##
var iter = AsyncIter [ T ] ( )
2023-11-14 13:02:17 +01:00
2024-06-11 00:47:29 +02:00
proc next ( ) : Future [ T ] {. async . } =
2023-11-14 13:02:17 +01:00
if not iter . finished :
var item : T
try :
2024-06-11 00:47:29 +02:00
item = await genNext ( )
except CancelledError as err :
iter . finish
raise err
2023-11-14 13:02:17 +01:00
except CatchableError as err :
if finishOnErr or isFinished ( ) :
iter . finish
raise err
if isFinished ( ) :
iter . finish
return item
else :
2024-06-11 00:47:29 +02:00
raise newException (
CatchableError , " AsyncIter is finished but next item was requested "
)
2023-11-14 13:02:17 +01:00
if isFinished ( ) :
iter . finish
2023-11-14 11:52:27 -06:00
2023-11-14 13:02:17 +01:00
iter . next = next
return iter
2025-06-24 15:16:44 +02:00
# forward declaration
proc mapAsync * [ T , U ] ( iter : Iter [ T ] , fn : AsyncIterFunc [ T , U ] ) : AsyncIter [ U ]
2023-11-14 13:02:17 +01:00
2024-06-11 00:47:29 +02:00
proc new * [ U , V : Ordinal ] ( _ : type AsyncIter [ U ] , slice : HSlice [ U , V ] ) : AsyncIter [ U ] =
## Creates new Iter from a slice
2023-11-14 13:02:17 +01:00
##
2024-06-11 00:47:29 +02:00
let iter = Iter [ U ] . new ( slice )
mapAsync [ U , U ] (
iter ,
proc ( i : U ) : Future [ U ] {. async . } =
i ,
)
2023-11-14 13:02:17 +01:00
2024-06-11 00:47:29 +02:00
proc new * [ U , V , S : Ordinal ] (
_ : type AsyncIter [ U ] , a : U , b : V , step : S = 1
) : AsyncIter [ U ] =
## Creates new Iter in range a..b with specified step (default 1)
2023-11-14 13:02:17 +01:00
##
2024-06-11 00:47:29 +02:00
let iter = Iter [ U ] . new ( a , b , step )
mapAsync [ U , U ] (
iter ,
proc ( i : U ) : Future [ U ] {. async . } =
i ,
)
2023-11-14 13:02:17 +01:00
2025-06-24 15:16:44 +02:00
proc finish * [ T ] ( self : AsyncIter [ T ] ) : void =
self . finished = true
2023-11-14 11:52:27 -06:00
2025-06-24 15:16:44 +02:00
proc finished * [ T ] ( self : AsyncIter [ T ] ) : bool =
self . finished
2025-01-21 21:54:46 +01:00
2025-06-24 15:16:44 +02:00
iterator items * [ T ] ( self : AsyncIter [ T ] ) : Future [ T ] =
while not self . finished :
yield self . next ( )
iterator pairs * [ T ] ( self : AsyncIter [ T ] ) : tuple [ key : int , val : Future [ T ] ] {. inline . } =
var i = 0
while not self . finished :
yield ( i , self . next ( ) )
inc ( i )
proc mapFuture * [ T , U ] ( fut : Future [ T ] , fn : AsyncIterFunc [ T , U ] ) : Future [ U ] {. async . } =
let t = await fut
fn ( t )
2023-11-14 13:02:17 +01:00
2025-06-24 15:16:44 +02:00
proc mapAsync * [ T , U ] ( iter : Iter [ T ] , fn : AsyncIterFunc [ T , U ] ) : AsyncIter [ U ] =
AsyncIter [ U ] . new ( genNext = ( ) = > fn ( iter . next ( ) ) , isFinished = ( ) = > iter . finished ( ) )
2023-11-14 13:02:17 +01:00
2025-06-24 15:16:44 +02:00
proc map * [ T , U ] ( iter : AsyncIter [ T ] , fn : AsyncIterFunc [ T , U ] ) : AsyncIter [ U ] =
2024-06-11 00:47:29 +02:00
AsyncIter [ U ] . new (
genNext = ( ) = > iter . next ( ) . flatMap ( fn ) , isFinished = ( ) = > iter . finished
2023-11-14 13:02:17 +01:00
)
2024-06-11 00:47:29 +02:00
proc mapFilter * [ T , U ] (
2025-06-24 15:16:44 +02:00
iter : AsyncIter [ T ] , mapPredicate : AsyncIterFunc [ T , Option [ U ] ]
2025-05-21 23:17:04 +02:00
) : Future [ AsyncIter [ U ] ] {. async : ( raises : [ CancelledError ] ) . } =
2024-06-11 00:47:29 +02:00
var nextFutU : Option [ Future [ U ] ]
2023-11-14 13:02:17 +01:00
2025-05-21 23:17:04 +02:00
proc tryFetch ( ) : Future [ void ] {. async : ( raises : [ CancelledError ] ) . } =
2024-06-11 00:47:29 +02:00
nextFutU = Future [ U ] . none
2023-11-14 13:02:17 +01:00
while not iter . finished :
2024-06-11 00:47:29 +02:00
let futT = iter . next ( )
try :
if u = ? await futT . flatMap ( mapPredicate ) :
let futU = newFuture [ U ] ( " AsyncIter.mapFilterAsync " )
futU . complete ( u )
nextFutU = some ( futU )
break
except CancelledError as err :
raise err
except CatchableError as err :
let errFut = newFuture [ U ] ( " AsyncIter.mapFilterAsync " )
errFut . fail ( err )
nextFutU = some ( errFut )
2023-11-14 13:02:17 +01:00
break
2024-06-11 00:47:29 +02:00
proc genNext ( ) : Future [ U ] {. async . } =
let futU = nextFutU . unsafeGet
await tryFetch ( )
await futU
2023-11-14 13:02:17 +01:00
proc isFinished ( ) : bool =
2024-06-11 00:47:29 +02:00
nextFutU . isNone
2023-11-14 13:02:17 +01:00
2024-06-11 00:47:29 +02:00
await tryFetch ( )
AsyncIter [ U ] . new ( genNext , isFinished )
2023-11-14 13:02:17 +01:00
2024-06-11 00:47:29 +02:00
proc filter * [ T ] (
2025-06-24 15:16:44 +02:00
iter : AsyncIter [ T ] , predicate : AsyncIterFunc [ T , bool ]
2025-05-21 23:17:04 +02:00
) : Future [ AsyncIter [ T ] ] {. async : ( raises : [ CancelledError ] ) . } =
2024-06-11 00:47:29 +02:00
proc wrappedPredicate ( t : T ) : Future [ Option [ T ] ] {. async . } =
if await predicate ( t ) :
some ( t )
2023-11-14 13:02:17 +01:00
else :
2024-06-11 00:47:29 +02:00
T . none
2023-11-14 13:02:17 +01:00
2024-06-11 00:47:29 +02:00
await mapFilter [ T , T ] ( iter , wrappedPredicate )
2023-11-14 13:02:17 +01:00
2024-06-11 00:47:29 +02:00
proc delayBy * [ T ] ( iter : AsyncIter [ T ] , d : Duration ) : AsyncIter [ T ] =
## Delays emitting each item by given duration
##
2023-11-14 13:02:17 +01:00
2024-06-11 00:47:29 +02:00
map [ T , T ] (
iter ,
proc ( t : T ) : Future [ T ] {. async . } =
await sleepAsync ( d )
t ,
)
2025-06-24 15:16:44 +02:00
proc empty * [ T ] ( _ : type AsyncIter [ T ] ) : AsyncIter [ T ] =
## Creates an empty AsyncIter
##
proc genNext ( ) : Future [ T ] {. async . } =
raise newException ( CatchableError , " Next item requested from an empty AsyncIter " )
proc isFinished ( ) : bool =
true
2025-06-24 15:24:55 +02:00
AsyncIter [ T ] . new ( genNext , isFinished )