TypedDatastore implementation
This commit is contained in:
parent
d02f2e7380
commit
50b480270f
|
@ -9,7 +9,7 @@ license = "Apache License 2.0 or MIT"
|
|||
requires "nim >= 1.2.0",
|
||||
"asynctest >= 0.4.3 & < 0.5.0",
|
||||
"chronos#c41599a", # FIXME change to Chronos >= 4.0.0 once it's out
|
||||
"questionable >= 0.10.3 & < 0.11.0",
|
||||
"questionable >= 0.10.15 & < 0.11.0",
|
||||
"sqlite3_abi",
|
||||
"stew",
|
||||
"unittest2",
|
||||
|
|
|
@ -10,6 +10,7 @@ import pkg/upraises
|
|||
|
||||
import ../datastore
|
||||
import ./sqlitedsdb
|
||||
import ./sqliteutils
|
||||
|
||||
export datastore, sqlitedsdb
|
||||
|
||||
|
|
|
@ -7,8 +7,6 @@ import pkg/upraises
|
|||
|
||||
import ./sqliteutils
|
||||
|
||||
export sqliteutils
|
||||
|
||||
type
|
||||
BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].}
|
||||
BoundVersionCol* = proc (): int64 {.closure, gcsafe, upraises: [].}
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
import std/options
|
||||
import std/macros
|
||||
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/chronos
|
||||
import pkg/chronos/futures
|
||||
|
||||
import ./datastore
|
||||
|
||||
## Wrapper for Datastore with basic functionality of automatically converting
|
||||
## stored values from some user defined type `T` to `seq[byte]` and vice-versa.
|
||||
##
|
||||
## To use this API you need to provide decoder and encoder procs.
|
||||
##
|
||||
## Basic usage
|
||||
## ==================
|
||||
## .. code-block:: Nim
|
||||
## import pkg/stew/byteutils
|
||||
## import pkg/questionable
|
||||
##
|
||||
## let
|
||||
## tds = ds.asTypedDs()
|
||||
## key = Key.init("p").tryGet()
|
||||
##
|
||||
## type Person = object
|
||||
## name: string
|
||||
## proc encode(t: Person): seq[byte] =
|
||||
## t.name.toBytes()
|
||||
## proc decode(T: type Person, bytes: seq[byte]): ?!T =
|
||||
## success(Person(name: string.fromBytes(bytes)))
|
||||
##
|
||||
## discard await tds.put(key, Person(name: "John"))
|
||||
##
|
||||
## let result = (await get[Person](tds, key)).tryGet()
|
||||
##
|
||||
## assert:
|
||||
## result == Person(name: "John")
|
||||
|
||||
|
||||
type
|
||||
TypedDatastore* = ref object of RootObj
|
||||
ds*: Datastore
|
||||
|
||||
Modify*[T] = proc(v: ?T): Future[?T] {.raises: [CatchableError], gcsafe, closure.}
|
||||
ModifyGet*[T, U] = proc(v: ?T): Future[(?T, U)] {.raises: [CatchableError], gcsafe, closure.}
|
||||
|
||||
QueryResponse*[T] = tuple[key: ?Key, value: ?!T]
|
||||
GetNext*[T] = proc(): Future[?!QueryResponse[T]] {.raises: [], gcsafe, closure.}
|
||||
QueryIter*[T] = ref object
|
||||
finished*: bool
|
||||
next*: GetNext[T]
|
||||
dispose*: IterDispose
|
||||
|
||||
# Helpers
|
||||
template requireDecoder*(T: typedesc): untyped =
|
||||
when (compiles do:
|
||||
let _: ?!T = T.decode(newSeq[byte]())):
|
||||
discard
|
||||
else:
|
||||
{.error: "provide a decoder: `proc decode(T: type " & $T & ", bytes: seq[byte]): ?!T`".}
|
||||
|
||||
template requireEncoder*(T: typedesc): untyped =
|
||||
when (compiles do:
|
||||
let _: seq[byte] = encode(default(T))):
|
||||
discard
|
||||
else:
|
||||
{.error: "provide an encoder: `proc encode(a: " & $T & "): seq[byte]`".}
|
||||
|
||||
# Original Datastore API
|
||||
proc has*(self: TypedDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
await self.ds.has(key)
|
||||
|
||||
proc contains*(self: TypedDatastore, key: Key): Future[bool] {.async.} =
|
||||
return (await self.ds.has(key)) |? false
|
||||
|
||||
proc delete*(self: TypedDatastore, key: Key): Future[?!void] {.async.} =
|
||||
await self.ds.delete(key)
|
||||
|
||||
proc delete*(self: TypedDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
||||
await self.ds.delete(keys)
|
||||
|
||||
proc close*(self: TypedDatastore): Future[?!void] {.async.} =
|
||||
await self.ds.close()
|
||||
|
||||
# Conversion Datastore -> TypeDatastore
|
||||
proc asTypedDs*(ds: Datastore): TypedDatastore =
|
||||
TypedDatastore(ds: ds)
|
||||
|
||||
# TypedDatastore API
|
||||
proc put*[T](self: TypedDatastore, key: Key, t: T): Future[?!void] {.async.} =
|
||||
requireEncoder(T)
|
||||
|
||||
await self.ds.put(key, t.encode)
|
||||
|
||||
proc get*[T](self: TypedDatastore, key: Key): Future[?!T] {.async.} =
|
||||
requireDecoder(T)
|
||||
|
||||
without bytes =? await self.ds.get(key), err:
|
||||
return failure(err)
|
||||
return T.decode(bytes)
|
||||
|
||||
proc modify*[T](self: TypedDatastore, key: Key, fn: Modify[T]): Future[?!void] {.async.} =
|
||||
requireDecoder(T)
|
||||
requireEncoder(T)
|
||||
|
||||
proc wrappedFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} =
|
||||
var
|
||||
maybeNextT: ?T
|
||||
if bytes =? maybeBytes:
|
||||
without t =? T.decode(bytes), err:
|
||||
raise err
|
||||
maybeNextT = await fn(t.some)
|
||||
else:
|
||||
maybeNextT = await fn(T.none)
|
||||
|
||||
if nextT =? maybeNextT:
|
||||
return nextT.encode().some
|
||||
else:
|
||||
return seq[byte].none
|
||||
|
||||
await self.ds.modify(key, wrappedFn)
|
||||
|
||||
proc modifyGet*[T, U](self: TypedDatastore, key: Key, fn: ModifyGet[T, U]): Future[?!U] {.async.} =
|
||||
requireDecoder(T)
|
||||
requireEncoder(T)
|
||||
requireEncoder(U)
|
||||
requireDecoder(U)
|
||||
|
||||
proc wrappedFn(maybeBytes: ?seq[byte]): Future[(Option[seq[byte]], seq[byte])] {.async.} =
|
||||
var
|
||||
maybeNextT: ?T
|
||||
aux: U
|
||||
if bytes =? maybeBytes:
|
||||
without t =? T.decode(bytes), err:
|
||||
raise err
|
||||
|
||||
(maybeNextT, aux) = await fn(t.some)
|
||||
else:
|
||||
(maybeNextT, aux) = await fn(T.none)
|
||||
|
||||
if nextT =? maybeNextT:
|
||||
let b: seq[byte] = nextT.encode()
|
||||
return (b.some, aux.encode())
|
||||
else:
|
||||
return (seq[byte].none, aux.encode())
|
||||
|
||||
without auxBytes =? await self.ds.modifyGet(key, wrappedFn), err:
|
||||
return failure(err)
|
||||
|
||||
|
||||
return U.decode(auxBytes)
|
||||
|
||||
proc query*[T](self: TypedDatastore, q: Query): Future[?!QueryIter[T]] {.async.} =
|
||||
requireDecoder(T)
|
||||
|
||||
without dsIter =? await self.ds.query(q), err:
|
||||
let childErr = newException(CatchableError, "Error executing query with key " & $q.key, parentException = err)
|
||||
return failure(childErr)
|
||||
|
||||
var iter = QueryIter[T]()
|
||||
iter.dispose = proc (): Future[?!void] {.async.} =
|
||||
await dsIter.dispose()
|
||||
|
||||
if dsIter.finished:
|
||||
iter.finished = true
|
||||
return success(iter)
|
||||
|
||||
proc getNext: Future[?!QueryResponse[T]] {.async.} =
|
||||
without pair =? await dsIter.next(), err:
|
||||
return failure(err)
|
||||
|
||||
if dsIter.finished:
|
||||
iter.finished = true
|
||||
|
||||
return success((key: pair.key, value: T.decode(pair.data)))
|
||||
|
||||
iter.next = getNext
|
||||
|
||||
return success(iter)
|
|
@ -15,7 +15,7 @@ import pkg/datastore
|
|||
proc modifyTests*(
|
||||
ds: Datastore,
|
||||
key: Key,
|
||||
multiAux: bool = false) =
|
||||
dsCount = 1) =
|
||||
|
||||
randomize()
|
||||
|
||||
|
@ -108,17 +108,10 @@ proc modifyTests*(
|
|||
proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||
return (seq[byte].none, @[byte 123])
|
||||
|
||||
let res = await ds.modifyGet(key, returningAux)
|
||||
let aux = (await ds.modifyGet(key, returningAux)).tryGet()
|
||||
|
||||
if multiAux:
|
||||
check:
|
||||
res.errorOption.map((err) => err.msg) == none(string)
|
||||
for b in res |? @[]:
|
||||
check:
|
||||
b == 123.byte
|
||||
else:
|
||||
check:
|
||||
res == success(@[byte 123])
|
||||
check:
|
||||
aux == (123.byte).repeat(dsCount)
|
||||
|
||||
test "should propagate exception as failure":
|
||||
proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} =
|
||||
|
|
|
@ -13,6 +13,7 @@ import pkg/datastore/sql/sqliteds
|
|||
import ../dscommontests
|
||||
import ../modifycommontests
|
||||
import ../querycommontests
|
||||
import ../typeddscommontests
|
||||
|
||||
suite "Test Basic SQLiteDatastore":
|
||||
let
|
||||
|
@ -26,6 +27,7 @@ suite "Test Basic SQLiteDatastore":
|
|||
|
||||
basicStoreTests(ds, key, bytes, otherBytes)
|
||||
modifyTests(ds, key)
|
||||
typedDsTests(ds, key)
|
||||
|
||||
suite "Test Read Only SQLiteDatastore":
|
||||
let
|
||||
|
@ -89,3 +91,12 @@ suite "Test Query":
|
|||
(await ds.close()).tryGet
|
||||
|
||||
queryTests(ds)
|
||||
|
||||
suite "Test Typed Query":
|
||||
let
|
||||
ds = SQLiteDatastore.new(Memory).tryGet()
|
||||
|
||||
teardownAll:
|
||||
(await ds.close()).tryGet
|
||||
|
||||
typedDsQueryTests(ds)
|
||||
|
|
|
@ -7,6 +7,7 @@ import pkg/stew/byteutils
|
|||
import pkg/sqlite3_abi
|
||||
import pkg/datastore/key
|
||||
import pkg/datastore/sql/sqlitedsdb
|
||||
import pkg/datastore/sql/sqliteutils
|
||||
import pkg/datastore/sql/sqliteds
|
||||
|
||||
suite "Test Open SQLite Datastore DB":
|
||||
|
|
|
@ -13,6 +13,7 @@ import pkg/datastore/fsds
|
|||
import ./dscommontests
|
||||
import ./modifycommontests
|
||||
import ./querycommontests
|
||||
import ./typeddscommontests
|
||||
|
||||
suite "Test Basic FSDatastore":
|
||||
let
|
||||
|
@ -39,6 +40,7 @@ suite "Test Basic FSDatastore":
|
|||
|
||||
basicStoreTests(fsStore, key, bytes, otherBytes)
|
||||
modifyTests(fsStore, key)
|
||||
typedDsTests(fsStore, key)
|
||||
|
||||
suite "Test Misc FSDatastore":
|
||||
let
|
||||
|
@ -137,4 +139,9 @@ suite "Test Query":
|
|||
removeDir(basePathAbs)
|
||||
require(not dirExists(basePathAbs))
|
||||
|
||||
teardownAll:
|
||||
removeDir(basePathAbs)
|
||||
require(not dirExists(basePathAbs))
|
||||
|
||||
queryTests(ds, false)
|
||||
typedDsQueryTests(ds)
|
||||
|
|
|
@ -13,6 +13,7 @@ import pkg/datastore/fsds
|
|||
|
||||
import ./dscommontests
|
||||
import ./modifycommontests
|
||||
import ./typeddscommontests
|
||||
|
||||
suite "Test Basic Mounted Datastore":
|
||||
let
|
||||
|
@ -52,11 +53,13 @@ suite "Test Basic Mounted Datastore":
|
|||
let namespace = Key.init(sqlKey, key).tryGet
|
||||
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
|
||||
modifyTests(mountedDs, namespace)
|
||||
typedDsTests(mountedDs, namespace)
|
||||
|
||||
suite "Mounted fs":
|
||||
let namespace = Key.init(fsKey, key).tryGet
|
||||
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
|
||||
modifyTests(mountedDs, namespace)
|
||||
typedDsTests(mountedDs, namespace)
|
||||
|
||||
suite "Test Mounted Datastore":
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import pkg/datastore/tieredds
|
|||
|
||||
import ./modifycommontests
|
||||
import ./dscommontests
|
||||
import ./typeddscommontests
|
||||
|
||||
suite "Test Basic Tired Datastore":
|
||||
let
|
||||
|
@ -41,7 +42,8 @@ suite "Test Basic Tired Datastore":
|
|||
require(not dirExists(rootAbs))
|
||||
|
||||
basicStoreTests(tiredDs, key, bytes, otherBytes)
|
||||
modifyTests(tiredDs, key, multiAux = true)
|
||||
modifyTests(tiredDs, key, dsCount = 2)
|
||||
typedDsTests(tiredDs, key, dsCount = 2)
|
||||
|
||||
suite "TieredDatastore":
|
||||
# assumes tests/test_all is run from project root, e.g. with `nimble test`
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
import std/options
|
||||
import std/sugar
|
||||
import std/tables
|
||||
import std/strutils
|
||||
|
||||
import pkg/asynctest
|
||||
import pkg/chronos
|
||||
import pkg/stew/byteutils
|
||||
import pkg/stew/endians2
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import pkg/datastore/typedds
|
||||
import pkg/datastore
|
||||
|
||||
proc encode(i: int): seq[byte] =
|
||||
@(cast[uint64](i).toBytesBE)
|
||||
|
||||
proc decode(T: type int, bytes: seq[byte]): ?!T =
|
||||
if bytes.len >= sizeof(uint64):
|
||||
success(cast[int](uint64.fromBytesBE(bytes)))
|
||||
else:
|
||||
failure("not enough bytes to decode int")
|
||||
|
||||
proc encode(s: string): seq[byte] =
|
||||
s.toBytes()
|
||||
|
||||
proc decode(T: type string, bytes: seq[byte]): ?!T =
|
||||
success(string.fromBytes(bytes))
|
||||
|
||||
proc typedDsTests*(
|
||||
ds: Datastore,
|
||||
key: Key,
|
||||
dsCount = 1) =
|
||||
|
||||
let tds: TypedDatastore = ds.asTypedDs()
|
||||
|
||||
test "should put a value":
|
||||
(await tds.put(key, 11)).tryGet()
|
||||
|
||||
check:
|
||||
(await tds.has(key)).tryGet()
|
||||
|
||||
test "should get the value":
|
||||
(await tds.put(key, 22)).tryGet()
|
||||
|
||||
check:
|
||||
22 == (await get[int](tds, key)).tryGet()
|
||||
|
||||
test "should insert a value":
|
||||
proc returningSome(_: ?int): Future[?int] {.async.} =
|
||||
some(33)
|
||||
|
||||
(await tds.delete(key)).tryGet()
|
||||
(await tds.modify(key, returningSome)).tryGet()
|
||||
|
||||
check:
|
||||
(await tds.has(key)).tryGet()
|
||||
|
||||
test "should delete a value":
|
||||
proc returningNone(_: ?int): Future[?int] {.async.} =
|
||||
int.none
|
||||
|
||||
(await tds.put(key, 33)).tryGet()
|
||||
(await tds.modify(key, returningNone)).tryGet()
|
||||
|
||||
check:
|
||||
not (await tds.has(key)).tryGet()
|
||||
|
||||
test "should update a value":
|
||||
proc incrementing(maybeI: ?int): Future[?int] {.async.} =
|
||||
if i =? maybeI:
|
||||
some(i + 1)
|
||||
else:
|
||||
int.none
|
||||
|
||||
(await tds.put(key, 33)).tryGet()
|
||||
(await tds.modify(key, incrementing)).tryGet()
|
||||
|
||||
check:
|
||||
34 == (await get[int](tds, key)).tryGet()
|
||||
|
||||
test "should update a value and get aux":
|
||||
proc returningAux(maybeI: ?int): Future[(?int, string)] {.async.} =
|
||||
if i =? maybeI:
|
||||
(some(i + 1), "foo")
|
||||
else:
|
||||
(int.none, "bar")
|
||||
|
||||
(await tds.put(key, 44)).tryGet()
|
||||
|
||||
check:
|
||||
"foo".repeat(dsCount) == (await tds.modifyGet(key, returningAux)).tryGet()
|
||||
45 == (await get[int](tds, key)).tryGet()
|
||||
|
||||
test "should propagate exception as failure":
|
||||
proc throwing(a: ?int): Future[?int] {.async.} =
|
||||
raise newException(CatchableError, "some error msg")
|
||||
|
||||
check:
|
||||
some("some error msg") == (await tds.modify(key, throwing)).errorOption.map((err) => err.msg)
|
||||
|
||||
proc typedDsQueryTests*(ds: Datastore) =
|
||||
|
||||
let tds = ds.asTypedDs()
|
||||
|
||||
test "should query values":
|
||||
let
|
||||
source = {
|
||||
"a": 11,
|
||||
"b": 22,
|
||||
"c": 33,
|
||||
"d": 44
|
||||
}.toTable
|
||||
Root = Key.init("/querytest").tryGet()
|
||||
|
||||
for k, v in source:
|
||||
let key = (Root / k).tryGet()
|
||||
(await tds.put(key, v)).tryGet()
|
||||
|
||||
let iter = (await query[int](tds, Query.init(Root))).tryGet()
|
||||
|
||||
var results = initTable[string, int]()
|
||||
|
||||
while not iter.finished:
|
||||
let
|
||||
item = (await iter.next()).tryGet()
|
||||
|
||||
without key =? item.key:
|
||||
continue
|
||||
|
||||
let value = item.value.tryGet()
|
||||
|
||||
check:
|
||||
key.value notin results
|
||||
|
||||
results[key.value] = value
|
||||
|
||||
check:
|
||||
results == source
|
Loading…
Reference in New Issue