Add proper sqlite query support (#30)

* Add query support for sqlite backend

* basic tests with in memory ds

* remove `close` default implementation
This commit is contained in:
Dmitriy Ryajov 2022-09-20 16:41:54 -04:00 committed by GitHub
parent 8775b2d49a
commit 02167bb69e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 277 additions and 327 deletions

View File

@ -5,18 +5,12 @@ import pkg/upraises
import ./key
import ./query
import ./types
export key, query
export key, query, types
push: {.upraises: [].}
type
DatastoreError* = object of CatchableError
DatastoreKeyNotFound* = object of DatastoreError
CodexResult*[T] = Result[T, ref DatastoreError]
Datastore* = ref object of RootObj
method contains*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} =
raiseAssert("Not implemented!")
@ -30,10 +24,10 @@ method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base,
raiseAssert("Not implemented!")
method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} =
return success()
raiseAssert("Not implemented!")
method query*(
self: Datastore,
query: Query): Future[QueryIter] {.gcsafe.} =
query: Query): Future[?!QueryIter] {.gcsafe.} =
raiseAssert("Not implemented!")

View File

@ -58,10 +58,6 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
removeFile(path)
return success()
# removing an empty directory might lead to surprising behavior depending
# on what the user specified as the `root` of the FSDatastore, so
# until further consideration, empty directories will be left in place
except OSError as e:
return failure e

View File

@ -96,18 +96,25 @@ func `$`*(namespace: Namespace): string =
func init*(T: type Key, namespaces: varargs[Namespace]): ?!T =
if namespaces.len == 0:
failure "namespaces must contain at least one Namespace"
else:
success T(namespaces: @namespaces)
return failure "namespaces must contain at least one Namespace"
success T(namespaces: @namespaces)
func init*(T: type Key, namespaces: varargs[string]): ?!T =
if namespaces.len == 0:
failure "namespaces must contain at least one Namespace id string"
else:
success T(
namespaces: namespaces.mapIt(
?Namespace.init(it)
))
return failure "namespaces must contain at least one Namespace id string"
success T(
namespaces: namespaces.mapIt(
?Namespace.init(it)
))
func init*(T: type Key, keys: varargs[Key]): ?!T =
if keys.len == 0:
return failure "No keys provided"
success T(
namespaces: keys.mapIt(it.namespaces).mapIt(it).concat)
func init*(T: type Key, id: string): ?!T =
if id == "":

View File

@ -1,28 +1,34 @@
import pkg/upraises
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ./key
import ./types
type
SortOrder* {.pure.} = enum
Assending,
Descensing
Descending
Query* = object
key*: Key
value*: bool
limit*: int
skip*: int
offset*: int
sort*: SortOrder
QueryResponse* = tuple[key: Key, data: seq[byte]]
QueryResponse* = tuple[key: ?Key, data: seq[byte]]
QueryEndedError* = object of DatastoreError
GetNext* = proc(): Future[QueryResponse] {.upraises: [], gcsafe, closure.}
QueryIter* = object
finished: bool
GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.}
QueryIter* = ref object
finished*: bool
next*: GetNext
dispose*: IterDispose
iterator items*(q: QueryIter): Future[QueryResponse] =
iterator items*(q: QueryIter): Future[?!QueryResponse] =
while not q.finished:
yield q.next()
@ -30,13 +36,13 @@ proc init*(
T: type Query,
key: Key,
value = false,
sort = SortOrder.Descensing,
skip = 0,
limit = 0): T =
sort = SortOrder.Assending,
offset = 0,
limit = -1): T =
T(
key: key,
value: value,
sort: sort,
skip: skip,
offset: offset,
limit: limit)

View File

