query iterator using items is breaks when the DS isn't blocking

This commit is contained in:
Jaremy Creechley 2023-08-29 20:15:54 -07:00 committed by Dmitriy Ryajov
parent b1a5b9cff8
commit 52286b8b39
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
4 changed files with 25 additions and 10 deletions

View File

@ -29,9 +29,11 @@ type
next*: GetNext next*: GetNext
dispose*: IterDispose dispose*: IterDispose
iterator items*(q: QueryIter): Future[?!QueryResponse] = proc collectAllQueries*(q: QueryIter) =
var qr: Future[?!QueryResponse]
while not q.finished: while not q.finished:
yield q.next() qr = q.next()
yield qr
proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} = proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} =
return success() return success()

View File

@ -184,6 +184,8 @@ proc delete*(
let bkey = StringBuffer.new(key.id()) let bkey = StringBuffer.new(key.id())
tds[].tp.spawn deleteTask(ret, tds, bkey) tds[].tp.spawn deleteTask(ret, tds, bkey)
import os
proc queryTask*( proc queryTask*(
ret: TResult[QueryResponseBuffer], ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr, tds: ThreadDatastorePtr,
@ -191,11 +193,12 @@ proc queryTask*(
) = ) =
try: try:
os.sleep(100)
without res =? waitFor(qiter[].it.next()), err: without res =? waitFor(qiter[].it.next()), err:
ret.failure(err) ret.failure(err)
let qrb = res.toBuffer() let qrb = res.toBuffer()
print "queryTask: ", " res: ", res # print "queryTask: ", " res: ", res
ret.success(qrb) ret.success(qrb)
print "queryTask: ", " qrb:key: ", ret[].results.get().key.toString() print "queryTask: ", " qrb:key: ", ret[].results.get().key.toString()

View File

@ -130,6 +130,8 @@ method query*(
without ret =? newThreadResult(QueryResponseBuffer), err: without ret =? newThreadResult(QueryResponseBuffer), err:
return failure(err) return failure(err)
echo "\n\n=== Query Start === "
## we need to setup the query iter on the main thread ## we need to setup the query iter on the main thread
## to keep it's lifetime associated with this async Future ## to keep it's lifetime associated with this async Future
without it =? await self.tds[].ds.query(query), err: without it =? await self.tds[].ds.query(query), err:
@ -142,17 +144,19 @@ method query*(
var iterWrapper = QueryIter.new() var iterWrapper = QueryIter.new()
proc next(): Future[?!QueryResponse] {.async.} = proc next(): Future[?!QueryResponse] {.async.} =
echo "\n\n=== Query Start === " print "query:next:start: "
iterWrapper.finished = iter[].it.finished
if not iter[].it.finished: if not iter[].it.finished:
echo ""
query(ret, self.tds, iter) query(ret, self.tds, iter)
await wait(ret[].signal) await wait(ret[].signal)
echo ""
print "query:post: ", ret[].results print "query:post: ", ret[].results
print "query:post:finished: ", iter[].it.finished
print "query:post: ", " qrb:key: ", ret[].results.get().key.toString() print "query:post: ", " qrb:key: ", ret[].results.get().key.toString()
print "query:post: ", " qrb:data: ", ret[].results.get().data.toString() print "query:post: ", " qrb:data: ", ret[].results.get().data.toString()
return ret.convert(QueryResponse) result = ret.convert(QueryResponse)
else: else:
iterWrapper.finished = true result = success (Key.none, EmptyBytes)
proc dispose(): Future[?!void] {.async.} = proc dispose(): Future[?!void] {.async.} =
iter[].it = nil # ensure our sharedptr doesn't try and dealloc iter[].it = nil # ensure our sharedptr doesn't try and dealloc

View File

@ -9,6 +9,8 @@ import pkg/stew/byteutils
import pkg/datastore import pkg/datastore
import pretty
template queryTests*(ds: Datastore, extended = true) {.dirty.} = template queryTests*(ds: Datastore, extended = true) {.dirty.} =
var var
key1: Key key1: Key
@ -36,9 +38,13 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
let let
iter = (await ds.query(q)).tryGet iter = (await ds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet ) var res: seq[QueryResponse]
.filterIt( it.key.isSome ) while not iter.finished:
let val = await iter.next()
let qr = val.tryGet()
if qr.key.isSome:
res.add qr
check: check:
res.len == 3 res.len == 3