diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 8a20fd4..021e23f 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -139,109 +139,112 @@ suite "Test Query ThreadDatastore with fsds": queryTests(ds, false) -suite "Test ThreadDatastore cancelations": - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - sds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] - - privateAccess(ThreadDatastore) # expose private fields - privateAccess(TaskCtx) # expose private fields - - setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - sds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - - teardown: - GC_fullCollect() # run full collect after each test - - test "Should monitor signal and cancel": +for i in 1..N: + suite "Test ThreadDatastore cancelations": var - signal = ThreadSignalPtr.new().tryGet() + sqlStore: SQLiteBackend[KeyId,DataBuffer] + sds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] - proc cancelTestTask(ctx: TaskCtx[bool]) {.gcsafe.} = - executeTask(ctx): - (?!bool).ok(true) + privateAccess(ThreadDatastore) # expose private fields + privateAccess(TaskCtx) # expose private fields - let ctx = newTaskCtx(bool, signal=signal) - ctx[].cancelled = true - dispatchTask(sds, signal): - sds.tp.spawn cancelTestTask(ctx) + setupAll: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + sds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - check: - ctx[].res.isErr == true - ctx[].cancelled == true - ctx[].running == false + teardown: + GC_fullCollect() # run full collect after each test - test "Should cancel future": + test "Should monitor signal and cancel": + var + signal = ThreadSignalPtr.new().tryGet() - var - signal = ThreadSignalPtr.new().tryGet() - ms {.global.}: MutexSignal - flag {.global.}: Atomic[bool] - futFreed {.global.}: Atomic[bool] - ready {.global.}: Atomic[bool] + proc cancelTestTask(ctx: TaskCtx[bool]) {.gcsafe.} = + executeTask(ctx): + (?!bool).ok(true) - ms.init() - - type - FutTestObj = object - TestValue = object - ThreadTestInt = (TestValue, ) - - proc `=destroy`(obj: var TestValue) = - echo "destroy TestObj!" - flag.store(true) - - proc `=destroy`(obj: var FutTestObj) = - echo "destroy FutTestObj!" - futFreed.store(true) - - proc wait(flag: var Atomic[bool]) = - echo "wait for task to be ready..." - defer: echo "" - for i in 1..100: - stdout.write(".") - if flag.load() == true: - return - os.sleep(10) - raise newException(Defect, "timeout") - - proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} = - executeTask(ctx): - echo "task:exec" - discard ctx[].signal.fireSync() - ready.store(true) - ms.wait() - echo "ctx:task: ", ctx[] - (?!ThreadTestInt).ok(default(ThreadTestInt)) - - proc runTestTask() {.async.} = - - let obj = FutTestObj() - await sleepAsync(1.milliseconds) - defer: echo "fut FutTestObj: ", obj - - let ctx = newTaskCtx(ThreadTestInt, signal=signal) + let ctx = newTaskCtx(bool, signal=signal) + ctx[].cancelled = true dispatchTask(sds, signal): - sds.tp.spawn errorTestTask(ctx) - ready.wait() - echo "raise error" - raise newException(ValueError, "fake error") + sds.tp.spawn cancelTestTask(ctx) - try: - block: - await runTestTask() - except CatchableError as exc: - echo "caught: ", $exc - finally: - echo "finish" - check ready.load() == true - GC_fullCollect() - futFreed.wait() - echo "future freed it's mem!" - check futFreed.load() == true + check: + ctx[].res.isErr == true + ctx[].cancelled == true + ctx[].running == false - ms.fire() - flag.wait() - check flag.load() == true + test "Should cancel future": + + var + signal = ThreadSignalPtr.new().tryGet() + ms {.global.}: MutexSignal + flag {.global.}: Atomic[bool] + futFreed {.global.}: Atomic[bool] + ready {.global.}: Atomic[bool] + + ms.init() + + type + FutTestObj = object + val: int + TestValue = object + ThreadTestInt = (TestValue, ) + + proc `=destroy`(obj: var TestValue) = + # echo "destroy TestObj!" + flag.store(true) + + proc `=destroy`(obj: var FutTestObj) = + # echo "destroy FutTestObj!" + futFreed.store(true) + + proc wait(flag: var Atomic[bool], name = "task") = + # echo "wait for " & name & " to be ready..." + defer: echo "" + for i in 1..100: + # stdout.write(".") + if flag.load() == true: + return + os.sleep(10) + raise newException(Defect, "timeout") + + proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} = + executeTask(ctx): + # echo "task:exec" + discard ctx[].signal.fireSync() + ready.store(true) + ms.wait() + # echo "ctx:task: ", ctx[] + (?!ThreadTestInt).ok(default(ThreadTestInt)) + + proc runTestTask() {.async.} = + let obj = FutTestObj(val: 42) + await sleepAsync(1.milliseconds) + try: + let ctx = newTaskCtx(ThreadTestInt, signal=signal) + dispatchTask(sds, signal): + sds.tp.spawn errorTestTask(ctx) + ready.wait() + # echo "raise error" + raise newException(ValueError, "fake error") + finally: + # echo "fut FutTestObj: ", obj + assert obj.val == 42 # need to force future to keep ref here + try: + block: + await runTestTask() + except CatchableError as exc: + # echo "caught: ", $exc + discard + finally: + # echo "finish" + check ready.load() == true + GC_fullCollect() + futFreed.wait("futFreed") + echo "future freed it's mem!" + check futFreed.load() == true + + ms.fire() + flag.wait("flag") + check flag.load() == true