diff --git a/datastore/fsds.nim b/datastore/fsds.nim index e3a6464..178de6e 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -7,267 +7,93 @@ import pkg/questionable/results from pkg/stew/results as stewResults import get, isErr import pkg/upraises -import ./backend +import ./threads/backend import ./datastore export datastore push: {.upraises: [].} + type - FSDatastore*[K, V] = object - root*: DataBuffer - ignoreProtected: bool - depth: int + SQLiteDatastore* = ref object of Datastore + db: SQLiteBackend[KeyId, DataBuffer] -proc isRootSubdir*(root, path: string): bool = - path.startsWith(root) +proc path*(self: SQLiteDatastore): string = + self.db.path() -proc validDepth*(self: FSDatastore, key: Key): bool = - key.len <= self.depth +proc readOnly*(self: SQLiteDatastore): bool = + self.db.readOnly() -proc findPath*[K,V](self: FSDatastore[K,V], key: K): ?!string = - ## Return filename corresponding to the key - ## or failure if the key doesn't correspond to a valid filename - ## - let root = $self.root - let key = Key.init($key).get() - if not self.validDepth(key): - return failure "Path has invalid depth!" +method has*(self: SQLiteDatastore, + key: Key): Future[?!bool] {.async.} = + return self.db.has(KeyId.new key.id()) - var - segments: seq[string] +method delete*(self: SQLiteDatastore, + key: Key): Future[?!void] {.async.} = + return self.db.delete(KeyId.new key.id()) - for ns in key: - let basename = ns.value.extractFilename - if basename == "" or not basename.isValidFilename: - return failure "Filename contains invalid chars!" +method delete*(self: SQLiteDatastore, + keys: seq[Key]): Future[?!void] {.async.} = + let dkeys = keys.mapIt(KeyId.new it.id()) + return self.db.delete(dkeys) - if ns.field == "": - segments.add(ns.value) - else: - let basename = ns.field.extractFilename - if basename == "" or not basename.isValidFilename: - return failure "Filename contains invalid chars!" +method get*(self: SQLiteDatastore, + key: Key): Future[?!seq[byte]] {.async.} = + self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]: + d.toSeq() - # `:` are replaced with `/` - segments.add(ns.field / ns.value) - - let - fullname = (root / segments.joinPath()) - .absolutePath() - .catch() - .get() - .addFileExt(FileExt) - - if not root.isRootSubdir(fullname): - return failure "Path is outside of `root` directory!" - - return success fullname - -proc has*[K,V](self: FSDatastore[K,V], key: K): ?!bool = - without path =? self.findPath(key), error: - return failure error - success path.fileExists() - -proc contains*[K](self: FSDatastore, key: K): bool = - return self.has(key).get() - -proc delete*[K,V](self: FSDatastore[K,V], key: K): ?!void = - without path =? self.findPath(key), error: - return failure error - - if not path.fileExists(): - return success() - - try: - removeFile(path) - except OSError as e: - return failure e - - return success() - -proc delete*[K,V](self: FSDatastore[K,V], keys: openArray[K]): ?!void = - for key in keys: - if err =? self.delete(key).errorOption: - return failure err - - return success() - -proc readFile[V](self: FSDatastore, path: string): ?!V = - var - file: File - - defer: - file.close - - if not file.open(path): - return failure "unable to open file! path: " & path - - try: - let - size = file.getFileSize().int - - when V is seq[byte]: - var bytes = newSeq[byte](size) - elif V is V: - var bytes = V.new(size=size) - else: - {.error: "unhandled result type".} - var - read = 0 - - # echo "BYTES: ", bytes.repr - while read < size: - read += file.readBytes(bytes.toOpenArray(0, size-1), read, size) - - if read < size: - return failure $read & " bytes were read from " & path & - " but " & $size & " bytes were expected" - - return success bytes - - except CatchableError as e: - return failure e - -proc get*[K,V](self: FSDatastore[K,V], key: K): ?!V = - without path =? self.findPath(key), error: - return failure error - - if not path.fileExists(): - return failure( - newException(DatastoreKeyNotFound, "Key doesn't exist")) - - return readFile[V](self, path) - -proc put*[K,V](self: FSDatastore[K,V], - key: K, - data: V - ): ?!void = - - without path =? self.findPath(key), error: - return failure error - - try: - var data = data - createDir(parentDir(path)) - writeFile(path, data.toOpenArray(0, data.len()-1)) - except CatchableError as e: - return failure e - - return success() - -proc put*[K,V]( - self: FSDatastore, - batch: seq[DbBatchEntry[K, V]]): ?!void = +method put*(self: SQLiteDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + self.db.put(KeyId.new key.id(), DataBuffer.new data) +method put*(self: SQLiteDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = + var dbatch: seq[tuple[key: KeyId, data: DataBuffer]] for entry in batch: - if err =? self.put(entry.key, entry.data).errorOption: - return failure err + dbatch.add((KeyId.new entry.key.id(), DataBuffer.new entry.data)) + self.db.put(dbatch) - return success() +method close*(self: SQLiteDatastore): Future[?!void] {.async.} = + self.db.close() -iterator dirIter(path: string): string {.gcsafe.} = - try: - for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): - yield p - except CatchableError as exc: - raise newException(Defect, exc.msg) +method queryIter*(self: SQLiteDatastore, + query: Query + ): ?!(iterator(): ?!QueryResponse) = -proc close*[K,V](self: FSDatastore[K,V]): ?!void = - return success() + let dbquery = dbQuery( + key= KeyId.new query.key.id(), + value= query.value, + limit= query.limit, + offset= query.offset, + sort= query.sort, + ) + var qhandle = ? self.db.query(dbquery) -type - FsQueryHandle*[K, V] = object - query*: DbQuery[K] - cancel*: bool - closed*: bool - env*: FsQueryEnv[K,V] - - FsQueryEnv*[K,V] = object - self: FSDatastore[K,V] - basePath: DataBuffer - -proc query*[K,V]( - self: FSDatastore[K,V], - query: DbQuery[K], -): Result[FsQueryHandle[K, V], ref CatchableError] = - - let key = query.key - without path =? self.findPath(key), error: - return failure error - - let basePath = - # it there is a file in the directory - # with the same name then list the contents - # of the directory, otherwise recurse - # into subdirectories - if path.fileExists: - path.parentDir - else: - path.changeFileExt("") + let iter = iterator(): ?!QueryResponse = + for resp in qhandle.queryIter(): + without qres =? resp, err: + yield QueryResponse.failure err + let k = qres.key.map() do(k: KeyId) -> Key: + Key.init($k).expect("valid key") + let v: seq[byte] = qres.data.toSeq() + yield success (k, v) - let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath)) - success FsQueryHandle[K, V](query: query, env: env) + success iter -proc close*[K,V](handle: var FsQueryHandle[K,V]) = - if not handle.closed: - handle.closed = true +proc new*( + T: type SQLiteDatastore, + path: string, + readOnly = false): ?!SQLiteDatastore = -iterator queryIter*[K, V]( - handle: var FsQueryHandle[K, V] -): ?!DbQueryResponse[K, V] = - let root = $(handle.env.self.root) - let basePath = $(handle.env.basePath) + success SQLiteDatastore( + db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly)) - for path in basePath.dirIter(): - if handle.cancel: - break +proc new*( + T: type SQLiteDatastore, + db: SQLiteBackend[KeyId, DataBuffer]): ?!T = - var - basePath = $handle.env.basePath - keyPath = basePath - - keyPath.removePrefix(root) - keyPath = keyPath / path.changeFileExt("") - keyPath = keyPath.replace("\\", "/") - - let - flres = (basePath / path).absolutePath().catch - if flres.isErr(): - yield DbQueryResponse[K,V].failure flres.error() - continue - - let - key = K.toKey($Key.init(keyPath).expect("valid key")) - data = - if handle.query.value: - let res = readFile[V](handle.env.self, flres.get) - if res.isErr(): - yield DbQueryResponse[K,V].failure res.error() - continue - res.get() - else: - V.new() - - yield success (key.some, data) - handle.close() - -proc newFSDatastore*[K,V](root: string, - depth = 2, - caseSensitive = true, - ignoreProtected = false - ): ?!FSDatastore[K,V] = - - let root = ? ( - block: - if root.isAbsolute: root - else: getCurrentDir() / root).catch - - if not dirExists(root): - return failure "directory does not exist: " & root - - success FSDatastore[K,V]( - root: DataBuffer.new root, - ignoreProtected: ignoreProtected, - depth: depth) + success T( + db: db, + readOnly: db.readOnly) diff --git a/datastore/query.nim b/datastore/query.nim index 256ad97..f10376f 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -6,7 +6,7 @@ import pkg/questionable/results import ./key import ./types -import ./backend +import ./threads/backend export types export options, SortOrder diff --git a/datastore/sql.nim b/datastore/sql.nim index cfe67ac..712bd9a 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -10,10 +10,10 @@ import pkg/upraises import std/sequtils import ../datastore -import ./backend -import ./sql/sqliteds +import ./threads/backend +import ./threads/sqlbackend -export datastore, sqliteds +export datastore, sqlbackend push: {.upraises: [].} diff --git a/datastore/sql/sqlitedsdb.nim b/datastore/sql/sqlitedsdb.nim index ae381e1..06e1dcf 100644 --- a/datastore/sql/sqlitedsdb.nim +++ b/datastore/sql/sqlitedsdb.nim @@ -4,7 +4,7 @@ import pkg/questionable import pkg/questionable/results import pkg/upraises -import ../backend +import ../threads/backend import ./sqliteutils export sqliteutils diff --git a/datastore/sql/sqliteutils.nim b/datastore/sql/sqliteutils.nim index 884527b..ea9bbcb 100644 --- a/datastore/sql/sqliteutils.nim +++ b/datastore/sql/sqliteutils.nim @@ -3,7 +3,7 @@ import pkg/questionable/results import pkg/sqlite3_abi import pkg/upraises -import ../backend +import ../threads/backend export sqlite3_abi diff --git a/datastore/backend.nim b/datastore/threads/backend.nim similarity index 97% rename from datastore/backend.nim rename to datastore/threads/backend.nim index 410f5b1..f6e3c81 100644 --- a/datastore/backend.nim +++ b/datastore/threads/backend.nim @@ -3,8 +3,8 @@ import std/options import pkg/questionable/results -import ./threads/databuffer -import ./types +import ./databuffer +import ../types export databuffer, types, SortOrder diff --git a/datastore/threads/fsbackend.nim b/datastore/threads/fsbackend.nim new file mode 100644 index 0000000..e3a6464 --- /dev/null +++ b/datastore/threads/fsbackend.nim @@ -0,0 +1,273 @@ +import std/os +import std/options +import std/strutils + +import pkg/questionable +import pkg/questionable/results +from pkg/stew/results as stewResults import get, isErr +import pkg/upraises + +import ./backend +import ./datastore + +export datastore + +push: {.upraises: [].} + +type + FSDatastore*[K, V] = object + root*: DataBuffer + ignoreProtected: bool + depth: int + +proc isRootSubdir*(root, path: string): bool = + path.startsWith(root) + +proc validDepth*(self: FSDatastore, key: Key): bool = + key.len <= self.depth + +proc findPath*[K,V](self: FSDatastore[K,V], key: K): ?!string = + ## Return filename corresponding to the key + ## or failure if the key doesn't correspond to a valid filename + ## + let root = $self.root + let key = Key.init($key).get() + if not self.validDepth(key): + return failure "Path has invalid depth!" + + var + segments: seq[string] + + for ns in key: + let basename = ns.value.extractFilename + if basename == "" or not basename.isValidFilename: + return failure "Filename contains invalid chars!" + + if ns.field == "": + segments.add(ns.value) + else: + let basename = ns.field.extractFilename + if basename == "" or not basename.isValidFilename: + return failure "Filename contains invalid chars!" + + # `:` are replaced with `/` + segments.add(ns.field / ns.value) + + let + fullname = (root / segments.joinPath()) + .absolutePath() + .catch() + .get() + .addFileExt(FileExt) + + if not root.isRootSubdir(fullname): + return failure "Path is outside of `root` directory!" + + return success fullname + +proc has*[K,V](self: FSDatastore[K,V], key: K): ?!bool = + without path =? self.findPath(key), error: + return failure error + success path.fileExists() + +proc contains*[K](self: FSDatastore, key: K): bool = + return self.has(key).get() + +proc delete*[K,V](self: FSDatastore[K,V], key: K): ?!void = + without path =? self.findPath(key), error: + return failure error + + if not path.fileExists(): + return success() + + try: + removeFile(path) + except OSError as e: + return failure e + + return success() + +proc delete*[K,V](self: FSDatastore[K,V], keys: openArray[K]): ?!void = + for key in keys: + if err =? self.delete(key).errorOption: + return failure err + + return success() + +proc readFile[V](self: FSDatastore, path: string): ?!V = + var + file: File + + defer: + file.close + + if not file.open(path): + return failure "unable to open file! path: " & path + + try: + let + size = file.getFileSize().int + + when V is seq[byte]: + var bytes = newSeq[byte](size) + elif V is V: + var bytes = V.new(size=size) + else: + {.error: "unhandled result type".} + var + read = 0 + + # echo "BYTES: ", bytes.repr + while read < size: + read += file.readBytes(bytes.toOpenArray(0, size-1), read, size) + + if read < size: + return failure $read & " bytes were read from " & path & + " but " & $size & " bytes were expected" + + return success bytes + + except CatchableError as e: + return failure e + +proc get*[K,V](self: FSDatastore[K,V], key: K): ?!V = + without path =? self.findPath(key), error: + return failure error + + if not path.fileExists(): + return failure( + newException(DatastoreKeyNotFound, "Key doesn't exist")) + + return readFile[V](self, path) + +proc put*[K,V](self: FSDatastore[K,V], + key: K, + data: V + ): ?!void = + + without path =? self.findPath(key), error: + return failure error + + try: + var data = data + createDir(parentDir(path)) + writeFile(path, data.toOpenArray(0, data.len()-1)) + except CatchableError as e: + return failure e + + return success() + +proc put*[K,V]( + self: FSDatastore, + batch: seq[DbBatchEntry[K, V]]): ?!void = + + for entry in batch: + if err =? self.put(entry.key, entry.data).errorOption: + return failure err + + return success() + +iterator dirIter(path: string): string {.gcsafe.} = + try: + for p in path.walkDirRec(yieldFilter = {pcFile}, relative = true): + yield p + except CatchableError as exc: + raise newException(Defect, exc.msg) + +proc close*[K,V](self: FSDatastore[K,V]): ?!void = + return success() + +type + FsQueryHandle*[K, V] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: FsQueryEnv[K,V] + + FsQueryEnv*[K,V] = object + self: FSDatastore[K,V] + basePath: DataBuffer + +proc query*[K,V]( + self: FSDatastore[K,V], + query: DbQuery[K], +): Result[FsQueryHandle[K, V], ref CatchableError] = + + let key = query.key + without path =? self.findPath(key), error: + return failure error + + let basePath = + # it there is a file in the directory + # with the same name then list the contents + # of the directory, otherwise recurse + # into subdirectories + if path.fileExists: + path.parentDir + else: + path.changeFileExt("") + + let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath)) + success FsQueryHandle[K, V](query: query, env: env) + +proc close*[K,V](handle: var FsQueryHandle[K,V]) = + if not handle.closed: + handle.closed = true + +iterator queryIter*[K, V]( + handle: var FsQueryHandle[K, V] +): ?!DbQueryResponse[K, V] = + let root = $(handle.env.self.root) + let basePath = $(handle.env.basePath) + + for path in basePath.dirIter(): + if handle.cancel: + break + + var + basePath = $handle.env.basePath + keyPath = basePath + + keyPath.removePrefix(root) + keyPath = keyPath / path.changeFileExt("") + keyPath = keyPath.replace("\\", "/") + + let + flres = (basePath / path).absolutePath().catch + if flres.isErr(): + yield DbQueryResponse[K,V].failure flres.error() + continue + + let + key = K.toKey($Key.init(keyPath).expect("valid key")) + data = + if handle.query.value: + let res = readFile[V](handle.env.self, flres.get) + if res.isErr(): + yield DbQueryResponse[K,V].failure res.error() + continue + res.get() + else: + V.new() + + yield success (key.some, data) + handle.close() + +proc newFSDatastore*[K,V](root: string, + depth = 2, + caseSensitive = true, + ignoreProtected = false + ): ?!FSDatastore[K,V] = + + let root = ? ( + block: + if root.isAbsolute: root + else: getCurrentDir() / root).catch + + if not dirExists(root): + return failure "directory does not exist: " & root + + success FSDatastore[K,V]( + root: DataBuffer.new root, + ignoreProtected: ignoreProtected, + depth: depth) diff --git a/datastore/sql/sqliteds.nim b/datastore/threads/sqlbackend.nim similarity index 99% rename from datastore/sql/sqliteds.nim rename to datastore/threads/sqlbackend.nim index 79d4a31..fdbce97 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/threads/sqlbackend.nim @@ -7,8 +7,8 @@ import pkg/sqlite3_abi from pkg/stew/results as stewResults import isErr import pkg/upraises -import ../backend -import ./sqlitedsdb +import ./backend +import ../sql/sqlitedsdb export backend, sqlitedsdb diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index e1a7571..7c7dc03 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -22,9 +22,9 @@ import pkg/threading/smartptrs import ../key import ../query import ../datastore -import ../backend -import ../fsds -import ../sql/sqliteds +import ./backend +import ./fsbackend +import ./sqlbackend import ./asyncsemaphore import ./databuffer @@ -140,7 +140,7 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = executeTask(ctx): has(ds, key) -method has*[BT](self: ThreadDatastore[BT], +proc has*[BT](self: ThreadDatastore[BT], key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() let signal = acquireSignal().get() @@ -159,7 +159,7 @@ proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; executeTask(ctx): delete(ds, key) -method delete*[BT](self: ThreadDatastore[BT], +proc delete*[BT](self: ThreadDatastore[BT], key: Key): Future[?!void] {.async.} = ## delete key await self.semaphore.acquire() @@ -174,7 +174,7 @@ method delete*[BT](self: ThreadDatastore[BT], return ctx[].res.toRes() -method delete*[BT](self: ThreadDatastore[BT], +proc delete*[BT](self: ThreadDatastore[BT], keys: seq[Key]): Future[?!void] {.async.} = ## delete batch for key in keys: @@ -190,7 +190,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; executeTask(ctx): put(ds, key, data) -method put*[BT](self: ThreadDatastore[BT], +proc put*[BT](self: ThreadDatastore[BT], key: Key, data: seq[byte]): Future[?!void] {.async.} = ## put key with data @@ -206,8 +206,8 @@ method put*[BT](self: ThreadDatastore[BT], self.tp.spawn putTask(ctx, ds, key, data) return ctx[].res.toRes() - -method put*[DB]( + +proc put*[DB]( self: ThreadDatastore[DB], batch: seq[BatchEntry]): Future[?!void] {.async.} = ## put batch data @@ -225,7 +225,7 @@ proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; let res = get(ds, key) res -method get*[BT](self: ThreadDatastore[BT], +proc get*[BT](self: ThreadDatastore[BT], key: Key, ): Future[?!seq[byte]] {.async.} = await self.semaphore.acquire() @@ -240,7 +240,7 @@ method get*[BT](self: ThreadDatastore[BT], return ctx[].res.toRes(v => v.toSeq()) -method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} = +proc close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} = await self.semaphore.closeAll() self.backend.close() @@ -291,7 +291,7 @@ proc queryTask[DB]( # set final result (?!QResult).ok((KeyId.none, DataBuffer())) -method query*[BT](self: ThreadDatastore[BT], +proc query*[BT](self: ThreadDatastore[BT], q: Query ): Future[?!QueryIter] {.async.} = ## performs async query diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim index 3508e9e..6a6cb0a 100644 --- a/datastore/threads/threadresult.nim +++ b/datastore/threads/threadresult.nim @@ -9,7 +9,7 @@ import ../types import ../query import ../key -import ../backend +import ./backend import ./databuffer type diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index e52d90b..290f4fa 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -8,7 +8,7 @@ import pkg/chronos import pkg/stew/results import pkg/stew/byteutils -import pkg/datastore/sql/sqliteds +import pkg/datastore/threads/sqlbackend import pkg/datastore/key import ../backendCommonTests diff --git a/tests/datastore/sql/testsqlitedsdb.nim b/tests/datastore/sql/testsqlitedsdb.nim index 03fd98f..2438d36 100644 --- a/tests/datastore/sql/testsqlitedsdb.nim +++ b/tests/datastore/sql/testsqlitedsdb.nim @@ -7,7 +7,7 @@ import pkg/stew/byteutils import pkg/sqlite3_abi import pkg/datastore/key import pkg/datastore/sql/sqlitedsdb -import pkg/datastore/sql/sqliteds +import pkg/datastore/threads/sqlbackend suite "Test Open SQLite Datastore DB": let diff --git a/tests/datastore/testfsds.nim b/tests/datastore/testfsds.nim index 1d95414..f55348d 100644 --- a/tests/datastore/testfsds.nim +++ b/tests/datastore/testfsds.nim @@ -10,7 +10,7 @@ import pkg/stew/byteutils import pkg/datastore/fsds import pkg/datastore/key -import pkg/datastore/backend +import pkg/datastore/threads/backend import ./backendCommonTests diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index db20c81..d6a2b83 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -16,9 +16,9 @@ import pkg/chronicles import pkg/threading/smartptrs import pkg/threading/atomics -import pkg/datastore/fsds -import pkg/datastore/sql/sqliteds -import pkg/datastore/threads/threadproxyds {.all.} +import pkg/datastore/threads/fsbackend +import pkg/datastore/threads/sqlbackend +import pkg/datastore/threads/threadproxyds import ./dscommontests import ./querycommontests