Rework AsyncIter

This commit is contained in:
Tomasz Bekas 2024-05-15 17:59:14 +02:00
parent c4fb2c33f6
commit bf84aa29ef
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
14 changed files with 496 additions and 149 deletions

View File

@ -120,7 +120,7 @@ proc getPendingBlocks(
CatchableError,
"Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index)
Iter.new(genNext, isFinished)
newAsyncIter[(?!bt.Block, int)](genNext, isFinished)
proc prepareEncodingData(
self: Erasure,
@ -440,8 +440,7 @@ proc decode*(
if treeCid != encoded.originalTreeCid:
return failure("Original tree root differs from the tree root computed out of recovered data")
let idxIter = Iter
.fromItems(recoveredIndices)
let idxIter = newIter(recoveredIndices)
.filter((i: Natural) => i < tree.leavesCount)
if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption:

View File

@ -39,22 +39,11 @@ func checkIteration(self: IndexingStrategy, iteration: int): void {.raises: [Ind
IndexingError,
"Indexing iteration can't be greater than or equal to iterations.")
proc getIter(first, last, step: int): Iter[int] =
var
finish = false
cur = first
proc getIterSE(first, last, step: int): Iter[int] =
newIter(first, last, step)
func get(): int =
result = cur
cur += step
if cur > last:
finish = true
func isFinished(): bool =
finish
Iter.new(get, isFinished)
func getIter(first, last, step: int): Iter[int] =
(cast[proc (f, l, s: int): Iter[int] {.nimcall, noSideEffect.}](getIterSE)(first, last, step))
func getLinearIndicies(
self: IndexingStrategy,

View File

@ -157,10 +157,8 @@ proc updateExpiry*(
try:
let
ensuringFutures = Iter
.fromSlice(0..<manifest.blocksCount)
.mapIt(
self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
ensuringFutures = newIter(0..<manifest.blocksCount)
.mapIt(self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
@ -209,7 +207,7 @@ proc fetchBatched*(
trace "Fetching blocks in batches of", size = batchSize
let iter = Iter.fromSlice(0..<manifest.blocksCount)
let iter = newIter(0..<manifest.blocksCount)
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch)
proc streamSingleBlock(

View File

@ -130,51 +130,35 @@ method listBlocks*(
## Get the list of blocks in the BlockStore. This is an intensive operation
##
var
iter = AsyncIter[?Cid]()
let
cids = self.cids()
proc next(): Future[?Cid] {.async.} =
await idleAsync()
proc isFinished(): bool =
return finished(cids)
var cid: Cid
while true:
if iter.finished:
return Cid.none
proc genNext(): Future[Cid] {.async.} =
cids()
cid = cids()
let iter = await (newAsyncIter[Cid](genNext, isFinished)
.filter(
proc (cid: Cid): Future[bool] {.async.} =
without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return false
if finished(cids):
iter.finish
return Cid.none
case blockType:
of BlockType.Both:
return true
of BlockType.Manifest:
return isManifest
of BlockType.Block:
return not isManifest
))
without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return Cid.none
case blockType:
of BlockType.Manifest:
if not isManifest:
trace "Cid is not manifest, skipping", cid
continue
break
of BlockType.Block:
if isManifest:
trace "Cid is a manifest, skipping", cid
continue
break
of BlockType.Both:
break
return cid.some
iter.next = next
return success iter
return success(map[Cid, ?Cid](iter,
proc (cid: Cid): Future[?Cid] {.async.} =
some(cid)
))
func putBlockSync(self: CacheStore, blk: Block): bool =

View File

@ -0,0 +1,59 @@
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/chronicles
# import pkg/datastore
import pkg/datastore/typedds
import ../utils/asynciter
type KeyVal[T] = tuple[key: Key, value: T]
proc toAsyncIter*[T](queryIter: QueryIter[T], finishOnErr: bool = true): Future[?!AsyncIter[?!QueryResponse[T]]] {.async.} =
## Converts `QueryIter[T]` to `AsyncIter[?!QueryResponse[T]]` and automatically runs dispose whenever `QueryIter` finishes
## or whenever an error occurs (only if the flag finishOnErr is set to true)
##
if queryIter.finished:
trace "Disposing iterator"
if err =? (await queryIter.dispose()).errorOption:
return failure(err)
return success(emptyAsyncIter[?!QueryResponse[T]]())
var errOccurred = false
proc genNext: Future[?!QueryResponse[T]] {.async.} =
let queryResOrErr = await queryIter.next()
if queryResOrErr.isErr:
errOccurred = true
if queryIter.finished or (errOccurred and finishOnErr):
trace "Disposing iterator"
if err =? (await queryIter.dispose()).errorOption:
return failure(err)
return queryResOrErr
proc isFinished(): bool =
queryIter.finished or (errOccurred and finishOnErr)
newAsyncIter[?!QueryResponse[T]](genNext, isFinished).success
proc filterSuccess*[T](iter: AsyncIter[?!QueryResponse[T]]): Future[AsyncIter[tuple[key: Key, value: T]]] {.async.} =
proc mapping(resOrErr: ?!QueryResponse[T]): Future[?KeyVal[T]] {.async.} =
without res =? resOrErr, err:
error "Error occurred when getting QueryResponse", msg = err.msg
return KeyVal[T].none
without key =? res.key:
warn "No key for a QueryResponse"
return KeyVal[T].none
without value =? res.value, err:
error "Error occurred when getting a value from QueryResponse", msg = err.msg
return KeyVal[T].none
(key: key, value: value).some
await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping)

View File

@ -47,4 +47,4 @@ proc putSomeProofs*(store: BlockStore, tree: CodexTree, iter: Iter[Natural]): Fu
store.putSomeProofs(tree, iter.map((i: Natural) => i.ord))
proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] =
store.putSomeProofs(tree, Iter.fromSlice(0..<tree.leavesCount))
store.putSomeProofs(tree, newIter(0..<tree.leavesCount))

View File

@ -3,26 +3,29 @@ import std/sugar
import pkg/questionable
import pkg/chronos
type
Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, noSideEffect.}
IsFinished* = proc(): bool {.raises: [], gcsafe, noSideEffect.}
GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.}
Iter*[T] = ref object
finished: bool
next*: GenNext[T]
AsyncIter*[T] = Iter[Future[T]]
import ./iter
proc finish*[T](self: Iter[T]): void =
export iter
## AsyncIter[T] is similar to `Iter[Future[T]]` with addition of methods specific to asynchronous processing
##
type
AsyncIter*[T] = ref object
finished: bool
next*: GenNext[Future[T]]
proc finish*[T](self: AsyncIter[T]): void =
self.finished = true
proc finished*[T](self: Iter[T]): bool =
proc finished*[T](self: AsyncIter[T]): bool =
self.finished
iterator items*[T](self: Iter[T]): T =
iterator items*[T](self: AsyncIter[T]): Future[T] =
while not self.finished:
yield self.next()
iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} =
iterator pairs*[T](self: AsyncIter[T]): tuple[key: int, val: Future[T]] {.inline.} =
var i = 0
while not self.finished:
yield (i, self.next())
@ -32,14 +35,18 @@ proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
let t = await fut
fn(t)
proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] =
var iter = Iter[T]()
proc flatMap*[T, U](fut: Future[T], fn: Function[T, Future[U]]): Future[U] {.async.} =
let t = await fut
await fn(t)
proc next(): T {.raises: [CatchableError].} =
proc newAsyncIter*[T](genNext: GenNext[Future[T]], isFinished: IsFinished, finishOnErr: bool = true): AsyncIter[T] =
var iter = AsyncIter[T]()
proc next(): Future[T] {.async.} =
if not iter.finished:
var item: T
try:
item = genNext()
item = await genNext()
except CatchableError as err:
if finishOnErr or isFinished():
iter.finish
@ -49,7 +56,7 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn
iter.finish
return item
else:
raise newException(CatchableError, "Iterator is finished but next item was requested")
raise newException(CatchableError, "AsyncIter is finished but next item was requested")
if isFinished():
iter.finish
@ -57,90 +64,63 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn
iter.next = next
return iter
proc fromItems*[T](_: type Iter, items: seq[T]): Iter[T] =
## Create new iterator from items
proc emptyAsyncIter*[T](): AsyncIter[T] =
## Creates an empty AsyncIter
##
Iter.fromSlice(0..<items.len)
.map((i: int) => items[i])
proc genNext(): Future[T] {.raises: [CatchableError].} =
raise newException(CatchableError, "Next item requested from an empty AsyncIter")
proc isFinished(): bool = true
proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] =
## Creates new iterator from slice
##
newAsyncIter[T](genNext, isFinished)
Iter.fromRange(slice.a.int, slice.b.int, 1)
proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] =
## Creates new iterator in range a..b with specified step (default 1)
##
var i = a
proc genNext(): U =
let u = i
inc(i, step)
u
proc isFinished(): bool =
(step > 0 and i > b) or
(step < 0 and i < b)
Iter.new(genNext, isFinished)
proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
Iter.new(
genNext = () => fn(iter.next()),
proc map*[T, U](iter: AsyncIter[T], fn: Function[T, Future[U]]): AsyncIter[U] =
newAsyncIter[U](
genNext = () => iter.next().flatMap(fn),
isFinished = () => iter.finished
)
proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] =
var nextT: Option[T]
proc mapFilter*[T, U](iter: AsyncIter[T], mapPredicate: Function[T, Future[Option[U]]]): Future[AsyncIter[U]] {.async.} =
var nextFutU: Option[Future[U]]
proc tryFetch(): void =
nextT = T.none
proc tryFetch(): Future[void] {.async.} =
nextFutU = Future[U].none
while not iter.finished:
let t = iter.next()
if predicate(t):
nextT = some(t)
let futT = iter.next()
try:
if u =? await futT.flatMap(mapPredicate):
let futU = newFuture[U]("AsyncIter.mapFilterAsync")
futU.complete(u)
nextFutU = some(futU)
break
except CatchableError as err:
let errFut = newFuture[U]("AsyncIter.mapFilterAsync")
errFut.fail(err)
nextFutU = some(errFut)
break
proc genNext(): T =
let t = nextT.unsafeGet
tryFetch()
return t
proc genNext(): Future[U] {.async.} =
let futU = nextFutU.unsafeGet
await tryFetch()
await futU
proc isFinished(): bool =
nextT.isNone
nextFutU.isNone
tryFetch()
Iter.new(genNext, isFinished)
await tryFetch()
newAsyncIter[U](genNext, isFinished)
proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] =
var ringBuf = newSeq[T](n)
var iterLen = int.high
var i = 0
proc tryFetch(j: int): void =
if not iter.finished:
let item = iter.next()
ringBuf[j mod n] = item
if iter.finished:
iterLen = min(j + 1, iterLen)
proc filter*[T](iter: AsyncIter[T], predicate: Function[T, Future[bool]]): Future[AsyncIter[T]] {.async.} =
proc wrappedPredicate(t: T): Future[Option[T]] {.async.} =
if await predicate(t):
some(t)
else:
if j == 0:
iterLen = 0
T.none
proc genNext(): T =
let item = ringBuf[i mod n]
tryFetch(i + n)
inc i
return item
await mapFilter[T, T](iter, wrappedPredicate)
proc isFinished(): bool =
i >= iterLen
# initialize ringBuf with n prefetched values
for j in 0..<n:
tryFetch(j)
Iter.new(genNext, isFinished)
proc mapAsync*[T, U](iter: Iter[T], fn: Function[T, Future[U]]): AsyncIter[U] =
newAsyncIter[T](
genNext = () => fn(iter.next()),
isFinished = () => iter.finished()
)

129
codex/utils/iter.nim Normal file
View File

@ -0,0 +1,129 @@
import std/sugar
import pkg/questionable
type
Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, closure.}
IsFinished* = proc(): bool {.raises: [], gcsafe, closure.}
GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.}
Iter*[T] = ref object
finished: bool
next*: GenNext[T]
proc finish*[T](self: Iter[T]): void =
self.finished = true
proc finished*[T](self: Iter[T]): bool =
self.finished
iterator items*[T](self: Iter[T]): T =
while not self.finished:
yield self.next()
iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} =
var i = 0
while not self.finished:
yield (i, self.next())
inc(i)
proc newIter*[T](genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] =
var iter = Iter[T]()
proc next(): T {.raises: [CatchableError].} =
if not iter.finished:
var item: T
try:
item = genNext()
except CatchableError as err:
if finishOnErr or isFinished():
iter.finish
raise err
if isFinished():
iter.finish
return item
else:
raise newException(CatchableError, "Iter is finished but next item was requested")
if isFinished():
iter.finish
iter.next = next
return iter
proc newIter*[U, V, S: Ordinal](a: U, b: V, step: S = 1): Iter[U] =
## Creates new Iter in range a..b with specified step (default 1)
##
var i = a
proc genNext(): U =
let u = i
inc(i, step)
u
proc isFinished(): bool =
(step > 0 and i > b) or
(step < 0 and i < b)
newIter(genNext, isFinished)
proc newIter*[U, V: Ordinal](slice: HSlice[U, V]): Iter[U] =
## Creates new Iter from slice
##
newIter(slice.a.int, slice.b.int, 1)
proc newIter*[T](items: seq[T]): Iter[T] =
## Creates new Iter from items
##
newIter(0..<items.len)
.map((i: int) => items[i])
proc emptyIter*[T](): Iter[T] =
## Creates an empty Iter
##
proc genNext(): T {.upraises: [CatchableError].} =
raise newException(CatchableError, "Next item requested from an empty Iter")
proc isFinished(): bool = true
newIter(genNext, isFinished)
proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
newIter(
genNext = () => fn(iter.next()),
isFinished = () => iter.finished
)
proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter[U] =
var nextU: Option[U]
proc tryFetch(): void =
nextU = U.none
while not iter.finished:
let t = iter.next()
if u =? mapPredicate(t):
nextU = some(u)
break
proc genNext(): U =
let u = nextU.unsafeGet
tryFetch()
return u
proc isFinished(): bool =
nextU.isNone
tryFetch()
newIter(genNext, isFinished)
proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] =
proc wrappedPredicate(t: T): Option[T] =
if predicate(t):
some(t)
else:
T.none
mapFilter[T, T](iter, wrappedPredicate)

