nim-codex/codex/stores/queryiterhelper.nim
Slava 15ff87a8bb
Merge latest master into release (#842)
* fix: createReservation lock (#825)

* fix: createReservation lock

* fix: additional locking places

* fix: acquire lock

* chore: feedback

Co-authored-by: markspanbroek <mark@spanbroek.net>
Signed-off-by: Adam Uhlíř <adam@uhlir.dev>

* feat: withLock template and fixed tests

* fix: use proc for MockReservations constructor

* chore: feedback

Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
Signed-off-by: Adam Uhlíř <adam@uhlir.dev>

* chore: feedback implementation

---------

Signed-off-by: Adam Uhlíř <adam@uhlir.dev>
Co-authored-by: markspanbroek <mark@spanbroek.net>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>

* Block deletion with ref count & repostore refactor (#631)

* Fix StoreStream so it doesn't return parity bytes  (#838)

* fix storestream so it doesn\'t return parity bits for protected/verifiable manifests

* use Cid.example instead of creating a mock manually

* Fix verifiable manifest initialization (#839)

* fix verifiable manifest initialization

* fix linearstrategy, use verifiableStrategy to select blocks for slots

* check for both strategies in attribute inheritance test

* ci: add verify_circuit=true to the releases (#840)

* provisional fix so EC errors do not crash the node on download (#841)

---------

Signed-off-by: Adam Uhlíř <adam@uhlir.dev>
Co-authored-by: Adam Uhlíř <adam@uhlir.dev>
Co-authored-by: markspanbroek <mark@spanbroek.net>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
Co-authored-by: Tomasz Bekas <tomasz.bekas@gmail.com>
Co-authored-by: Giuliano Mega <giuliano.mega@gmail.com>
2024-06-26 05:38:04 +03:00

67 lines
2.0 KiB
Nim

import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/chronicles
import pkg/datastore/typedds
import ../utils/asynciter
type KeyVal*[T] = tuple[key: Key, value: T]
proc toAsyncIter*[T](
queryIter: QueryIter[T],
finishOnErr: bool = true
): Future[?!AsyncIter[?!QueryResponse[T]]] {.async.} =
## Converts `QueryIter[T]` to `AsyncIter[?!QueryResponse[T]]` and automatically
## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
## if the flag finishOnErr is set to true)
##
if queryIter.finished:
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return success(AsyncIter[?!QueryResponse[T]].empty())
var errOccurred = false
proc genNext: Future[?!QueryResponse[T]] {.async.} =
let queryResOrErr = await queryIter.next()
if queryResOrErr.isErr:
errOccurred = true
if queryIter.finished or (errOccurred and finishOnErr):
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return queryResOrErr
proc isFinished(): bool =
queryIter.finished or (errOccurred and finishOnErr)
AsyncIter[?!QueryResponse[T]].new(genNext, isFinished).success
proc filterSuccess*[T](
iter: AsyncIter[?!QueryResponse[T]]
): Future[AsyncIter[tuple[key: Key, value: T]]] {.async.} =
## Filters out any items that are not success
proc mapping(resOrErr: ?!QueryResponse[T]): Future[?KeyVal[T]] {.async.} =
without res =? resOrErr, error:
error "Error occurred when getting QueryResponse", msg = error.msg
return KeyVal[T].none
without key =? res.key:
warn "No key for a QueryResponse"
return KeyVal[T].none
without value =? res.value, error:
error "Error occurred when getting a value from QueryResponse", msg = error.msg
return KeyVal[T].none
(key: key, value: value).some
await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping)