diff --git a/datastore/threadbackend.nim b/datastore/threadbackend.nim index 905be10..f65e8cb 100644 --- a/datastore/threadbackend.nim +++ b/datastore/threadbackend.nim @@ -47,7 +47,7 @@ proc success*[T](ret: TResult[T], value: T) = proc success*[T: void](ret: TResult[T]) = ret[].results.ok() -proc failure*[T](ret: TResult[T], exc: ref CatchableError) = +proc failure*[T](ret: TResult[T], exc: ref Exception) = ret[].results.err(exc.toBuffer()) proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] = @@ -176,39 +176,33 @@ proc delete*( let bkey = StringBuffer.new(key.id()) tds[].tp.spawn deleteTask(ret, tds, bkey) -# proc keyIterator(self: ThreadProxyDatastore, -# queryKey: string -# ): iterator: KeyBuffer {.gcsafe.} = -# return iterator(): KeyBuffer {.closure.} = -# var keys = self.store.keys().toSeq() -# keys.sort(proc (x, y: KeyBuffer): int = cmp(x.toString, y.toString)) -# for key in keys: -# if key.toString().startsWith(queryKey): -# yield key - method queryTask*( ret: TResult[QueryResponseBuffer], tds: ThreadDatastorePtr, - qb: QueryBuffer, + qiter: QueryIter, ) = - let query = qb.toQuery() - without key =? kb.toKey(), err: - ret.failure(err) + try: + without res =? waitFor(qiter.next()), err: + ret.failure(err) - let q = Query.init(key1) + let qrb = res.toBuffer() + ret.success(qrb) - for entry in batch: - if err =? (await self.put(entry.key, entry.data)).errorOption: - return failure err - - iter.next = next - return success iter + except Exception: + echo "failure" proc query*( - ret: TResult[void], + ret: TResult[QueryResponseBuffer], tds: ThreadDatastorePtr, - query: Query, + q: Query, ) = - let bq = query.toBuffer() - tds[].tp.spawn queryTask(ret, tds, bq) + + try: + without iter =? waitFor(tds[].ds.query(q)), err: + ret.failure(err) + + while not iter.finished: + tds[].tp.spawn queryTask(ret, tds, iter) + except Exception as exc: + ret.failure(exc) diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index c1c7584..5896c3f 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -120,6 +120,22 @@ method put*( return success() +method query*( + self: ThreadProxyDatastore, + query: Query +): Future[?!QueryIter] {.async.} = + + without ret =? newThreadResult(QueryResponseBuffer), err: + return failure(err) + + try: + delete(ret, self.tds, key) + await wait(ret[].signal) + finally: + ret[].signal.close() + + return ret.convert(void) + method close*( self: ThreadProxyDatastore ): Future[?!void] {.async.} =