View File

@ -0,0 +1,64 @@
import std/sugar
import pkg/questionable
import pkg/chronos
import pkg/datastore
import pkg/datastore/typedds as t
import pkg/codex/utils/asynciter
import pkg/codex/stores/queryiterhelper
import ../../asynctest
import ../helpers
proc encode(s: string): seq[byte] =
s.toBytes()
proc decode(T: type string, bytes: seq[byte]): ?!T =
success(string.fromBytes(bytes))
asyncchecksuite "Test QueryIter helper":
var
tds: t.TypedDatastore
setupAll:
tds = t.TypedDatastore.init(SQLiteDatastore.new(Memory).tryGet())
teardownAll:
(await tds.close()).tryGet
test "Should auto-dispose when QueryIter finishes":
let
source = {
"a": "11",
"b": "22"
}.toTable
Root = Key.init("/queryitertest").tryGet()
for k, v in source:
let key = (Root / k).tryGet()
(await tds.put(key, v)).tryGet()
var
disposed = false
queryIter = (await query[string](tds, Query.init(Root))).tryGet()
let iterDispose: IterDispose = queryIter.dispose
queryIter.dispose = () => (disposed = true; iterDispose())
let
iter1 = (await toAsyncIter[string](queryIter)).tryGet()
iter2 = await filterSuccess[string](iter1)
var items = initTable[string, string]()
for fut in iter2:
let item = await fut
items[item.key.value] = item.value
check:
items == source
disposed == true
queryIter.finished == true
iter1.finished == true
iter2.finished == true

