Compare commits

..

10 Commits

Author SHA1 Message Date
Giuliano Mega
1d22c62325
fix: use proper object reference in hashing (#85)
Uses `addr obj[]` instead of `addr obj`, fix tests.

Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com>
2026-02-09 10:01:13 -03:00
Jacek Sieka
c2c1f55a22
chore: bump leveldb (#81)
..to include fix for newer cmake version

Co-authored-by: Giuliano Mega <giuliano.mega@gmail.com>
2026-02-06 16:27:25 -03:00
Giuliano Mega
92af582d97
fix: close pending iterators before closing LevelDB store (#83)
This adds tracking of open iterators to leveldbds so that when one attempts to close it, iterators are disposed of first. It also adds automatic disposal if iterators are completely consumed.

fixes #82
2026-02-06 16:24:11 -03:00
Arnaud
a7ee4b170a Use short syntax with nim-results 2025-12-11 08:38:39 -03:00
Arnaud
950990720a Use await instead of fut.read 2025-12-11 08:38:39 -03:00
Arnaud
d583647c5b Propagate CancelledError 2025-12-11 08:38:39 -03:00
Arnaud
92f9533c7d Add more raises in async pragma 2025-12-11 08:38:39 -03:00
Arnaud
9586f95ccd Do not propagate AsyncLockError 2025-12-11 08:38:39 -03:00
Arnaud
42e4f530bf Add errors to raises async pragam 2025-12-11 08:38:39 -03:00
Arnaud
13c0a0cdb1 Define raises for async pragma 2025-12-11 08:38:39 -03:00
11 changed files with 201 additions and 105 deletions

View File

@ -1,7 +1,7 @@
mode = ScriptMode.Verbose mode = ScriptMode.Verbose
packageName = "datastore" packageName = "datastore"
version = "0.2.1" version = "0.2.3"
author = "Status Research & Development GmbH" author = "Status Research & Development GmbH"
description = "Simple, unified API for multiple data stores" description = "Simple, unified API for multiple data stores"
license = "Apache License 2.0 or MIT" license = "Apache License 2.0 or MIT"
@ -11,7 +11,7 @@ requires "nim >= 2.0.14",
"chronos >= 4.0.4 & < 5.0.0", "chronos >= 4.0.4 & < 5.0.0",
"questionable >= 0.10.15 & < 0.11.0", "questionable >= 0.10.15 & < 0.11.0",
"sqlite3_abi >= 3.47.0.0 & < 4.0.0.0", "sqlite3_abi >= 3.47.0.0 & < 4.0.0.0",
"leveldbstatic >= 0.2.0 & < 0.3.0", "leveldbstatic >= 0.2.1 & < 0.3.0",
"stew >= 0.4.0 & < 0.5.0", "stew >= 0.4.0 & < 0.5.0",
"unittest2 >= 0.2.3 & < 0.3.0" "unittest2 >= 0.2.3 & < 0.3.0"

View File

@ -16,37 +16,37 @@ type
Modify* = Function[?seq[byte], Future[?seq[byte]]] Modify* = Function[?seq[byte], Future[?seq[byte]]]
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]] ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
method has*(self: Datastore, key: Key): Future[?!bool] {.base, gcsafe, locks: "unknown".} = method has*(self: Datastore, key: Key): Future[?!bool] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method delete*(self: Datastore, key: Key): Future[?!void] {.base, gcsafe, locks: "unknown".} = method delete*(self: Datastore, key: Key): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, gcsafe, locks: "unknown".} = method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} = method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, gcsafe, locks: "unknown".} = method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, locks: "unknown".} = method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} = method close*(self: Datastore): Future[?!void] {.base, async: (raises: [CancelledError]), locks: "unknown".} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method query*( method query*(
self: Datastore, self: Datastore,
query: Query): Future[?!QueryIter] {.base, gcsafe.} = query: Query): Future[?!QueryIter] {.base, gcsafe, async: (raises: [CancelledError]).} =
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
proc contains*(self: Datastore, key: Key): Future[bool] {.async.} = proc contains*(self: Datastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
return (await self.has(key)) |? false return (await self.has(key)) |? false
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, locks: "unknown".} = method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
## Concurrently safe way of modifying the value associated with the `key`. ## Concurrently safe way of modifying the value associated with the `key`.
## ##
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value. ## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
@ -54,7 +54,7 @@ method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, gc
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, locks: "unknown".} = method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, gcsafe, async: (raises: [CancelledError]), locks: "unknown".} =
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on ## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
## successful update. ## successful update.
## ##

