diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index e273e06..1c5528b 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -41,7 +41,7 @@ type signal: ThreadSignalPtr running: bool ## used to mark when a task worker is running cancelled: bool ## used to cancel a task before it's started - nextSignal: (Lock, Cond) + nextSignal: MutexSignal TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] ## Task context object. @@ -73,13 +73,6 @@ proc setDone[T](ctx: TaskCtx[T]) = # withLock(ctxLock): ctx[].running = false -proc waitSync*(sig: var (Lock, Cond)) = - withLock(sig[0]): - wait(sig[1], sig[0]) -proc fireSync*(sig: var (Lock, Cond)) = - withLock(sig[0]): - signal(sig[1]) - proc acquireSignal(): ?!ThreadSignalPtr = let signal = ThreadSignalPtr.new() if signal.isErr(): @@ -254,7 +247,6 @@ method queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, query: DbQuery[KeyId], - nextSignal: ThreadSignalPtr ) {.gcsafe, nimcall.} = ## run query command executeTask(ctx): @@ -299,8 +291,8 @@ method query*[BT](self: ThreadDatastore[BT], await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - without nextSignal =? acquireSignal(), err: - return failure err + let ctx = newTaskCtx(QResult, signal=signal) + ctx[].nextSignal.init() try: let query = dbQuery( @@ -308,9 +300,8 @@ method query*[BT](self: ThreadDatastore[BT], value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx = newTaskCtx(QResult, signal=signal) dispatchTaskWrap(self, signal): - self.tp.spawn queryTask(ctx, ds, query, nextSignal) + self.tp.spawn queryTask(ctx, ds, query) await nextSignal.fire() var diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim index da0df6d..9caa70c 100644 --- a/datastore/threads/threadresult.nim +++ b/datastore/threads/threadresult.nim @@ -1,5 +1,6 @@ import std/atomics import std/options +import std/locks import pkg/questionable/results import pkg/results @@ -51,3 +52,24 @@ proc toRes*[T,S](res: ThreadResult[T], result.err res.error().toExc() else: result.ok m(res.get()) + +type + MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool] + +proc open*(sig: var MutexSignal) = + sig.lock.initLock() + sig.cond.initCond() + sig.open = true + +proc waitSync*(sig: var MutexSignal) = + withLock(sig.lock): + wait(sig.cond, sig.lock) + +proc fireSync*(sig: var MutexSignal) = + withLock(sig.lock): + signal(sig.cond) + +proc close*(sig: var MutexSignal) = + if sig.open: + sig.lock.deinitLock() + sig.cond.deinitCond()