2025-05-21 23:17:04 +02:00
## Nim-Codex
## Copyright (c) 2025 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{. push raises : [ ] . }
import std / sugar
import pkg / questionable
import pkg / questionable / results
import pkg / chronos
import . / iter
2025-06-24 15:24:55 +02:00
## AsyncResultIter[T] is similar to `AsyncIterator[Future[T]]`
2025-05-21 23:17:04 +02:00
## but does not throw exceptions others than CancelledError.
2025-06-24 12:58:01 +02:00
##
## Instead of throwing exception, it uses Result to communicate errors (
2025-06-24 15:24:55 +02:00
## thus the name AsyncResultIter).
2025-05-21 23:17:04 +02:00
##
## Public interface:
##
## Attributes
## - next - allows to set a custom function to be called when the next item is requested
##
## Operations:
2025-06-24 15:24:55 +02:00
## - new - to create a new async iterator (AsyncResultIter)
2025-05-21 23:17:04 +02:00
## - finish - to finish the async iterator
## - finished - to check if the async iterator is finished
## - next - to get the next item from the async iterator
## - items - to iterate over the async iterator
## - pairs - to iterate over the async iterator and return the index of each item
2025-06-17 02:27:48 +02:00
## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures
2025-06-24 15:24:55 +02:00
## - mapAsync - to convert a regular sync iterator (Iter) to an async iterator (AsyncResultIter)
## - map - to convert one async iterator (AsyncResultIter) to another async iterator (AsyncResultIter)
## - mapFilter - to convert one async iterator (AsyncResultIter) to another async iterator (AsyncResultIter) and apply filtering at the same time
## - filter - to filter an async iterator (AsyncResultIter) and return another async iterator (AsyncResultIter)
## - delayBy - to delay each item returned by async iterator by a given duration
## - empty - to create an empty async iterator (AsyncResultIter)
2025-05-21 23:17:04 +02:00
type
2025-06-24 15:24:55 +02:00
AsyncResultIterFunc [ T , U ] =
proc ( fut : T ) : Future [ U ] {. async : ( raises : [ CancelledError ] ) . }
AsyncResultIterIsFinished = proc ( ) : bool {. raises : [ ] , gcsafe . }
AsyncResultIterGenNext [ T ] = proc ( ) : Future [ T ] {. async : ( raises : [ CancelledError ] ) . }
2025-05-21 23:17:04 +02:00
2025-06-24 15:24:55 +02:00
AsyncResultIter * [ T ] = ref object
2025-05-21 23:17:04 +02:00
finished : bool
2025-06-24 15:24:55 +02:00
next * : AsyncResultIterGenNext [ ? ! T ]
2025-05-21 23:17:04 +02:00
proc flatMap [ T , U ] (
2025-06-24 15:24:55 +02:00
fut : auto , fn : AsyncResultIterFunc [ ? ! T , ? ! U ]
2025-05-21 23:17:04 +02:00
) : Future [ ? ! U ] {. async : ( raises : [ CancelledError ] ) . } =
let t = await fut
await fn ( t )
proc flatMap [ T , U ] (
2025-06-24 15:24:55 +02:00
fut : auto , fn : AsyncResultIterFunc [ ? ! T , Option [ ? ! U ] ]
2025-05-21 23:17:04 +02:00
) : Future [ Option [ ? ! U ] ] {. async : ( raises : [ CancelledError ] ) . } =
let t = await fut
await fn ( t )
########################################################################
2025-06-24 15:24:55 +02:00
## AsyncResultIter public interface methods
2025-05-21 23:17:04 +02:00
########################################################################
proc new * [ T ] (
2025-06-24 15:24:55 +02:00
_ : type AsyncResultIter [ T ] ,
genNext : AsyncResultIterGenNext [ ? ! T ] ,
isFinished : AsyncResultIterIsFinished ,
2025-05-21 23:17:04 +02:00
finishOnErr : bool = true ,
2025-06-24 15:24:55 +02:00
) : AsyncResultIter [ T ] =
2025-05-21 23:17:04 +02:00
## Creates a new Iter using elements returned by supplier function `genNext`.
## Iter is finished whenever `isFinished` returns true.
##
2025-06-24 15:24:55 +02:00
var iter = AsyncResultIter [ T ] ( )
2025-05-21 23:17:04 +02:00
proc next ( ) : Future [ ? ! T ] {. async : ( raises : [ CancelledError ] ) . } =
try :
if not iter . finished :
let item = await genNext ( )
if finishOnErr and err = ? item . errorOption :
iter . finished = true
return failure ( err )
if isFinished ( ) :
iter . finished = true
return item
else :
2025-06-24 15:24:55 +02:00
return failure ( " AsyncResultIter is finished but next item was requested " )
2025-05-21 23:17:04 +02:00
except CancelledError as err :
iter . finished = true
raise err
if isFinished ( ) :
iter . finished = true
iter . next = next
return iter
# forward declaration
proc mapAsync * [ T , U ] (
2025-06-24 15:24:55 +02:00
iter : Iter [ T ] , fn : AsyncResultIterFunc [ T , ? ! U ] , finishOnErr : bool = true
) : AsyncResultIter [ U ]
2025-05-21 23:17:04 +02:00
proc new * [ U , V : Ordinal ] (
2025-06-24 15:24:55 +02:00
_ : type AsyncResultIter [ U ] , slice : HSlice [ U , V ] , finishOnErr : bool = true
) : AsyncResultIter [ U ] =
2025-05-21 23:17:04 +02:00
## Creates new Iter from a slice
##
let iter = Iter [ U ] . new ( slice )
mapAsync [ U , U ] (
iter ,
proc ( i : U ) : Future [ ? ! U ] {. async : ( raises : [ CancelledError ] ) . } =
success [ U ] ( i ) ,
finishOnErr = finishOnErr ,
)
proc new * [ U , V , S : Ordinal ] (
2025-06-24 15:24:55 +02:00
_ : type AsyncResultIter [ U ] , a : U , b : V , step : S = 1 , finishOnErr : bool = true
) : AsyncResultIter [ U ] =
2025-05-21 23:17:04 +02:00
## Creates new Iter in range a..b with specified step (default 1)
##
let iter = Iter [ U ] . new ( a , b , step )
mapAsync [ U , U ] (
iter ,
proc ( i : U ) : Future [ ? ! U ] {. async : ( raises : [ CancelledError ] ) . } =
U . success ( i ) ,
finishOnErr = finishOnErr ,
)
2025-06-24 15:24:55 +02:00
proc finish * [ T ] ( self : AsyncResultIter [ T ] ) : void =
2025-05-21 23:17:04 +02:00
self . finished = true
2025-06-24 15:24:55 +02:00
proc finished * [ T ] ( self : AsyncResultIter [ T ] ) : bool =
2025-05-21 23:17:04 +02:00
self . finished
2025-06-24 15:24:55 +02:00
iterator items * [ T ] ( self : AsyncResultIter [ T ] ) : auto {. inline . } =
2025-05-21 23:17:04 +02:00
while not self . finished :
yield self . next ( )
2025-06-24 15:24:55 +02:00
iterator pairs * [ T ] ( self : AsyncResultIter [ T ] ) : auto {. inline . } =
2025-05-21 23:17:04 +02:00
var i = 0
while not self . finished :
yield ( i , self . next ( ) )
inc ( i )
2025-06-17 02:27:48 +02:00
proc mapFuture * [ T , U ] (
2025-06-24 15:24:55 +02:00
fut : auto , fn : AsyncResultIterFunc [ T , U ]
2025-06-17 02:27:48 +02:00
) : Future [ U ] {. async : ( raises : [ CancelledError ] ) . } =
let t = await fut
await fn ( t )
2025-05-21 23:17:04 +02:00
proc mapAsync * [ T , U ] (
2025-06-24 15:24:55 +02:00
iter : Iter [ T ] , fn : AsyncResultIterFunc [ T , ? ! U ] , finishOnErr : bool = true
) : AsyncResultIter [ U ] =
AsyncResultIter [ U ] . new (
2025-05-21 23:17:04 +02:00
genNext = ( ) = > fn ( iter . next ( ) ) ,
isFinished = ( ) = > iter . finished ( ) ,
finishOnErr = finishOnErr ,
)
proc map * [ T , U ] (
2025-06-24 15:24:55 +02:00
iter : AsyncResultIter [ T ] ,
fn : AsyncResultIterFunc [ ? ! T , ? ! U ] ,
2025-06-24 12:58:01 +02:00
finishOnErr : bool = true ,
2025-06-24 15:24:55 +02:00
) : AsyncResultIter [ U ] =
AsyncResultIter [ U ] . new (
2025-05-21 23:17:04 +02:00
genNext = ( ) = > iter . next ( ) . flatMap ( fn ) ,
isFinished = ( ) = > iter . finished ,
finishOnErr = finishOnErr ,
)
proc mapFilter * [ T , U ] (
2025-06-24 15:24:55 +02:00
iter : AsyncResultIter [ T ] ,
mapPredicate : AsyncResultIterFunc [ ? ! T , Option [ ? ! U ] ] ,
2025-05-21 23:17:04 +02:00
finishOnErr : bool = true ,
2025-06-24 15:24:55 +02:00
) : Future [ AsyncResultIter [ U ] ] {. async : ( raises : [ CancelledError ] ) . } =
2025-05-21 23:17:04 +02:00
var nextU : Option [ ? ! U ]
proc filter ( ) : Future [ void ] {. async : ( raises : [ CancelledError ] ) . } =
nextU = none ( ? ! U )
while not iter . finished :
let futT = iter . next ( )
if mappedValue = ? await futT . flatMap ( mapPredicate ) :
nextU = some ( mappedValue )
break
proc genNext ( ) : Future [ ? ! U ] {. async : ( raises : [ CancelledError ] ) . } =
let u = nextU . unsafeGet
await filter ( )
u
proc isFinished ( ) : bool =
nextU . isNone
await filter ( )
2025-06-24 15:24:55 +02:00
AsyncResultIter [ U ] . new ( genNext , isFinished , finishOnErr = finishOnErr )
2025-05-21 23:17:04 +02:00
proc filter * [ T ] (
2025-06-24 15:24:55 +02:00
iter : AsyncResultIter [ T ] ,
predicate : AsyncResultIterFunc [ ? ! T , bool ] ,
2025-06-24 12:58:01 +02:00
finishOnErr : bool = true ,
2025-06-24 15:24:55 +02:00
) : Future [ AsyncResultIter [ T ] ] {. async : ( raises : [ CancelledError ] ) . } =
2025-05-21 23:17:04 +02:00
proc wrappedPredicate (
t : ? ! T
) : Future [ Option [ ? ! T ] ] {. async : ( raises : [ CancelledError ] ) . } =
if await predicate ( t ) :
some ( t )
else :
none ( ? ! T )
await mapFilter [ T , T ] ( iter , wrappedPredicate , finishOnErr = finishOnErr )
proc delayBy * [ T ] (
2025-06-24 15:24:55 +02:00
iter : AsyncResultIter [ T ] , d : Duration , finishOnErr : bool = true
) : AsyncResultIter [ T ] =
2025-05-21 23:17:04 +02:00
## Delays emitting each item by given duration
##
map [ T , T ] (
iter ,
proc ( t : ? ! T ) : Future [ ? ! T ] {. async : ( raises : [ CancelledError ] ) . } =
await sleepAsync ( d )
return t ,
finishOnErr = finishOnErr ,
)
2025-06-24 15:24:55 +02:00
proc empty * [ T ] ( _ : type AsyncResultIter [ T ] ) : AsyncResultIter [ T ] =
## Creates an empty AsyncResultIter
2025-05-21 23:17:04 +02:00
##
proc genNext ( ) : Future [ ? ! T ] {. async : ( raises : [ CancelledError ] ) . } =
2025-06-24 15:24:55 +02:00
T . failure ( " Next item requested from an empty AsyncResultIter " )
2025-05-21 23:17:04 +02:00
proc isFinished ( ) : bool =
true
2025-06-24 15:24:55 +02:00
AsyncResultIter [ T ] . new ( genNext , isFinished )