mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-06 23:53:09 +00:00
simplifying
This commit is contained in:
parent
37dbd1c234
commit
2aa8cfa3aa
@ -50,7 +50,6 @@ type
|
|||||||
ds: D
|
ds: D
|
||||||
res: ThreadResult[T]
|
res: ThreadResult[T]
|
||||||
cancelled: bool
|
cancelled: bool
|
||||||
semaphore: AsyncSemaphore
|
|
||||||
signal: ThreadSignalPtr
|
signal: ThreadSignalPtr
|
||||||
|
|
||||||
ThreadDatastore* = ref object of Datastore
|
ThreadDatastore* = ref object of Datastore
|
||||||
@ -58,59 +57,18 @@ type
|
|||||||
backend: ThreadBackend
|
backend: ThreadBackend
|
||||||
semaphore: AsyncSemaphore # semaphore is used for backpressure \
|
semaphore: AsyncSemaphore # semaphore is used for backpressure \
|
||||||
# to avoid exhausting file descriptors
|
# to avoid exhausting file descriptors
|
||||||
withLocks: bool
|
|
||||||
tasks: Table[KeyId, Future[void]]
|
|
||||||
queryLock: AsyncLock # global query lock, this is only really \
|
|
||||||
# needed for the fsds, but it is expensive!
|
|
||||||
|
|
||||||
template withLocks(
|
|
||||||
self: ThreadDatastore,
|
proc dispatchTask[D, T](
|
||||||
ctx: TaskCtx,
|
self: ThreadDatastore,
|
||||||
key: ?KeyId = KeyId.none,
|
ctx: var TaskCtx[D, T],
|
||||||
body: untyped): untyped =
|
key: ?KeyId = KeyId.none,
|
||||||
|
runTask: proc
|
||||||
|
): Future[?!T] {.async.} =
|
||||||
try:
|
try:
|
||||||
if key.isSome and key.get in self.tasks:
|
runTask()
|
||||||
if self.withLocks:
|
let fut = wait(ctx.signal)
|
||||||
await self.tasks[key.get]
|
|
||||||
|
|
||||||
if self.withLocks:
|
|
||||||
await self.queryLock.acquire() # only lock if it's required (fsds)
|
|
||||||
|
|
||||||
block:
|
|
||||||
body
|
|
||||||
finally:
|
|
||||||
if self.withLocks:
|
|
||||||
if key.isSome and key.get in self.tasks:
|
|
||||||
self.tasks.del(key.get)
|
|
||||||
if self.queryLock.locked:
|
|
||||||
self.queryLock.release()
|
|
||||||
|
|
||||||
# TODO: needs rework, we can't use `result` with async
|
|
||||||
template dispatchTask[D, T](
|
|
||||||
self: ThreadDatastore,
|
|
||||||
ctx: TaskCtx[D, T],
|
|
||||||
key: ?KeyId = KeyId.none,
|
|
||||||
runTask: proc): auto =
|
|
||||||
try:
|
|
||||||
await self.semaphore.acquire()
|
|
||||||
let signal = ThreadSignalPtr.new()
|
|
||||||
if signal.isErr:
|
|
||||||
failure(signal.error)
|
|
||||||
else:
|
|
||||||
ctx.signal = signal.get()
|
|
||||||
let
|
|
||||||
fut = wait(ctx.signal)
|
|
||||||
|
|
||||||
withLocks(self, ctx, key):
|
|
||||||
runTask()
|
|
||||||
await fut
|
|
||||||
if ctx.res.isErr:
|
|
||||||
failure ctx.res.error
|
|
||||||
else:
|
|
||||||
when result.T isnot void:
|
|
||||||
success result.T(ctx.res.get)
|
|
||||||
else:
|
|
||||||
success()
|
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
trace "Cancelling thread future!", exc = exc.msg
|
trace "Cancelling thread future!", exc = exc.msg
|
||||||
ctx.cancelled = true
|
ctx.cancelled = true
|
||||||
@ -120,29 +78,12 @@ template dispatchTask[D, T](
|
|||||||
discard ctx.signal.close()
|
discard ctx.signal.close()
|
||||||
self.semaphore.release()
|
self.semaphore.release()
|
||||||
|
|
||||||
proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} =
|
proc acquireSignal(): ?!ThreadSignalPtr =
|
||||||
## Monitor the signal and cancel the future if
|
let signal = ThreadSignalPtr.new()
|
||||||
## the cancellation flag is set
|
if signal.isErr():
|
||||||
##
|
failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error())
|
||||||
|
else:
|
||||||
if ctx.isNil:
|
success signal.get()
|
||||||
trace "ctx is nil"
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await ctx[].signal.wait()
|
|
||||||
trace "Received signal"
|
|
||||||
|
|
||||||
if ctx[].cancelled: # there could eventually be other flags
|
|
||||||
trace "Cancelling future"
|
|
||||||
if not fut.finished:
|
|
||||||
await fut.cancelAndWait() # cancel the `has` future
|
|
||||||
|
|
||||||
discard ctx[].signal.fireSync()
|
|
||||||
except CatchableError as exc:
|
|
||||||
trace "Exception in thread signal monitor", exc = exc.msg
|
|
||||||
ctx[].res[].err(exc)
|
|
||||||
discard ctx[].signal.fireSync()
|
|
||||||
|
|
||||||
proc hasTask(ctx: ptr TaskCtx, key: KeyId) =
|
proc hasTask(ctx: ptr TaskCtx, key: KeyId) =
|
||||||
defer:
|
defer:
|
||||||
@ -159,11 +100,14 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
|
|||||||
var
|
var
|
||||||
key = KeyId.new key.id()
|
key = KeyId.new key.id()
|
||||||
|
|
||||||
|
await self.semaphore.acquire()
|
||||||
|
let signal = ? acquireSignal()
|
||||||
|
|
||||||
case self.backend.kind:
|
case self.backend.kind:
|
||||||
of Sqlite:
|
of Sqlite:
|
||||||
var
|
var
|
||||||
ds = self.backend.sql
|
ds = self.backend.sql
|
||||||
ctx = TaskCtx[typeof(ds), bool](ds: ds)
|
ctx = TaskCtx[typeof(ds), bool](ds: ds, signal: signal)
|
||||||
|
|
||||||
proc runTask() =
|
proc runTask() =
|
||||||
self.tp.spawn hasTask(addr ctx, key)
|
self.tp.spawn hasTask(addr ctx, key)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user