implementing query type

This commit is contained in:
Jaremy Creechley 2023-08-29 15:42:53 -07:00 committed by Dmitriy Ryajov
parent 3f8c471aed
commit 120f770d1e
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
2 changed files with 36 additions and 26 deletions

View File

@ -47,7 +47,7 @@ proc success*[T](ret: TResult[T], value: T) =
proc success*[T: void](ret: TResult[T]) =
ret[].results.ok()
proc failure*[T](ret: TResult[T], exc: ref CatchableError) =
proc failure*[T](ret: TResult[T], exc: ref Exception) =
ret[].results.err(exc.toBuffer())
proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] =
@ -176,39 +176,33 @@ proc delete*(
let bkey = StringBuffer.new(key.id())
tds[].tp.spawn deleteTask(ret, tds, bkey)
# proc keyIterator(self: ThreadProxyDatastore,
# queryKey: string
# ): iterator: KeyBuffer {.gcsafe.} =
# return iterator(): KeyBuffer {.closure.} =
# var keys = self.store.keys().toSeq()
# keys.sort(proc (x, y: KeyBuffer): int = cmp(x.toString, y.toString))
# for key in keys:
# if key.toString().startsWith(queryKey):
# yield key
method queryTask*(
ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr,
qb: QueryBuffer,
qiter: QueryIter,
) =
let query = qb.toQuery()
without key =? kb.toKey(), err:
ret.failure(err)
try:
without res =? waitFor(qiter.next()), err:
ret.failure(err)
let q = Query.init(key1)
let qrb = res.toBuffer()
ret.success(qrb)
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
iter.next = next
return success iter
except Exception:
echo "failure"
proc query*(
ret: TResult[void],
ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr,
query: Query,
q: Query,
) =
let bq = query.toBuffer()
tds[].tp.spawn queryTask(ret, tds, bq)
try:
without iter =? waitFor(tds[].ds.query(q)), err:
ret.failure(err)
while not iter.finished:
tds[].tp.spawn queryTask(ret, tds, iter)
except Exception as exc:
ret.failure(exc)

View File

@ -120,6 +120,22 @@ method put*(
return success()
method query*(
self: ThreadProxyDatastore,
query: Query
): Future[?!QueryIter] {.async.} =
without ret =? newThreadResult(QueryResponseBuffer), err:
return failure(err)
try:
delete(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.close()
return ret.convert(void)
method close*(
self: ThreadProxyDatastore
): Future[?!void] {.async.} =