This commit is contained in:
Jaremy Creechley 2023-09-28 17:14:27 -07:00
parent 374e5a891f
commit 4002ba9507
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
14 changed files with 367 additions and 268 deletions

View File

@ -7,267 +7,93 @@ import pkg/questionable/results
from pkg/stew/results as stewResults import get, isErr
import pkg/upraises
import ./backend
import ./threads/backend
import ./datastore
export datastore
push: {.upraises: [].}
type
FSDatastore*[K, V] = object
root*: DataBuffer
ignoreProtected: bool
depth: int
SQLiteDatastore* = ref object of Datastore
db: SQLiteBackend[KeyId, DataBuffer]
proc isRootSubdir*(root, path: string): bool =
path.startsWith(root)
proc path*(self: SQLiteDatastore): string =
self.db.path()
proc validDepth*(self: FSDatastore, key: Key): bool =
key.len <= self.depth
proc readOnly*(self: SQLiteDatastore): bool =
self.db.readOnly()
proc findPath*[K,V](self: FSDatastore[K,V], key: K): ?!string =
## Return filename corresponding to the key
## or failure if the key doesn't correspond to a valid filename
##
let root = $self.root
let key = Key.init($key).get()
if not self.validDepth(key):
return failure "Path has invalid depth!"
method has*(self: SQLiteDatastore,
key: Key): Future[?!bool] {.async.} =
return self.db.has(KeyId.new key.id())
var
segments: seq[string]
method delete*(self: SQLiteDatastore,
key: Key): Future[?!void] {.async.} =
return self.db.delete(KeyId.new key.id())
for ns in key:
let basename = ns.value.extractFilename
if basename == "" or not basename.isValidFilename:
return failure "Filename contains invalid chars!"
method delete*(self: SQLiteDatastore,
keys: seq[Key]): Future[?!void] {.async.} =
let dkeys = keys.mapIt(KeyId.new it.id())
return self.db.delete(dkeys)
if ns.field == "":
segments.add(ns.value)
else:
let basename = ns.field.extractFilename
if basename == "" or not basename.isValidFilename:
return failure "Filename contains invalid chars!"
method get*(self: SQLiteDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]:
d.toSeq()
# `:` are replaced with `/`
segments.add(ns.field / ns.value)
let
fullname = (root / segments.joinPath())
.absolutePath()
.catch()
.get()
.addFileExt(FileExt)
if not root.isRootSubdir(fullname):
return failure "Path is outside of `root` directory!"
return success fullname
proc has*[K,V](self: FSDatastore[K,V], key: K): ?!bool =
without path =? self.findPath(key), error:
return failure error
success path.fileExists()
proc contains*[K](self: FSDatastore, key: K): bool =
return self.has(key).get()
proc delete*[K,V](self: FSDatastore[K,V], key: K): ?!void =
without path =? self.findPath(key), error:
return failure error
if not path.fileExists():
return success()
try:
removeFile(path)
except OSError as e:
return failure e
return success()
proc delete*[K,V](self: FSDatastore[K,V], keys: openArray[K]): ?!void =
for key in keys:
if err =? self.delete(key).errorOption:
return failure err
return success()
proc readFile[V](self: FSDatastore, path: string): ?!V =
var
file: File
defer:
file.close
if not file.open(path):
return failure "unable to open file! path: " & path
try:
let
size = file.getFileSize().int
when V is seq[byte]:
var bytes = newSeq[byte](size)
elif V is V:
var bytes = V.new(size=size)
else:
{.error: "unhandled result type".}
var
read = 0
# echo "BYTES: ", bytes.repr
while read < size:
read += file.readBytes(bytes.toOpenArray(0, size-1), read, size)
if read < size:
return failure $read & " bytes were read from " & path &
" but " & $size & " bytes were expected"
return success bytes
except CatchableError as e:
return failure e
proc get*[K,V](self: FSDatastore[K,V], key: K): ?!V =
without path =? self.findPath(key), error:
return failure error
if not path.fileExists():
return failure(
newException(DatastoreKeyNotFound, "Key doesn't exist"))
return readFile[V](self, path)
proc put*[K,V](self: FSDatastore[K,V],
key: K,
data: V
): ?!void =
without path =? self.findPath(key), error:
return failure error
try:
var data = data
createDir(parentDir(path))
writeFile(path, data.toOpenArray(0, data.len()-1))
except CatchableError as e:
return failure e
return success()
proc put*[K,V](
self: FSDatastore,
batch: seq[DbBatchEntry[K, V]]): ?!void =
method put*(self: SQLiteDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
self.db.put(KeyId.new key.id(), DataBuffer.new data)
method put*(self: SQLiteDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
var dbatch: seq[tuple[key: KeyId, data: DataBuffer]]
for entry in batch:
if err =? self.put(entry.key, entry.data).errorOption:
return failure err
dbatch.add((KeyId.new entry.key.id(), DataBuffer.new entry.data))
self.db.put(dbatch)
return success()
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
self.db.close()
iterator dirIter(path: string): string {.gcsafe.} =
try:
for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true):
yield p
except CatchableError as exc:
raise newException(Defect, exc.msg)
method queryIter*(self: SQLiteDatastore,
query: Query
): ?!(iterator(): ?!QueryResponse) =
proc close*[K,V](self: FSDatastore[K,V]): ?!void =
return success()
let dbquery = dbQuery(
key= KeyId.new query.key.id(),
value= query.value,
limit= query.limit,
offset= query.offset,
sort= query.sort,
)
var qhandle = ? self.db.query(dbquery)
type
FsQueryHandle*[K, V] = object
query*: DbQuery[K]
cancel*: bool
closed*: bool
env*: FsQueryEnv[K,V]
FsQueryEnv*[K,V] = object
self: FSDatastore[K,V]
basePath: DataBuffer
proc query*[K,V](
self: FSDatastore[K,V],
query: DbQuery[K],
): Result[FsQueryHandle[K, V], ref CatchableError] =
let key = query.key
without path =? self.findPath(key), error:
return failure error
let basePath =
# it there is a file in the directory
# with the same name then list the contents
# of the directory, otherwise recurse
# into subdirectories
if path.fileExists:
path.parentDir
else:
path.changeFileExt("")
let iter = iterator(): ?!QueryResponse =
for resp in qhandle.queryIter():
without qres =? resp, err:
yield QueryResponse.failure err
let k = qres.key.map() do(k: KeyId) -> Key:
Key.init($k).expect("valid key")
let v: seq[byte] = qres.data.toSeq()
yield success (k, v)
let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath))
success FsQueryHandle[K, V](query: query, env: env)
success iter
proc close*[K,V](handle: var FsQueryHandle[K,V]) =
if not handle.closed:
handle.closed = true
proc new*(
T: type SQLiteDatastore,
path: string,
readOnly = false): ?!SQLiteDatastore =
iterator queryIter*[K, V](
handle: var FsQueryHandle[K, V]
): ?!DbQueryResponse[K, V] =
let root = $(handle.env.self.root)
let basePath = $(handle.env.basePath)
success SQLiteDatastore(
db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly))
for path in basePath.dirIter():
if handle.cancel:
break
proc new*(
T: type SQLiteDatastore,
db: SQLiteBackend[KeyId, DataBuffer]): ?!T =
var
basePath = $handle.env.basePath
keyPath = basePath
keyPath.removePrefix(root)
keyPath = keyPath / path.changeFileExt("")
keyPath = keyPath.replace("\\", "/")
let
flres = (basePath / path).absolutePath().catch
if flres.isErr():
yield DbQueryResponse[K,V].failure flres.error()
continue
let
key = K.toKey($Key.init(keyPath).expect("valid key"))
data =
if handle.query.value:
let res = readFile[V](handle.env.self, flres.get)
if res.isErr():
yield DbQueryResponse[K,V].failure res.error()
continue
res.get()
else:
V.new()
yield success (key.some, data)
handle.close()
proc newFSDatastore*[K,V](root: string,
depth = 2,
caseSensitive = true,
ignoreProtected = false
): ?!FSDatastore[K,V] =
let root = ? (
block:
if root.isAbsolute: root
else: getCurrentDir() / root).catch
if not dirExists(root):
return failure "directory does not exist: " & root
success FSDatastore[K,V](
root: DataBuffer.new root,
ignoreProtected: ignoreProtected,
depth: depth)
success T(
db: db,
readOnly: db.readOnly)

