2025-05-21 23:17:04 +02:00
## Nim-Codex
## Copyright (c) 2025 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{. push raises : [ ] . }
import std / sugar
import pkg / questionable
import pkg / questionable / results
import pkg / chronos
import . / iter
2025-06-24 12:58:01 +02:00
## AsyncResultIterator[T] is similar to `AsyncIterator[Future[T]]`
2025-05-21 23:17:04 +02:00
## but does not throw exceptions others than CancelledError.
2025-06-24 12:58:01 +02:00
##
## Instead of throwing exception, it uses Result to communicate errors (
## thus the name AsyncResultIterator).
2025-05-21 23:17:04 +02:00
##
## Public interface:
##
## Attributes
## - next - allows to set a custom function to be called when the next item is requested
##
## Operations:
2025-06-24 12:48:05 +02:00
## - new - to create a new async iterator (AsyncResultIterator)
2025-05-21 23:17:04 +02:00
## - finish - to finish the async iterator
## - finished - to check if the async iterator is finished
## - next - to get the next item from the async iterator
## - items - to iterate over the async iterator
## - pairs - to iterate over the async iterator and return the index of each item
2025-06-17 02:27:48 +02:00
## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures
2025-06-24 12:48:05 +02:00
## - mapAsync - to convert a regular sync iterator (Iter) to an async iter (AsyncResultIterator)
## - map - to convert one async iterator (AsyncResultIterator) to another async iter (AsyncResultIterator)
## - mapFilter - to convert one async iterator (AsyncResultIterator) to another async iter (AsyncResultIterator) and apply filtering at the same time
## - filter - to filter an async iterator (AsyncResultIterator) returning another async iterator (AsyncResultIterator)
2025-05-21 23:17:04 +02:00
## - delayBy - to delay each item returned by async iter by a given duration
2025-06-24 12:48:05 +02:00
## - empty - to create an empty async iterator (AsyncResultIterator)
2025-05-21 23:17:04 +02:00
type
2025-06-24 12:58:01 +02:00
AsyncResultIteratorFunc [ T , U ] =
2025-05-21 23:17:04 +02:00
proc ( fut : T ) : Future [ U ] {. async : ( raises : [ CancelledError ] ) , gcsafe , closure . }
2025-06-24 12:58:01 +02:00
AsyncResultIteratorIsFinished = proc ( ) : bool {. raises : [ ] , gcsafe , closure . }
AsyncResultIteratorGenNext [ T ] =
proc ( ) : Future [ T ] {. async : ( raises : [ CancelledError ] ) , gcsafe . }
2025-05-21 23:17:04 +02:00
2025-06-24 12:48:05 +02:00
AsyncResultIterator * [ T ] = ref object
2025-05-21 23:17:04 +02:00
finished : bool
2025-06-24 12:58:01 +02:00
next * : AsyncResultIteratorGenNext [ ? ! T ]
2025-05-21 23:17:04 +02:00
proc flatMap [ T , U ] (
2025-06-24 12:58:01 +02:00
fut : auto , fn : AsyncResultIteratorFunc [ ? ! T , ? ! U ]
2025-05-21 23:17:04 +02:00
) : Future [ ? ! U ] {. async : ( raises : [ CancelledError ] ) . } =
let t = await fut
await fn ( t )
proc flatMap [ T , U ] (
2025-06-24 12:58:01 +02:00
fut : auto , fn : AsyncResultIteratorFunc [ ? ! T , Option [ ? ! U ] ]
2025-05-21 23:17:04 +02:00
) : Future [ Option [ ? ! U ] ] {. async : ( raises : [ CancelledError ] ) . } =
let t = await fut
await fn ( t )
########################################################################
2025-06-24 12:48:05 +02:00
## AsyncResultIterator public interface methods
2025-05-21 23:17:04 +02:00
########################################################################
proc new * [ T ] (
2025-06-24 12:48:05 +02:00
_ : type AsyncResultIterator [ T ] ,
2025-06-24 12:58:01 +02:00
genNext : AsyncResultIteratorGenNext [ ? ! T ] ,
2025-05-21 23:17:04 +02:00
isFinished : IsFinished ,
finishOnErr : bool = true ,
2025-06-24 12:48:05 +02:00
) : AsyncResultIterator [ T ] =
2025-05-21 23:17:04 +02:00
## Creates a new Iter using elements returned by supplier function `genNext`.
## Iter is finished whenever `isFinished` returns true.
##
2025-06-24 12:48:05 +02:00
var iter = AsyncResultIterator [ T ] ( )
2025-05-21 23:17:04 +02:00
proc next ( ) : Future [ ? ! T ] {. async : ( raises : [ CancelledError ] ) . } =
try :
if not iter . finished :
let item = await genNext ( )
if finishOnErr and err = ? item . errorOption :
iter . finished = true
return failure ( err )
if isFinished ( ) :
iter . finished = true
return item
else :
2025-06-24 12:48:05 +02:00
return failure ( " AsyncResultIterator is finished but next item was requested " )
2025-05-21 23:17:04 +02:00
except CancelledError as err :
iter . finished = true
raise err
if isFinished ( ) :
iter . finished = true
iter . next = next
return iter
# forward declaration
proc mapAsync * [ T , U ] (
2025-06-24 12:58:01 +02:00
iter : Iter [ T ] , fn : AsyncResultIteratorFunc [ T , ? ! U ] , finishOnErr : bool = true
2025-06-24 12:48:05 +02:00
) : AsyncResultIterator [ U ]
2025-05-21 23:17:04 +02:00
proc new * [ U , V : Ordinal ] (
2025-06-24 12:48:05 +02:00
_ : type AsyncResultIterator [ U ] , slice : HSlice [ U , V ] , finishOnErr : bool = true
) : AsyncResultIterator [ U ] =
2025-05-21 23:17:04 +02:00
## Creates new Iter from a slice
##
let iter = Iter [ U ] . new ( slice )
mapAsync [ U , U ] (
iter ,
proc ( i : U ) : Future [ ? ! U ] {. async : ( raises : [ CancelledError ] ) . } =
success [ U ] ( i ) ,
finishOnErr = finishOnErr ,
)
proc new * [ U , V , S : Ordinal ] (
2025-06-24 12:48:05 +02:00
_ : type AsyncResultIterator [ U ] , a : U , b : V , step : S = 1 , finishOnErr : bool = true
) : AsyncResultIterator [ U ] =
2025-05-21 23:17:04 +02:00
## Creates new Iter in range a..b with specified step (default 1)
##
let iter = Iter [ U ] . new ( a , b , step )
mapAsync [ U , U ] (
iter ,
proc ( i : U ) : Future [ ? ! U ] {. async : ( raises : [ CancelledError ] ) . } =
U . success ( i ) ,
finishOnErr = finishOnErr ,
)
2025-06-24 12:48:05 +02:00
proc finish * [ T ] ( self : AsyncResultIterator [ T ] ) : void =
2025-05-21 23:17:04 +02:00
self . finished = true
2025-06-24 12:48:05 +02:00
proc finished * [ T ] ( self : AsyncResultIterator [ T ] ) : bool =
2025-05-21 23:17:04 +02:00
self . finished
2025-06-24 12:48:05 +02:00
iterator items * [ T ] ( self : AsyncResultIterator [ T ] ) : auto {. inline . } =
2025-05-21 23:17:04 +02:00
while not self . finished :
yield self . next ( )
2025-06-24 12:48:05 +02:00
iterator pairs * [ T ] ( self : AsyncResultIterator [ T ] ) : auto {. inline . } =
2025-05-21 23:17:04 +02:00
var i = 0
while not self . finished :
yield ( i , self . next ( ) )
inc ( i )
2025-06-17 02:27:48 +02:00
proc mapFuture * [ T , U ] (
2025-06-24 12:58:01 +02:00
fut : auto , fn : AsyncResultIteratorFunc [ T , U ]
2025-06-17 02:27:48 +02:00
) : Future [ U ] {. async : ( raises : [ CancelledError ] ) . } =
let t = await fut
await fn ( t )
2025-05-21 23:17:04 +02:00
proc mapAsync * [ T , U ] (
2025-06-24 12:58:01 +02:00
iter : Iter [ T ] , fn : AsyncResultIteratorFunc [ T , ? ! U ] , finishOnErr : bool = true
2025-06-24 12:48:05 +02:00
) : AsyncResultIterator [ U ] =
AsyncResultIterator [ U ] . new (
2025-05-21 23:17:04 +02:00
genNext = ( ) = > fn ( iter . next ( ) ) ,
isFinished = ( ) = > iter . finished ( ) ,
finishOnErr = finishOnErr ,
)
proc map * [ T , U ] (
2025-06-24 12:58:01 +02:00
iter : AsyncResultIterator [ T ] ,
fn : AsyncResultIteratorFunc [ ? ! T , ? ! U ] ,
finishOnErr : bool = true ,
2025-06-24 12:48:05 +02:00
) : AsyncResultIterator [ U ] =
AsyncResultIterator [ U ] . new (
2025-05-21 23:17:04 +02:00
genNext = ( ) = > iter . next ( ) . flatMap ( fn ) ,
isFinished = ( ) = > iter . finished ,
finishOnErr = finishOnErr ,
)
proc mapFilter * [ T , U ] (
2025-06-24 12:48:05 +02:00
iter : AsyncResultIterator [ T ] ,
2025-06-24 12:58:01 +02:00
mapPredicate : AsyncResultIteratorFunc [ ? ! T , Option [ ? ! U ] ] ,
2025-05-21 23:17:04 +02:00
finishOnErr : bool = true ,
2025-06-24 12:48:05 +02:00
) : Future [ AsyncResultIterator [ U ] ] {. async : ( raises : [ CancelledError ] ) . } =
2025-05-21 23:17:04 +02:00
var nextU : Option [ ? ! U ]
proc filter ( ) : Future [ void ] {. async : ( raises : [ CancelledError ] ) . } =
nextU = none ( ? ! U )
while not iter . finished :
let futT = iter . next ( )
if mappedValue = ? await futT . flatMap ( mapPredicate ) :
nextU = some ( mappedValue )
break
proc genNext ( ) : Future [ ? ! U ] {. async : ( raises : [ CancelledError ] ) . } =
let u = nextU . unsafeGet
await filter ( )
u
proc isFinished ( ) : bool =
nextU . isNone
await filter ( )
2025-06-24 12:48:05 +02:00
AsyncResultIterator [ U ] . new ( genNext , isFinished , finishOnErr = finishOnErr )
2025-05-21 23:17:04 +02:00
proc filter * [ T ] (
2025-06-24 12:58:01 +02:00
iter : AsyncResultIterator [ T ] ,
predicate : AsyncResultIteratorFunc [ ? ! T , bool ] ,
finishOnErr : bool = true ,
2025-06-24 12:48:05 +02:00
) : Future [ AsyncResultIterator [ T ] ] {. async : ( raises : [ CancelledError ] ) . } =
2025-05-21 23:17:04 +02:00
proc wrappedPredicate (
t : ? ! T
) : Future [ Option [ ? ! T ] ] {. async : ( raises : [ CancelledError ] ) . } =
if await predicate ( t ) :
some ( t )
else :
none ( ? ! T )
await mapFilter [ T , T ] ( iter , wrappedPredicate , finishOnErr = finishOnErr )
proc delayBy * [ T ] (
2025-06-24 12:48:05 +02:00
iter : AsyncResultIterator [ T ] , d : Duration , finishOnErr : bool = true
) : AsyncResultIterator [ T ] =
2025-05-21 23:17:04 +02:00
## Delays emitting each item by given duration
##
map [ T , T ] (
iter ,
proc ( t : ? ! T ) : Future [ ? ! T ] {. async : ( raises : [ CancelledError ] ) . } =
await sleepAsync ( d )
return t ,
finishOnErr = finishOnErr ,
)
2025-06-24 12:48:05 +02:00
proc empty * [ T ] ( _ : type AsyncResultIterator [ T ] ) : AsyncResultIterator [ T ] =
## Creates an empty AsyncResultIterator
2025-05-21 23:17:04 +02:00
##
proc genNext ( ) : Future [ ? ! T ] {. async : ( raises : [ CancelledError ] ) . } =
2025-06-24 12:48:05 +02:00
T . failure ( " Next item requested from an empty AsyncResultIterator " )
2025-05-21 23:17:04 +02:00
proc isFinished ( ) : bool =
true
2025-06-24 12:48:05 +02:00
AsyncResultIterator [ T ] . new ( genNext , isFinished )