From d34cbd7df5ca1a834c7f9a8d85e9a6aff4c6e164 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 12:51:28 -0700 Subject: [PATCH] integrate setups --- datastore/threads/threadproxyds.nim | 62 ++++++++++++++--------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 35237bc..33f0d81 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -72,31 +72,6 @@ proc acquireSignal(): ?!ThreadSignalPtr = else: success signal.get() -template dispatchTask[T](self: ThreadDatastore, - signal: ThreadSignalPtr, - blk: untyped - ): auto = - var - ctx {.inject.} = TaskCtx[T](signal: signal) - try: - case self.backend.kind: - of Sqlite: - var ds {.inject.} = self.backend.sql - proc runTask() = - `blk` - runTask() - - await wait(ctx.signal) - except CancelledError as exc: - trace "Cancelling thread future!", exc = exc.msg - while not ctx.setCancelled(): - warn "waiting to cancel thread future!", fn = astToStr(fn) - await sleepAsync(10.milliseconds) - raise exc - finally: - discard ctx.signal.close() - self.semaphore.release() - template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) = try: withLock(ctxLock): @@ -122,6 +97,31 @@ template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) = finally: discard ctx[].signal.fireSync() +template dispatchTask[T](self: ThreadDatastore, + signal: ThreadSignalPtr, + blk: untyped + ): auto = + var + ctx {.inject.} = TaskCtx[T](signal: signal) + try: + case self.backend.kind: + of Sqlite: + var ds {.inject.} = self.backend.sql + proc runTask() = + `blk` + runTask() + + await wait(ctx.signal) + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + while not ctx.setCancelled(): + warn "waiting to cancel thread future!", fn = astToStr(fn) + await sleepAsync(10.milliseconds) + raise exc + finally: + discard ctx.signal.close() + self.semaphore.release() + proc hasTask[T, DB](ctx: ptr TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): @@ -214,15 +214,18 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} = of Sqlite: self.backend.sql.close() +proc queryTask[DB](ctx: ptr TaskCtx, ds: DB; + key: DbQuery[KeyId]) {.gcsafe, nimcall.} = + ## run backend command + executeTask(ctx): + let handle = ds.query(q) + query(ds, key) method query*( self: ThreadDatastore, query: Query): Future[?!QueryIter] {.async.} = - without var childIter =? await self.ds.query(query), error: - return failure error var - iter = QueryIter.new() lock = newAsyncLock() # serialize querying under threads proc next(): Future[?!QueryResponse] {.async.} = @@ -242,9 +245,6 @@ method query*( iter.finished = childIter.finished var res = ThreadResult[QueryResponse]() - ctx = TaskCtx[QueryResponse]( - ds: self.ds, - res: addr res) proc runTask() = self.tp.spawn queryTask(addr ctx, addr childIter)