From b1a5b9cff8aff27dc8e34acb9c0416496bddaf24 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 29 Aug 2023 19:43:28 -0700 Subject: [PATCH] implementing query type --- datastore/threadbackend.nim | 2 ++ datastore/threadproxyds.nim | 30 +++++++++++++++++++----------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/datastore/threadbackend.nim b/datastore/threadbackend.nim index d36acf6..ebcb25a 100644 --- a/datastore/threadbackend.nim +++ b/datastore/threadbackend.nim @@ -62,6 +62,8 @@ proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableEr result.ok(ret[].results.get().toString()) elif S is void: result.ok() + elif S is QueryResponse: + result.ok(ret[].results.get().toQueryResponse()) else: result.ok(ret[].results.get()) else: diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index da48275..abac2e3 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -130,30 +130,38 @@ method query*( without ret =? newThreadResult(QueryResponseBuffer), err: return failure(err) - try: - ## we need to setup the query iter on the main thread - ## to keep it's lifetime associated with this async Future - without it =? await self.tds[].ds.query(query), err: - ret.failure(err) + ## we need to setup the query iter on the main thread + ## to keep it's lifetime associated with this async Future + without it =? await self.tds[].ds.query(query), err: + ret.failure(err) - var iter = newSharedPtr(QueryIterStore) - ## note that bypasses SharedPtr isolation - may need `protect` here? - iter[].it = it + var iter = newSharedPtr(QueryIterStore) + ## note that bypasses SharedPtr isolation - may need `protect` here? + iter[].it = it + var iterWrapper = QueryIter.new() + + proc next(): Future[?!QueryResponse] {.async.} = echo "\n\n=== Query Start === " - while not iter[].it.finished: + if not iter[].it.finished: echo "" query(ret, self.tds, iter) await wait(ret[].signal) print "query:post: ", ret[].results print "query:post: ", " qrb:key: ", ret[].results.get().key.toString() print "query:post: ", " qrb:data: ", ret[].results.get().data.toString() + return ret.convert(QueryResponse) + else: + iterWrapper.finished = true + proc dispose(): Future[?!void] {.async.} = iter[].it = nil # ensure our sharedptr doesn't try and dealloc - finally: ret[].signal.close() + return success() - # return ret.convert(void) + iterWrapper.next = next + iterWrapper.dispose = dispose + return success iterWrapper method close*( self: ThreadProxyDatastore