diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 8f9c781..9ef5bce 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -55,12 +55,15 @@ method query*(self: FSDatastore, proc new*( T: type FSDatastore, - path: string, - readOnly = false, + root: string, tp: Taskpool, + depth = 2, + caseSensitive = true, + ignoreProtected = false ): ?!FSDatastore = let - backend = ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly) + backend = ? newFSBackend[KeyId, DataBuffer]( + root=root, depth=depth, caseSensitive=caseSensitive, ignoreProtected=ignoreProtected) db = ? ThreadDatastore.new(backend, tp = tp) success FSDatastore(db: db) diff --git a/datastore/sql.nim b/datastore/sql.nim index db6baed..66e9ce5 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -14,7 +14,7 @@ import ./threads/sqlbackend import ./threads/threadproxyds import ./datastore -export datastore, keys, query, Taskpool +export datastore, keys, query, Taskpool, Memory push: {.upraises: [].} @@ -63,15 +63,11 @@ method query*(self: SQLiteDatastore, proc new*( T: type SQLiteDatastore, path: string, + tp: Taskpool, readOnly = false): ?!SQLiteDatastore = - success SQLiteDatastore( - db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly)) + let + backend = ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly) + db = ? ThreadDatastore.new(backend, tp = tp) + success SQLiteDatastore(db: db) -proc new*( - T: type SQLiteDatastore, - db: SQLiteBackend[KeyId, DataBuffer]): ?!T = - - success T( - db: db, - readOnly: db.readOnly) diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim deleted file mode 100644 index dd88395..0000000 --- a/tests/datastore/testthreadproxyds.nim +++ /dev/null @@ -1,242 +0,0 @@ -import std/options -import std/sequtils -import std/os -import std/cpuinfo -import std/algorithm -import std/importutils - -import pkg/asynctest -import pkg/chronos -import pkg/chronos/threadsync -import pkg/stew/results -import pkg/stew/byteutils -import pkg/taskpools -import pkg/questionable/results -import pkg/chronicles -import pkg/threading/smartptrs -import pkg/threading/atomics - -import pkg/datastore/threads/fsbackend -import pkg/datastore/threads/sqlbackend -import pkg/datastore/threads/threadproxyds - -import ./dscommontests -import ./querycommontests - -const - NumThreads = 20 # IO threads aren't attached to CPU count - ThreadTestLoops {.intdefine.} = 1 - N = ThreadTestLoops - ThreadTestInnerLoops {.intdefine.} = 1 - M = ThreadTestInnerLoops - -var - taskPool: Taskpool = Taskpool.new(NumThreads) - -for i in 1..N: - suite "Test Basic ThreadDatastore with SQLite " & $i: - - var - ds: SQLiteDatastore - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - setupAll: - ds = SQLiteDatastore.new(Memory, tp = taskPool).tryGet() - - teardown: - GC_fullCollect() - - teardownAll: - (await ds.close()).tryGet() - - for i in 1..M: - basicStoreTests(ds, key, bytes, otherBytes) - GC_fullCollect() - - -for i in 1..N: - suite "Test Query ThreadDatastore with SQLite " & $i: - - var - ds: SQLiteDatastore - - setup: - ds = SQLiteDatastore.new(Memory, tp = taskPool).tryGet() - - teardown: - GC_fullCollect() - - (await ds.close()).tryGet() - - for i in 1..M: - queryTests(ds, true) - GC_fullCollect() - -suite "Test Basic ThreadDatastore with fsds": - let - path = currentSourcePath() # get this file's name - basePath = "tests_data" - basePathAbs = path.parentDir / basePath - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - var - ds: SQLiteDatastore - - setupAll: - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - createDir(basePathAbs) - - fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 3).tryGet() - ds = ThreadDatastore.new(fsStore, tp = taskPool).tryGet() - - teardown: - GC_fullCollect() - - teardownAll: - (await ds.close()).tryGet() - - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - - basicStoreTests(ds, key, bytes, otherBytes) - -suite "Test Query ThreadDatastore with fsds": - let - path = currentSourcePath() # get this file's name - basePath = "tests_data" - basePathAbs = path.parentDir / basePath - - var - fsStore: FSDatastore[KeyId, DataBuffer] - ds: ThreadDatastore[FSDatastore[KeyId, DataBuffer]] - - setup: - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - createDir(basePathAbs) - - fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 5).tryGet() - ds = ThreadDatastore.new(fsStore, tp = taskPool).tryGet() - - teardown: - GC_fullCollect() - (await ds.close()).tryGet() - - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - - queryTests(ds, false) - -for i in 1..N: - suite "Test ThreadDatastore cancelations": - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - sds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] - - privateAccess(ThreadDatastore) # expose private fields - privateAccess(TaskCtx) # expose private fields - - setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - sds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - - teardown: - GC_fullCollect() # run full collect after each test - - test "Should monitor signal and cancel": - var - signal = ThreadSignalPtr.new().tryGet() - - proc cancelTestTask(ctx: TaskCtx[bool]) {.gcsafe.} = - executeTask(ctx): - (?!bool).ok(true) - - let ctx = newTaskCtx(bool, signal=signal) - ctx[].cancelled = true - dispatchTask(sds, signal): - sds.tp.spawn cancelTestTask(ctx) - - check: - ctx[].res.isErr == true - ctx[].cancelled == true - ctx[].running == false - - test "Should cancel future": - - var - signal = ThreadSignalPtr.new().tryGet() - ms {.global.}: MutexSignal - flag {.global.}: Atomic[bool] - futFreed {.global.}: Atomic[bool] - ready {.global.}: Atomic[bool] - - ms.init() - - type - FutTestObj = object - val: int - TestValue = object - ThreadTestInt = (TestValue, ) - - proc `=destroy`(obj: var TestValue) = - # echo "destroy TestObj!" - flag.store(true) - - proc `=destroy`(obj: var FutTestObj) = - # echo "destroy FutTestObj!" - futFreed.store(true) - - proc wait(flag: var Atomic[bool], name = "task") = - # echo "wait for " & name & " to be ready..." - # defer: echo "" - for i in 1..100: - # stdout.write(".") - if flag.load() == true: - return - os.sleep(10) - raise newException(Defect, "timeout") - - proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} = - executeTask(ctx): - # echo "task:exec" - discard ctx[].signal.fireSync() - ready.store(true) - ms.wait() - echo "task context memory: ", ctx[] - (?!ThreadTestInt).ok(default(ThreadTestInt)) - - proc runTestTask() {.async.} = - let obj = FutTestObj(val: 42) - await sleepAsync(1.milliseconds) - try: - let ctx = newTaskCtx(ThreadTestInt, signal=signal) - dispatchTask(sds, signal): - sds.tp.spawn errorTestTask(ctx) - ready.wait() - # echo "raise error" - raise newException(ValueError, "fake error") - finally: - # echo "fut FutTestObj: ", obj - assert obj.val == 42 # need to force future to keep ref here - try: - block: - await runTestTask() - except CatchableError as exc: - # echo "caught: ", $exc - discard - finally: - # echo "finish" - check ready.load() == true - GC_fullCollect() - futFreed.wait("futFreed") - echo "future freed it's mem!" - check futFreed.load() == true - - ms.fire() - flag.wait("flag") - check flag.load() == true