test task cancel

This commit is contained in:
Jaremy Creechley 2023-09-28 13:47:48 -07:00
parent 46f22dc10e
commit bd79cb7357
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 32 additions and 47 deletions

View File

@ -38,10 +38,10 @@ logScope:
type
TaskCtxObj*[T: ThreadTypes] = object
res: ThreadResult[T]
res*: ThreadResult[T]
signal: ThreadSignalPtr
running: bool ## used to mark when a task worker is running
cancelled: bool ## used to cancel a task before it's started
running*: bool ## used to mark when a task worker is running
cancelled*: bool ## used to cancel a task before it's started
nextSignal: ThreadSignalPtr
TaskCtx*[T] = SharedPtr[TaskCtxObj[T]]
@ -78,7 +78,7 @@ proc acquireSignal(): ?!ThreadSignalPtr =
else:
success signal.get()
template executeTask[T](ctx: TaskCtx[T], blk: untyped) =
template executeTask*[T](ctx: TaskCtx[T], blk: untyped) =
## executes a task on a thread work and handles cleanup after cancels/errors
##
try:
@ -115,7 +115,7 @@ template dispatchTaskWrap[BT](self: ThreadDatastore[BT],
runTask()
await wait(ctx[].signal)
template dispatchTask[BT](self: ThreadDatastore[BT],
template dispatchTask*[BT](self: ThreadDatastore[BT],
signal: ThreadSignalPtr,
blk: untyped
): auto =

View File

@ -141,55 +141,40 @@ suite "Test Query ThreadDatastore with fsds":
queryTests(ds, false)
# suite "Test ThreadDatastore cancelations":
# var
# sqlStore: SQLiteBackend[KeyId,DataBuffer]
# ds: ThreadDatastore
# taskPool: Taskpool
suite "Test ThreadDatastore cancelations":
var
sqlStore: SQLiteBackend[KeyId,DataBuffer]
sds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
# privateAccess(ThreadDatastore) # expose private fields
# privateAccess(TaskCtx) # expose private fields
privateAccess(ThreadDatastore) # expose private fields
privateAccess(TaskCtx) # expose private fields
# setupAll:
# sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
# taskPool = Taskpool.new(NumThreads)
# ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
sds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
# teardown:
# GC_fullCollect() # run full collect after each test
teardown:
GC_fullCollect() # run full collect after each test
# teardownAll:
# (await ds.close()).tryGet()
# taskPool.shutdown()
test "Should monitor signal and cancel":
var
signal = ThreadSignalPtr.new().tryGet()
res = ThreadResult[void]()
# test "Should monitor signal and cancel":
# var
# signal = ThreadSignalPtr.new().tryGet()
# res = ThreadResult[void]()
# ctx = newSharedPtr(TaskCtxObj[void](signal: signal))
# fut = newFuture[void]("signalMonitor")
# threadArgs = (addr ctx, addr fut)
# thread: Thread[type threadArgs]
proc cancelTestTask[T](ctx: TaskCtx[T]) {.gcsafe.} =
executeTask(ctx):
(?!bool).ok(true)
# proc threadTask(args: type threadArgs) =
# var (ctx, fut) = args
# proc asyncTask() {.async.} =
# let
# monitor = signalMonitor(ctx, fut[])
let ctx = newTaskCtx(bool, signal=signal)
ctx[].cancelled = true
dispatchTask(sds, signal):
sds.tp.spawn cancelTestTask(ctx)
# await monitor
# waitFor asyncTask()
# createThread(thread, threadTask, threadArgs)
# ctx.cancelled = true
# check: ctx.signal.fireSync.tryGet
# joinThreads(thread)
# check: fut.cancelled
# check: ctx.signal.close().isOk
# fut = nil
echo "ctx: ", ctx[]
check:
ctx[].res.isErr == true
ctx[].cancelled == true
ctx[].running == false
# test "Should monitor and not cancel":
# var