From f51ef528b15bcfd04af6921267ceb8f5d4678573 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Tue, 11 Jun 2024 00:47:29 +0200 Subject: [PATCH] Rework AsyncIter (#811) * Rework AsyncIter * Add tests for finishing iter on error * Improved error handling for and additional tests * Use new style of constructors * Handle future cancellation * Docs for constructors --- codex/erasure/erasure.nim | 5 +- codex/indexingstrategy.nim | 19 +-- codex/node.nim | 8 +- codex/stores/cachestore.nim | 60 +++---- codex/stores/queryiterhelper.nim | 66 ++++++++ codex/stores/treehelper.nim | 2 +- codex/utils/asynciter.nim | 177 ++++++++++++--------- codex/utils/iter.nim | 140 ++++++++++++++++ tests/codex/stores/testqueryiterhelper.nim | 65 ++++++++ tests/codex/teststores.nim | 1 + tests/codex/testutils.nim | 1 + tests/codex/utils/testasynciter.nim | 160 +++++++++++++++++++ tests/codex/utils/testiter.nim | 129 +++++++++++++++ vendor/nim-datastore | 2 +- 14 files changed, 692 insertions(+), 143 deletions(-) create mode 100644 codex/stores/queryiterhelper.nim create mode 100644 codex/utils/iter.nim create mode 100644 tests/codex/stores/testqueryiterhelper.nim create mode 100644 tests/codex/utils/testasynciter.nim create mode 100644 tests/codex/utils/testiter.nim diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 358a2812..0c921776 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -120,7 +120,7 @@ proc getPendingBlocks( CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index) - Iter.new(genNext, isFinished) + AsyncIter[(?!bt.Block, int)].new(genNext, isFinished) proc prepareEncodingData( self: Erasure, @@ -440,8 +440,7 @@ proc decode*( if treeCid != encoded.originalTreeCid: return failure("Original tree root differs from the tree root computed out of recovered data") - let idxIter = Iter - .fromItems(recoveredIndices) + let idxIter = Iter[Natural].new(recoveredIndices) .filter((i: Natural) => i < tree.leavesCount) if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption: diff --git a/codex/indexingstrategy.nim b/codex/indexingstrategy.nim index fa71e1a3..27444522 100644 --- a/codex/indexingstrategy.nim +++ b/codex/indexingstrategy.nim @@ -39,22 +39,9 @@ func checkIteration(self: IndexingStrategy, iteration: int): void {.raises: [Ind IndexingError, "Indexing iteration can't be greater than or equal to iterations.") -proc getIter(first, last, step: int): Iter[int] = - var - finish = false - cur = first - - func get(): int = - result = cur - cur += step - - if cur > last: - finish = true - - func isFinished(): bool = - finish - - Iter.new(get, isFinished) +func getIter(first, last, step: int): Iter[int] = + {.cast(noSideEffect).}: + Iter[int].new(first, last, step) func getLinearIndicies( self: IndexingStrategy, diff --git a/codex/node.nim b/codex/node.nim index effaff38..dd76a717 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -157,10 +157,8 @@ proc updateExpiry*( try: let - ensuringFutures = Iter - .fromSlice(0.. i.ord)) proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] = - store.putSomeProofs(tree, Iter.fromSlice(0.. fn(iter.next()), + isFinished = () => iter.finished() + ) + +proc new*[U, V: Ordinal](_: type AsyncIter[U], slice: HSlice[U, V]): AsyncIter[U] = + ## Creates new Iter from a slice ## - Iter.fromSlice(0.. items[i]) + let iter = Iter[U].new(slice) + mapAsync[U, U](iter, + proc (i: U): Future[U] {.async.} = + i + ) -proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] = - ## Creates new iterator from slice +proc new*[U, V, S: Ordinal](_: type AsyncIter[U], a: U, b: V, step: S = 1): AsyncIter[U] = + ## Creates new Iter in range a..b with specified step (default 1) ## - Iter.fromRange(slice.a.int, slice.b.int, 1) + let iter = Iter[U].new(a, b, step) + mapAsync[U, U](iter, + proc (i: U): Future[U] {.async.} = + i + ) -proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] = - ## Creates new iterator in range a..b with specified step (default 1) +proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] = + ## Creates an empty AsyncIter ## - var i = a + proc genNext(): Future[T] {.raises: [CatchableError].} = + raise newException(CatchableError, "Next item requested from an empty AsyncIter") + proc isFinished(): bool = true - proc genNext(): U = - let u = i - inc(i, step) - u + AsyncIter[T].new(genNext, isFinished) - proc isFinished(): bool = - (step > 0 and i > b) or - (step < 0 and i < b) - - Iter.new(genNext, isFinished) - -proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = - Iter.new( - genNext = () => fn(iter.next()), +proc map*[T, U](iter: AsyncIter[T], fn: Function[T, Future[U]]): AsyncIter[U] = + AsyncIter[U].new( + genNext = () => iter.next().flatMap(fn), isFinished = () => iter.finished ) -proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = - var nextT: Option[T] +proc mapFilter*[T, U](iter: AsyncIter[T], mapPredicate: Function[T, Future[Option[U]]]): Future[AsyncIter[U]] {.async.} = + var nextFutU: Option[Future[U]] - proc tryFetch(): void = - nextT = T.none + proc tryFetch(): Future[void] {.async.} = + nextFutU = Future[U].none while not iter.finished: - let t = iter.next() - if predicate(t): - nextT = some(t) + let futT = iter.next() + try: + if u =? await futT.flatMap(mapPredicate): + let futU = newFuture[U]("AsyncIter.mapFilterAsync") + futU.complete(u) + nextFutU = some(futU) + break + except CancelledError as err: + raise err + except CatchableError as err: + let errFut = newFuture[U]("AsyncIter.mapFilterAsync") + errFut.fail(err) + nextFutU = some(errFut) break - proc genNext(): T = - let t = nextT.unsafeGet - tryFetch() - return t + proc genNext(): Future[U] {.async.} = + let futU = nextFutU.unsafeGet + await tryFetch() + await futU proc isFinished(): bool = - nextT.isNone + nextFutU.isNone - tryFetch() - Iter.new(genNext, isFinished) + await tryFetch() + AsyncIter[U].new(genNext, isFinished) -proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] = - var ringBuf = newSeq[T](n) - var iterLen = int.high - var i = 0 - - proc tryFetch(j: int): void = - if not iter.finished: - let item = iter.next() - ringBuf[j mod n] = item - if iter.finished: - iterLen = min(j + 1, iterLen) +proc filter*[T](iter: AsyncIter[T], predicate: Function[T, Future[bool]]): Future[AsyncIter[T]] {.async.} = + proc wrappedPredicate(t: T): Future[Option[T]] {.async.} = + if await predicate(t): + some(t) else: - if j == 0: - iterLen = 0 + T.none - proc genNext(): T = - let item = ringBuf[i mod n] - tryFetch(i + n) - inc i - return item + await mapFilter[T, T](iter, wrappedPredicate) - proc isFinished(): bool = - i >= iterLen +proc delayBy*[T](iter: AsyncIter[T], d: Duration): AsyncIter[T] = + ## Delays emitting each item by given duration + ## - # initialize ringBuf with n prefetched values - for j in 0.. 0 and i > b) or + (step < 0 and i < b) + + Iter[U].new(genNext, isFinished) + +proc new*[U, V: Ordinal](_: type Iter[U], slice: HSlice[U, V]): Iter[U] = + ## Creates a new Iter from a slice + ## + + Iter[U].new(slice.a.int, slice.b.int, 1) + +proc new*[T](_: type Iter[T], items: seq[T]): Iter[T] = + ## Creates a new Iter from a sequence + ## + + Iter[int].new(0.. items[i]) + +proc empty*[T](_: type Iter[T]): Iter[T] = + ## Creates an empty Iter + ## + + proc genNext(): T {.raises: [CatchableError].} = + raise newException(CatchableError, "Next item requested from an empty Iter") + proc isFinished(): bool = true + + Iter[T].new(genNext, isFinished) + +proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = + Iter[U].new( + genNext = () => fn(iter.next()), + isFinished = () => iter.finished + ) + +proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter[U] = + var nextUOrErr: Option[Result[U, ref CatchableError]] + + proc tryFetch(): void = + nextUOrErr = Result[U, ref CatchableError].none + while not iter.finished: + try: + let t = iter.next() + if u =? mapPredicate(t): + nextUOrErr = some(success(u)) + break + except CatchableError as err: + nextUOrErr = some(U.failure(err)) + + proc genNext(): U {.raises: [CatchableError].} = + # at this point nextUOrErr should always be some(..) + without u =? nextUOrErr.unsafeGet, err: + raise err + + tryFetch() + return u + + proc isFinished(): bool = + nextUOrErr.isNone + + tryFetch() + Iter[U].new(genNext, isFinished) + +proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = + proc wrappedPredicate(t: T): Option[T] = + if predicate(t): + some(t) + else: + T.none + + mapFilter[T, T](iter, wrappedPredicate) diff --git a/tests/codex/stores/testqueryiterhelper.nim b/tests/codex/stores/testqueryiterhelper.nim new file mode 100644 index 00000000..ddc769c8 --- /dev/null +++ b/tests/codex/stores/testqueryiterhelper.nim @@ -0,0 +1,65 @@ +import std/sugar + +import pkg/stew/results +import pkg/questionable +import pkg/chronos +import pkg/datastore/typedds +import pkg/datastore/sql/sqliteds +import pkg/codex/stores/queryiterhelper +import pkg/codex/utils/asynciter + +import ../../asynctest +import ../helpers + +proc encode(s: string): seq[byte] = + s.toBytes() + +proc decode(T: type string, bytes: seq[byte]): ?!T = + success(string.fromBytes(bytes)) + +asyncchecksuite "Test QueryIter helper": + var + tds: TypedDatastore + + setupAll: + tds = TypedDatastore.init(SQLiteDatastore.new(Memory).tryGet()) + + teardownAll: + (await tds.close()).tryGet + + test "Should auto-dispose when QueryIter finishes": + let + source = { + "a": "11", + "b": "22" + }.toTable + Root = Key.init("/queryitertest").tryGet() + + for k, v in source: + let key = (Root / k).tryGet() + (await tds.put(key, v)).tryGet() + + var + disposed = false + queryIter = (await query[string](tds, Query.init(Root))).tryGet() + + let iterDispose: IterDispose = queryIter.dispose + queryIter.dispose = () => (disposed = true; iterDispose()) + + let + iter1 = (await toAsyncIter[string](queryIter)).tryGet() + iter2 = await filterSuccess[string](iter1) + + var items = initTable[string, string]() + + for fut in iter2: + let item = await fut + + items[item.key.value] = item.value + + check: + items == source + disposed == true + queryIter.finished == true + iter1.finished == true + iter2.finished == true diff --git a/tests/codex/teststores.nim b/tests/codex/teststores.nim index 3aad3ef3..470cff79 100644 --- a/tests/codex/teststores.nim +++ b/tests/codex/teststores.nim @@ -1,5 +1,6 @@ import ./stores/testcachestore import ./stores/testrepostore import ./stores/testmaintenance +import ./stores/testqueryiterhelper {.warning[UnusedImport]: off.} diff --git a/tests/codex/testutils.nim b/tests/codex/testutils.nim index 82b5ecad..1a4a9469 100644 --- a/tests/codex/testutils.nim +++ b/tests/codex/testutils.nim @@ -1,6 +1,7 @@ import ./utils/testoptions import ./utils/testkeyutils import ./utils/testasyncstatemachine +import ./utils/testasynciter import ./utils/testtimer import ./utils/testthen import ./utils/testtrackedfutures diff --git a/tests/codex/utils/testasynciter.nim b/tests/codex/utils/testasynciter.nim new file mode 100644 index 00000000..2a7e2b8c --- /dev/null +++ b/tests/codex/utils/testasynciter.nim @@ -0,0 +1,160 @@ +import std/sugar + +import pkg/questionable +import pkg/chronos +import pkg/codex/utils/asynciter + +import ../../asynctest +import ../helpers + +asyncchecksuite "Test AsyncIter": + + test "Should be finished": + let iter = AsyncIter[int].empty() + + check: + iter.finished == true + + test "Should map each item using `map`": + let + iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis) + iter2 = map[int, string](iter1, + proc (i: int): Future[string] {.async.} = + $i + ) + + var collected: seq[string] + + for fut in iter2: + collected.add(await fut) + + check: + collected == @["0", "1", "2", "3", "4"] + + test "Should leave only odd items using `filter`": + let + iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis) + iter2 = await filter[int](iter1, + proc (i: int): Future[bool] {.async.} = + (i mod 2) == 1 + ) + + var collected: seq[int] + + for fut in iter2: + collected.add(await fut) + + check: + collected == @[1, 3] + + test "Should leave only odd items using `mapFilter`": + let + iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis) + iter2 = await mapFilter[int, string](iter1, + proc (i: int): Future[?string] {.async.} = + if (i mod 2) == 1: + some($i) + else: + string.none + ) + + var collected: seq[string] + + for fut in iter2: + collected.add(await fut) + + check: + collected == @["1", "3"] + + test "Should yield all items before err using `map`": + let + iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis) + iter2 = map[int, string](iter1, + proc (i: int): Future[string] {.async.} = + if i < 3: + return $i + else: + raise newException(CatchableError, "Some error") + ) + + var collected: seq[string] + + expect CatchableError: + for fut in iter2: + collected.add(await fut) + + check: + collected == @["0", "1", "2"] + iter2.finished + + test "Should yield all items before err using `filter`": + let + iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis) + iter2 = await filter[int](iter1, + proc (i: int): Future[bool] {.async.} = + if i < 3: + return true + else: + raise newException(CatchableError, "Some error") + ) + + var collected: seq[int] + + expect CatchableError: + for fut in iter2: + collected.add(await fut) + + check: + collected == @[0, 1, 2] + iter2.finished + + test "Should yield all items before err using `mapFilter`": + let + iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis) + iter2 = await mapFilter[int, string](iter1, + proc (i: int): Future[?string] {.async.} = + if i < 3: + return some($i) + else: + raise newException(CatchableError, "Some error") + ) + + var collected: seq[string] + + expect CatchableError: + for fut in iter2: + collected.add(await fut) + + check: + collected == @["0", "1", "2"] + iter2.finished + + test "Should propagate cancellation error immediately": + let + fut = newFuture[?string]("testasynciter") + + let + iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis) + iter2 = await mapFilter[int, string](iter1, + proc (i: int): Future[?string] {.async.} = + if i < 3: + return some($i) + else: + return await fut + ) + + proc cancelFut(): Future[void] {.async.} = + await sleepAsync(100.millis) + await fut.cancelAndWait() + + asyncSpawn(cancelFut()) + + var collected: seq[string] + + expect CancelledError: + for fut in iter2: + collected.add(await fut) + + check: + collected == @["0", "1"] + iter2.finished diff --git a/tests/codex/utils/testiter.nim b/tests/codex/utils/testiter.nim new file mode 100644 index 00000000..e2806b5a --- /dev/null +++ b/tests/codex/utils/testiter.nim @@ -0,0 +1,129 @@ +import std/sugar + +import pkg/questionable +import pkg/chronos +import pkg/codex/utils/iter + +import ../../asynctest +import ../helpers + +checksuite "Test Iter": + + test "Should be finished": + let iter = Iter[int].empty() + + check: + iter.finished == true + + test "Should be iterable with `items`": + let iter = Iter.new(0..<5) + + let items = + collect: + for v in iter: + v + + check: + items == @[0, 1, 2, 3, 4] + + test "Should be iterable with `pairs`": + let iter = Iter.new(0..<5) + + let pairs = + collect: + for i, v in iter: + (i, v) + + check: + pairs == @[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)] + + test "Should map each item using `map`": + let iter = Iter.new(0..<5) + .map((i: int) => $i) + + check: + iter.toSeq() == @["0", "1", "2", "3", "4"] + + test "Should leave only odd items using `filter`": + let iter = Iter.new(0..<5) + .filter((i: int) => (i mod 2) == 1) + + check: + iter.toSeq() == @[1, 3] + + test "Should leave only odd items using `mapFilter`": + let + iter1 = Iter.new(0..<5) + iter2 = mapFilter[int, string](iter1, + proc(i: int): ?string = + if (i mod 2) == 1: + some($i) + else: + string.none + ) + + check: + iter2.toSeq() == @["1", "3"] + + test "Should yield all items before err using `map`": + let + iter = Iter.new(0..<5) + .map( + proc (i: int): string = + if i < 3: + return $i + else: + raise newException(CatchableError, "Some error") + ) + + var collected: seq[string] + + expect CatchableError: + for i in iter: + collected.add(i) + + check: + collected == @["0", "1", "2"] + iter.finished + + test "Should yield all items before err using `filter`": + let + iter = Iter.new(0..<5) + .filter( + proc (i: int): bool = + if i < 3: + return true + else: + raise newException(CatchableError, "Some error") + ) + + var collected: seq[int] + + expect CatchableError: + for i in iter: + collected.add(i) + + check: + collected == @[0, 1, 2] + iter.finished + + test "Should yield all items before err using `mapFilter`": + let + iter1 = Iter.new(0..<5) + iter2 = mapFilter[int, string](iter1, + proc (i: int): ?string = + if i < 3: + return some($i) + else: + raise newException(CatchableError, "Some error") + ) + + var collected: seq[string] + + expect CatchableError: + for i in iter2: + collected.add(i) + + check: + collected == @["0", "1", "2"] + iter2.finished diff --git a/vendor/nim-datastore b/vendor/nim-datastore index f4989fcc..3ab6b84a 160000 --- a/vendor/nim-datastore +++ b/vendor/nim-datastore @@ -1 +1 @@ -Subproject commit f4989fcce5d74a648e7e2598a72a7b21948f4a85 +Subproject commit 3ab6b84a634a7b2ee8c0144f050bf5893cd47c17