Compare commits

...

11 Commits

Author SHA1 Message Date
Giuliano Mega
1d22c62325
fix: use proper object reference in hashing (#85)
Uses `addr obj[]` instead of `addr obj`, fix tests.

Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com>
2026-02-09 10:01:13 -03:00
Jacek Sieka
c2c1f55a22
chore: bump leveldb (#81)
..to include fix for newer cmake version

Co-authored-by: Giuliano Mega <giuliano.mega@gmail.com>
2026-02-06 16:27:25 -03:00
Giuliano Mega
92af582d97
fix: close pending iterators before closing LevelDB store (#83)
This adds tracking of open iterators to leveldbds so that when one attempts to close it, iterators are disposed of first. It also adds automatic disposal if iterators are completely consumed.

fixes #82
2026-02-06 16:24:11 -03:00
Arnaud
a7ee4b170a Use short syntax with nim-results 2025-12-11 08:38:39 -03:00
Arnaud
950990720a Use await instead of fut.read 2025-12-11 08:38:39 -03:00
Arnaud
d583647c5b Propagate CancelledError 2025-12-11 08:38:39 -03:00
Arnaud
92f9533c7d Add more raises in async pragma 2025-12-11 08:38:39 -03:00
Arnaud
9586f95ccd Do not propagate AsyncLockError 2025-12-11 08:38:39 -03:00
Arnaud
42e4f530bf Add errors to raises async pragam 2025-12-11 08:38:39 -03:00
Arnaud
13c0a0cdb1 Define raises for async pragma 2025-12-11 08:38:39 -03:00
Chrysostomos Nanakos
ea917be824
chore: update stew upper bound version (#80)
Bump package version to 0.2.1.

Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
2025-12-11 10:12:45 +01:00
12 changed files with 210 additions and 108 deletions

View File

@ -23,16 +23,22 @@ jobs:
os: windows,
shell: msys2
}
nim: [2.0.14]
nim: [binary:2.2.4]
name: ${{ matrix.platform.icon }} ${{ matrix.platform.label }} - Nim v${{ matrix.nim }}
runs-on: ${{ matrix.platform.os }}-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
- uses: iffy/install-nim@v4
- uses: iffy/install-nim@v5
with:
version: ${{ matrix.nim }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Setup CMake
uses: jwlawson/actions-setup-cmake@v2
with:
cmake-version: '3.x'
- name: Build
run: nimble install -y
- name: Test

View File

@ -1,7 +1,7 @@
mode = ScriptMode.Verbose
packageName = "datastore"
version = "0.2.0"
version = "0.2.3"
author = "Status Research & Development GmbH"
description = "Simple, unified API for multiple data stores"
license = "Apache License 2.0 or MIT"
@ -11,8 +11,8 @@ requires "nim >= 2.0.14",
"chronos >= 4.0.4 & < 5.0.0",
"questionable >= 0.10.15 & < 0.11.0",
"sqlite3_abi >= 3.47.0.0 & < 4.0.0.0",
"leveldbstatic >= 0.2.0 & < 0.3.0",
"stew >= 0.2.0 & < 0.3.0",
"leveldbstatic >= 0.2.1 & < 0.3.0",
"stew >= 0.4.0 & < 0.5.0",
"unittest2 >= 0.2.3 & < 0.3.0"
task coverage, "generates code coverage report":

View File

@ -16,37 +16,37 @@ type
Modify* = Function[?seq[byte], Future[?seq[byte]]]
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
method has*(self: Datastore, key: Key): Future[?!bool] {.base, gcsafe, locks: "unknown".} =
method has*(self: Datastore, key: Key): Future[?!bool] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!")
method delete*(self: Datastore, key: Key): Future[?!void] {.base, gcsafe, locks: "unknown".} =
method delete*(self: Datastore, key: Key): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!")
method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!")
method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} =
method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!")
method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!")
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, locks: "unknown".} =
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!")
method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} =
method close*(self: Datastore): Future[?!void] {.base, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!")
method query*(
self: Datastore,
query: Query): Future[?!QueryIter] {.base, gcsafe.} =
query: Query): Future[?!QueryIter] {.base, gcsafe, async: (raises: [CancelledError]).} =
raiseAssert("Not implemented!")
proc contains*(self: Datastore, key: Key): Future[bool] {.async.} =
proc contains*(self: Datastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
return (await self.has(key)) |? false
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, locks: "unknown".} =
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
## Concurrently safe way of modifying the value associated with the `key`.
##
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
@ -54,7 +54,7 @@ method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gc
raiseAssert("Not implemented!")
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} =
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
## successful update.
##