@ -14,6 +14,9 @@ export datastore, sqlitedsdb
push: {.upraises: [].}
const
EmptyBytes = newSeq[byte](0)
type
SQLiteDatastore* = ref object of Datastore
readOnly: bool
@ -64,73 +67,109 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
return success bytes
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
return self.db.putStmt.exec((key.id, @data, timestamp()))
return self.db.putStmt.exec((key.id, data, timestamp()))
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
self.db.close()
return success()
# iterator query*(
# self: SQLiteDatastore,
# query: Query): Future[QueryResponse] =
method query*(
self: SQLiteDatastore,
query: Query): Future[?!QueryIter] {.async.} =
# let
# queryStmt = QueryStmt.prepare(
# self.db.env, QueryStmtStr).expect("should not fail")
var
iter = QueryIter()
queryStr = QueryStmtStr
# s = RawStmtPtr(queryStmt)
if query.sort == SortOrder.Descending:
queryStr &= QueryStmtOrderDescending
else:
queryStr &= QueryStmtOrderAscending
# defer:
# discard sqlite3_reset(s)
# discard sqlite3_clear_bindings(s)
# s.dispose
if query.limit != 0:
queryStr &= QueryStmtLimit
# let
# v = sqlite3_bind_text(s, 1.cint, query.key.id.cstring, -1.cint,
# SQLITE_TRANSIENT_GCSAFE)
if query.offset != 0:
queryStr &= QueryStmtOffset
# if not (v == SQLITE_OK):
# raise (ref Defect)(msg: $sqlite3_errstr(v))
let
queryStmt = QueryStmt.prepare(
self.db.env, queryStr).expect("should not fail")
# while true:
# let
# v = sqlite3_step(s)
s = RawStmtPtr(queryStmt)
# case v
# of SQLITE_ROW:
# let
# key = Key.init($sqlite3_column_text_not_null(
# s, QueryStmtIdCol)).expect("should not fail")
var
v = sqlite3_bind_text(
s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE)
# blob = sqlite3_column_blob(s, QueryStmtDataCol)
if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v))
# # detect out-of-memory error
# # see the conversion table and final paragraph of:
# # https://www.sqlite.org/c3ref/column_blob.html
# # see also https://www.sqlite.org/rescode.html
if query.limit != 0:
v = sqlite3_bind_int(s, 2.cint, query.limit.cint)
# # the "data" column can be NULL so in order to detect an out-of-memory
# # error it is necessary to check that the result is a null pointer and
# # that the result code is an error code
# if blob.isNil:
# let
# v = sqlite3_errcode(sqlite3_db_handle(s))
if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v))
# if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]):
# raise (ref Defect)(msg: $sqlite3_errstr(v))
if query.offset != 0:
v = sqlite3_bind_int(s, 3.cint, query.offset.cint)
# let
# dataLen = sqlite3_column_bytes(s, QueryStmtDataCol)
# dataBytes = cast[ptr UncheckedArray[byte]](blob)
# data = @(toOpenArray(dataBytes, 0, dataLen - 1))
# fut = newFuture[QueryResponse]()
if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v))
# fut.complete((key, data))
# yield fut
# of SQLITE_DONE:
# break
# else:
# raise (ref Defect)(msg: $sqlite3_errstr(v))
proc next(): Future[?!QueryResponse] {.async.} =
if iter.finished:
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
let
v = sqlite3_step(s)
case v
of SQLITE_ROW:
let
key = Key.init(
$sqlite3_column_text_not_null(s,QueryStmtIdCol))
.expect("should not fail")
blob = sqlite3_column_blob(s, QueryStmtDataCol)
# detect out-of-memory error
# see the conversion table and final paragraph of:
# https://www.sqlite.org/c3ref/column_blob.html
# see also https://www.sqlite.org/rescode.html
# the "data" column can be NULL so in order to detect an out-of-memory
# error it is necessary to check that the result is a null pointer and
# that the result code is an error code
if blob.isNil:
let
v = sqlite3_errcode(sqlite3_db_handle(s))
if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]):
iter.finished = true
return failure newException(DatastoreError, $sqlite3_errstr(v))
let
dataLen = sqlite3_column_bytes(s, QueryStmtDataCol)
dataBytes = cast[ptr UncheckedArray[byte]](blob)
data = @(toOpenArray(dataBytes, 0, dataLen - 1))
return success (key.some, data)
of SQLITE_DONE:
iter.finished = true
return success (Key.none, EmptyBytes)
else:
iter.finished = true
return failure newException(DatastoreError, $sqlite3_errstr(v))
iter.dispose = proc(): Future[?!void] {.async.} =
discard sqlite3_reset(s)
discard sqlite3_clear_bindings(s)
s.dispose
return success()
iter.next = next
return success iter
proc new*(
T: type SQLiteDatastore,

View File

@ -60,7 +60,7 @@ const
SELECT EXISTS(
SELECT 1 FROM """ & TableName & """
WHERE """ & IdColName & """ = ?
);
)
"""
ContainsStmtExistsCol* = 0
@ -75,12 +75,12 @@ const
DeleteStmtStr* = """
DELETE FROM """ & TableName & """
WHERE """ & IdColName & """ = ?;
WHERE """ & IdColName & """ = ?
"""
GetStmtStr* = """
SELECT """ & DataColName & """ FROM """ & TableName & """
WHERE """ & IdColName & """ = ?;
WHERE """ & IdColName & """ = ?
"""
GetStmtDataCol* = 0
@ -90,12 +90,28 @@ const
""" & IdColName & """,
""" & DataColName & """,
""" & TimestampColName & """
) VALUES (?, ?, ?);
) VALUES (?, ?, ?)
"""
QueryStmtStr* = """
SELECT """ & IdColName & """, """ & DataColName & """ FROM """ & TableName &
""" WHERE """ & IdColName & """ GLOB ?;
""" WHERE """ & IdColName & """ GLOB ?
"""
QueryStmtOffset* = """
OFFSET ?
"""
QueryStmtLimit* = """
LIMIT ?
"""
QueryStmtOrderAscending* = """
ORDER BY """ & IdColName & """ ASC
"""
QueryStmtOrderDescending* = """
ORDER BY """ & IdColName & """ DESC
"""
QueryStmtIdCol* = 0

5
datastore/types.nim Normal file
View File

@ -0,0 +1,5 @@
type
DatastoreError* = object of CatchableError
DatastoreKeyNotFound* = object of DatastoreError
Datastore* = ref object of RootObj

View File

@ -1,5 +1,7 @@
import std/options
import std/os
import std/sequtils
from std/algorithm import sort, reversed
import pkg/asynctest/unittest2
import pkg/chronos
@ -12,32 +14,15 @@ import ../basictests
suite "Test Basic SQLiteDatastore":
let
(path, _, _) = instantiationInfo(-1, fullPaths = true) # get this file's name
basePath = "tests_data"
basePathAbs = path.parentDir / basePath
filename = "test_store" & DbExt
dbPathAbs = basePathAbs / filename
ds = SQLiteDatastore.new(Memory).tryGet()
key = Key.init("a:b/c/d:e").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
var
dsDb: SQLiteDatastore
teardown:
(await ds.close()).tryGet()
setupAll:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
createDir(basePathAbs)
dsDb = SQLiteDatastore.new(path = dbPathAbs).tryGet()
teardownAll:
(await dsDb.close()).tryGet()
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
basicStoreTests(dsDb, key, bytes, otherBytes)
basicStoreTests(ds, key, bytes, otherBytes)
suite "Test Read Only SQLiteDatastore":
let
@ -90,261 +75,163 @@ suite "Test Read Only SQLiteDatastore":
not (await readOnlyDb.contains(key)).tryGet()
not (await dsDb.contains(key)).tryGet()
# test "query":
# ds = SQLiteDatastore.new(basePathAbs, filename).get
suite "Test Query":
var
ds: SQLiteDatastore
# var
# key1 = Key.init("a").get
# key2 = Key.init("a/b").get
# key3 = Key.init("a/b:c").get
# key4 = Key.init("a:b").get
# key5 = Key.init("a:b/c").get
# key6 = Key.init("a:b/c:d").get
# key7 = Key.init("A").get
# key8 = Key.init("A/B").get
# key9 = Key.init("A/B:C").get
# key10 = Key.init("A:B").get
# key11 = Key.init("A:B/C").get
# key12 = Key.init("A:B/C:D").get
setup:
ds = SQLiteDatastore.new(Memory).tryGet()
# bytes1 = @[1.byte, 2.byte, 3.byte]
# bytes2 = @[4.byte, 5.byte, 6.byte]
# bytes3: seq[byte] = @[]
# bytes4 = bytes1
# bytes5 = bytes2
# bytes6 = bytes3
# bytes7 = bytes1
# bytes8 = bytes2
# bytes9 = bytes3
# bytes10 = bytes1
# bytes11 = bytes2
# bytes12 = bytes3
test "Key should query all key and all it's children":
let
key1 = Key.init("/a").tryGet
key2 = Key.init("/a/b").tryGet
key3 = Key.init("/a/b/c").tryGet
val1 = "value for 1".toBytes
val2 = "value for 2".toBytes
val3 = "value for 3".toBytes
# queryKey = Key.init("*").get
q = Query.init(key1)
# var
# putRes = await ds.put(key1, bytes1)
(await ds.put(key1, val1)).tryGet
(await ds.put(key2, val2)).tryGet
(await ds.put(key3, val3)).tryGet
# assert putRes.isOk
# putRes = await ds.put(key2, bytes2)
# assert putRes.isOk
# putRes = await ds.put(key3, bytes3)
# assert putRes.isOk
# putRes = await ds.put(key4, bytes4)
# assert putRes.isOk
# putRes = await ds.put(key5, bytes5)
# assert putRes.isOk
# putRes = await ds.put(key6, bytes6)
# assert putRes.isOk
# putRes = await ds.put(key7, bytes7)
# assert putRes.isOk
# putRes = await ds.put(key8, bytes8)
# assert putRes.isOk
# putRes = await ds.put(key9, bytes9)
# assert putRes.isOk
# putRes = await ds.put(key10, bytes10)
# assert putRes.isOk
# putRes = await ds.put(key11, bytes11)
# assert putRes.isOk
# putRes = await ds.put(key12, bytes12)
# assert putRes.isOk
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
# var
# kds: seq[QueryResponse]
check:
res.len == 4
res[0].read.tryGet.key.get == key1
res[0].read.tryGet.data == val1
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
res[1].read.tryGet.key.get == key2
res[1].read.tryGet.data == val2
# kds.add (key, data)
res[2].read.tryGet.key.get == key3
res[2].read.tryGet.data == val3
# # see https://sqlite.org/lang_select.html#the_order_by_clause
# # If a SELECT statement that returns more than one row does not have an
# # ORDER BY clause, the order in which the rows are returned is undefined.
(await iter.dispose()).tryGet
# check: kds.sortedByIt(it.key.id) == @[
# (key: key1, data: bytes1),
# (key: key2, data: bytes2),
# (key: key3, data: bytes3),
# (key: key4, data: bytes4),
# (key: key5, data: bytes5),
# (key: key6, data: bytes6),
# (key: key7, data: bytes7),
# (key: key8, data: bytes8),
# (key: key9, data: bytes9),
# (key: key10, data: bytes10),
# (key: key11, data: bytes11),
# (key: key12, data: bytes12)
# ].sortedByIt(it.key.id)
test "Key should not query parent":
let
key1 = Key.init("/a").tryGet
key2 = Key.init("/a/b").tryGet
key3 = Key.init("/a/b/c").tryGet
val1 = "value for 1".toBytes
val2 = "value for 2".toBytes
val3 = "value for 3".toBytes
# kds = @[]
q = Query.init(key2)
# queryKey = Key.init("a*").get
(await ds.put(key1, val1)).tryGet
(await ds.put(key2, val2)).tryGet
(await ds.put(key3, val3)).tryGet
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
# kds.add (key, data)
check:
res.len == 3
res[0].read.tryGet.key.get == key2
res[0].read.tryGet.data == val2
# check: kds.sortedByIt(it.key.id) == @[
# (key: key1, data: bytes1),
# (key: key2, data: bytes2),
# (key: key3, data: bytes3),
# (key: key4, data: bytes4),
# (key: key5, data: bytes5),
# (key: key6, data: bytes6)
# ].sortedByIt(it.key.id)
res[1].read.tryGet.key.get == key3
res[1].read.tryGet.data == val3
# kds = @[]
(await iter.dispose()).tryGet
# queryKey = Key.init("A*").get
test "Should apply limit":
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
let
key = Key.init("/a").tryGet
q = Query.init(key, limit = 10)
# kds.add (key, data)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
# check: kds.sortedByIt(it.key.id) == @[
# (key: key7, data: bytes7),
# (key: key8, data: bytes8),
# (key: key9, data: bytes9),
# (key: key10, data: bytes10),
# (key: key11, data: bytes11),
# (key: key12, data: bytes12)
# ].sortedByIt(it.key.id)
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
# kds = @[]
check:
res.len == 11
# queryKey = Key.init("a/?").get
(await iter.dispose()).tryGet
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
test "Should not apply offset":
let
key = Key.init("/a").tryGet
q = Query.init(key, offset = 90)
# kds.add (key, data)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
# check: kds.sortedByIt(it.key.id) == @[
# (key: key2, data: bytes2)
# ].sortedByIt(it.key.id)
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
# kds = @[]
check:
res.len == 11
# queryKey = Key.init("A/?").get
(await iter.dispose()).tryGet
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
test "Should not apply offset and limit":
let
key = Key.init("/a").tryGet
q = Query.init(key, offset = 95, limit = 5)
# kds.add (key, data)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
# check: kds.sortedByIt(it.key.id) == @[
# (key: key8, data: bytes8)
# ].sortedByIt(it.key.id)
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
# kds = @[]
check:
res.len == 6
# queryKey = Key.init("*/?").get
for i in 0..<res.high:
let
val = ("val " & $(i + 95)).toBytes
key = Key.init(key, Key.init("/" & $(i + 95)).tryGet).tryGet
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
check:
res[i].read.tryGet.key.get == key
res[i].read.tryGet.data == val
# kds.add (key, data)
(await iter.dispose()).tryGet
# check: kds.sortedByIt(it.key.id) == @[
# (key: key2, data: bytes2),
# (key: key5, data: bytes5),
# (key: key8, data: bytes8),
# (key: key11, data: bytes11)
# ].sortedByIt(it.key.id)
test "Should apply sort order - descending":
let
key = Key.init("/a").tryGet
q = Query.init(key, sort = SortOrder.Descending)
# kds = @[]
var kvs: seq[QueryResponse]
for i in 0..<100:
let
k = Key.init(key, Key.init("/" & $i).tryGet).tryGet
val = ("val " & $i).toBytes
# queryKey = Key.init("[Aa]/?").get
kvs.add((k.some, val))
(await ds.put(k, val)).tryGet
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
kvs.sort do (a, b: QueryResponse) -> int:
cmp(a.key.get.id, b.key.get.id)
# kds.add (key, data)
kvs = kvs.reversed
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
# check: kds.sortedByIt(it.key.id) == @[
# (key: key2, data: bytes2),
# (key: key8, data: bytes8)
# ].sortedByIt(it.key.id)
check:
res.len == 101
# kds = @[]
for i, r in res[1..^1]:
check:
res[i].read.tryGet.key.get == kvs[i].key.get
res[i].read.tryGet.data == kvs[i].data
# # SQLite's GLOB operator, akin to Unix file globbing syntax, is greedy re:
# # wildcard "*". So a pattern such as "a:*[^/]" will not restrict results to
# # "/a:b", i.e. it will match on "/a:b", "/a:b/c" and "/a:b/c:d".
# queryKey = Key.init("a:*[^/]").get
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
# kds.add (key, data)
# check: kds.sortedByIt(it.key.id) == @[
# (key: key4, data: bytes4),
# (key: key5, data: bytes5),
# (key: key6, data: bytes6)
# ].sortedByIt(it.key.id)
# kds = @[]
# queryKey = Key.init("a:*[Bb]").get
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
# kds.add (key, data)
# check: kds.sortedByIt(it.key.id) == @[
# (key: key4, data: bytes4)
# ].sortedByIt(it.key.id)
# kds = @[]
# var
# deleteRes = await ds.delete(key1)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key2)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key3)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key4)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key5)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key6)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key7)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key8)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key9)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key10)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key11)
# assert deleteRes.isOk
# deleteRes = await ds.delete(key12)
# assert deleteRes.isOk
# let
# emptyKds: seq[QueryResponse] = @[]
# for kd in ds.query(Query.init(queryKey)):
# let
# (key, data) = await kd
# kds.add (key, data)
# check: kds == emptyKds
(await iter.dispose()).tryGet

View File

@ -4,7 +4,7 @@ import pkg/asynctest/unittest2
import pkg/chronos
import pkg/stew/results
import ../../datastore
import pkg/datastore
suite "Datastore (base)":
let
@ -25,5 +25,5 @@ suite "Datastore (base)":
test "query":
expect Defect:
let iter = await ds.query(Query.init(key))
let iter = (await ds.query(Query.init(key))).tryGet
for n in iter: discard