From 14c39478e9624151745526d2009ec4373275b04e Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 29 Aug 2023 15:55:42 -0700 Subject: [PATCH] implementing query type --- datastore/threadbackend.nim | 29 +++++++++-------------------- datastore/threadproxyds.nim | 19 ++++++++++++++++--- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/datastore/threadbackend.nim b/datastore/threadbackend.nim index f65e8cb..8e10d08 100644 --- a/datastore/threadbackend.nim +++ b/datastore/threadbackend.nim @@ -17,8 +17,8 @@ export key, query, smartptrs, databuffer push: {.upraises: [].} type - - ThreadResult*[T: DataBuffer | void | bool | ThreadDatastorePtr] = object + ThreadSafeTypes* = DataBuffer | void | bool | ThreadDatastorePtr | QueryResponseBuffer + ThreadResult*[T: ThreadSafeTypes] = object signal*: ThreadSignalPtr results*: Result[T, CatchableErrorBuffer] @@ -30,6 +30,10 @@ type ThreadDatastorePtr* = SharedPtr[ThreadDatastore] + QueryIterStore* = object + it*: QueryIter + QueryIterPtr* = SharedPtr[QueryIterStore] + proc newThreadResult*[T]( tp: typedesc[T] ): Result[TResult[T], ref CatchableError] = @@ -176,14 +180,14 @@ proc delete*( let bkey = StringBuffer.new(key.id()) tds[].tp.spawn deleteTask(ret, tds, bkey) -method queryTask*( +proc queryTask*( ret: TResult[QueryResponseBuffer], tds: ThreadDatastorePtr, - qiter: QueryIter, + qiter: QueryIterPtr, ) = try: - without res =? waitFor(qiter.next()), err: + without res =? waitFor(qiter[].it.next()), err: ret.failure(err) let qrb = res.toBuffer() @@ -191,18 +195,3 @@ method queryTask*( except Exception: echo "failure" - -proc query*( - ret: TResult[QueryResponseBuffer], - tds: ThreadDatastorePtr, - q: Query, -) = - - try: - without iter =? waitFor(tds[].ds.query(q)), err: - ret.failure(err) - - while not iter.finished: - tds[].tp.spawn queryTask(ret, tds, iter) - except Exception as exc: - ret.failure(exc) diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 5896c3f..115bd5e 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -129,12 +129,24 @@ method query*( return failure(err) try: - delete(ret, self.tds, key) - await wait(ret[].signal) + ## we need to setup the query iter on the main thread + ## to keep it's lifetime associated with this async Future + without it =? await self.tds[].ds.query(query), err: + ret.failure(err) + + var iter = newSharedPtr(QueryIterStore()) + ## note that bypasses SharedPtr isolation - may need `protect` here? + iter[].it = it + + while not iter[].it.finished: + self.tds[].tp.spawn queryTask(ret, self.tds, iter) + await wait(ret[].signal) + + iter[].it = nil # ensure our sharedptr doesn't try and dealloc finally: ret[].signal.close() - return ret.convert(void) + # return ret.convert(void) method close*( self: ThreadProxyDatastore @@ -150,6 +162,7 @@ method close*( ## this can block... how to handle? maybe just leak? self.tds[].tp.shutdown() + self[].tds[].ds = nil # ensure our sharedptr doesn't try and dealloc proc newThreadProxyDatastore*( ds: Datastore,