From 173d42631a082fb84fce91dadc21cbe37f68c176 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 14:53:12 -0700 Subject: [PATCH] setup query end --- datastore/threads/threadproxyds.nim | 67 +++++++++++++++-------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 8fc0d1e..4117d00 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -15,6 +15,7 @@ import pkg/questionable import pkg/questionable/results import pkg/taskpools import pkg/chronicles +import pkg/threading/smartptrs import ../key import ../query @@ -42,12 +43,14 @@ type of Sqlite: sql*: SQLiteBackend[KeyId,DataBuffer] - TaskCtx[T: ThreadTypes] = object + TaskCtxObj[T: ThreadTypes] = object res: ThreadResult[T] signal: ThreadSignalPtr running: bool cancelled: bool + TaskCtx[T] = SharedPtr[TaskCtxObj[T]] + ThreadDatastore* = ref object of Datastore tp: Taskpool backend: ThreadBackend @@ -57,22 +60,22 @@ type var ctxLock: Lock ctxLock.initLock() -proc setCancelled(ctx: var TaskCtx): bool = +proc setCancelled[T](ctx: TaskCtx[T]): bool = withLock(ctxLock): - if ctx.running: + if ctx[].running: return false else: - ctx.cancelled = true + ctx[].cancelled = true return true -proc setRunning[T](ctx: ptr TaskCtx[T]): bool = +proc setRunning[T](ctx: TaskCtx[T]): bool = withLock(ctxLock): - if ctx.cancelled: + if ctx[].cancelled: return - ctx.running = true -proc setDone[T](ctx: ptr TaskCtx[T]) = + ctx[].running = true +proc setDone[T](ctx: TaskCtx[T]) = withLock(ctxLock): - ctx.running = false + ctx[].running = false proc acquireSignal(): ?!ThreadSignalPtr = let signal = ThreadSignalPtr.new() @@ -81,7 +84,7 @@ proc acquireSignal(): ?!ThreadSignalPtr = else: success signal.get() -template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) = +template executeTask[T](ctx: TaskCtx[T], blk: untyped) = try: if not ctx.setRunning(): return @@ -90,11 +93,11 @@ template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) = let res = `blk` if res.isOk(): when T is void: - ctx.res.ok() + ctx[].res.ok() else: - ctx.res.ok(res.get()) + ctx[].res.ok(res.get()) else: - ctx.res.err res.error().toThreadErr() + ctx[].res.err res.error().toThreadErr() except CatchableError as exc: trace "Unexpected exception thrown in async task", exc = exc.msg ctx[].res.err exc.toThreadErr() @@ -110,7 +113,7 @@ template dispatchTask[T](self: ThreadDatastore, blk: untyped ): auto = var - ctx {.inject.} = TaskCtx[T](signal: signal) + ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) try: case self.backend.kind: of Sqlite: @@ -119,7 +122,7 @@ template dispatchTask[T](self: ThreadDatastore, `blk` runTask() - await wait(ctx.signal) + await wait(ctx[].signal) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg while not ctx.setCancelled(): @@ -127,10 +130,10 @@ template dispatchTask[T](self: ThreadDatastore, await sleepAsync(10.milliseconds) raise exc finally: - discard ctx.signal.close() + discard ctx[].signal.close() self.semaphore.release() -proc hasTask[T, DB](ctx: ptr TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = +proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): has(ds, key) @@ -143,9 +146,9 @@ method has*(self: ThreadDatastore, let key = KeyId.new key.id() dispatchTask[bool](self, signal): - self.tp.spawn hasTask(addr ctx, ds, key) + self.tp.spawn hasTask(ctx, ds, key) -proc deleteTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; +proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): @@ -159,7 +162,7 @@ method delete*(self: ThreadDatastore, let key = KeyId.new key.id() dispatchTask[void](self, signal): - self.tp.spawn deleteTask(addr ctx, ds, key) + self.tp.spawn deleteTask(ctx, ds, key) method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} = @@ -170,7 +173,7 @@ method delete*(self: ThreadDatastore, return success() -proc putTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; +proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId, data: DataBuffer) {.gcsafe, nimcall.} = ## run backend command @@ -187,7 +190,7 @@ method put*(self: ThreadDatastore, let key = KeyId.new key.id() let data = DataBuffer.new data dispatchTask[void](self, signal): - self.tp.spawn putTask(addr ctx, ds, key, data) + self.tp.spawn putTask(ctx, ds, key, data) method put*( self: ThreadDatastore, @@ -199,7 +202,7 @@ method put*( return success() -proc getTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; +proc getTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe, nimcall.} = ## run backend command executeTask(ctx): @@ -214,7 +217,7 @@ method get*(self: ThreadDatastore, let key = KeyId.new key.id() dispatchTask[void](self, signal): - self.tp.spawn getTask(addr ctx, ds, key) + self.tp.spawn getTask(ctx, ds, key) method close*(self: ThreadDatastore): Future[?!void] {.async.} = await self.semaphore.closeAll() @@ -226,7 +229,7 @@ type QResult = DbQueryResponse[KeyId, DataBuffer] proc queryTask[DB]( - ctx: ptr TaskCtx[QResult], + ctx: TaskCtx[QResult], ds: DB, dq: DbQuery[KeyId] ) {.gcsafe, nimcall.} = @@ -266,7 +269,7 @@ method query*( ) dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal): - self.tp.spawn queryTask(addr ctx, ds, dq) + self.tp.spawn queryTask(ctx, ds, dq) var lock = newAsyncLock() # serialize querying under threads @@ -286,16 +289,16 @@ method query*( await lock.acquire() dispatchTask[void](self, signal): - discard ctx.signal.fireSync() + discard ctx[].signal.fireSync() - if ctx.res.isErr() and ctx.res.error()[0] == ErrorEnum.QueryEndedErr: + if ctx[].res.isErr() and ctx[].res.error()[0] == ErrorEnum.QueryEndedErr: iter.finished = true return - elif ctx.res.isErr(): - return err(ctx.res.error()) + elif ctx[].res.isErr(): + return err(ctx[].res.error()) else: - let qres = ctx.res.get() - return ok(default) + # let qres = ctx[].res.get() + return (?!QueryResponse).ok(default(QueryResponse)) iter.next = next return success iter