mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-05-05 02:53:08 +00:00
removes async iter from queryiterhelper (and the related tests)
This commit is contained in:
parent
293adcc6f6
commit
c9fba17169
@ -4,47 +4,12 @@ import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/datastore/typedds
|
||||
|
||||
import ../utils/asynciter
|
||||
import ../utils/safeasynciter
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
type KeyVal*[T] = tuple[key: Key, value: T]
|
||||
|
||||
proc toAsyncIter*[T](
|
||||
queryIter: QueryIter[T], finishOnErr: bool = true
|
||||
): Future[?!AsyncIter[?!QueryResponse[T]]] {.async: (raises: [CancelledError]).} =
|
||||
## Converts `QueryIter[T]` to `AsyncIter[?!QueryResponse[T]]` and automatically
|
||||
## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
|
||||
## if the flag finishOnErr is set to true)
|
||||
##
|
||||
|
||||
if queryIter.finished:
|
||||
trace "Disposing iterator"
|
||||
if error =? (await queryIter.dispose()).errorOption:
|
||||
return failure(error)
|
||||
return success(AsyncIter[?!QueryResponse[T]].empty())
|
||||
|
||||
var errOccurred = false
|
||||
|
||||
proc genNext(): Future[?!QueryResponse[T]] {.async.} =
|
||||
let queryResOrErr = await queryIter.next()
|
||||
|
||||
if queryResOrErr.isErr:
|
||||
errOccurred = true
|
||||
|
||||
if queryIter.finished or (errOccurred and finishOnErr):
|
||||
trace "Disposing iterator"
|
||||
if error =? (await queryIter.dispose()).errorOption:
|
||||
return failure(error)
|
||||
|
||||
return queryResOrErr
|
||||
|
||||
proc isFinished(): bool =
|
||||
queryIter.finished or (errOccurred and finishOnErr)
|
||||
|
||||
AsyncIter[?!QueryResponse[T]].new(genNext, isFinished).success
|
||||
|
||||
proc toSafeAsyncIter*[T](
|
||||
queryIter: QueryIter[T], finishOnErr: bool = true
|
||||
): Future[?!SafeAsyncIter[QueryResponse[T]]] {.async: (raises: [CancelledError]).} =
|
||||
@ -79,28 +44,6 @@ proc toSafeAsyncIter*[T](
|
||||
|
||||
SafeAsyncIter[QueryResponse[T]].new(genNext, isFinished).success
|
||||
|
||||
proc filterSuccess*[T](
|
||||
iter: AsyncIter[?!QueryResponse[T]]
|
||||
): Future[AsyncIter[tuple[key: Key, value: T]]] {.async: (raises: [CancelledError]).} =
|
||||
## Filters out any items that are not success
|
||||
|
||||
proc mapping(resOrErr: ?!QueryResponse[T]): Future[?KeyVal[T]] {.async.} =
|
||||
without res =? resOrErr, error:
|
||||
error "Error occurred when getting QueryResponse", msg = error.msg
|
||||
return KeyVal[T].none
|
||||
|
||||
without key =? res.key:
|
||||
warn "No key for a QueryResponse"
|
||||
return KeyVal[T].none
|
||||
|
||||
without value =? res.value, error:
|
||||
error "Error occurred when getting a value from QueryResponse", msg = error.msg
|
||||
return KeyVal[T].none
|
||||
|
||||
(key: key, value: value).some
|
||||
|
||||
await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping)
|
||||
|
||||
proc filterSuccess*[T](
|
||||
iter: SafeAsyncIter[QueryResponse[T]]
|
||||
): Future[SafeAsyncIter[tuple[key: Key, value: T]]] {.
|
||||
|
||||
@ -6,7 +6,7 @@ import pkg/chronos
|
||||
import pkg/datastore/typedds
|
||||
import pkg/datastore/sql/sqliteds
|
||||
import pkg/codex/stores/queryiterhelper
|
||||
import pkg/codex/utils/asynciter
|
||||
import pkg/codex/utils/safeasynciter
|
||||
|
||||
import ../../asynctest
|
||||
import ../helpers
|
||||
@ -43,15 +43,14 @@ asyncchecksuite "Test QueryIter helper":
|
||||
queryIter.dispose = () => (disposed = true; iterDispose())
|
||||
|
||||
let
|
||||
iter1 = (await toAsyncIter[string](queryIter)).tryGet()
|
||||
iter1 = (await toSafeAsyncIter[string](queryIter)).tryGet()
|
||||
iter2 = await filterSuccess[string](iter1)
|
||||
|
||||
var items = initTable[string, string]()
|
||||
|
||||
for fut in iter2:
|
||||
let item = await fut
|
||||
|
||||
items[item.key.value] = item.value
|
||||
if item =? (await fut):
|
||||
items[item.key.value] = item.value
|
||||
|
||||
check:
|
||||
items == source
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user