This commit is contained in:
Jaremy Creechley 2023-09-26 16:38:46 -07:00
parent 54a04127a8
commit c31f1890b2
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
3 changed files with 44 additions and 22 deletions

View File

@ -67,8 +67,9 @@ proc setCancelled[T](ctx: TaskCtx[T]) =
proc setRunning[T](ctx: TaskCtx[T]): bool =
withLock(ctxLock):
if ctx[].cancelled:
return
return false
ctx[].running = true
return true
proc setDone[T](ctx: TaskCtx[T]) =
withLock(ctxLock):
ctx[].running = false
@ -82,18 +83,26 @@ proc acquireSignal(): ?!ThreadSignalPtr =
template executeTask[T](ctx: TaskCtx[T], blk: untyped) =
try:
echo "executeTask:start:"
if not ctx.setRunning():
echo "executeTask:notRunning!"
return
## run backend command
echo "executeTask:run:"
let res = `blk`
if res.isOk():
echo "executeTask:run:ok"
when T is void:
ctx[].res.ok()
else:
ctx[].res.ok(res.get())
else:
echo "executeTask:run:err"
ctx[].res.err res.error().toThreadErr()
echo "executeTask:run:done: ", ctx[].res.repr
echo ""
except CatchableError as exc:
trace "Unexpected exception thrown in async task", exc = exc.msg
ctx[].res.err exc.toThreadErr()
@ -101,8 +110,10 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) =
trace "Unexpected defect thrown in async task", exc = exc.msg
ctx[].res.err exc.toThreadErr()
finally:
echo "executeTask:finally:setDone"
ctx.setDone()
discard ctx[].signal.fireSync()
echo "executeTask:finally:done\n"
template dispatchTaskWrap[T](self: ThreadDatastore,
signal: ThreadSignalPtr,
@ -111,12 +122,15 @@ template dispatchTaskWrap[T](self: ThreadDatastore,
try:
case self.backend.kind:
of Sqlite:
var ds {.inject.} = self.backend.sql
echo "dispatchTask:sql:"
var ds {.used, inject.} = self.backend.sql
proc runTask() =
`blk`
runTask()
echo "dispatchTask:wait:start"
await wait(ctx[].signal)
echo "dispatchTask:wait:done"
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled()
@ -172,16 +186,19 @@ method delete*(self: ThreadDatastore,
return success()
proc putTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId,
data: DataBuffer) {.gcsafe, nimcall.} =
key: KeyId,
data: DataBuffer) {.gcsafe, nimcall.} =
## run backend command
echo "\n\nputTask:start "
executeTask(ctx):
echo "putTask:key: ", key
echo "putTask:data: ", data
echo "putTask:ctx: ", ctx.repr()
echo ""
put(ds, key, data)
echo "putTask:done"
method put*(self: ThreadDatastore,
key: Key,
@ -200,6 +217,8 @@ method put*(self: ThreadDatastore,
echo ""
self.tp.spawn putTask(ctx, ds, key, data)
return ctx[].res
method put*(
self: ThreadDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
@ -208,7 +227,6 @@ method put*(
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
return success()
proc getTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe, nimcall.} =

View File

@ -39,3 +39,7 @@ converter toExc*(e: ThreadResErr): ref CatchableError =
of ErrorEnum.DatastoreErr: (ref DatastoreError)(msg: $e[1])
of ErrorEnum.CatchableErr: (ref CatchableError)(msg: $e[1])
of ErrorEnum.DefectErr: (ref CatchableError)(msg: "defect: " & $e[1])
converter toExcRes*[T](res: ThreadResult[T]): ?!T =
res.mapErr() do(exc: ThreadResErr) -> ref CatchableError:
exc.toExc()

View File

@ -170,26 +170,26 @@ suite "Test Basic ThreadProxyDatastore":
# queryTests(ds, false)
suite "Test ThreadDatastore cancelations":
var
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
# suite "Test ThreadDatastore cancelations":
# var
# sqlStore: SQLiteBackend[KeyId,DataBuffer]
# ds: ThreadDatastore
# taskPool: Taskpool
privateAccess(ThreadDatastore) # expose private fields
privateAccess(TaskCtx) # expose private fields
# privateAccess(ThreadDatastore) # expose private fields
# privateAccess(TaskCtx) # expose private fields
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
# setupAll:
# sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
# taskPool = Taskpool.new(NumThreads)
# ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
teardown:
GC_fullCollect() # run full collect after each test
# teardown:
# GC_fullCollect() # run full collect after each test
teardownAll:
(await ds.close()).tryGet()
taskPool.shutdown()
# teardownAll:
# (await ds.close()).tryGet()
# taskPool.shutdown()
# test "Should monitor signal and cancel":
# var