mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-05 07:03:12 +00:00
add nextSignal using mutex
This commit is contained in:
parent
c51d354f72
commit
30728c1d0f
@ -41,7 +41,7 @@ type
|
||||
signal: ThreadSignalPtr
|
||||
running: bool ## used to mark when a task worker is running
|
||||
cancelled: bool ## used to cancel a task before it's started
|
||||
nextSignal: (Lock, Cond)
|
||||
nextSignal: MutexSignal
|
||||
|
||||
TaskCtx*[T] = SharedPtr[TaskCtxObj[T]]
|
||||
## Task context object.
|
||||
@ -73,13 +73,6 @@ proc setDone[T](ctx: TaskCtx[T]) =
|
||||
# withLock(ctxLock):
|
||||
ctx[].running = false
|
||||
|
||||
proc waitSync*(sig: var (Lock, Cond)) =
|
||||
withLock(sig[0]):
|
||||
wait(sig[1], sig[0])
|
||||
proc fireSync*(sig: var (Lock, Cond)) =
|
||||
withLock(sig[0]):
|
||||
signal(sig[1])
|
||||
|
||||
proc acquireSignal(): ?!ThreadSignalPtr =
|
||||
let signal = ThreadSignalPtr.new()
|
||||
if signal.isErr():
|
||||
@ -254,7 +247,6 @@ method queryTask[DB](
|
||||
ctx: TaskCtx[QResult],
|
||||
ds: DB,
|
||||
query: DbQuery[KeyId],
|
||||
nextSignal: ThreadSignalPtr
|
||||
) {.gcsafe, nimcall.} =
|
||||
## run query command
|
||||
executeTask(ctx):
|
||||
@ -299,8 +291,8 @@ method query*[BT](self: ThreadDatastore[BT],
|
||||
await self.semaphore.acquire()
|
||||
without signal =? acquireSignal(), err:
|
||||
return failure err
|
||||
without nextSignal =? acquireSignal(), err:
|
||||
return failure err
|
||||
let ctx = newTaskCtx(QResult, signal=signal)
|
||||
ctx[].nextSignal.init()
|
||||
|
||||
try:
|
||||
let query = dbQuery(
|
||||
@ -308,9 +300,8 @@ method query*[BT](self: ThreadDatastore[BT],
|
||||
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
|
||||
|
||||
# setup initial queryTask
|
||||
let ctx = newTaskCtx(QResult, signal=signal)
|
||||
dispatchTaskWrap(self, signal):
|
||||
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
|
||||
self.tp.spawn queryTask(ctx, ds, query)
|
||||
await nextSignal.fire()
|
||||
|
||||
var
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import std/atomics
|
||||
import std/options
|
||||
import std/locks
|
||||
|
||||
import pkg/questionable/results
|
||||
import pkg/results
|
||||
@ -51,3 +52,24 @@ proc toRes*[T,S](res: ThreadResult[T],
|
||||
result.err res.error().toExc()
|
||||
else:
|
||||
result.ok m(res.get())
|
||||
|
||||
type
|
||||
MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool]
|
||||
|
||||
proc open*(sig: var MutexSignal) =
|
||||
sig.lock.initLock()
|
||||
sig.cond.initCond()
|
||||
sig.open = true
|
||||
|
||||
proc waitSync*(sig: var MutexSignal) =
|
||||
withLock(sig.lock):
|
||||
wait(sig.cond, sig.lock)
|
||||
|
||||
proc fireSync*(sig: var MutexSignal) =
|
||||
withLock(sig.lock):
|
||||
signal(sig.cond)
|
||||
|
||||
proc close*(sig: var MutexSignal) =
|
||||
if sig.open:
|
||||
sig.lock.deinitLock()
|
||||
sig.cond.deinitCond()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user