diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 1da13d3..130d642 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -44,7 +44,6 @@ type sql*: SQLiteBackend[KeyId,DataBuffer] TaskCtx[D; T: ThreadTypes] = object - key: KeyId res: ThreadResult[T] signal: ThreadSignalPtr running: bool @@ -76,22 +75,24 @@ proc acquireSignal(): ?!ThreadSignalPtr = template dispatchTask(self: ThreadDatastore, signal: ThreadSignalPtr, - fn: untyped): auto = + blk: untyped + ): auto = var - ctx = TaskCtx[SqliteDB, bool](signal: signal) + ctx {.inject.} = TaskCtx[SqliteDB, bool](signal: signal) try: case self.backend.kind: of Sqlite: - var ds = self.backend.sql + var ds {.inject.} = self.backend.sql proc runTask() = - self.tp.spawn `fn`(addr ctx, ds) + # self.tp.spawn `fn`(addr ctx, ds, args) + `blk` runTask() await wait(ctx.signal) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg while not ctx.setCancelled(): - warn "waiting to cancel thread future!", fn = astToStr(fn), key = $ctx.key + warn "waiting to cancel thread future!", fn = astToStr(fn) await sleepAsync(10.milliseconds) raise exc finally: @@ -107,7 +108,6 @@ proc runTask[D, T](ctx: ptr TaskCtx, ds: D, cb: proc(ctx: ptr TaskCtx): T {.gcsa ## run backend command let res = cb(ctx) - # let res = has(ds, ctx.key) withLock(ctxLock): ctx.running = false @@ -119,19 +119,19 @@ proc runTask[D, T](ctx: ptr TaskCtx, ds: D, cb: proc(ctx: ptr TaskCtx): T {.gcsa finally: discard ctx[].signal.fireSync() -proc hasTask[D](ctx: ptr TaskCtx, ds: D) {.gcsafe.} = +proc hasTask[DB](ctx: ptr TaskCtx, ds: DB, key: KeyId) {.gcsafe.} = ## run backend command runTask(ctx, ds) do(ctx: ptr TaskCtx) -> ?!bool: - has(ds, ctx.key) + has(ds, key) method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = - var key = KeyId.new key.id() - await self.semaphore.acquire() without signal =? acquireSignal(), err: return bool.failure err - dispatchTask(self, signal, hasTask) + let key = KeyId.new key.id() + dispatchTask(self, signal): + self.tp.spawn hasTask(addr ctx, ds, key) # proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} = # if ctx.isNil: