implementing query type

This commit is contained in:
Jaremy Creechley 2023-08-29 15:42:53 -07:00
parent da98e66389
commit e4ace8a759
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 36 additions and 26 deletions

View File

@ -47,7 +47,7 @@ proc success*[T](ret: TResult[T], value: T) =
proc success*[T: void](ret: TResult[T]) = proc success*[T: void](ret: TResult[T]) =
ret[].results.ok() ret[].results.ok()
proc failure*[T](ret: TResult[T], exc: ref CatchableError) = proc failure*[T](ret: TResult[T], exc: ref Exception) =
ret[].results.err(exc.toBuffer()) ret[].results.err(exc.toBuffer())
proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] = proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] =
@ -176,39 +176,33 @@ proc delete*(
let bkey = StringBuffer.new(key.id()) let bkey = StringBuffer.new(key.id())
tds[].tp.spawn deleteTask(ret, tds, bkey) tds[].tp.spawn deleteTask(ret, tds, bkey)
# proc keyIterator(self: ThreadProxyDatastore,
# queryKey: string
# ): iterator: KeyBuffer {.gcsafe.} =
# return iterator(): KeyBuffer {.closure.} =
# var keys = self.store.keys().toSeq()
# keys.sort(proc (x, y: KeyBuffer): int = cmp(x.toString, y.toString))
# for key in keys:
# if key.toString().startsWith(queryKey):
# yield key
method queryTask*( method queryTask*(
ret: TResult[QueryResponseBuffer], ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr, tds: ThreadDatastorePtr,
qb: QueryBuffer, qiter: QueryIter,
) = ) =
let query = qb.toQuery() try:
without key =? kb.toKey(), err: without res =? waitFor(qiter.next()), err:
ret.failure(err) ret.failure(err)
let q = Query.init(key1) let qrb = res.toBuffer()
ret.success(qrb)
for entry in batch: except Exception:
if err =? (await self.put(entry.key, entry.data)).errorOption: echo "failure"
return failure err
iter.next = next
return success iter
proc query*( proc query*(
ret: TResult[void], ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr, tds: ThreadDatastorePtr,
query: Query, q: Query,
) = ) =
let bq = query.toBuffer()
tds[].tp.spawn queryTask(ret, tds, bq) try:
without iter =? waitFor(tds[].ds.query(q)), err:
ret.failure(err)
while not iter.finished:
tds[].tp.spawn queryTask(ret, tds, iter)
except Exception as exc:
ret.failure(exc)

View File

@ -120,6 +120,22 @@ method put*(
return success() return success()
method query*(
self: ThreadProxyDatastore,
query: Query
): Future[?!QueryIter] {.async.} =
without ret =? newThreadResult(QueryResponseBuffer), err:
return failure(err)
try:
delete(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.close()
return ret.convert(void)
method close*( method close*(
self: ThreadProxyDatastore self: ThreadProxyDatastore
): Future[?!void] {.async.} = ): Future[?!void] {.async.} =