View File

@ -11,10 +11,9 @@ proc defaultModifyGetImpl*(
lock: AsyncLock,
key: Key,
fn: ModifyGet
): Future[?!seq[byte]] {.async.} =
): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
# Default implementation, serializes all modify operations using provided lock
#
await lock.acquire()
try:
@ -34,6 +33,8 @@ proc defaultModifyGetImpl*(
try:
(maybeNewData, aux) = await fn(maybeCurrentData)
except CancelledError as err:
raise err
except CatchableError as err:
return failure(err)
@ -46,14 +47,17 @@ proc defaultModifyGetImpl*(
return aux.success
finally:
lock.release()
try:
lock.release()
except AsyncLockError as err:
return failure(err)
proc defaultModifyImpl*(
self: Datastore,
lock: AsyncLock,
key: Key,
fn: Modify
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let res = await fn(maybeValue)
let ignoredAux = newSeq[byte]()

View File

@ -65,10 +65,10 @@ proc path*(self: FSDatastore, key: Key): ?!string =
return success fullname
method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} =
method has*(self: FSDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
return self.path(key).?fileExists()
method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
method delete*(self: FSDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
without path =? self.path(key), error:
return failure error
@ -82,7 +82,7 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
return success()
method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} =
method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
@ -119,7 +119,7 @@ proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
except CatchableError as e:
return failure e
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
without path =? self.path(key), error:
return failure error
@ -132,7 +132,7 @@ method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
method put*(
self: FSDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
without path =? self.path(key), error:
return failure error
@ -147,7 +147,7 @@ method put*(
method put*(
self: FSDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
@ -163,12 +163,12 @@ proc dirWalker(path: string): (iterator: string {.raises: [Defect], gcsafe.}) =
except CatchableError as exc:
raise newException(Defect, exc.msg)
method close*(self: FSDatastore): Future[?!void] {.async.} =
method close*(self: FSDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
return success()
method query*(
self: FSDatastore,
query: Query): Future[?!QueryIter] {.async.} =
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
without path =? self.path(query.key), error:
return failure error
@ -189,7 +189,7 @@ method query*(
var
iter = QueryIter.new()
proc next(): Future[?!QueryResponse] {.async.} =
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
let
path = walker()
@ -208,8 +208,13 @@ method query*(
key = Key.init(keyPath).expect("should not fail")
data =
if query.value:
self.readFile((basePath / path).absolutePath)
.expect("Should read file")
try:
self.readFile((basePath / path).absolutePath)
.expect("Should read file")
except ValueError as err:
return failure err
except OSError as err:
return failure err
else:
@[]
@ -221,7 +226,7 @@ method query*(
method modifyGet*(
self: FSDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
var lock: AsyncLock
try:
lock = self.locks.mgetOrPut(key, newAsyncLock())
@ -233,8 +238,9 @@ method modifyGet*(
method modify*(
self: FSDatastore,
key: Key,
fn: Modify): Future[?!void] {.async.} =
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
var lock: AsyncLock
try:
lock = self.locks.mgetOrPut(key, newAsyncLock())
return await defaultModifyImpl(self, lock, key, fn)

View File

@ -5,6 +5,8 @@ import std/tables
import std/os
import std/strformat
import std/strutils
import std/sets
import std/sequtils
import pkg/leveldbstatic
import pkg/chronos
@ -19,28 +21,32 @@ type
LevelDbDatastore* = ref object of Datastore
db: LevelDb
locks: TableRef[Key, AsyncLock]
openIterators: HashSet[QueryIter]
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async.} =
proc hash(iter: QueryIter): Hash =
hash(addr iter[])
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
try:
let str = self.db.get($key)
return success(str.isSome)
except LevelDbException as e:
return failure("LevelDbDatastore.has exception: " & e.msg)
method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async.} =
method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
self.db.delete($key, sync = true)
return success()
except LevelDbException as e:
return failure("LevelDbDatastore.delete exception: " & e.msg)
method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async.} =
method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure(err.msg)
return success()
method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async.} =
method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
try:
let str = self.db.get($key)
if not str.isSome:
@ -50,7 +56,7 @@ method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async.} =
except LevelDbException as e:
return failure("LevelDbDatastore.get exception: " & $e.msg)
method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
let str = string.fromBytes(data)
self.db.put($key, str)
@ -58,7 +64,7 @@ method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {
except LevelDbException as e:
return failure("LevelDbDatastore.put exception: " & $e.msg)
method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
let b = newBatch()
for entry in batch:
@ -68,8 +74,12 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as
except LevelDbException as e:
return failure("LevelDbDatastore.put (batch) exception: " & $e.msg)
method close*(self: LevelDbDatastore): Future[?!void] {.async.} =
method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
for iter in self.openIterators.toSeq:
if err =? (await iter.dispose()).errorOption:
return failure(err.msg)
self.openIterators.clear()
self.db.close()
return success()
except LevelDbException as e:
@ -84,7 +94,7 @@ proc getQueryString(query: Query): string =
method query*(
self: LevelDbDatastore,
query: Query): Future[?!QueryIter] {.async, gcsafe.} =
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]), gcsafe.} =
if not (query.sort == SortOrder.Assending):
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
@ -98,7 +108,14 @@ method query*(
limit = query.limit
)
proc next(): Future[?!QueryResponse] {.async.} =
proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} =
dbIter.dispose()
iter.disposed = true
self.openIterators.excl(iter)
return success()
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
if iter.finished:
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
@ -107,6 +124,9 @@ method query*(
if dbIter.finished:
iter.finished = true
if err =? (await dispose()).errorOption:
return failure(err)
return success (Key.none, EmptyBytes)
else:
let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.")
@ -114,18 +134,16 @@ method query*(
except LevelDbException as e:
return failure("LevelDbDatastore.query -> next exception: " & $e.msg)
proc dispose(): Future[?!void] {.async.} =
dbIter.dispose()
return success()
iter.next = next
iter.dispose = dispose
self.openIterators.incl(iter)
return success iter
method modifyGet*(
self: LevelDbDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
var lock: AsyncLock
try:
lock = self.locks.mgetOrPut(key, newAsyncLock())
@ -137,7 +155,7 @@ method modifyGet*(
method modify*(
self: LevelDbDatastore,
key: Key,
fn: Modify): Future[?!void] {.async.} =
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
var lock: AsyncLock
try:
lock = self.locks.mgetOrPut(key, newAsyncLock())
@ -146,6 +164,9 @@ method modify*(
if not lock.locked:
self.locks.del(key)
proc openIteratorCount*(self: LevelDbDatastore): int =
self.openIterators.len
proc new*(
T: type LevelDbDatastore, dbName: string): ?!T =
try:

View File

@ -63,7 +63,7 @@ proc dispatch(
method has*(
self: MountedDatastore,
key: Key): Future[?!bool] {.async.} =
key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key):
return failure "No mounted datastore found"
@ -72,7 +72,7 @@ method has*(
method delete*(
self: MountedDatastore,
key: Key): Future[?!void] {.async.} =
key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error:
return failure(error)
@ -81,7 +81,7 @@ method delete*(
method delete*(
self: MountedDatastore,
keys: seq[Key]): Future[?!void] {.async.} =
keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
@ -91,7 +91,7 @@ method delete*(
method get*(
self: MountedDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error:
return failure(error)
@ -101,7 +101,7 @@ method get*(
method put*(
self: MountedDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error:
return failure(error)
@ -110,7 +110,7 @@ method put*(
method put*(
self: MountedDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
@ -121,7 +121,7 @@ method put*(
method modifyGet*(
self: MountedDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error:
return failure(error)
@ -131,14 +131,14 @@ method modifyGet*(
method modify*(
self: MountedDatastore,
key: Key,
fn: Modify): Future[?!void] {.async.} =
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error:
return failure(error)
return await mounted.store.store.modify(mounted.relative, fn)
method close*(self: MountedDatastore): Future[?!void] {.async.} =
method close*(self: MountedDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
for s in self.stores.values:
discard await s.store.close()

View File

@ -23,10 +23,11 @@ type
QueryResponse* = tuple[key: ?Key, data: seq[byte]]
QueryEndedError* = object of DatastoreError
GetNext* = proc(): Future[?!QueryResponse] {.raises: [], gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.raises: [], gcsafe.}
GetNext* = proc(): Future[?!QueryResponse] {.async: (raises: [CancelledError]), gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.}
QueryIter* = ref object
finished*: bool
disposed*: bool
next*: GetNext
dispose*: IterDispose
@ -34,7 +35,7 @@ iterator items*(q: QueryIter): Future[?!QueryResponse] =
while not q.finished:
yield q.next()
proc defaultDispose(): Future[?!void] {.gcsafe, async.} =
proc defaultDispose(): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} =
return success()
proc new*(T: type QueryIter, dispose = defaultDispose): T =

View File

@ -40,7 +40,7 @@ proc newRollbackError(rbErr: ref CatchableError, opErrMsg: string): ref Rollback
proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError =
return newRollbackError(rbErr, opErr)
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} =
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
var
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
aux: seq[byte]
@ -62,6 +62,8 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
try:
(maybeNewData, aux) = await fn(maybeCurrentData)
except CancelledError as err:
raise err
except CatchableError as err:
return failure(err)
@ -135,7 +137,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
return success(aux)
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} =
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let res = await fn(maybeValue)
let ignoredAux = newSeq[byte]()
@ -146,7 +148,7 @@ method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.as
else:
return success()
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
var
exists = false
@ -158,10 +160,10 @@ method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
return success exists
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} =
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
return self.db.deleteStmt.exec((key.id))
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} =
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
if err =? self.db.beginStmt.exec().errorOption:
return failure(err)
@ -177,7 +179,7 @@ method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.}
return success()
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
# see comment in ./filesystem_datastore re: finer control of memory
# allocation in `method get`, could apply here as well if bytes were read
# incrementally with `sqlite3_blob_read`
@ -197,10 +199,10 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
return success bytes
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
return self.db.putStmt.exec((key.id, data, initVersion, timestamp()))
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
if err =? self.db.beginStmt.exec().errorOption:
return failure err
@ -216,14 +218,14 @@ method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.asy
return success()
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
method close*(self: SQLiteDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
self.db.close()
return success()
method query*(
self: SQLiteDatastore,
query: Query): Future[?!QueryIter] {.async.} =
query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
var
iter = QueryIter()
@ -267,7 +269,7 @@ method query*(
if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v))
proc next(): Future[?!QueryResponse] {.async.} =
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
if iter.finished:
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
@ -321,7 +323,7 @@ method query*(
iter.finished = true
return failure newException(DatastoreError, $sqlite3_errstr(v))
iter.dispose = proc(): Future[?!void] {.async.} =
iter.dispose = proc(): Future[?!void] {.async: (raises: [CancelledError]).} =
discard sqlite3_reset(s)
discard sqlite3_clear_bindings(s)
iter.next = nil

View File

@ -28,7 +28,7 @@ proc stores*(self: TieredDatastore): seq[Datastore] =
method has*(
self: TieredDatastore,
key: Key): Future[?!bool] {.async.} =
key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
for store in self.stores:
without res =? (await store.has(key)), err:
@ -41,32 +41,32 @@ method has*(
method delete*(
self: TieredDatastore,
key: Key): Future[?!void] {.async.} =
key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
let
pending = await allFinished(self.stores.mapIt(it.delete(key)))
for fut in pending:
if fut.read().isErr: return fut.read()
? await fut
return success()
method delete*(
self: TieredDatastore,
keys: seq[Key]): Future[?!void] {.async.} =
keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
for key in keys:
let
pending = await allFinished(self.stores.mapIt(it.delete(key)))
for fut in pending:
if fut.read().isErr: return fut.read()
? await fut
return success()
method get*(
self: TieredDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
var
bytes: seq[byte]
@ -91,33 +91,33 @@ method get*(
method put*(
self: TieredDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
let
pending = await allFinished(self.stores.mapIt(it.put(key, data)))
for fut in pending:
if fut.read().isErr: return fut.read()
? await fut
return success()
method put*(
self: TieredDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
for entry in batch:
let
pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data)))
for fut in pending:
if fut.read().isErr: return fut.read()
? await fut
return success()
method modifyGet*(
self: TieredDatastore,
key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
let
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
@ -125,23 +125,21 @@ method modifyGet*(
var aux = newSeq[byte]()
for fut in pending:
if fut.read().isErr:
return fut.read()
else:
aux.add(fut.read().get)
let res = ? await fut
aux.add(res)
return success(aux)
method modify*(
self: TieredDatastore,
key: Key,
fn: Modify): Future[?!void] {.async.} =
fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
let
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
for fut in pending:
if fut.read().isErr: return fut.read()
? await fut
return success()

View File

@ -47,11 +47,11 @@ type
TypedDatastore* = ref object of RootObj
ds*: Datastore
Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CatchableError], gcsafe, closure.}
ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CatchableError], gcsafe, closure.}
Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CancelledError], gcsafe, closure.}
ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CancelledError], gcsafe, closure.}
QueryResponse*[T] = tuple[key: ?Key, value: ?!T]
GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.raises: [], gcsafe, closure.}
GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]), gcsafe, closure.}
QueryIter*[T] = ref object
finished*: bool
next*: GetNext[T]
@ -71,16 +71,16 @@ template requireEncoder*(T: typedesc): untyped =
{.error: "provide an encoder: `proc encode(a: " & $T & "): seq[byte]`".}
# Original Datastore API
proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async.} =
proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
await self.ds.has(key)
proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async.} =
proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
return (await self.ds.has(key)) |? false
proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async.} =
proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
await self.ds.delete(key)
proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async.} =
proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
await self.ds.delete(keys)
proc close*(self: TypedDatastore): Future[?!void] {.async.} =
@ -90,19 +90,19 @@ proc close*(self: TypedDatastore): Future[?!void] {.async.} =
proc init*(T: type TypedDatastore, ds: Datastore): T =
TypedDatastore(ds: ds)
proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async.} =
proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async: (raises: [CancelledError]).} =
requireEncoder(T)
await self.ds.put(key, t.encode)
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async.} =
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async: (raises: [CancelledError]).} =
requireDecoder(T)
without bytes =? await self.ds.get(key), error:
return failure(error)
return T.decode(bytes)
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} =
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async: (raises: [CancelledError]).} =
requireDecoder(T)
requireEncoder(T)
@ -123,7 +123,7 @@ proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {
await self.ds.modify(key, wrappedFn)
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async.} =
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async: (raises: [CancelledError]).} =
requireDecoder(T)
requireEncoder(T)
requireEncoder(U)
@ -153,7 +153,7 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu
return U.decode(auxBytes)
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} =
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async: (raises: [CancelledError]).} =
requireDecoder(T)
without dsIter =? await self.ds.query(q), error:
@ -161,14 +161,14 @@ proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.}
return failure(childErr)
var iter = QueryIter[T]()
iter.dispose = proc (): Future[?!void] {.async.} =
iter.dispose = proc (): Future[?!void] {.async: (raises: [CancelledError]).} =
await dsIter.dispose()
if dsIter.finished:
iter.finished = true
return success(iter)
proc getNext: Future[?!QueryResponse[T]] {.async.} =
proc getNext: Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]).} =
without pair =? await dsIter.next(), error:
return failure(error)

