From d24d3e5f2d1c324aa285d6146520c06b05e421ae Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 28 Sep 2023 18:13:37 -0700 Subject: [PATCH] reorg --- datastore/fsds.nim | 2 +- datastore/sql.nim | 2 +- .../{threadproxyds.nim => threadproxy.nim} | 0 tests/datastore/threads/testthreadproxyds.nim | 236 ++++++++++++++++++ 4 files changed, 238 insertions(+), 2 deletions(-) rename datastore/threads/{threadproxyds.nim => threadproxy.nim} (100%) create mode 100644 tests/datastore/threads/testthreadproxyds.nim diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 44b87b0..333e653 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -10,7 +10,7 @@ import pkg/chronos import pkg/taskpools import ./threads/fsbackend -import ./threads/threadproxyds +import ./threads/threadproxy import ./datastore export datastore, Taskpool diff --git a/datastore/sql.nim b/datastore/sql.nim index 66e9ce5..0a860b6 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -11,7 +11,7 @@ import pkg/chronos import pkg/taskpools import ./threads/sqlbackend -import ./threads/threadproxyds +import ./threads/threadproxy import ./datastore export datastore, keys, query, Taskpool, Memory diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxy.nim similarity index 100% rename from datastore/threads/threadproxyds.nim rename to datastore/threads/threadproxy.nim diff --git a/tests/datastore/threads/testthreadproxyds.nim b/tests/datastore/threads/testthreadproxyds.nim new file mode 100644 index 0000000..000a3e1 --- /dev/null +++ b/tests/datastore/threads/testthreadproxyds.nim @@ -0,0 +1,236 @@ +import std/options +import std/sequtils +import std/os +import std/cpuinfo +import std/algorithm +import std/importutils + +import pkg/asynctest +import pkg/chronos +import pkg/chronos/threadsync +import pkg/stew/results +import pkg/stew/byteutils +import pkg/taskpools +import pkg/questionable/results +import pkg/threading/atomics + +import pkg/datastore/sql +import pkg/datastore/fsds +import pkg/datastore/threads/threadproxy + +import ../dscommontests +import ../querycommontests + +const + NumThreads = 20 # IO threads aren't attached to CPU count + ThreadTestLoops {.intdefine.} = 1 + N = ThreadTestLoops + ThreadTestInnerLoops {.intdefine.} = 1 + M = ThreadTestInnerLoops + +var + taskPool: Taskpool = Taskpool.new(NumThreads) + +for i in 1..N: + suite "Test Basic ThreadDatastore with SQLite " & $i: + + var + ds: SQLiteDatastore + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes + + setupAll: + ds = SQLiteDatastore.new(Memory, tp=taskPool).tryGet() + + teardown: + GC_fullCollect() + + teardownAll: + (await ds.close()).tryGet() + + for i in 1..M: + basicStoreTests(ds, key, bytes, otherBytes) + GC_fullCollect() + + +for i in 1..N: + suite "Test Query ThreadDatastore with SQLite " & $i: + + var + ds: SQLiteDatastore + + setup: + ds = SQLiteDatastore.new(Memory, tp = taskPool).tryGet() + + teardown: + GC_fullCollect() + + (await ds.close()).tryGet() + + for i in 1..M: + queryTests(ds, true) + GC_fullCollect() + +suite "Test Basic ThreadDatastore with fsds": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes + + var + ds: FSDatastore + + setupAll: + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + ds = FSDatastore.new(root=basePathAbs, tp=taskPool, depth=5).tryGet() + + teardown: + GC_fullCollect() + + teardownAll: + (await ds.close()).tryGet() + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + + basicStoreTests(ds, key, bytes, otherBytes) + +suite "Test Query ThreadDatastore with fsds": + let + path = currentSourcePath() # get this file's name + basePath = "tests_data" + basePathAbs = path.parentDir / basePath + + var + ds: FSDatastore + + setup: + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + createDir(basePathAbs) + + ds = FSDatastore.new(root=basePathAbs, tp = taskPool, depth=5).tryGet() + + teardown: + GC_fullCollect() + (await ds.close()).tryGet() + + removeDir(basePathAbs) + require(not dirExists(basePathAbs)) + + queryTests(ds, false) + +for i in 1..N: + suite "Test ThreadDatastore cancelations": + + privateAccess(SQLiteDatastore) # expose private fields + privateAccess(ThreadDatastore) # expose private fields + # privateAccess(TaskCtx) # expose private fields + + var sds: SQLiteDatastore + + setupAll: + sds = SQLiteDatastore.new(Memory, tp = taskPool).tryGet() + + teardown: + GC_fullCollect() # run full collect after each test + + test "Should monitor signal and cancel": + var + signal = ThreadSignalPtr.new().tryGet() + + proc cancelTestTask(ctx: TaskCtx[bool]) {.gcsafe.} = + executeTask(ctx): + (?!bool).ok(true) + + let ctx = newTaskCtx(bool, signal=signal) + ctx[].cancelled = true + dispatchTask(sds.db, signal): + sds.db.tp.spawn cancelTestTask(ctx) + + check: + ctx[].res.isErr == true + ctx[].cancelled == true + ctx[].running == false + + test "Should cancel future": + + var + signal = ThreadSignalPtr.new().tryGet() + ms {.global.}: MutexSignal + flag {.global.}: Atomic[bool] + futFreed {.global.}: Atomic[bool] + ready {.global.}: Atomic[bool] + + ms.init() + + type + FutTestObj = object + val: int + TestValue = object + ThreadTestInt = (TestValue, ) + + proc `=destroy`(obj: var TestValue) = + # echo "destroy TestObj!" + flag.store(true) + + proc `=destroy`(obj: var FutTestObj) = + # echo "destroy FutTestObj!" + futFreed.store(true) + + proc wait(flag: var Atomic[bool], name = "task") = + # echo "wait for " & name & " to be ready..." + # defer: echo "" + for i in 1..100: + # stdout.write(".") + if flag.load() == true: + return + os.sleep(10) + raise newException(Defect, "timeout") + + proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} = + executeTask(ctx): + # echo "task:exec" + discard ctx[].signal.fireSync() + ready.store(true) + ms.wait() + echo "task context memory: ", ctx[] + (?!ThreadTestInt).ok(default(ThreadTestInt)) + + proc runTestTask() {.async.} = + let obj = FutTestObj(val: 42) + await sleepAsync(1.milliseconds) + try: + let ctx = newTaskCtx(ThreadTestInt, signal=signal) + dispatchTask(sds.db, signal): + sds.db.tp.spawn errorTestTask(ctx) + ready.wait() + # echo "raise error" + raise newException(ValueError, "fake error") + finally: + # echo "fut FutTestObj: ", obj + assert obj.val == 42 # need to force future to keep ref here + try: + block: + await runTestTask() + except CatchableError as exc: + # echo "caught: ", $exc + discard + finally: + # echo "finish" + check ready.load() == true + GC_fullCollect() + futFreed.wait("futFreed") + echo "future freed it's mem!" + check futFreed.load() == true + + ms.fire() + flag.wait("flag") + check flag.load() == true