From c31f1890b2ec7d9fbd038aaf30601e82f35dc6bd Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 16:38:46 -0700 Subject: [PATCH] testing --- datastore/threads/threadproxyds.nim | 30 ++++++++++++++++++++----- datastore/threads/threadresult.nim | 4 ++++ tests/datastore/testthreadproxyds.nim | 32 +++++++++++++-------------- 3 files changed, 44 insertions(+), 22 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index fd0d384..0387781 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -67,8 +67,9 @@ proc setCancelled[T](ctx: TaskCtx[T]) = proc setRunning[T](ctx: TaskCtx[T]): bool = withLock(ctxLock): if ctx[].cancelled: - return + return false ctx[].running = true + return true proc setDone[T](ctx: TaskCtx[T]) = withLock(ctxLock): ctx[].running = false @@ -82,18 +83,26 @@ proc acquireSignal(): ?!ThreadSignalPtr = template executeTask[T](ctx: TaskCtx[T], blk: untyped) = try: + echo "executeTask:start:" if not ctx.setRunning(): + echo "executeTask:notRunning!" return ## run backend command + echo "executeTask:run:" let res = `blk` if res.isOk(): + echo "executeTask:run:ok" when T is void: ctx[].res.ok() else: ctx[].res.ok(res.get()) else: + echo "executeTask:run:err" ctx[].res.err res.error().toThreadErr() + echo "executeTask:run:done: ", ctx[].res.repr + echo "" + except CatchableError as exc: trace "Unexpected exception thrown in async task", exc = exc.msg ctx[].res.err exc.toThreadErr() @@ -101,8 +110,10 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) = trace "Unexpected defect thrown in async task", exc = exc.msg ctx[].res.err exc.toThreadErr() finally: + echo "executeTask:finally:setDone" ctx.setDone() discard ctx[].signal.fireSync() + echo "executeTask:finally:done\n" template dispatchTaskWrap[T](self: ThreadDatastore, signal: ThreadSignalPtr, @@ -111,12 +122,15 @@ template dispatchTaskWrap[T](self: ThreadDatastore, try: case self.backend.kind: of Sqlite: - var ds {.inject.} = self.backend.sql + echo "dispatchTask:sql:" + var ds {.used, inject.} = self.backend.sql proc runTask() = `blk` runTask() - + echo "dispatchTask:wait:start" await wait(ctx[].signal) + echo "dispatchTask:wait:done" + except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() @@ -172,16 +186,19 @@ method delete*(self: ThreadDatastore, return success() + proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; - key: KeyId, - data: DataBuffer) {.gcsafe, nimcall.} = + key: KeyId, + data: DataBuffer) {.gcsafe, nimcall.} = ## run backend command + echo "\n\nputTask:start " executeTask(ctx): echo "putTask:key: ", key echo "putTask:data: ", data echo "putTask:ctx: ", ctx.repr() echo "" put(ds, key, data) + echo "putTask:done" method put*(self: ThreadDatastore, key: Key, @@ -200,6 +217,8 @@ method put*(self: ThreadDatastore, echo "" self.tp.spawn putTask(ctx, ds, key, data) + return ctx[].res + method put*( self: ThreadDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = @@ -208,7 +227,6 @@ method put*( if err =? (await self.put(entry.key, entry.data)).errorOption: return failure err - return success() proc getTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe, nimcall.} = diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim index b1694f4..f0a226b 100644 --- a/datastore/threads/threadresult.nim +++ b/datastore/threads/threadresult.nim @@ -39,3 +39,7 @@ converter toExc*(e: ThreadResErr): ref CatchableError = of ErrorEnum.DatastoreErr: (ref DatastoreError)(msg: $e[1]) of ErrorEnum.CatchableErr: (ref CatchableError)(msg: $e[1]) of ErrorEnum.DefectErr: (ref CatchableError)(msg: "defect: " & $e[1]) + +converter toExcRes*[T](res: ThreadResult[T]): ?!T = + res.mapErr() do(exc: ThreadResErr) -> ref CatchableError: + exc.toExc() diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 046773d..93eb84b 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -170,26 +170,26 @@ suite "Test Basic ThreadProxyDatastore": # queryTests(ds, false) -suite "Test ThreadDatastore cancelations": - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool +# suite "Test ThreadDatastore cancelations": +# var +# sqlStore: SQLiteBackend[KeyId,DataBuffer] +# ds: ThreadDatastore +# taskPool: Taskpool - privateAccess(ThreadDatastore) # expose private fields - privateAccess(TaskCtx) # expose private fields +# privateAccess(ThreadDatastore) # expose private fields +# privateAccess(TaskCtx) # expose private fields - setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() +# setupAll: +# sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() +# taskPool = Taskpool.new(NumThreads) +# ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - teardown: - GC_fullCollect() # run full collect after each test +# teardown: +# GC_fullCollect() # run full collect after each test - teardownAll: - (await ds.close()).tryGet() - taskPool.shutdown() +# teardownAll: +# (await ds.close()).tryGet() +# taskPool.shutdown() # test "Should monitor signal and cancel": # var