mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-03 14:13:09 +00:00
setup query
This commit is contained in:
parent
8595878244
commit
ca143227c6
@ -65,6 +65,15 @@ proc setCancelled(ctx: var TaskCtx): bool =
|
||||
ctx.cancelled = true
|
||||
return true
|
||||
|
||||
proc setRunning[T](ctx: ptr TaskCtx[T]): bool =
|
||||
withLock(ctxLock):
|
||||
if ctx.cancelled:
|
||||
return
|
||||
ctx.running = true
|
||||
proc setDone[T](ctx: ptr TaskCtx[T]) =
|
||||
withLock(ctxLock):
|
||||
ctx.running = false
|
||||
|
||||
proc acquireSignal(): ?!ThreadSignalPtr =
|
||||
let signal = ThreadSignalPtr.new()
|
||||
if signal.isErr():
|
||||
@ -74,10 +83,8 @@ proc acquireSignal(): ?!ThreadSignalPtr =
|
||||
|
||||
template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) =
|
||||
try:
|
||||
withLock(ctxLock):
|
||||
if ctx.cancelled:
|
||||
return
|
||||
ctx.running = true
|
||||
if not ctx.setRunning():
|
||||
return
|
||||
|
||||
## run backend command
|
||||
let res = `blk`
|
||||
@ -88,13 +95,11 @@ template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) =
|
||||
ctx.res.ok(res.get())
|
||||
else:
|
||||
ctx.res.err res.error().toThreadErr()
|
||||
|
||||
withLock(ctxLock):
|
||||
ctx.running = false
|
||||
except CatchableError as exc:
|
||||
trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg
|
||||
raiseAssert exc.msg
|
||||
trace "Unexpected exception thrown in async task", exc = exc.msg
|
||||
ctx[].res.err exc.toThreadErr()
|
||||
finally:
|
||||
ctx.setDone()
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
template dispatchTask[T](self: ThreadDatastore,
|
||||
@ -214,13 +219,27 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} =
|
||||
of Sqlite:
|
||||
self.backend.sql.close()
|
||||
|
||||
proc queryTask[T, DB](ctx: ptr TaskCtx[T], ds: DB;
|
||||
query: DbQuery[KeyId]) {.gcsafe, nimcall.} =
|
||||
## run backend command
|
||||
var handle = ds.query(query)
|
||||
executeTask(ctx):
|
||||
handle
|
||||
|
||||
type
|
||||
QResult = DbQueryResponse[KeyId, DataBuffer]
|
||||
|
||||
proc queryTask[DB](
|
||||
ctx: ptr TaskCtx[QResult],
|
||||
ds: DB,
|
||||
dq: DbQuery[KeyId]
|
||||
) {.gcsafe, nimcall.} =
|
||||
## run query command
|
||||
if not ctx.setRunning():
|
||||
return
|
||||
|
||||
var qh = ds.query(dq)
|
||||
if qh .isOk():
|
||||
(?!QResult).ok(default(QResult))
|
||||
else:
|
||||
(?!QResult).err(qh.error())
|
||||
|
||||
var handle = qh.get()
|
||||
|
||||
for item in
|
||||
executeTask(ctx):
|
||||
query(ds, key)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user