View File

@ -6,7 +6,7 @@ import pkg/questionable/results
import ./key
import ./types
import ./backend
import ./threads/backend
export types
export options, SortOrder

View File

@ -10,10 +10,10 @@ import pkg/upraises
import std/sequtils
import ../datastore
import ./backend
import ./sql/sqliteds
import ./threads/backend
import ./threads/sqlbackend
export datastore, sqliteds
export datastore, sqlbackend
push: {.upraises: [].}

View File

@ -4,7 +4,7 @@ import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ../backend
import ../threads/backend
import ./sqliteutils
export sqliteutils

View File

@ -3,7 +3,7 @@ import pkg/questionable/results
import pkg/sqlite3_abi
import pkg/upraises
import ../backend
import ../threads/backend
export sqlite3_abi

View File

@ -3,8 +3,8 @@ import std/options
import pkg/questionable/results
import ./threads/databuffer
import ./types
import ./databuffer
import ../types
export databuffer, types, SortOrder

View File

@ -0,0 +1,273 @@
import std/os
import std/options
import std/strutils
import pkg/questionable
import pkg/questionable/results
from pkg/stew/results as stewResults import get, isErr
import pkg/upraises
import ./backend
import ./datastore
export datastore
push: {.upraises: [].}
type
FSDatastore*[K, V] = object
root*: DataBuffer
ignoreProtected: bool
depth: int
proc isRootSubdir*(root, path: string): bool =
path.startsWith(root)
proc validDepth*(self: FSDatastore, key: Key): bool =
key.len <= self.depth
proc findPath*[K,V](self: FSDatastore[K,V], key: K): ?!string =
## Return filename corresponding to the key
## or failure if the key doesn't correspond to a valid filename
##
let root = $self.root
let key = Key.init($key).get()
if not self.validDepth(key):
return failure "Path has invalid depth!"
var
segments: seq[string]
for ns in key:
let basename = ns.value.extractFilename
if basename == "" or not basename.isValidFilename:
return failure "Filename contains invalid chars!"
if ns.field == "":
segments.add(ns.value)
else:
let basename = ns.field.extractFilename
if basename == "" or not basename.isValidFilename:
return failure "Filename contains invalid chars!"
# `:` are replaced with `/`
segments.add(ns.field / ns.value)
let
fullname = (root / segments.joinPath())
.absolutePath()
.catch()
.get()
.addFileExt(FileExt)
if not root.isRootSubdir(fullname):
return failure "Path is outside of `root` directory!"
return success fullname
proc has*[K,V](self: FSDatastore[K,V], key: K): ?!bool =
without path =? self.findPath(key), error:
return failure error
success path.fileExists()
proc contains*[K](self: FSDatastore, key: K): bool =
return self.has(key).get()
proc delete*[K,V](self: FSDatastore[K,V], key: K): ?!void =
without path =? self.findPath(key), error:
return failure error
if not path.fileExists():
return success()
try:
removeFile(path)
except OSError as e:
return failure e
return success()
proc delete*[K,V](self: FSDatastore[K,V], keys: openArray[K]): ?!void =
for key in keys:
if err =? self.delete(key).errorOption:
return failure err
return success()
proc readFile[V](self: FSDatastore, path: string): ?!V =
var
file: File
defer:
file.close
if not file.open(path):
return failure "unable to open file! path: " & path
try:
let
size = file.getFileSize().int
when V is seq[byte]:
var bytes = newSeq[byte](size)
elif V is V:
var bytes = V.new(size=size)
else:
{.error: "unhandled result type".}
var
read = 0
# echo "BYTES: ", bytes.repr
while read < size:
read += file.readBytes(bytes.toOpenArray(0, size-1), read, size)
if read < size:
return failure $read & " bytes were read from " & path &
" but " & $size & " bytes were expected"
return success bytes
except CatchableError as e:
return failure e
proc get*[K,V](self: FSDatastore[K,V], key: K): ?!V =
without path =? self.findPath(key), error:
return failure error
if not path.fileExists():
return failure(
newException(DatastoreKeyNotFound, "Key doesn't exist"))
return readFile[V](self, path)
proc put*[K,V](self: FSDatastore[K,V],
key: K,
data: V
): ?!void =
without path =? self.findPath(key), error:
return failure error
try:
var data = data
createDir(parentDir(path))
writeFile(path, data.toOpenArray(0, data.len()-1))
except CatchableError as e:
return failure e
return success()
proc put*[K,V](
self: FSDatastore,
batch: seq[DbBatchEntry[K, V]]): ?!void =
for entry in batch:
if err =? self.put(entry.key, entry.data).errorOption:
return failure err
return success()
iterator dirIter(path: string): string {.gcsafe.} =
try:
for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true):
yield p
except CatchableError as exc:
raise newException(Defect, exc.msg)
proc close*[K,V](self: FSDatastore[K,V]): ?!void =
return success()
type
FsQueryHandle*[K, V] = object
query*: DbQuery[K]
cancel*: bool
closed*: bool
env*: FsQueryEnv[K,V]
FsQueryEnv*[K,V] = object
self: FSDatastore[K,V]
basePath: DataBuffer
proc query*[K,V](
self: FSDatastore[K,V],
query: DbQuery[K],
): Result[FsQueryHandle[K, V], ref CatchableError] =
let key = query.key
without path =? self.findPath(key), error:
return failure error
let basePath =
# it there is a file in the directory
# with the same name then list the contents
# of the directory, otherwise recurse
# into subdirectories
if path.fileExists:
path.parentDir
else:
path.changeFileExt("")
let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath))
success FsQueryHandle[K, V](query: query, env: env)
proc close*[K,V](handle: var FsQueryHandle[K,V]) =
if not handle.closed:
handle.closed = true
iterator queryIter*[K, V](
handle: var FsQueryHandle[K, V]
): ?!DbQueryResponse[K, V] =
let root = $(handle.env.self.root)
let basePath = $(handle.env.basePath)
for path in basePath.dirIter():
if handle.cancel:
break
var
basePath = $handle.env.basePath
keyPath = basePath
keyPath.removePrefix(root)
keyPath = keyPath / path.changeFileExt("")
keyPath = keyPath.replace("\\", "/")
let
flres = (basePath / path).absolutePath().catch
if flres.isErr():
yield DbQueryResponse[K,V].failure flres.error()
continue
let
key = K.toKey($Key.init(keyPath).expect("valid key"))
data =
if handle.query.value:
let res = readFile[V](handle.env.self, flres.get)
if res.isErr():
yield DbQueryResponse[K,V].failure res.error()
continue
res.get()
else:
V.new()
yield success (key.some, data)
handle.close()
proc newFSDatastore*[K,V](root: string,
depth = 2,
caseSensitive = true,
ignoreProtected = false
): ?!FSDatastore[K,V] =
let root = ? (
block:
if root.isAbsolute: root
else: getCurrentDir() / root).catch
if not dirExists(root):
return failure "directory does not exist: " & root
success FSDatastore[K,V](
root: DataBuffer.new root,
ignoreProtected: ignoreProtected,
depth: depth)

