test task cancel

This commit is contained in:
Jaremy Creechley 2023-09-28 14:51:27 -07:00
parent 2481746301
commit 43520c3608
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 32 additions and 10 deletions

View File

@ -128,8 +128,10 @@ template dispatchTask*[BT](self: ThreadDatastore[BT],
trace "Cancelling thread future!", exc = exc.msg trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled() ctx.setCancelled()
raise exc raise exc
except CatchableError as exc:
ctx.setCancelled()
raise exc
finally: finally:
# echo "signal:CLOSE!"
discard ctx[].signal.close() discard ctx[].signal.close()
self.semaphore.release() self.semaphore.release()
@ -359,6 +361,7 @@ method query*[BT](self: ThreadDatastore[BT],
return success iter return success iter
except CancelledError as exc: except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled()
await iterDispose() await iterDispose()
raise exc raise exc

View File

@ -14,6 +14,7 @@ import pkg/taskpools
import pkg/questionable/results import pkg/questionable/results
import pkg/chronicles import pkg/chronicles
import pkg/threading/smartptrs import pkg/threading/smartptrs
import pkg/threading/atomics
import pkg/datastore/fsds import pkg/datastore/fsds
import pkg/datastore/sql/sqliteds import pkg/datastore/sql/sqliteds
@ -176,7 +177,8 @@ suite "Test ThreadDatastore cancelations":
var var
signal = ThreadSignalPtr.new().tryGet() signal = ThreadSignalPtr.new().tryGet()
ms {.global.}: MutexSignal ms {.global.}: MutexSignal
flag {.global.}: int = 0 flag {.global.}: Atomic[bool]
ready {.global.}: Atomic[bool]
ms.init() ms.init()
@ -186,12 +188,25 @@ suite "Test ThreadDatastore cancelations":
proc `=destroy`(obj: var TestValue) = proc `=destroy`(obj: var TestValue) =
echo "destroy TestObj!" echo "destroy TestObj!"
flag = 10 flag.store(true)
proc wait(flag: var Atomic[bool]) =
echo "wait for task to be ready..."
defer: echo ""
for i in 1..100:
stdout.write(".")
if flag.load() == true:
return
os.sleep(10)
raise newException(Defect, "timeout")
proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} = proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} =
executeTask(ctx): executeTask(ctx):
echo "task:exec"
discard ctx[].signal.fireSync() discard ctx[].signal.fireSync()
ready.store(true)
ms.wait() ms.wait()
echo "ctx:task: ", ctx[]
(?!ThreadTestInt).ok(default(ThreadTestInt)) (?!ThreadTestInt).ok(default(ThreadTestInt))
proc runTestTask() {.async.} = proc runTestTask() {.async.} =
@ -199,17 +214,21 @@ suite "Test ThreadDatastore cancelations":
let ctx = newTaskCtx(ThreadTestInt, signal=signal) let ctx = newTaskCtx(ThreadTestInt, signal=signal)
dispatchTask(sds, signal): dispatchTask(sds, signal):
sds.tp.spawn errorTestTask(ctx) sds.tp.spawn errorTestTask(ctx)
ready.wait()
echo "raise error" echo "raise error"
raise newException(ValueError, "fake error") raise newException(ValueError, "fake error")
try: try:
block:
await runTestTask() await runTestTask()
except CatchableError as exc: except CatchableError as exc:
echo "caught: ", $exc echo "caught: ", $exc
finally: finally:
echo "finish" echo "finish"
ms.fire() check ready.load() == true
os.sleep(10)
check flag == 10 ms.fire()
GC_fullCollect()
flag.wait()
check flag.load() == true