mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-04 06:33:11 +00:00
Merge branch 'master' into tests/concurrent-queries
This commit is contained in:
commit
10c115f40b
@ -98,8 +98,8 @@ proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async.} =
|
|||||||
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async.} =
|
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async.} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
|
|
||||||
without bytes =? await self.ds.get(key), err:
|
without bytes =? await self.ds.get(key), error:
|
||||||
return failure(err)
|
return failure(error)
|
||||||
return T.decode(bytes)
|
return T.decode(bytes)
|
||||||
|
|
||||||
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} =
|
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} =
|
||||||
@ -110,8 +110,8 @@ proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {
|
|||||||
var
|
var
|
||||||
maybeNextT: ?T
|
maybeNextT: ?T
|
||||||
if bytes =? maybeBytes:
|
if bytes =? maybeBytes:
|
||||||
without t =? T.decode(bytes), err:
|
without t =? T.decode(bytes), error:
|
||||||
raise err
|
raise error
|
||||||
maybeNextT = await fn(t.some)
|
maybeNextT = await fn(t.some)
|
||||||
else:
|
else:
|
||||||
maybeNextT = await fn(T.none)
|
maybeNextT = await fn(T.none)
|
||||||
@ -134,8 +134,8 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu
|
|||||||
maybeNextT: ?T
|
maybeNextT: ?T
|
||||||
aux: U
|
aux: U
|
||||||
if bytes =? maybeBytes:
|
if bytes =? maybeBytes:
|
||||||
without t =? T.decode(bytes), err:
|
without t =? T.decode(bytes), error:
|
||||||
raise err
|
raise error
|
||||||
|
|
||||||
(maybeNextT, aux) = await fn(t.some)
|
(maybeNextT, aux) = await fn(t.some)
|
||||||
else:
|
else:
|
||||||
@ -147,8 +147,8 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu
|
|||||||
else:
|
else:
|
||||||
return (seq[byte].none, aux.encode())
|
return (seq[byte].none, aux.encode())
|
||||||
|
|
||||||
without auxBytes =? await self.ds.modifyGet(key, wrappedFn), err:
|
without auxBytes =? await self.ds.modifyGet(key, wrappedFn), error:
|
||||||
return failure(err)
|
return failure(error)
|
||||||
|
|
||||||
|
|
||||||
return U.decode(auxBytes)
|
return U.decode(auxBytes)
|
||||||
@ -156,8 +156,8 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu
|
|||||||
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} =
|
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} =
|
||||||
requireDecoder(T)
|
requireDecoder(T)
|
||||||
|
|
||||||
without dsIter =? await self.ds.query(q), err:
|
without dsIter =? await self.ds.query(q), error:
|
||||||
let childErr = newException(CatchableError, "Error executing query with key " & $q.key, parentException = err)
|
let childErr = newException(CatchableError, "Error executing query with key " & $q.key, parentException = error)
|
||||||
return failure(childErr)
|
return failure(childErr)
|
||||||
|
|
||||||
var iter = QueryIter[T]()
|
var iter = QueryIter[T]()
|
||||||
@ -169,8 +169,8 @@ proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.}
|
|||||||
return success(iter)
|
return success(iter)
|
||||||
|
|
||||||
proc getNext: Future[?!QueryResponse[T]] {.async.} =
|
proc getNext: Future[?!QueryResponse[T]] {.async.} =
|
||||||
without pair =? await dsIter.next(), err:
|
without pair =? await dsIter.next(), error:
|
||||||
return failure(err)
|
return failure(error)
|
||||||
|
|
||||||
if dsIter.finished:
|
if dsIter.finished:
|
||||||
iter.finished = true
|
iter.finished = true
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user