View File

@ -1,5 +1,6 @@
import ./stores/testcachestore
import ./stores/testrepostore
import ./stores/testmaintenance
import ./stores/testqueryiterhelper
{.warning[UnusedImport]: off.}

View File

@ -1,6 +1,7 @@
import ./utils/testoptions
import ./utils/testkeyutils
import ./utils/testasyncstatemachine
import ./utils/testasynciter
import ./utils/testtimer
import ./utils/testthen
import ./utils/testtrackedfutures

View File

@ -0,0 +1,77 @@
import std/sugar
import pkg/questionable
import pkg/chronos
import pkg/codex/utils/asynciter
import ../../asynctest
import ../helpers
asyncchecksuite "Test AsyncIter":
test "Should be finished":
let iter = emptyAsyncIter[int]()
check:
iter.finished == true
test "Should multiply each item by 2 using `map`":
let
iter1 = newIter(1..<5)
iter2 = mapAsync[int, int](iter1,
proc (i: int): Future[int] {.async.} =
i * 2
)
var items: seq[int]
for fut in iter2:
items.add(await fut)
check:
items == @[2, 4, 6, 8]
test "Should leave only odd items using `filter`":
let
iter1 = newIter(0..<5)
iter2 = mapAsync[int, int](iter1,
proc (i: int): Future[int] {.async.} =
await sleepAsync((i * 10).millis)
i
)
iter3 = await filter[int](iter2,
proc (i: int): Future[bool] {.async.} =
(i mod 2) == 1
)
var items: seq[int]
for fut in iter3:
items.add(await fut)
check:
items == @[1, 3]
test "Should leave only odd items using `mapFilter`":
let
iter1 = newIter(0..<5)
iter2 = mapAsync[int, int](iter1,
proc (i: int): Future[int] {.async.} =
await sleepAsync((i * 10).millis)
i
)
iter3 = await mapFilter[int, string](iter2,
proc (i: int): Future[?string] {.async.} =
if (i mod 2) == 1:
some($i)
else:
string.none
)
var items: seq[string]
for fut in iter3:
items.add(await fut)
check:
items == @["1", "3"]