View File

@ -11,10 +11,9 @@ proc defaultModifyGetImpl*(
lock: AsyncLock, lock: AsyncLock,
key: Key, key: Key,
fn: ModifyGet fn: ModifyGet
): Future[?!seq[byte]] {.async.} = ): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
# Default implementation, serializes all modify operations using provided lock # Default implementation, serializes all modify operations using provided lock
# #
await lock.acquire() await lock.acquire()
try: try:
@ -34,6 +33,8 @@ proc defaultModifyGetImpl*(
try: try:
(maybeNewData, aux) = await fn(maybeCurrentData) (maybeNewData, aux) = await fn(maybeCurrentData)
except CancelledError as err:
raise err
except CatchableError as err: except CatchableError as err:
return failure(err) return failure(err)
@ -46,14 +47,17 @@ proc defaultModifyGetImpl*(
return aux.success return aux.success
finally: finally:
try:
lock.release() lock.release()
except AsyncLockError as err:
return failure(err)
proc defaultModifyImpl*( proc defaultModifyImpl*(
self: Datastore, self: Datastore,
lock: AsyncLock, lock: AsyncLock,
key: Key, key: Key,
fn: Modify fn: Modify
): Future[?!void] {.async.} = ): Future[?!void] {.async: (raises: [CancelledError]).} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let res = await fn(maybeValue) let res = await fn(maybeValue)
let ignoredAux = newSeq[byte]() let ignoredAux = newSeq[byte]()

View File

@ -65,10 +65,10 @@ proc path*(self: FSDatastore, key: Key): ?!string =
return success fullname return success fullname
method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} = method has*(self: FSDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
return self.path(key).?fileExists() return self.path(key).?fileExists()
method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} = method delete*(self: FSDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
without path =? self.path(key), error: without path =? self.path(key), error:
return failure error return failure error
@ -82,7 +82,7 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
return success() return success()
method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} = method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
for key in keys: for key in keys:
if err =? (await self.delete(key)).errorOption: if err =? (await self.delete(key)).errorOption:
return failure err return failure err
@ -119,7 +119,7 @@ proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
except CatchableError as e: except CatchableError as e:
return failure e return failure e
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} = method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
without path =? self.path(key), error: without path =? self.path(key), error:
return failure error return failure error
@ -132,7 +132,7 @@ method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
method put*( method put*(
self: FSDatastore, self: FSDatastore,
key: Key, key: Key,
data: seq[byte]): Future[?!void] {.async.} = data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
without path =? self.path(key), error: without path =? self.path(key), error:
return failure error return failure error
@ -147,7 +147,7 @@ method put*(
method put*( method put*(
self: FSDatastore, self: FSDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} = batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
for entry in batch: for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption: if err =? (await self.put(entry.key, entry.data)).errorOption:
@ -163,12 +163,12 @@ proc dirWalker(path: string): (iterator: string {.raises: [Defect], gcsafe.}) =
except CatchableError as exc: except CatchableError as exc:
raise newException(Defect, exc.msg) raise newException(Defect, exc.msg)
method close*(self: FSDatastore): Future[?!void] {.async.} = method close*(self: FSDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
return success() return success()
method query*( method query*(
self: FSDatastore, self: FSDatastore,
query: Query): Future[?!QueryIter] {.async.} = query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
without path =? self.path(query.key), error: without path =? self.path(query.key), error:
return failure error return failure error
@ -189,7 +189,7 @@ method query*(
var var
iter = QueryIter.new() iter = QueryIter.new()
proc next(): Future[?!QueryResponse] {.async.} = proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
let let
path = walker() path = walker()
@ -208,8 +208,13 @@ method query*(
key = Key.init(keyPath).expect("should not fail") key = Key.init(keyPath).expect("should not fail")
data = data =
if query.value: if query.value:
try:
self.readFile((basePath / path).absolutePath) self.readFile((basePath / path).absolutePath)
.expect("Should read file") .expect("Should read file")
except ValueError as err:
return failure err
except OSError as err:
return failure err
else: else:
@[] @[]
@ -221,7 +226,7 @@ method query*(
method modifyGet*( method modifyGet*(
self: FSDatastore, self: FSDatastore,
key: Key, key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} = fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
var lock: AsyncLock var lock: AsyncLock
try: try:
lock = self.locks.mgetOrPut(key, newAsyncLock()) lock = self.locks.mgetOrPut(key, newAsyncLock())
@ -233,8 +238,9 @@ method modifyGet*(
method modify*( method modify*(
self: FSDatastore, self: FSDatastore,
key: Key, key: Key,
fn: Modify): Future[?!void] {.async.} = fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
var lock: AsyncLock var lock: AsyncLock
try: try:
lock = self.locks.mgetOrPut(key, newAsyncLock()) lock = self.locks.mgetOrPut(key, newAsyncLock())
return await defaultModifyImpl(self, lock, key, fn) return await defaultModifyImpl(self, lock, key, fn)

View File

@ -5,6 +5,8 @@ import std/tables
import std/os import std/os
import std/strformat import std/strformat
import std/strutils import std/strutils
import std/sets
import std/sequtils
import pkg/leveldbstatic import pkg/leveldbstatic
import pkg/chronos import pkg/chronos
@ -19,28 +21,32 @@ type
LevelDbDatastore* = ref object of Datastore LevelDbDatastore* = ref object of Datastore
db: LevelDb db: LevelDb
locks: TableRef[Key, AsyncLock] locks: TableRef[Key, AsyncLock]
openIterators: HashSet[QueryIter]
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async.} = proc hash(iter: QueryIter): Hash =
hash(addr iter[])
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
try: try:
let str = self.db.get($key) let str = self.db.get($key)
return success(str.isSome) return success(str.isSome)
except LevelDbException as e: except LevelDbException as e:
return failure("LevelDbDatastore.has exception: " & e.msg) return failure("LevelDbDatastore.has exception: " & e.msg)
method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async.} = method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
try: try:
self.db.delete($key, sync = true) self.db.delete($key, sync = true)
return success() return success()
except LevelDbException as e: except LevelDbException as e:
return failure("LevelDbDatastore.delete exception: " & e.msg) return failure("LevelDbDatastore.delete exception: " & e.msg)
method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async.} = method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
for key in keys: for key in keys:
if err =? (await self.delete(key)).errorOption: if err =? (await self.delete(key)).errorOption:
return failure(err.msg) return failure(err.msg)
return success() return success()
method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async.} = method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
try: try:
let str = self.db.get($key) let str = self.db.get($key)
if not str.isSome: if not str.isSome:
@ -50,7 +56,7 @@ method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async.} =
except LevelDbException as e: except LevelDbException as e:
return failure("LevelDbDatastore.get exception: " & $e.msg) return failure("LevelDbDatastore.get exception: " & $e.msg)
method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
try: try:
let str = string.fromBytes(data) let str = string.fromBytes(data)
self.db.put($key, str) self.db.put($key, str)
@ -58,7 +64,7 @@ method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {
except LevelDbException as e: except LevelDbException as e:
return failure("LevelDbDatastore.put exception: " & $e.msg) return failure("LevelDbDatastore.put exception: " & $e.msg)
method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
try: try:
let b = newBatch() let b = newBatch()
for entry in batch: for entry in batch:
@ -68,8 +74,12 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as
except LevelDbException as e: except LevelDbException as e:
return failure("LevelDbDatastore.put (batch) exception: " & $e.msg) return failure("LevelDbDatastore.put (batch) exception: " & $e.msg)
method close*(self: LevelDbDatastore): Future[?!void] {.async.} = method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
try: try:
for iter in self.openIterators.toSeq:
if err =? (await iter.dispose()).errorOption:
return failure(err.msg)
self.openIterators.clear()
self.db.close() self.db.close()
return success() return success()
except LevelDbException as e: except LevelDbException as e:
@ -84,7 +94,7 @@ proc getQueryString(query: Query): string =
method query*( method query*(
self: LevelDbDatastore, self: LevelDbDatastore,
query: Query): Future[?!QueryIter] {.async, gcsafe.} = query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]), gcsafe.} =
if not (query.sort == SortOrder.Assending): if not (query.sort == SortOrder.Assending):
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.") return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
@ -98,7 +108,14 @@ method query*(
limit = query.limit limit = query.limit
) )
proc next(): Future[?!QueryResponse] {.async.} = proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} =
dbIter.dispose()
iter.disposed = true
self.openIterators.excl(iter)
return success()
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
if iter.finished: if iter.finished:
return failure(newException(QueryEndedError, "Calling next on a finished query!")) return failure(newException(QueryEndedError, "Calling next on a finished query!"))
@ -107,6 +124,9 @@ method query*(
if dbIter.finished: if dbIter.finished:
iter.finished = true iter.finished = true
if err =? (await dispose()).errorOption:
return failure(err)
return success (Key.none, EmptyBytes) return success (Key.none, EmptyBytes)
else: else:
let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.") let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.")
@ -114,18 +134,16 @@ method query*(
except LevelDbException as e: except LevelDbException as e:
return failure("LevelDbDatastore.query -> next exception: " & $e.msg) return failure("LevelDbDatastore.query -> next exception: " & $e.msg)
proc dispose(): Future[?!void] {.async.} =
dbIter.dispose()
return success()
iter.next = next iter.next = next
iter.dispose = dispose iter.dispose = dispose
self.openIterators.incl(iter)
return success iter return success iter
method modifyGet*( method modifyGet*(
self: LevelDbDatastore, self: LevelDbDatastore,
key: Key, key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} = fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
var lock: AsyncLock var lock: AsyncLock
try: try:
lock = self.locks.mgetOrPut(key, newAsyncLock()) lock = self.locks.mgetOrPut(key, newAsyncLock())
@ -137,7 +155,7 @@ method modifyGet*(
method modify*( method modify*(
self: LevelDbDatastore, self: LevelDbDatastore,
key: Key, key: Key,
fn: Modify): Future[?!void] {.async.} = fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
var lock: AsyncLock var lock: AsyncLock
try: try:
lock = self.locks.mgetOrPut(key, newAsyncLock()) lock = self.locks.mgetOrPut(key, newAsyncLock())
@ -146,6 +164,9 @@ method modify*(
if not lock.locked: if not lock.locked:
self.locks.del(key) self.locks.del(key)
proc openIteratorCount*(self: LevelDbDatastore): int =
self.openIterators.len
proc new*( proc new*(
T: type LevelDbDatastore, dbName: string): ?!T = T: type LevelDbDatastore, dbName: string): ?!T =
try: try:

View File

@ -63,7 +63,7 @@ proc dispatch(
method has*( method has*(
self: MountedDatastore, self: MountedDatastore,
key: Key): Future[?!bool] {.async.} = key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key): without mounted =? self.dispatch(key):
return failure "No mounted datastore found" return failure "No mounted datastore found"
@ -72,7 +72,7 @@ method has*(
method delete*( method delete*(
self: MountedDatastore, self: MountedDatastore,
key: Key): Future[?!void] {.async.} = key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error: without mounted =? self.dispatch(key), error:
return failure(error) return failure(error)
@ -81,7 +81,7 @@ method delete*(
method delete*( method delete*(
self: MountedDatastore, self: MountedDatastore,
keys: seq[Key]): Future[?!void] {.async.} = keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
for key in keys: for key in keys:
if err =? (await self.delete(key)).errorOption: if err =? (await self.delete(key)).errorOption:
@ -91,7 +91,7 @@ method delete*(
method get*( method get*(
self: MountedDatastore, self: MountedDatastore,
key: Key): Future[?!seq[byte]] {.async.} = key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error: without mounted =? self.dispatch(key), error:
return failure(error) return failure(error)
@ -101,7 +101,7 @@ method get*(
method put*( method put*(
self: MountedDatastore, self: MountedDatastore,
key: Key, key: Key,
data: seq[byte]): Future[?!void] {.async.} = data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error: without mounted =? self.dispatch(key), error:
return failure(error) return failure(error)
@ -110,7 +110,7 @@ method put*(
method put*( method put*(
self: MountedDatastore, self: MountedDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} = batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
for entry in batch: for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption: if err =? (await self.put(entry.key, entry.data)).errorOption:
@ -121,7 +121,7 @@ method put*(
method modifyGet*( method modifyGet*(
self: MountedDatastore, self: MountedDatastore,
key: Key, key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} = fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error: without mounted =? self.dispatch(key), error:
return failure(error) return failure(error)
@ -131,14 +131,14 @@ method modifyGet*(
method modify*( method modify*(
self: MountedDatastore, self: MountedDatastore,
key: Key, key: Key,
fn: Modify): Future[?!void] {.async.} = fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
without mounted =? self.dispatch(key), error: without mounted =? self.dispatch(key), error:
return failure(error) return failure(error)
return await mounted.store.store.modify(mounted.relative, fn) return await mounted.store.store.modify(mounted.relative, fn)
method close*(self: MountedDatastore): Future[?!void] {.async.} = method close*(self: MountedDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
for s in self.stores.values: for s in self.stores.values:
discard await s.store.close() discard await s.store.close()

View File

@ -23,10 +23,11 @@ type
QueryResponse* = tuple[key: ?Key, data: seq[byte]] QueryResponse* = tuple[key: ?Key, data: seq[byte]]
QueryEndedError* = object of DatastoreError QueryEndedError* = object of DatastoreError
GetNext* = proc(): Future[?!QueryResponse] {.raises: [], gcsafe, closure.} GetNext* = proc(): Future[?!QueryResponse] {.async: (raises: [CancelledError]), gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.raises: [], gcsafe.} IterDispose* = proc(): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.}
QueryIter* = ref object QueryIter* = ref object
finished*: bool finished*: bool
disposed*: bool
next*: GetNext next*: GetNext
dispose*: IterDispose dispose*: IterDispose
@ -34,7 +35,7 @@ iterator items*(q: QueryIter): Future[?!QueryResponse] =
while not q.finished: while not q.finished:
yield q.next() yield q.next()
proc defaultDispose(): Future[?!void] {.gcsafe, async.} = proc defaultDispose(): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} =
return success() return success()
proc new*(T: type QueryIter, dispose = defaultDispose): T = proc new*(T: type QueryIter, dispose = defaultDispose): T =

View File

@ -40,7 +40,7 @@ proc newRollbackError(rbErr: ref CatchableError, opErrMsg: string): ref Rollback
proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError = proc newRollbackError(rbErr: ref CatchableError, opErr: ref CatchableError): ref RollbackError =
return newRollbackError(rbErr, opErr) return newRollbackError(rbErr, opErr)
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} = method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
var var
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
aux: seq[byte] aux: seq[byte]
@ -62,6 +62,8 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
try: try:
(maybeNewData, aux) = await fn(maybeCurrentData) (maybeNewData, aux) = await fn(maybeCurrentData)
except CancelledError as err:
raise err
except CatchableError as err: except CatchableError as err:
return failure(err) return failure(err)
@ -135,7 +137,7 @@ method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[
return success(aux) return success(aux)
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} = method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
let res = await fn(maybeValue) let res = await fn(maybeValue)
let ignoredAux = newSeq[byte]() let ignoredAux = newSeq[byte]()
@ -146,7 +148,7 @@ method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.as
else: else:
return success() return success()
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
var var
exists = false exists = false
@ -158,10 +160,10 @@ method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
return success exists return success exists
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
return self.db.deleteStmt.exec((key.id)) return self.db.deleteStmt.exec((key.id))
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
if err =? self.db.beginStmt.exec().errorOption: if err =? self.db.beginStmt.exec().errorOption:
return failure(err) return failure(err)
@ -177,7 +179,7 @@ method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.}
return success() return success()
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
# see comment in ./filesystem_datastore re: finer control of memory # see comment in ./filesystem_datastore re: finer control of memory
# allocation in `method get`, could apply here as well if bytes were read # allocation in `method get`, could apply here as well if bytes were read
# incrementally with `sqlite3_blob_read` # incrementally with `sqlite3_blob_read`
@ -197,10 +199,10 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
return success bytes return success bytes
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
return self.db.putStmt.exec((key.id, data, initVersion, timestamp())) return self.db.putStmt.exec((key.id, data, initVersion, timestamp()))
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
if err =? self.db.beginStmt.exec().errorOption: if err =? self.db.beginStmt.exec().errorOption:
return failure err return failure err
@ -216,14 +218,14 @@ method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.asy
return success() return success()
method close*(self: SQLiteDatastore): Future[?!void] {.async.} = method close*(self: SQLiteDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
self.db.close() self.db.close()
return success() return success()
method query*( method query*(
self: SQLiteDatastore, self: SQLiteDatastore,
query: Query): Future[?!QueryIter] {.async.} = query: Query): Future[?!QueryIter] {.async: (raises: [CancelledError]).} =
var var
iter = QueryIter() iter = QueryIter()
@ -267,7 +269,7 @@ method query*(
if not (v == SQLITE_OK): if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v)) return failure newException(DatastoreError, $sqlite3_errstr(v))
proc next(): Future[?!QueryResponse] {.async.} = proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
if iter.finished: if iter.finished:
return failure(newException(QueryEndedError, "Calling next on a finished query!")) return failure(newException(QueryEndedError, "Calling next on a finished query!"))
@ -321,7 +323,7 @@ method query*(
iter.finished = true iter.finished = true
return failure newException(DatastoreError, $sqlite3_errstr(v)) return failure newException(DatastoreError, $sqlite3_errstr(v))
iter.dispose = proc(): Future[?!void] {.async.} = iter.dispose = proc(): Future[?!void] {.async: (raises: [CancelledError]).} =
discard sqlite3_reset(s) discard sqlite3_reset(s)
discard sqlite3_clear_bindings(s) discard sqlite3_clear_bindings(s)
iter.next = nil iter.next = nil

View File

@ -28,7 +28,7 @@ proc stores*(self: TieredDatastore): seq[Datastore] =
method has*( method has*(
self: TieredDatastore, self: TieredDatastore,
key: Key): Future[?!bool] {.async.} = key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
for store in self.stores: for store in self.stores:
without res =? (await store.has(key)), err: without res =? (await store.has(key)), err:
@ -41,32 +41,32 @@ method has*(
method delete*( method delete*(
self: TieredDatastore, self: TieredDatastore,
key: Key): Future[?!void] {.async.} = key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
let let
pending = await allFinished(self.stores.mapIt(it.delete(key))) pending = await allFinished(self.stores.mapIt(it.delete(key)))
for fut in pending: for fut in pending:
if fut.read().isErr: return fut.read() ? await fut
return success() return success()
method delete*( method delete*(
self: TieredDatastore, self: TieredDatastore,
keys: seq[Key]): Future[?!void] {.async.} = keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
for key in keys: for key in keys:
let let
pending = await allFinished(self.stores.mapIt(it.delete(key))) pending = await allFinished(self.stores.mapIt(it.delete(key)))
for fut in pending: for fut in pending:
if fut.read().isErr: return fut.read() ? await fut
return success() return success()
method get*( method get*(
self: TieredDatastore, self: TieredDatastore,
key: Key): Future[?!seq[byte]] {.async.} = key: Key): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
var var
bytes: seq[byte] bytes: seq[byte]
@ -91,33 +91,33 @@ method get*(
method put*( method put*(
self: TieredDatastore, self: TieredDatastore,
key: Key, key: Key,
data: seq[byte]): Future[?!void] {.async.} = data: seq[byte]): Future[?!void] {.async: (raises: [CancelledError]).} =
let let
pending = await allFinished(self.stores.mapIt(it.put(key, data))) pending = await allFinished(self.stores.mapIt(it.put(key, data)))
for fut in pending: for fut in pending:
if fut.read().isErr: return fut.read() ? await fut
return success() return success()
method put*( method put*(
self: TieredDatastore, self: TieredDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} = batch: seq[BatchEntry]): Future[?!void] {.async: (raises: [CancelledError]).} =
for entry in batch: for entry in batch:
let let
pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data))) pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data)))
for fut in pending: for fut in pending:
if fut.read().isErr: return fut.read() ? await fut
return success() return success()
method modifyGet*( method modifyGet*(
self: TieredDatastore, self: TieredDatastore,
key: Key, key: Key,
fn: ModifyGet): Future[?!seq[byte]] {.async.} = fn: ModifyGet): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
let let
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn))) pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
@ -125,23 +125,21 @@ method modifyGet*(
var aux = newSeq[byte]() var aux = newSeq[byte]()
for fut in pending: for fut in pending:
if fut.read().isErr: let res = ? await fut
return fut.read() aux.add(res)
else:
aux.add(fut.read().get)
return success(aux) return success(aux)
method modify*( method modify*(
self: TieredDatastore, self: TieredDatastore,
key: Key, key: Key,
fn: Modify): Future[?!void] {.async.} = fn: Modify): Future[?!void] {.async: (raises: [CancelledError]).} =
let let
pending = await allFinished(self.stores.mapIt(it.modify(key, fn))) pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
for fut in pending: for fut in pending:
if fut.read().isErr: return fut.read() ? await fut
return success() return success()

View File

@ -47,11 +47,11 @@ type
TypedDatastore* = ref object of RootObj TypedDatastore* = ref object of RootObj
ds*: Datastore ds*: Datastore
Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CatchableError], gcsafe, closure.} Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CancelledError], gcsafe, closure.}
ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CatchableError], gcsafe, closure.} ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CancelledError], gcsafe, closure.}
QueryResponse*[T] = tuple[key: ?Key, value: ?!T] QueryResponse*[T] = tuple[key: ?Key, value: ?!T]
GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.raises: [], gcsafe, closure.} GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]), gcsafe, closure.}
QueryIter*[T] = ref object QueryIter*[T] = ref object
finished*: bool finished*: bool
next*: GetNext[T] next*: GetNext[T]
@ -71,16 +71,16 @@ template requireEncoder*(T: typedesc): untyped =
{.error: "provide an encoder: `proc encode(a: " & $T & "): seq[byte]`".} {.error: "provide an encoder: `proc encode(a: " & $T & "): seq[byte]`".}
# Original Datastore API # Original Datastore API
proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async.} = proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
await self.ds.has(key) await self.ds.has(key)
proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async.} = proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async: (raises: [CancelledError]).} =
return (await self.ds.has(key)) |? false return (await self.ds.has(key)) |? false
proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async.} = proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async: (raises: [CancelledError]).} =
await self.ds.delete(key) await self.ds.delete(key)
proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async.} = proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async: (raises: [CancelledError]).} =
await self.ds.delete(keys) await self.ds.delete(keys)
proc close*(self: TypedDatastore): Future[?!void] {.async.} = proc close*(self: TypedDatastore): Future[?!void] {.async.} =
@ -90,19 +90,19 @@ proc close*(self: TypedDatastore): Future[?!void] {.async.} =
proc init*(T: type TypedDatastore, ds: Datastore): T = proc init*(T: type TypedDatastore, ds: Datastore): T =
TypedDatastore(ds: ds) TypedDatastore(ds: ds)
proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async.} = proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async: (raises: [CancelledError]).} =
requireEncoder(T) requireEncoder(T)
await self.ds.put(key, t.encode) await self.ds.put(key, t.encode)
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async.} = proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async: (raises: [CancelledError]).} =
requireDecoder(T) requireDecoder(T)
without bytes =? await self.ds.get(key), error: without bytes =? await self.ds.get(key), error:
return failure(error) return failure(error)
return T.decode(bytes) return T.decode(bytes)
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} = proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async: (raises: [CancelledError]).} =
requireDecoder(T) requireDecoder(T)
requireEncoder(T) requireEncoder(T)
@ -123,7 +123,7 @@ proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {
await self.ds.modify(key, wrappedFn) await self.ds.modify(key, wrappedFn)
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async.} = proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async: (raises: [CancelledError]).} =
requireDecoder(T) requireDecoder(T)
requireEncoder(T) requireEncoder(T)
requireEncoder(U) requireEncoder(U)
@ -153,7 +153,7 @@ proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Futu
return U.decode(auxBytes) return U.decode(auxBytes)
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} = proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async: (raises: [CancelledError]).} =
requireDecoder(T) requireDecoder(T)
without dsIter =? await self.ds.query(q), error: without dsIter =? await self.ds.query(q), error:
@ -161,14 +161,14 @@ proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.}
return failure(childErr) return failure(childErr)
var iter = QueryIter[T]() var iter = QueryIter[T]()
iter.dispose = proc (): Future[?!void] {.async.} = iter.dispose = proc (): Future[?!void] {.async: (raises: [CancelledError]).} =
await dsIter.dispose() await dsIter.dispose()
if dsIter.finished: if dsIter.finished:
iter.finished = true iter.finished = true
return success(iter) return success(iter)
proc getNext: Future[?!QueryResponse[T]] {.async.} = proc getNext: Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]).} =
without pair =? await dsIter.next(), error: without pair =? await dsIter.next(), error:
return failure(error) return failure(error)

