mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-08 08:33:10 +00:00
integrate setups
This commit is contained in:
parent
d8d6ad771a
commit
0926eaf245
@ -44,7 +44,6 @@ type
|
||||
sql*: SQLiteBackend[KeyId,DataBuffer]
|
||||
|
||||
TaskCtx[D; T: ThreadTypes] = object
|
||||
key: KeyId
|
||||
res: ThreadResult[T]
|
||||
signal: ThreadSignalPtr
|
||||
running: bool
|
||||
@ -76,22 +75,24 @@ proc acquireSignal(): ?!ThreadSignalPtr =
|
||||
|
||||
template dispatchTask(self: ThreadDatastore,
|
||||
signal: ThreadSignalPtr,
|
||||
fn: untyped): auto =
|
||||
blk: untyped
|
||||
): auto =
|
||||
var
|
||||
ctx = TaskCtx[SqliteDB, bool](signal: signal)
|
||||
ctx {.inject.} = TaskCtx[SqliteDB, bool](signal: signal)
|
||||
try:
|
||||
case self.backend.kind:
|
||||
of Sqlite:
|
||||
var ds = self.backend.sql
|
||||
var ds {.inject.} = self.backend.sql
|
||||
proc runTask() =
|
||||
self.tp.spawn `fn`(addr ctx, ds)
|
||||
# self.tp.spawn `fn`(addr ctx, ds, args)
|
||||
`blk`
|
||||
runTask()
|
||||
|
||||
await wait(ctx.signal)
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
while not ctx.setCancelled():
|
||||
warn "waiting to cancel thread future!", fn = astToStr(fn), key = $ctx.key
|
||||
warn "waiting to cancel thread future!", fn = astToStr(fn)
|
||||
await sleepAsync(10.milliseconds)
|
||||
raise exc
|
||||
finally:
|
||||
@ -107,7 +108,6 @@ proc runTask[D, T](ctx: ptr TaskCtx, ds: D, cb: proc(ctx: ptr TaskCtx): T {.gcsa
|
||||
|
||||
## run backend command
|
||||
let res = cb(ctx)
|
||||
# let res = has(ds, ctx.key)
|
||||
|
||||
withLock(ctxLock):
|
||||
ctx.running = false
|
||||
@ -119,19 +119,19 @@ proc runTask[D, T](ctx: ptr TaskCtx, ds: D, cb: proc(ctx: ptr TaskCtx): T {.gcsa
|
||||
finally:
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
proc hasTask[D](ctx: ptr TaskCtx, ds: D) {.gcsafe.} =
|
||||
proc hasTask[DB](ctx: ptr TaskCtx, ds: DB, key: KeyId) {.gcsafe.} =
|
||||
## run backend command
|
||||
runTask(ctx, ds) do(ctx: ptr TaskCtx) -> ?!bool:
|
||||
has(ds, ctx.key)
|
||||
has(ds, key)
|
||||
|
||||
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
var key = KeyId.new key.id()
|
||||
|
||||
await self.semaphore.acquire()
|
||||
without signal =? acquireSignal(), err:
|
||||
return bool.failure err
|
||||
|
||||
dispatchTask(self, signal, hasTask)
|
||||
let key = KeyId.new key.id()
|
||||
dispatchTask(self, signal):
|
||||
self.tp.spawn hasTask(addr ctx, ds, key)
|
||||
|
||||
# proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} =
|
||||
# if ctx.isNil:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user