diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 4117d00..055e5d0 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -237,10 +237,8 @@ proc queryTask[DB]( var qh: typeof(ds.query(dq)) executeTask(ctx): qh = ds.query(dq) - if qh.isOk(): - (?!QResult).ok(default(QResult)) - else: - (?!QResult).err(qh.error()) + if qh.isOk(): (?!QResult).ok(default(QResult)) + else: (?!QResult).err(qh.error()) if qh.isErr(): return @@ -254,19 +252,15 @@ proc queryTask[DB]( method query*( self: ThreadDatastore, - query: Query): Future[?!QueryIter] {.async.} = + q: Query): Future[?!QueryIter] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err let dq = dbQuery( - key= KeyId.new query.key.id(), - value=query.value, - limit=query.limit, - offset=query.offset, - sort=query.sort, - ) + key= KeyId.new q.key.id(), + value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal): self.tp.spawn queryTask(ctx, ds, dq) @@ -288,17 +282,19 @@ method query*( await lock.acquire() - dispatchTask[void](self, signal): - discard ctx[].signal.fireSync() + discard ctx[].signal.fireSync() + await ctx[].signal.wait() if ctx[].res.isErr() and ctx[].res.error()[0] == ErrorEnum.QueryEndedErr: iter.finished = true - return + return success (key: Key.none, data: EmptyBytes) elif ctx[].res.isErr(): return err(ctx[].res.error()) else: - # let qres = ctx[].res.get() - return (?!QueryResponse).ok(default(QueryResponse)) + let qres = ctx[].res.get() + let key = qres.key.map(proc (k: KeyId): Key = k.toKey()) + let data = qres.data.toSeq() + return (?!QueryResponse).ok((key: key, data: data)) iter.next = next return success iter