View File

@ -0,0 +1,66 @@
import std/sugar
import pkg/questionable
import pkg/chronos
import pkg/codex/utils/iter
import ../../asynctest
import ../helpers
checksuite "Test Iter":
test "Should be finished":
let iter = emptyIter[int]()
check:
iter.finished == true
test "Should be iterable with `items`":
let iter = newIter(1..<5)
let items =
collect:
for v in iter:
v
check:
items == @[1, 2, 3, 4]
test "Should be iterable with `pairs`":
let iter = newIter(1..<5)
let pairs =
collect:
for i, v in iter:
(i, v)
check:
pairs == @[(0, 1), (1, 2), (2, 3), (3, 4)]
test "Should multiply each item by 2 using `map`":
let iter = newIter(1..<5)
.map((i: int) => i * 2)
check:
iter.toSeq() == @[2, 4, 6, 8]
test "Should leave only odd items using `filter`":
let iter = newIter(0..<5)
.filter((i: int) => (i mod 2) == 1)
check:
iter.toSeq() == @[1, 3]
test "Should leave only odd items using `mapFilter`":
let
iter1 = newIter(0..<5)
iter2 = mapFilter[int, string](iter1,
proc(i: int): ?string =
if (i mod 2) == 1:
some($i)
else:
string.none
)
check:
iter2.toSeq() == @["1", "3"]

@ -1 +1 @@
Subproject commit 0ee88a2a9b41308792e6ba4dfadf80d3df1c0ae3
Subproject commit 3d921ae0855ca7df25d9775dda72fefac3426c94