nim-codex/codex/utils/asynciter.nim

147 lines
3.3 KiB
Nim
Raw Normal View History

import std/sugar
import pkg/questionable
import pkg/chronos
type
Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, noSideEffect.}
IsFinished* = proc(): bool {.raises: [], gcsafe, noSideEffect.}
GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.}
Iter*[T] = ref object
finished: bool
next*: GenNext[T]
AsyncIter*[T] = Iter[Future[T]]
proc finish*[T](self: Iter[T]): void =
self.finished = true
proc finished*[T](self: Iter[T]): bool =
self.finished
iterator items*[T](self: Iter[T]): T =
while not self.finished:
yield self.next()
iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} =
var i = 0
while not self.finished:
yield (i, self.next())
inc(i)
proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
let t = await fut
fn(t)
proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] =
var iter = Iter[T]()
Wire sampler (#676) * Setting up testfixture for proof datasampler * Sets up calculating number of cells in a slot * Sets up tests for bitwise modulo * Implements cell index collection * setting up slot blocks module * Implements getting treeCID from slot * implements getting slot blocks by index * Implements out-of-range check for slot index * cleanup * Sets up getting sample from block * Implements selecting a cell sample from a block * Implements building a minitree for block cells * Adds method to get dataset block index from slot block index * It's running * splits up indexing * almost there * Fixes test. Implementation is now functional * Refactoring to object-oriented * Cleanup * Lining up output type with updated reference code. * setting up * Updates expected samples * Updates proof checking test to match new format * move builder to own dir * move sampler to own dir * fix paths * various changes to add support for the sampler * wip sampler implementation * don't use upraises * wip sampler integration * misc * move tests around * Various fixes to select correct slot and block index * removing old tests * cleanup * misc fix tests that work with correct cell indices * remove unused file * fixup logging * add logscope * truncate entropy to 31 bytes, otherwise it might be > than mod * forwar getCidAndProof to local store * misc * Adds missing test for initial-proving state * reverting back to correct slot/block indexing * fix tests for revert * misc * misc --------- Co-authored-by: benbierens <thatbenbierens@gmail.com>
2024-01-17 13:24:34 -06:00
proc next(): T {.raises: [CatchableError].} =
if not iter.finished:
var item: T
try:
item = genNext()
except CatchableError as err:
if finishOnErr or isFinished():
iter.finish
raise err
if isFinished():
iter.finish
return item
else:
raise newException(CatchableError, "Iterator is finished but next item was requested")
if isFinished():
iter.finish
iter.next = next
return iter
proc fromItems*[T](_: type Iter, items: seq[T]): Iter[T] =
## Create new iterator from items
##
Iter.fromSlice(0..<items.len)
.map((i: int) => items[i])
proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] =
## Creates new iterator from slice
##
Iter.fromRange(slice.a.int, slice.b.int, 1)
proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] =
## Creates new iterator in range a..b with specified step (default 1)
##
var i = a
proc genNext(): U =
let u = i
inc(i, step)
u
proc isFinished(): bool =
(step > 0 and i > b) or
(step < 0 and i < b)
Iter.new(genNext, isFinished)
proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
Iter.new(
genNext = () => fn(iter.next()),
isFinished = () => iter.finished
)
proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] =
var nextT: Option[T]
proc tryFetch(): void =
nextT = T.none
while not iter.finished:
let t = iter.next()
if predicate(t):
nextT = some(t)
break
proc genNext(): T =
let t = nextT.unsafeGet
tryFetch()
return t
proc isFinished(): bool =
nextT.isNone
tryFetch()
Iter.new(genNext, isFinished)
proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] =
var ringBuf = newSeq[T](n)
var iterLen = int.high
var i = 0
proc tryFetch(j: int): void =
if not iter.finished:
let item = iter.next()
ringBuf[j mod n] = item
if iter.finished:
iterLen = min(j + 1, iterLen)
else:
if j == 0:
iterLen = 0
proc genNext(): T =
let item = ringBuf[i mod n]
tryFetch(i + n)
inc i
return item
proc isFinished(): bool =
i >= iterLen
# initialize ringBuf with n prefetched values
for j in 0..<n:
tryFetch(j)
Iter.new(genNext, isFinished)