setup query

This commit is contained in:
Jaremy Creechley 2023-09-26 13:01:53 -07:00
parent d34cbd7df5
commit b2de461c41
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -162,7 +162,7 @@ method delete*(self: ThreadDatastore,
return success()
proc putTask[DB](ctx: ptr TaskCtx, ds: DB;
proc putTask[T, DB](ctx: ptr TaskCtx[T], ds: DB;
key: KeyId,
data: DataBuffer) {.gcsafe, nimcall.} =
## run backend command
@ -191,7 +191,7 @@ method put*(
return success()
proc getTask[DB](ctx: ptr TaskCtx, ds: DB;
proc getTask[T, DB](ctx: ptr TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe, nimcall.} =
## run backend command
executeTask(ctx):
@ -214,17 +214,35 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} =
of Sqlite:
self.backend.sql.close()
proc queryTask[DB](ctx: ptr TaskCtx, ds: DB;
key: DbQuery[KeyId]) {.gcsafe, nimcall.} =
proc queryTask[T, DB](ctx: ptr TaskCtx[T], ds: DB;
query: DbQuery[KeyId]) {.gcsafe, nimcall.} =
## run backend command
var handle = ds.query(query)
executeTask(ctx):
handle
executeTask(ctx):
let handle = ds.query(q)
query(ds, key)
method query*(
self: ThreadDatastore,
query: Query): Future[?!QueryIter] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
let dq = dbQuery(
key=query.key,
value=query.value,
limit=query.limit,
offset=query.offset,
sort=query.sort,
)
dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal):
self.tp.spawn deleteTask(addr ctx, ds, dq)
var
lock = newAsyncLock() # serialize querying under threads
@ -246,10 +264,6 @@ method query*(
var
res = ThreadResult[QueryResponse]()
proc runTask() =
self.tp.spawn queryTask(addr ctx, addr childIter)
return self.dispatchTask(ctx, Key.none, runTask)
iter.next = next
return success iter