diff --git a/datastore/query.nim b/datastore/query.nim index dc0f238..bc6435a 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -29,9 +29,11 @@ type next*: GetNext dispose*: IterDispose -iterator items*(q: QueryIter): Future[?!QueryResponse] = +proc collectAllQueries*(q: QueryIter) = + var qr: Future[?!QueryResponse] while not q.finished: - yield q.next() + qr = q.next() + yield qr proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} = return success() diff --git a/datastore/threadbackend.nim b/datastore/threadbackend.nim index ebcb25a..087dc48 100644 --- a/datastore/threadbackend.nim +++ b/datastore/threadbackend.nim @@ -184,6 +184,8 @@ proc delete*( let bkey = StringBuffer.new(key.id()) tds[].tp.spawn deleteTask(ret, tds, bkey) +import os + proc queryTask*( ret: TResult[QueryResponseBuffer], tds: ThreadDatastorePtr, @@ -191,11 +193,12 @@ proc queryTask*( ) = try: + os.sleep(100) without res =? waitFor(qiter[].it.next()), err: ret.failure(err) let qrb = res.toBuffer() - print "queryTask: ", " res: ", res + # print "queryTask: ", " res: ", res ret.success(qrb) print "queryTask: ", " qrb:key: ", ret[].results.get().key.toString() diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index abac2e3..4410f10 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -130,6 +130,8 @@ method query*( without ret =? newThreadResult(QueryResponseBuffer), err: return failure(err) + echo "\n\n=== Query Start === " + ## we need to setup the query iter on the main thread ## to keep it's lifetime associated with this async Future without it =? await self.tds[].ds.query(query), err: @@ -142,17 +144,19 @@ method query*( var iterWrapper = QueryIter.new() proc next(): Future[?!QueryResponse] {.async.} = - echo "\n\n=== Query Start === " + print "query:next:start: " + iterWrapper.finished = iter[].it.finished if not iter[].it.finished: - echo "" query(ret, self.tds, iter) await wait(ret[].signal) + echo "" print "query:post: ", ret[].results + print "query:post:finished: ", iter[].it.finished print "query:post: ", " qrb:key: ", ret[].results.get().key.toString() print "query:post: ", " qrb:data: ", ret[].results.get().data.toString() - return ret.convert(QueryResponse) + result = ret.convert(QueryResponse) else: - iterWrapper.finished = true + result = success (Key.none, EmptyBytes) proc dispose(): Future[?!void] {.async.} = iter[].it = nil # ensure our sharedptr doesn't try and dealloc diff --git a/tests/datastore/querycommontests.nim b/tests/datastore/querycommontests.nim index 4f0a15d..86092fc 100644 --- a/tests/datastore/querycommontests.nim +++ b/tests/datastore/querycommontests.nim @@ -9,6 +9,8 @@ import pkg/stew/byteutils import pkg/datastore +import pretty + template queryTests*(ds: Datastore, extended = true) {.dirty.} = var key1: Key @@ -36,9 +38,13 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = let iter = (await ds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt( it.read.tryGet ) - .filterIt( it.key.isSome ) + + var res: seq[QueryResponse] + while not iter.finished: + let val = await iter.next() + let qr = val.tryGet() + if qr.key.isSome: + res.add qr check: res.len == 3