View File

@ -67,7 +67,7 @@ suite "Test LevelDB Typed Query":
test "Typed Queries":
typedDsQueryTests(ds)
suite "LevelDB Query: keys should disregard trailing wildcards":
suite "LevelDB Query":
let tempDir = getTempDir() / "testleveldbds"
var
ds: LevelDbDatastore
@ -97,7 +97,28 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
(await ds.close()).tryGet
removeDir(tempDir)
test "Forward":
test "should query by prefix":
let
q = Query.init(Key.init("/a").tryGet)
iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check:
res.len == 3
res[0].key.get == key1
res[0].data == val1
res[1].key.get == key2
res[1].data == val2
res[2].key.get == key3
res[2].data == val3
(await iter.dispose()).tryGet
test "should disregard forward trailing wildcards in keys":
let
q = Query.init(Key.init("/a/*").tryGet)
iter = (await ds.query(q)).tryGet
@ -116,9 +137,7 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
res[2].key.get == key3
res[2].data == val3
(await iter.dispose()).tryGet
test "Backwards":
test "should disregard backward trailing wildcards in key":
let
q = Query.init(Key.init("/a\\*").tryGet)
iter = (await ds.query(q)).tryGet
@ -137,4 +156,49 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
res[2].key.get == key3
res[2].data == val3
test "should dispose automatically of iterators when finished":
let
q = Query.init(Key.init("/a/b/c").tryGet)
iter = (await ds.query(q)).tryGet
let val = (await iter.next()).tryGet()
check val.key.get == key3
check val.data == val3
check iter.finished == false
check iter.disposed == false
let val2 = (await iter.next()).tryGet()
check val2.key == Key.none
check val2.data == EmptyBytes
check iter.finished == true
check iter.disposed == true
test "should dispose automatically of iterators when datastore is closed":
let
q1 = Query.init(Key.init("/a/b/c").tryGet)
q2 = Query.init(Key.init("/a/b").tryGet)
i1 = (await ds.query(q1)).tryGet
i2 = (await ds.query(q2)).tryGet
check i1.disposed == false
check i2.disposed == false
(await ds.close()).tryGet
check i1.disposed == true
check i2.disposed == true
test "should have idempotent QueryIterator.dispose":
let q = Query.init(Key.init("/a/b/c").tryGet)
let iter = (await ds.query(q)).tryGet
(await iter.dispose()).tryGet
(await iter.dispose()).tryGet
test "should stop tracking iterator objects once those are disposed":
let q = Query.init(Key.init("/a/b/c").tryGet)
let iter = (await ds.query(q)).tryGet
check ds.openIteratorCount == 1
(await iter.dispose()).tryGet
check ds.openIteratorCount == 0