From b2de461c41f55ba91233c38bf6ac3ec586090be9 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 13:01:53 -0700 Subject: [PATCH] setup query --- datastore/threads/threadproxyds.nim | 32 +++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 33f0d81..a752d63 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -162,7 +162,7 @@ method delete*(self: ThreadDatastore, return success() -proc putTask[DB](ctx: ptr TaskCtx, ds: DB; +proc putTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; key: KeyId, data: DataBuffer) {.gcsafe, nimcall.} = ## run backend command @@ -191,7 +191,7 @@ method put*( return success() -proc getTask[DB](ctx: ptr TaskCtx, ds: DB; +proc getTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; key: KeyId) {.gcsafe, nimcall.} = ## run backend command executeTask(ctx): @@ -214,17 +214,35 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} = of Sqlite: self.backend.sql.close() -proc queryTask[DB](ctx: ptr TaskCtx, ds: DB; - key: DbQuery[KeyId]) {.gcsafe, nimcall.} = +proc queryTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; + query: DbQuery[KeyId]) {.gcsafe, nimcall.} = ## run backend command + var handle = ds.query(query) + executeTask(ctx): + handle + executeTask(ctx): - let handle = ds.query(q) query(ds, key) method query*( self: ThreadDatastore, query: Query): Future[?!QueryIter] {.async.} = + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err + + let dq = dbQuery( + key=query.key, + value=query.value, + limit=query.limit, + offset=query.offset, + sort=query.sort, + ) + + dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal): + self.tp.spawn deleteTask(addr ctx, ds, dq) + var lock = newAsyncLock() # serialize querying under threads @@ -246,10 +264,6 @@ method query*( var res = ThreadResult[QueryResponse]() - proc runTask() = - self.tp.spawn queryTask(addr ctx, addr childIter) - - return self.dispatchTask(ctx, Key.none, runTask) iter.next = next return success iter