diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index cbe5e45..19b5def 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -305,11 +305,9 @@ method query*[BT](self: ThreadDatastore[BT], let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal) proc iterDispose() {.async.} = - # echo "signal:CLOSE!" ctx.setCancelled() await ctx[].nextSignal.fire() discard ctx[].signal.close() - # echo "nextSignal:CLOSE!" discard ctx[].nextSignal.close() self.semaphore.release() diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 45ac6ad..9799411 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -44,7 +44,6 @@ for i in 1..N: setupAll: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - # taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() teardown: @@ -52,7 +51,6 @@ for i in 1..N: teardownAll: (await ds.close()).tryGet() - # taskPool.shutdown() for i in 1..M: basicStoreTests(ds, key, bytes, otherBytes) @@ -76,7 +74,6 @@ for i in 1..N: GC_fullCollect() (await ds.close()).tryGet() - # taskPool.shutdown() for i in 1..M: queryTests(ds, true) @@ -159,9 +156,8 @@ suite "Test ThreadDatastore cancelations": test "Should monitor signal and cancel": var signal = ThreadSignalPtr.new().tryGet() - res = ThreadResult[void]() - proc cancelTestTask[T](ctx: TaskCtx[T]) {.gcsafe.} = + proc cancelTestTask(ctx: TaskCtx[bool]) {.gcsafe.} = executeTask(ctx): (?!bool).ok(true) @@ -170,40 +166,50 @@ suite "Test ThreadDatastore cancelations": dispatchTask(sds, signal): sds.tp.spawn cancelTestTask(ctx) - echo "ctx: ", ctx[] check: ctx[].res.isErr == true ctx[].cancelled == true ctx[].running == false - # test "Should monitor and not cancel": - # var - # signal = ThreadSignalPtr.new().tryGet() - # res = ThreadResult[void]() - # ctx = TaskCtx[void]( - # ds: sqlStore, - # res: addr res, - # signal: signal) - # fut = newFuture[void]("signalMonitor") - # threadArgs = (addr ctx, addr fut) - # thread: Thread[type threadArgs] + test "Should cancel future": - # proc threadTask(args: type threadArgs) = - # var (ctx, fut) = args - # proc asyncTask() {.async.} = - # let - # monitor = signalMonitor(ctx, fut[]) + var + signal = ThreadSignalPtr.new().tryGet() + ms {.global.}: MutexSignal + flag {.global.}: int = 0 - # await monitor + ms.init() - # waitFor asyncTask() + type + TestValue = object + ThreadTestInt = (TestValue, ) - # createThread(thread, threadTask, threadArgs) - # ctx.cancelled = false - # check: ctx.signal.fireSync.tryGet + proc `=destroy`(obj: var TestValue) = + echo "destroy TestObj!" + flag = 10 - # joinThreads(thread) + proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} = + executeTask(ctx): + discard ctx[].signal.fireSync() + ms.wait() + (?!ThreadTestInt).ok(default(ThreadTestInt)) + + proc runTestTask() {.async.} = + + let ctx = newTaskCtx(ThreadTestInt, signal=signal) + dispatchTask(sds, signal): + sds.tp.spawn errorTestTask(ctx) + + echo "raise error" + raise newException(ValueError, "fake error") + + try: + await runTestTask() + except CatchableError as exc: + echo "caught: ", $exc + finally: + echo "finish" + ms.fire() + os.sleep(10) + check flag == 10 - # check: not fut.cancelled - # check: ctx.signal.close().isOk - # fut = nil