View File

@ -67,7 +67,7 @@ suite "Test LevelDB Typed Query":
test "Typed Queries": test "Typed Queries":
typedDsQueryTests(ds) typedDsQueryTests(ds)
suite "LevelDB Query: keys should disregard trailing wildcards": suite "LevelDB Query":
let tempDir = getTempDir() / "testleveldbds" let tempDir = getTempDir() / "testleveldbds"
var var
ds: LevelDbDatastore ds: LevelDbDatastore
@ -97,7 +97,28 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
(await ds.close()).tryGet (await ds.close()).tryGet
removeDir(tempDir) removeDir(tempDir)
test "Forward": test "should query by prefix":
let
q = Query.init(Key.init("/a").tryGet)
iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check:
res.len == 3
res[0].key.get == key1
res[0].data == val1
res[1].key.get == key2
res[1].data == val2
res[2].key.get == key3
res[2].data == val3
(await iter.dispose()).tryGet
test "should disregard forward trailing wildcards in keys":
let let
q = Query.init(Key.init("/a/*").tryGet) q = Query.init(Key.init("/a/*").tryGet)
iter = (await ds.query(q)).tryGet iter = (await ds.query(q)).tryGet
@ -116,9 +137,7 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
res[2].key.get == key3 res[2].key.get == key3
res[2].data == val3 res[2].data == val3
(await iter.dispose()).tryGet test "should disregard backward trailing wildcards in key":
test "Backwards":
let let
q = Query.init(Key.init("/a\\*").tryGet) q = Query.init(Key.init("/a\\*").tryGet)
iter = (await ds.query(q)).tryGet iter = (await ds.query(q)).tryGet
@ -137,4 +156,49 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
res[2].key.get == key3 res[2].key.get == key3
res[2].data == val3 res[2].data == val3
test "should dispose automatically of iterators when finished":
let
q = Query.init(Key.init("/a/b/c").tryGet)
iter = (await ds.query(q)).tryGet
let val = (await iter.next()).tryGet()
check val.key.get == key3
check val.data == val3
check iter.finished == false
check iter.disposed == false
let val2 = (await iter.next()).tryGet()
check val2.key == Key.none
check val2.data == EmptyBytes
check iter.finished == true
check iter.disposed == true
test "should dispose automatically of iterators when datastore is closed":
let
q1 = Query.init(Key.init("/a/b/c").tryGet)
q2 = Query.init(Key.init("/a/b").tryGet)
i1 = (await ds.query(q1)).tryGet
i2 = (await ds.query(q2)).tryGet
check i1.disposed == false
check i2.disposed == false
(await ds.close()).tryGet
check i1.disposed == true
check i2.disposed == true
test "should have idempotent QueryIterator.dispose":
let q = Query.init(Key.init("/a/b/c").tryGet)
let iter = (await ds.query(q)).tryGet
(await iter.dispose()).tryGet (await iter.dispose()).tryGet
(await iter.dispose()).tryGet
test "should stop tracking iterator objects once those are disposed":
let q = Query.init(Key.init("/a/b/c").tryGet)
let iter = (await ds.query(q)).tryGet
check ds.openIteratorCount == 1
(await iter.dispose()).tryGet
check ds.openIteratorCount == 0