View File

@ -7,8 +7,8 @@ import pkg/sqlite3_abi
from pkg/stew/results as stewResults import isErr
import pkg/upraises
import ../backend
import ./sqlitedsdb
import ./backend
import ../sql/sqlitedsdb
export backend, sqlitedsdb

View File

@ -22,9 +22,9 @@ import pkg/threading/smartptrs
import ../key
import ../query
import ../datastore
import ../backend
import ../fsds
import ../sql/sqliteds
import ./backend
import ./fsbackend
import ./sqlbackend
import ./asyncsemaphore
import ./databuffer
@ -140,7 +140,7 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
executeTask(ctx):
has(ds, key)
method has*[BT](self: ThreadDatastore[BT],
proc has*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!bool] {.async.} =
await self.semaphore.acquire()
let signal = acquireSignal().get()
@ -159,7 +159,7 @@ proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
executeTask(ctx):
delete(ds, key)
method delete*[BT](self: ThreadDatastore[BT],
proc delete*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!void] {.async.} =
## delete key
await self.semaphore.acquire()
@ -174,7 +174,7 @@ method delete*[BT](self: ThreadDatastore[BT],
return ctx[].res.toRes()
method delete*[BT](self: ThreadDatastore[BT],
proc delete*[BT](self: ThreadDatastore[BT],
keys: seq[Key]): Future[?!void] {.async.} =
## delete batch
for key in keys:
@ -190,7 +190,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB;
executeTask(ctx):
put(ds, key, data)
method put*[BT](self: ThreadDatastore[BT],
proc put*[BT](self: ThreadDatastore[BT],
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
## put key with data
@ -206,8 +206,8 @@ method put*[BT](self: ThreadDatastore[BT],
self.tp.spawn putTask(ctx, ds, key, data)
return ctx[].res.toRes()
method put*[DB](
proc put*[DB](
self: ThreadDatastore[DB],
batch: seq[BatchEntry]): Future[?!void] {.async.} =
## put batch data
@ -225,7 +225,7 @@ proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
let res = get(ds, key)
res
method get*[BT](self: ThreadDatastore[BT],
proc get*[BT](self: ThreadDatastore[BT],
key: Key,
): Future[?!seq[byte]] {.async.} =
await self.semaphore.acquire()
@ -240,7 +240,7 @@ method get*[BT](self: ThreadDatastore[BT],
return ctx[].res.toRes(v => v.toSeq())
method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} =
proc close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} =
await self.semaphore.closeAll()
self.backend.close()
@ -291,7 +291,7 @@ proc queryTask[DB](
# set final result
(?!QResult).ok((KeyId.none, DataBuffer()))
method query*[BT](self: ThreadDatastore[BT],
proc query*[BT](self: ThreadDatastore[BT],
q: Query
): Future[?!QueryIter] {.async.} =
## performs async query

View File

@ -9,7 +9,7 @@ import ../types
import ../query
import ../key
import ../backend
import ./backend
import ./databuffer
type

View File

@ -8,7 +8,7 @@ import pkg/chronos
import pkg/stew/results
import pkg/stew/byteutils
import pkg/datastore/sql/sqliteds
import pkg/datastore/threads/sqlbackend
import pkg/datastore/key
import ../backendCommonTests

View File

@ -7,7 +7,7 @@ import pkg/stew/byteutils
import pkg/sqlite3_abi
import pkg/datastore/key
import pkg/datastore/sql/sqlitedsdb
import pkg/datastore/sql/sqliteds
import pkg/datastore/threads/sqlbackend
suite "Test Open SQLite Datastore DB":
let

View File

@ -10,7 +10,7 @@ import pkg/stew/byteutils
import pkg/datastore/fsds
import pkg/datastore/key
import pkg/datastore/backend
import pkg/datastore/threads/backend
import ./backendCommonTests

View File

@ -16,9 +16,9 @@ import pkg/chronicles
import pkg/threading/smartptrs
import pkg/threading/atomics
import pkg/datastore/fsds
import pkg/datastore/sql/sqliteds
import pkg/datastore/threads/threadproxyds {.all.}
import pkg/datastore/threads/fsbackend
import pkg/datastore/threads/sqlbackend
import pkg/datastore/threads/threadproxyds
import ./dscommontests
import ./querycommontests