logos-storage-nim/codex/stores/queryiterhelper.nim

72 lines
2.2 KiB
Nim

import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/chronicles
import pkg/datastore/typedds
import ../utils/asyncresultiter
{.push raises: [].}
type KeyVal*[T] = tuple[key: Key, value: T]
proc toAsyncResultIter*[T](
queryIter: QueryIter[T], finishOnErr: bool = true
): Future[?!AsyncResultIter[QueryResponse[T]]] {.async: (raises: [CancelledError]).} =
## Converts `QueryIter[T]` to `AsyncResultIter[QueryResponse[T]]` and automatically
## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
## if the flag finishOnErr is set to true)
##
if queryIter.finished:
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return success(AsyncResultIter[QueryResponse[T]].empty())
var errOccurred = false
proc genNext(): Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]).} =
let queryResOrErr = await queryIter.next()
if queryResOrErr.isErr:
errOccurred = true
if queryIter.finished or (errOccurred and finishOnErr):
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return queryResOrErr
proc isFinished(): bool =
queryIter.finished
AsyncResultIter[QueryResponse[T]].new(genNext, isFinished).success
proc filterSuccess*[T](
iter: AsyncResultIter[QueryResponse[T]]
): Future[AsyncResultIter[tuple[key: Key, value: T]]] {.
async: (raises: [CancelledError])
.} =
## Filters out any items that are not success
proc mapping(
resOrErr: ?!QueryResponse[T]
): Future[Option[?!KeyVal[T]]] {.async: (raises: [CancelledError]).} =
without res =? resOrErr, error:
error "Error occurred when getting QueryResponse", msg = error.msg
return Result[KeyVal[T], ref CatchableError].none
without key =? res.key:
warn "No key for a QueryResponse"
return Result[KeyVal[T], ref CatchableError].none
without value =? res.value, error:
error "Error occurred when getting a value from QueryResponse", msg = error.msg
return Result[KeyVal[T], ref CatchableError].none
some(success((key: key, value: value)))
await mapFilter[QueryResponse[T], KeyVal[T]](iter, mapping)