diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 2877d7c..ee18c56 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -178,6 +178,12 @@ proc close*[K,V](self: FSDatastore[K,V]): ?!void = return success() type + FsQueryHandle*[K, V] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: FsQueryEnv[K,V] + FsQueryEnv*[K,V] = object self: FSDatastore[K,V] basePath: DataBuffer @@ -185,7 +191,7 @@ type proc query*[K,V]( self: FSDatastore[K,V], query: DbQuery[K], -): Result[DbQueryHandle[K, V, FsQueryEnv[K,V]], ref CatchableError] = +): Result[FsQueryHandle[K, V], ref CatchableError] = let key = query.key without path =? self.findPath(key), error: @@ -202,14 +208,15 @@ proc query*[K,V]( path.changeFileExt("") let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath)) - success DbQueryHandle[KeyId, V, FsQueryEnv[K,V]](query: query, env: env) + success FsQueryHandle[K, V](query: query, env: env) proc close*[K,V](handle: var DbQueryHandle[K,V,FsQueryEnv[K,V]]) = if not handle.closed: handle.closed = true -iterator iter*[K, V](handle: var DbQueryHandle[K, V, FsQueryEnv[K,V]] - ): ?!DbQueryResponse[K, V] = +iterator queryIter*[K, V]( + handle: var FsQueryHandle[K, V] +): ?!DbQueryResponse[K, V] = let root = $(handle.env.self.root) let basePath = $(handle.env.basePath) diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index e86fb90..e642b07 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -103,10 +103,17 @@ proc close*[K,V](self: SQLiteBackend[K,V]): ?!void = return success() +type + SqQueryHandle*[K, V] = object + query*: DbQuery[K] + cancel*: bool + closed*: bool + env*: RawStmtPtr + proc query*[K,V]( self: SQLiteBackend[K,V], query: DbQuery[K] -): Result[DbQueryHandle[K,V,RawStmtPtr], ref CatchableError] = +): Result[SqQueryHandle[K,V], ref CatchableError] = var queryStr = if query.value: @@ -151,16 +158,18 @@ proc query*[K,V]( if not (v == SQLITE_OK): return failure newException(DatastoreError, $sqlite3_errstr(v)) - success DbQueryHandle[K,V,RawStmtPtr](query: query, env: s) + success SqQueryHandle[K,V](query: query, env: s) -proc close*[K,V](handle: var DbQueryHandle[K,V,RawStmtPtr]) = +proc close*[K,V](handle: var SqQueryHandle[K,V]) = if not handle.closed: handle.closed = true discard sqlite3_reset(handle.env) discard sqlite3_clear_bindings(handle.env) handle.env.dispose() -iterator iter*[K, V](handle: var DbQueryHandle[K, V, RawStmtPtr]): ?!DbQueryResponse[K, V] = +iterator queyIter*[K, V]( + handle: var SqQueryHandle[K, V] +): ?!DbQueryResponse[K, V] = while not handle.cancel: let v = sqlite3_step(handle.env) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index e85038c..1f4d679 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -23,6 +23,7 @@ import ../key import ../query import ../datastore import ../backend +import ../fsds import ../sql/sqliteds import ./asyncsemaphore @@ -251,10 +252,13 @@ method queryTask[DB]( query: DbQuery[KeyId], ) {.gcsafe, nimcall.} = ## run query command + mixin queryIter executeTask(ctx): # we execute this all inside `executeTask` # so we need to return a final result - let handleRes = ds.query(query) + let handleRes = query(ds, query) + static: + echo "HANDLE_RES: ", typeof(handleRes) if handleRes.isErr(): # set error and exit executeTask, which will fire final signal (?!QResult).err(handleRes.error()) @@ -266,7 +270,9 @@ method queryTask[DB]( raise newException(DeadThreadDefect, "queryTask timed out") var handle = handleRes.get() - for item in handle.iter(): + static: + echo "HANDLE: ", typeof(handle) + for item in handle.queyIter(): # wait for next request from async thread if ctx[].cancelled: diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index d59992c..801f987 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -15,8 +15,8 @@ import pkg/questionable/results import pkg/chronicles import pkg/threading/smartptrs -import pkg/datastore/sql/sqliteds import pkg/datastore/fsds +import pkg/datastore/sql/sqliteds import pkg/datastore/threads/threadproxyds {.all.} import ./dscommontests @@ -82,40 +82,39 @@ for i in 1..N: queryTests(ds, true) GC_fullCollect() -# suite "Test Basic ThreadDatastore with fsds": -# let -# path = currentSourcePath() # get this file's name -# basePath = "tests_data" -# basePathAbs = path.parentDir / basePath -# key = Key.init("/a/b").tryGet() -# bytes = "some bytes".toBytes -# otherBytes = "some other bytes".toBytes +suite "Test Basic ThreadDatastore with fsds": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes -# var -# fsStore: FSDatastore -# ds: ThreadDatastore -# taskPool: Taskpool + var + fsStore: FSDatastore[KeyId, DataBuffer] + ds: ThreadDatastore[FSDatastore[KeyId, DataBuffer]] + taskPool: Taskpool -# setupAll: -# removeDir(basePathAbs) -# require(not dirExists(basePathAbs)) -# createDir(basePathAbs) + setupAll: + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) -# fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() -# taskPool = Taskpool.new(NumThreads) -# ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet() + fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 3).tryGet() + ds = ThreadDatastore.new(fsStore, tp = taskPool).tryGet() -# teardown: -# GC_fullCollect() + teardown: + GC_fullCollect() -# teardownAll: -# (await ds.close()).tryGet() -# taskPool.shutdown() + teardownAll: + (await ds.close()).tryGet() -# removeDir(basePathAbs) -# require(not dirExists(basePathAbs)) + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + + basicStoreTests(ds, key, bytes, otherBytes) -# basicStoreTests(fsStore, key, bytes, otherBytes) # suite "Test Query ThreadDatastore with fsds": # let