diff --git a/datastore.nimble b/datastore.nimble index 7f229a2..4183fed 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -9,7 +9,7 @@ license = "Apache License 2.0 or MIT" requires "nim >= 1.2.0", "asynctest >= 0.4.3 & < 0.5.0", "chronos#c41599a", # FIXME change to Chronos >= 4.0.0 once it's out - "questionable >= 0.10.3 & < 0.11.0", + "questionable >= 0.10.15 & < 0.11.0", "sqlite3_abi", "leveldbstatic >= 0.1.2", "stew", diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index c764496..b966435 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -10,6 +10,7 @@ import pkg/upraises import ../datastore import ./sqlitedsdb +import ./sqliteutils export datastore, sqlitedsdb diff --git a/datastore/sql/sqlitedsdb.nim b/datastore/sql/sqlitedsdb.nim index 2381ecb..3e34319 100644 --- a/datastore/sql/sqlitedsdb.nim +++ b/datastore/sql/sqlitedsdb.nim @@ -7,8 +7,6 @@ import pkg/upraises import ./sqliteutils -export sqliteutils - type BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].} BoundVersionCol* = proc (): int64 {.closure, gcsafe, upraises: [].} diff --git a/datastore/typedds.nim b/datastore/typedds.nim new file mode 100644 index 0000000..dca1b9a --- /dev/null +++ b/datastore/typedds.nim @@ -0,0 +1,176 @@ +import std/options +import std/macros + +import pkg/questionable +import pkg/questionable/results +import pkg/chronos +import pkg/chronos/futures + +import ./datastore + +## Wrapper for Datastore with basic functionality of automatically converting +## stored values from some user defined type `T` to `seq[byte]` and vice-versa. +## +## To use this API you need to provide decoder and encoder procs. +## +## Basic usage +## ================== +## .. code-block:: Nim +## import pkg/stew/byteutils +## import pkg/questionable/results +## +## let +## tds = TypeDatastore.init(ds) +## key = Key.init("p").tryGet() +## +## type Person = object +## age: int +## name: string +## +## proc encode(p: Person): seq[byte] = +## ($p.age & ":" & p.name).toBytes() +## proc decode(T: type Person, bytes: seq[byte]): ?!T = +## let values = string.fromBytes(bytes).split(':', maxsplit = 1) +## success(Person(age: parseInt(values[0]), name: values[1])) +## +## let p1 = Person(name: "john", age: 21) +## (await tds.put(key, p1)).tryGet() +## let p2 = (await get[Person](tds, key)).tryGet() +## +## assert p1 == p2 + +type + TypedDatastore* = ref object of RootObj + ds*: Datastore + + Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CatchableError], gcsafe, closure.} + ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CatchableError], gcsafe, closure.} + + QueryResponse*[T] = tuple[key: ?Key, value: ?!T] + GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.raises: [], gcsafe, closure.} + QueryIter*[T] = ref object + finished*: bool + next*: GetNext[T] + dispose*: IterDispose + +# Helpers +template requireDecoder*(T: typedesc): untyped = + when not (compiles do: + let _: ?!T = T.decode(newSeq[byte]())): + {.error: "provide a decoder: `proc decode(T: type " & $T & ", bytes: seq[byte]): ?!T`".} + +template requireEncoder*(T: typedesc): untyped = + when not (compiles do: + let _: seq[byte] = encode(default(T))): + {.error: "provide an encoder: `proc encode(a: " & $T & "): seq[byte]`".} + +# Original Datastore API +proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async.} = + await self.ds.has(key) + +proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async.} = + return (await self.ds.has(key)) |? false + +proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async.} = + await self.ds.delete(key) + +proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async.} = + await self.ds.delete(keys) + +proc close*(self: TypedDatastore): Future[?!void] {.async.} = + await self.ds.close() + +# TypedDatastore API +proc init*(T: type TypedDatastore, ds: Datastore): T = + TypedDatastore(ds: ds) + +proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async.} = + requireEncoder(T) + + await self.ds.put(key, t.encode) + +proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async.} = + requireDecoder(T) + + without bytes =? await self.ds.get(key), err: + return failure(err) + return T.decode(bytes) + +proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} = + requireDecoder(T) + requireEncoder(T) + + proc wrappedFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} = + var + maybeNextT: ?T + if bytes =? maybeBytes: + without t =? T.decode(bytes), err: + raise err + maybeNextT = await fn(t.some) + else: + maybeNextT = await fn(T.none) + + if nextT =? maybeNextT: + return nextT.encode().some + else: + return seq[byte].none + + await self.ds.modify(key, wrappedFn) + +proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async.} = + requireDecoder(T) + requireEncoder(T) + requireEncoder(U) + requireDecoder(U) + + proc wrappedFn(maybeBytes: ?seq[byte]): Future[(Option[seq[byte]], seq[byte])] {.async.} = + var + maybeNextT: ?T + aux: U + if bytes =? maybeBytes: + without t =? T.decode(bytes), err: + raise err + + (maybeNextT, aux) = await fn(t.some) + else: + (maybeNextT, aux) = await fn(T.none) + + if nextT =? maybeNextT: + let b: seq[byte] = nextT.encode() + return (b.some, aux.encode()) + else: + return (seq[byte].none, aux.encode()) + + without auxBytes =? await self.ds.modifyGet(key, wrappedFn), err: + return failure(err) + + + return U.decode(auxBytes) + +proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} = + requireDecoder(T) + + without dsIter =? await self.ds.query(q), err: + let childErr = newException(CatchableError, "Error executing query with key " & $q.key, parentException = err) + return failure(childErr) + + var iter = QueryIter[T]() + iter.dispose = proc (): Future[?!void] {.async.} = + await dsIter.dispose() + + if dsIter.finished: + iter.finished = true + return success(iter) + + proc getNext: Future[?!QueryResponse[T]] {.async.} = + without pair =? await dsIter.next(), err: + return failure(err) + + if dsIter.finished: + iter.finished = true + + return success((key: pair.key, value: T.decode(pair.data))) + + iter.next = getNext + + return success(iter) diff --git a/tests/datastore/modifycommontests.nim b/tests/datastore/modifycommontests.nim index b7f4871..d512edc 100644 --- a/tests/datastore/modifycommontests.nim +++ b/tests/datastore/modifycommontests.nim @@ -15,7 +15,7 @@ import pkg/datastore proc modifyTests*( ds: Datastore, key: Key, - multiAux: bool = false) = + dsCount = 1) = randomize() @@ -108,17 +108,10 @@ proc modifyTests*( proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} = return (seq[byte].none, @[byte 123]) - let res = await ds.modifyGet(key, returningAux) + let aux = (await ds.modifyGet(key, returningAux)).tryGet() - if multiAux: - check: - res.errorOption.map((err) => err.msg) == none(string) - for b in res |? @[]: - check: - b == 123.byte - else: - check: - res == success(@[byte 123]) + check: + aux == (123.byte).repeat(dsCount) test "should propagate exception as failure": proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} = diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index b3b8ba2..b48b6fe 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -13,6 +13,7 @@ import pkg/datastore/sql/sqliteds import ../dscommontests import ../modifycommontests import ../querycommontests +import ../typeddscommontests suite "Test Basic SQLiteDatastore": let @@ -26,6 +27,7 @@ suite "Test Basic SQLiteDatastore": basicStoreTests(ds, key, bytes, otherBytes) modifyTests(ds, key) + typedDsTests(ds, key) suite "Test Read Only SQLiteDatastore": let @@ -92,3 +94,12 @@ suite "Test Query": testLimitsAndOffsets = true, testSortOrder = true ) + +suite "Test Typed Query": + let + ds = SQLiteDatastore.new(Memory).tryGet() + + teardownAll: + (await ds.close()).tryGet + + typedDsQueryTests(ds) diff --git a/tests/datastore/sql/testsqlitedsdb.nim b/tests/datastore/sql/testsqlitedsdb.nim index 7251d31..4dfd3d2 100644 --- a/tests/datastore/sql/testsqlitedsdb.nim +++ b/tests/datastore/sql/testsqlitedsdb.nim @@ -7,6 +7,7 @@ import pkg/stew/byteutils import pkg/sqlite3_abi import pkg/datastore/key import pkg/datastore/sql/sqlitedsdb +import pkg/datastore/sql/sqliteutils import pkg/datastore/sql/sqliteds suite "Test Open SQLite Datastore DB": diff --git a/tests/datastore/testfsds.nim b/tests/datastore/testfsds.nim index 9053cff..5791463 100644 --- a/tests/datastore/testfsds.nim +++ b/tests/datastore/testfsds.nim @@ -13,6 +13,7 @@ import pkg/datastore/fsds import ./dscommontests import ./modifycommontests import ./querycommontests +import ./typeddscommontests suite "Test Basic FSDatastore": let @@ -39,6 +40,7 @@ suite "Test Basic FSDatastore": basicStoreTests(fsStore, key, bytes, otherBytes) modifyTests(fsStore, key) + typedDsTests(fsStore, key) suite "Test Misc FSDatastore": let @@ -137,7 +139,12 @@ suite "Test Query": removeDir(basePathAbs) require(not dirExists(basePathAbs)) + teardownAll: + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + queryTests(ds, testLimitsAndOffsets = false, testSortOrder = false ) + typedDsQueryTests(ds) diff --git a/tests/datastore/testmountedds.nim b/tests/datastore/testmountedds.nim index 15f91f3..825a473 100644 --- a/tests/datastore/testmountedds.nim +++ b/tests/datastore/testmountedds.nim @@ -13,6 +13,7 @@ import pkg/datastore/fsds import ./dscommontests import ./modifycommontests +import ./typeddscommontests suite "Test Basic Mounted Datastore": let @@ -52,11 +53,13 @@ suite "Test Basic Mounted Datastore": let namespace = Key.init(sqlKey, key).tryGet basicStoreTests(mountedDs, namespace, bytes, otherBytes) modifyTests(mountedDs, namespace) + typedDsTests(mountedDs, namespace) suite "Mounted fs": let namespace = Key.init(fsKey, key).tryGet basicStoreTests(mountedDs, namespace, bytes, otherBytes) modifyTests(mountedDs, namespace) + typedDsTests(mountedDs, namespace) suite "Test Mounted Datastore": diff --git a/tests/datastore/testtieredds.nim b/tests/datastore/testtieredds.nim index 272c852..1e924aa 100644 --- a/tests/datastore/testtieredds.nim +++ b/tests/datastore/testtieredds.nim @@ -12,6 +12,7 @@ import pkg/datastore/tieredds import ./modifycommontests import ./dscommontests +import ./typeddscommontests suite "Test Basic Tired Datastore": let @@ -41,7 +42,8 @@ suite "Test Basic Tired Datastore": require(not dirExists(rootAbs)) basicStoreTests(tiredDs, key, bytes, otherBytes) - modifyTests(tiredDs, key, multiAux = true) + modifyTests(tiredDs, key, dsCount = 2) + typedDsTests(tiredDs, key, dsCount = 2) suite "TieredDatastore": # assumes tests/test_all is run from project root, e.g. with `nimble test` diff --git a/tests/datastore/typeddscommontests.nim b/tests/datastore/typeddscommontests.nim new file mode 100644 index 0000000..ae8c4fd --- /dev/null +++ b/tests/datastore/typeddscommontests.nim @@ -0,0 +1,140 @@ +import std/options +import std/sugar +import std/tables +import std/strutils + +import pkg/asynctest +import pkg/chronos +import pkg/stew/byteutils +import pkg/stew/endians2 +import pkg/questionable +import pkg/questionable/results + +import pkg/datastore/typedds +import pkg/datastore + +proc encode(i: int): seq[byte] = + @(cast[uint64](i).toBytesBE) + +proc decode(T: type int, bytes: seq[byte]): ?!T = + if bytes.len >= sizeof(uint64): + success(cast[int](uint64.fromBytesBE(bytes))) + else: + failure("not enough bytes to decode int") + +proc encode(s: string): seq[byte] = + s.toBytes() + +proc decode(T: type string, bytes: seq[byte]): ?!T = + success(string.fromBytes(bytes)) + +proc typedDsTests*( + ds: Datastore, + key: Key, + dsCount = 1) = + + let tds = TypedDatastore.init(ds) + + test "should put a value": + (await tds.put(key, 11)).tryGet() + + check: + (await tds.has(key)).tryGet() + + test "should get the value": + (await tds.put(key, 22)).tryGet() + + check: + 22 == (await get[int](tds, key)).tryGet() + + test "should insert a value": + proc returningSome(_: ?int): Future[?int] {.async.} = + some(33) + + (await tds.delete(key)).tryGet() + (await tds.modify(key, returningSome)).tryGet() + + check: + (await tds.has(key)).tryGet() + + test "should delete a value": + proc returningNone(_: ?int): Future[?int] {.async.} = + int.none + + (await tds.put(key, 33)).tryGet() + (await tds.modify(key, returningNone)).tryGet() + + check: + not (await tds.has(key)).tryGet() + + test "should update a value": + proc incrementing(maybeI: ?int): Future[?int] {.async.} = + if i =? maybeI: + some(i + 1) + else: + int.none + + (await tds.put(key, 33)).tryGet() + (await tds.modify(key, incrementing)).tryGet() + + check: + 34 == (await get[int](tds, key)).tryGet() + + test "should update a value and get aux": + proc returningAux(maybeI: ?int): Future[(?int, string)] {.async.} = + if i =? maybeI: + (some(i + 1), "foo") + else: + (int.none, "bar") + + (await tds.put(key, 44)).tryGet() + + check: + "foo".repeat(dsCount) == (await tds.modifyGet(key, returningAux)).tryGet() + 45 == (await get[int](tds, key)).tryGet() + + test "should propagate exception as failure": + proc throwing(a: ?int): Future[?int] {.async.} = + raise newException(CatchableError, "some error msg") + + check: + some("some error msg") == (await tds.modify(key, throwing)).errorOption.map((err) => err.msg) + +proc typedDsQueryTests*(ds: Datastore) = + + let tds = TypedDatastore.init(ds) + + test "should query values": + let + source = { + "a": 11, + "b": 22, + "c": 33, + "d": 44 + }.toTable + Root = Key.init("/querytest").tryGet() + + for k, v in source: + let key = (Root / k).tryGet() + (await tds.put(key, v)).tryGet() + + let iter = (await query[int](tds, Query.init(Root))).tryGet() + + var results = initTable[string, int]() + + while not iter.finished: + let + item = (await iter.next()).tryGet() + + without key =? item.key: + continue + + let value = item.value.tryGet() + + check: + key.value notin results + + results[key.value] = value + + check: + results == source