mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-02-20 13:43:13 +00:00
simplifying
This commit is contained in:
parent
2aa8cfa3aa
commit
575d973118
@ -10,6 +10,7 @@ import std/atomics
|
||||
import std/strutils
|
||||
import std/tables
|
||||
import std/sequtils
|
||||
import std/locks
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronos/threadsync
|
||||
@ -36,6 +37,8 @@ logScope:
|
||||
topics = "datastore threadproxyds"
|
||||
|
||||
type
|
||||
SqliteDB = SQLiteBackend[KeyId,DataBuffer]
|
||||
|
||||
ThreadBackendKinds* = enum
|
||||
Sqlite
|
||||
# Filesystem
|
||||
@ -49,8 +52,9 @@ type
|
||||
TaskCtx[D; T: ThreadTypes] = object
|
||||
ds: D
|
||||
res: ThreadResult[T]
|
||||
cancelled: bool
|
||||
signal: ThreadSignalPtr
|
||||
running: bool
|
||||
cancelled: bool
|
||||
|
||||
ThreadDatastore* = ref object of Datastore
|
||||
tp: Taskpool
|
||||
@ -58,21 +62,30 @@ type
|
||||
semaphore: AsyncSemaphore # semaphore is used for backpressure \
|
||||
# to avoid exhausting file descriptors
|
||||
|
||||
var finishLock: Lock
|
||||
|
||||
proc dispatchTask[D, T](
|
||||
self: ThreadDatastore,
|
||||
ctx: var TaskCtx[D, T],
|
||||
key: ?KeyId = KeyId.none,
|
||||
runTask: proc
|
||||
): Future[?!T] {.async.} =
|
||||
finishLock.initLock()
|
||||
|
||||
proc setCancelled(ctx: var TaskCtx): bool =
|
||||
withLock(finishLock):
|
||||
if ctx.running:
|
||||
return false
|
||||
else:
|
||||
ctx.cancelled = true
|
||||
return true
|
||||
|
||||
proc dispatchTask[D, T](self: ThreadDatastore,
|
||||
ctx: var TaskCtx[D, T],
|
||||
key: ?KeyId = KeyId.none,
|
||||
runTask: proc(): void
|
||||
): Future[?!T] {.async.} =
|
||||
try:
|
||||
runTask()
|
||||
let fut = wait(ctx.signal)
|
||||
|
||||
await wait(ctx.signal)
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
ctx.cancelled = true
|
||||
await ctx.signal.fire()
|
||||
while not ctx.setCancelled():
|
||||
await sleepAsync(10.milliseconds)
|
||||
raise exc
|
||||
finally:
|
||||
discard ctx.signal.close()
|
||||
@ -92,27 +105,29 @@ proc hasTask(ctx: ptr TaskCtx, key: KeyId) =
|
||||
|
||||
try:
|
||||
let res = has(ctx.ds, key)
|
||||
ctx.res = res.mapErr() do(e: ref CatchableError) -> ThreadResErr:
|
||||
e.toThreadErr()
|
||||
except CatchableError as exc:
|
||||
trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg
|
||||
raiseAssert exc.msg
|
||||
|
||||
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
var
|
||||
key = KeyId.new key.id()
|
||||
var key = KeyId.new key.id()
|
||||
|
||||
await self.semaphore.acquire()
|
||||
let signal = ? acquireSignal()
|
||||
without signal =? acquireSignal(), err:
|
||||
return bool.failure err
|
||||
|
||||
case self.backend.kind:
|
||||
of Sqlite:
|
||||
var
|
||||
ds = self.backend.sql
|
||||
ctx = TaskCtx[typeof(ds), bool](ds: ds, signal: signal)
|
||||
ctx = TaskCtx[SqliteDB, bool](ds: self.backend.sql, signal: signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn hasTask(addr ctx, key)
|
||||
|
||||
return self.dispatchTask(ctx, key.some, runTask)
|
||||
return await self.dispatchTask(ctx, key.some, runTask)
|
||||
|
||||
# proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} =
|
||||
# if ctx.isNil:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user