Fsds query (#32)

* add basic query capabilities to fsds

* rename common tests

* make query tests common

* sanitize key

* check it's possible to query keys at same level
This commit is contained in:
Dmitriy Ryajov 2022-09-20 20:18:33 -04:00 committed by GitHub
parent 02167bb69e
commit 446de6f978
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 343 additions and 191 deletions

View File

@ -1,5 +1,6 @@
import std/os
import std/options
import std/strutils
import pkg/chronos
import pkg/questionable
@ -31,7 +32,7 @@ template path*(self: FSDatastore, key: Key): string =
# `:` are replaced with `/`
segments.add(ns.field / ns.value)
self.root / segments.joinPath()
(self.root / segments.joinPath()).absolutePath()
template validDepth*(self: FSDatastore, key: Key): bool =
key.len <= self.depth
@ -42,7 +43,7 @@ method contains*(self: FSDatastore, key: Key): Future[?!bool] {.async.} =
return failure "Path has invalid depth!"
let
path = self.path(key)
path = self.path(key).addFileExt(FileExt)
return success fileExists(path)
@ -52,7 +53,10 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
return failure "Path has invalid depth!"
let
path = self.path(key)
path = self.path(key).addFileExt(FileExt)
if not path.fileExists():
return failure newException(DatastoreKeyNotFound, "Key not found!")
try:
removeFile(path)
@ -61,24 +65,7 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
except OSError as e:
return failure e
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
# to support finer control of memory allocation, maybe could/should change
# the signature of `get` so that it has a 3rd parameter
# `bytes: var openArray[byte]` and return type `?!bool`; this variant with
# return type `?!(?seq[byte])` would be a special case (convenience method)
# calling the former after allocating a seq with size automatically
# determined via `getFileSize`
if not self.validDepth(key):
return failure "Path has invalid depth!"
let
path = self.path(key)
if not fileExists(path):
return failure(newException(DatastoreKeyNotFound, "Key doesn't exist"))
proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
var
file: File
@ -86,7 +73,7 @@ method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
file.close
if not file.open(path):
return failure "unable to open file: " & path
return failure newException(DatastoreKeyNotFound, "Key not found!")
try:
let
@ -108,6 +95,26 @@ method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
except CatchableError as e:
return failure e
method get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
# to support finer control of memory allocation, maybe could/should change
# the signature of `get` so that it has a 3rd parameter
# `bytes: var openArray[byte]` and return type `?!bool`; this variant with
# return type `?!(?seq[byte])` would be a special case (convenience method)
# calling the former after allocating a seq with size automatically
# determined via `getFileSize`
if not self.validDepth(key):
return failure "Path has invalid depth!"
let
path = self.path(key).addFileExt(FileExt)
if not path.fileExists():
return failure(newException(DatastoreKeyNotFound, "Key doesn't exist"))
return self.readFile(path)
method put*(
self: FSDatastore,
key: Key,
@ -121,17 +128,55 @@ method put*(
try:
createDir(parentDir(path))
writeFile(path, data)
writeFile(path.addFileExt(FileExt), data)
except CatchableError as e:
return failure e
return success()
# method query*(
# self: FSDatastore,
# query: ...): Future[?!(?...)] {.async, locks: "unknown".} =
#
# return success ....some
proc dirWalker(path: string): iterator: string {.gcsafe.} =
return iterator(): string =
try:
for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true):
yield p
except CatchableError as exc:
raise newException(Defect, exc.msg)
method query*(
self: FSDatastore,
query: Query): Future[?!QueryIter] {.async.} =
var
iter = QueryIter.new()
let
basePath = self.path(query.key).parentDir
walker = dirWalker(basePath)
proc next(): Future[?!QueryResponse] {.async.} =
let
path = walker()
if finished(walker):
iter.finished = true
return success (Key.none, EmptyBytes)
without data =? self.readFile((basePath / path).absolutePath), err:
return failure err
var
keyPath = basePath
keyPath.removePrefix(self.root)
keyPath = keyPath / path.changeFileExt("")
keyPath = keyPath.replace("\\", "/")
let
key = Key.init(keyPath).expect("should not fail")
return success (key.some, data)
iter.next = next
return success iter
proc new*(
T: type FSDatastore,

View File

@ -32,6 +32,12 @@ iterator items*(q: QueryIter): Future[?!QueryResponse] =
while not q.finished:
yield q.next()
proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} =
return success()
proc new*(T: type QueryIter): T =
QueryIter(dispose: defaultDispose)
proc init*(
T: type Query,
key: Key,

View File

@ -14,9 +14,6 @@ export datastore, sqlitedsdb
push: {.upraises: [].}
const
EmptyBytes = newSeq[byte](0)
type
SQLiteDatastore* = ref object of Datastore
readOnly: bool
@ -71,6 +68,7 @@ method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
self.db.close()
return success()
method query*(

View File

@ -1,3 +1,8 @@
const
FileExt* = "dsobj"
EmptyBytes* = newSeq[byte](0)
type
DatastoreError* = object of CatchableError
DatastoreKeyNotFound* = object of DatastoreError

View File

@ -6,11 +6,11 @@ import pkg/stew/results
import pkg/datastore
proc basicStoreTests*(
template basicStoreTests*(
ds: Datastore,
key: Key,
bytes: seq[byte],
otherBytes: seq[byte]) =
otherBytes: seq[byte]) {.dirty.} =
test "put":
(await ds.put(key, bytes)).tryGet()

View File

@ -0,0 +1,219 @@
import std/options
import std/sequtils
from std/algorithm import sort, reversed
import pkg/asynctest
import pkg/chronos
import pkg/stew/results
import pkg/stew/byteutils
import pkg/datastore
template queryTests*(ds: Datastore, extended = true) {.dirty.} =
test "Key should query all key and all it's children":
let
key1 = Key.init("/a").tryGet
key2 = Key.init("/a/b").tryGet
key3 = Key.init("/a/b/c").tryGet
val1 = "value for 1".toBytes
val2 = "value for 2".toBytes
val3 = "value for 3".toBytes
q = Query.init(key1)
(await ds.put(key1, val1)).tryGet
(await ds.put(key2, val2)).tryGet
(await ds.put(key3, val3)).tryGet
let
iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check:
res.len == 3
res[0].key.get == key1
res[0].data == val1
res[1].key.get == key2
res[1].data == val2
res[2].key.get == key3
res[2].data == val3
(await iter.dispose()).tryGet
test "Key should not query parent":
let
key1 = Key.init("/a").tryGet
key2 = Key.init("/a/b").tryGet
key3 = Key.init("/a/b/c").tryGet
val1 = "value for 1".toBytes
val2 = "value for 2".toBytes
val3 = "value for 3".toBytes
q = Query.init(key2)
(await ds.put(key1, val1)).tryGet
(await ds.put(key2, val2)).tryGet
(await ds.put(key3, val3)).tryGet
let
iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check:
res.len == 2
res[0].key.get == key2
res[0].data == val2
res[1].key.get == key3
res[1].data == val3
(await iter.dispose()).tryGet
test "Key should all list all keys at the same level":
let
queryKey = Key.init("/a").tryGet
key1 = Key.init("/a/1").tryGet
key2 = Key.init("/a/2").tryGet
key3 = Key.init("/a/3").tryGet
val1 = "value for 1".toBytes
val2 = "value for 2".toBytes
val3 = "value for 3".toBytes
q = Query.init(queryKey)
(await ds.put(key1, val1)).tryGet
(await ds.put(key2, val2)).tryGet
(await ds.put(key3, val3)).tryGet
let
iter = (await ds.query(q)).tryGet
var
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
res.sort do (a, b: QueryResponse) -> int:
cmp(a.key.get.id, b.key.get.id)
check:
res.len == 3
res[0].key.get == key1
res[0].data == val1
res[1].key.get == key2
res[1].data == val2
res[2].key.get == key3
res[2].data == val3
(await iter.dispose()).tryGet
if extended:
test "Should apply limit":
let
key = Key.init("/a").tryGet
q = Query.init(key, limit = 10)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
let
iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check:
res.len == 10
(await iter.dispose()).tryGet
test "Should not apply offset":
let
key = Key.init("/a").tryGet
q = Query.init(key, offset = 90)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
let
iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check:
res.len == 10
(await iter.dispose()).tryGet
test "Should not apply offset and limit":
let
key = Key.init("/a").tryGet
q = Query.init(key, offset = 95, limit = 5)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
let
iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check:
res.len == 5
for i in 0..<res.high:
let
val = ("val " & $(i + 95)).toBytes
key = Key.init(key, Key.init("/" & $(i + 95)).tryGet).tryGet
check:
res[i].key.get == key
res[i].data == val
(await iter.dispose()).tryGet
test "Should apply sort order - descending":
let
key = Key.init("/a").tryGet
q = Query.init(key, sort = SortOrder.Descending)
var kvs: seq[QueryResponse]
for i in 0..<100:
let
k = Key.init(key, Key.init("/" & $i).tryGet).tryGet
val = ("val " & $i).toBytes
kvs.add((k.some, val))
(await ds.put(k, val)).tryGet
# lexicographic sort, as it comes from the backend
kvs.sort do (a, b: QueryResponse) -> int:
cmp(a.key.get.id, b.key.get.id)
kvs = kvs.reversed
let
iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check:
res.len == 100
for i, r in res[1..^1]:
check:
res[i].key.get == kvs[i].key.get
res[i].data == kvs[i].data
(await iter.dispose()).tryGet

View File

@ -10,7 +10,8 @@ import pkg/stew/byteutils
import pkg/datastore/sql/sqliteds
import ../basictests
import ../dscommontests
import ../querycommontests
suite "Test Basic SQLiteDatastore":
let
@ -19,7 +20,7 @@ suite "Test Basic SQLiteDatastore":
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
teardown:
teardownAll:
(await ds.close()).tryGet()
basicStoreTests(ds, key, bytes, otherBytes)
@ -82,156 +83,7 @@ suite "Test Query":
setup:
ds = SQLiteDatastore.new(Memory).tryGet()
test "Key should query all key and all it's children":
let
key1 = Key.init("/a").tryGet
key2 = Key.init("/a/b").tryGet
key3 = Key.init("/a/b/c").tryGet
val1 = "value for 1".toBytes
val2 = "value for 2".toBytes
val3 = "value for 3".toBytes
teardown:
(await ds.close()).tryGet
q = Query.init(key1)
(await ds.put(key1, val1)).tryGet
(await ds.put(key2, val2)).tryGet
(await ds.put(key3, val3)).tryGet
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
check:
res.len == 4
res[0].read.tryGet.key.get == key1
res[0].read.tryGet.data == val1
res[1].read.tryGet.key.get == key2
res[1].read.tryGet.data == val2
res[2].read.tryGet.key.get == key3
res[2].read.tryGet.data == val3
(await iter.dispose()).tryGet
test "Key should not query parent":
let
key1 = Key.init("/a").tryGet
key2 = Key.init("/a/b").tryGet
key3 = Key.init("/a/b/c").tryGet
val1 = "value for 1".toBytes
val2 = "value for 2".toBytes
val3 = "value for 3".toBytes
q = Query.init(key2)
(await ds.put(key1, val1)).tryGet
(await ds.put(key2, val2)).tryGet
(await ds.put(key3, val3)).tryGet
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
check:
res.len == 3
res[0].read.tryGet.key.get == key2
res[0].read.tryGet.data == val2
res[1].read.tryGet.key.get == key3
res[1].read.tryGet.data == val3
(await iter.dispose()).tryGet
test "Should apply limit":
let
key = Key.init("/a").tryGet
q = Query.init(key, limit = 10)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
check:
res.len == 11
(await iter.dispose()).tryGet
test "Should not apply offset":
let
key = Key.init("/a").tryGet
q = Query.init(key, offset = 90)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
check:
res.len == 11
(await iter.dispose()).tryGet
test "Should not apply offset and limit":
let
key = Key.init("/a").tryGet
q = Query.init(key, offset = 95, limit = 5)
for i in 0..<100:
(await ds.put(Key.init(key, Key.init("/" & $i).tryGet).tryGet, ("val " & $i).toBytes)).tryGet
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
check:
res.len == 6
for i in 0..<res.high:
let
val = ("val " & $(i + 95)).toBytes
key = Key.init(key, Key.init("/" & $(i + 95)).tryGet).tryGet
check:
res[i].read.tryGet.key.get == key
res[i].read.tryGet.data == val
(await iter.dispose()).tryGet
test "Should apply sort order - descending":
let
key = Key.init("/a").tryGet
q = Query.init(key, sort = SortOrder.Descending)
var kvs: seq[QueryResponse]
for i in 0..<100:
let
k = Key.init(key, Key.init("/" & $i).tryGet).tryGet
val = ("val " & $i).toBytes
kvs.add((k.some, val))
(await ds.put(k, val)).tryGet
kvs.sort do (a, b: QueryResponse) -> int:
cmp(a.key.get.id, b.key.get.id)
kvs = kvs.reversed
let
iter = (await ds.query(q)).tryGet
res = await allFinished(toSeq(iter))
check:
res.len == 101
for i, r in res[1..^1]:
check:
res[i].read.tryGet.key.get == kvs[i].key.get
res[i].read.tryGet.data == kvs[i].data
(await iter.dispose()).tryGet
queryTests(ds)

View File

@ -1,5 +1,7 @@
import std/options
import std/sequtils
import std/os
from std/algorithm import sort, reversed
import pkg/asynctest/unittest2
import pkg/chronos
@ -8,7 +10,8 @@ import pkg/stew/byteutils
import pkg/datastore/fsds
import ./basictests
import ./dscommontests
import ./querycommontests
suite "Test Basic FSDatastore":
let
@ -42,12 +45,12 @@ suite "Test Misc FSDatastore":
basePathAbs = path.parentDir / basePath
bytes = "some bytes".toBytes
setupAll:
setup:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
createDir(basePathAbs)
teardownAll:
teardown:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
@ -82,3 +85,27 @@ suite "Test Misc FSDatastore":
(await fs.get(key)).isOk
(await fs.delete(key)).isOk
(await fs.contains(key)).isOk
suite "Test Query":
let
(path, _, _) = instantiationInfo(-1, fullPaths = true) # get this file's name
basePath = "tests_data"
basePathAbs = path.parentDir / basePath
bytes = "some bytes".toBytes
var
ds: FSDatastore
setup:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
createDir(basePathAbs)
ds = FSDatastore.new(root = basePathAbs, depth = 5).tryGet()
teardown:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
queryTests(ds, false)

View File

@ -10,7 +10,7 @@ import pkg/datastore/fsds
import pkg/datastore/sql
import pkg/datastore/tieredds
import ./basictests
import ./dscommontests
suite "Test Basic FSDatastore":
let