add check for whether future is finished before iterating on next

This commit is contained in:
Jaremy Creechley 2023-09-06 15:49:58 -07:00
parent f542babae3
commit 63b59007e5
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
5 changed files with 40 additions and 12 deletions

View File

@ -27,9 +27,16 @@ type
IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.}
QueryIter* = ref object
finished*: bool
readyForNext*: bool
next*: GetNext
dispose*: IterDispose
iterator items*(q: QueryIter): Future[?!QueryResponse] =
while not q.finished:
if not q.readyForNext:
raise newException(FutureDefect, "query iterator not ready for next Future")
yield q.next()
proc waitForAllQueryResults*(qi: ?!QueryIter): Future[?!seq[QueryResponse]] {.async.} =
## for large blocks this would be *expensive*
var res: seq[QueryResponse]

View File

@ -115,7 +115,7 @@ method put*(
return success()
# import pretty
import pretty
method query*(
self: ThreadProxyDatastore,
@ -136,13 +136,16 @@ method query*(
iter[].it = it
var iterWrapper = QueryIter.new()
iterWrapper.readyForNext = true
proc next(): Future[?!QueryResponse] {.async.} =
# print "query:next:start: "
iterWrapper.finished = iter[].it.finished
if not iter[].it.finished:
iterWrapper.readyForNext = false
query(ret, self.tds, iter)
await wait(ret[].signal)
iterWrapper.readyForNext = true
# echo ""
# print "query:post: ", ret[].results
# print "query:post:finished: ", iter[].it.finished

View File

@ -13,6 +13,8 @@ import ./datastore
import ./databuffer
import ./threadresults
# import pretty
export key, query, smartptrs, databuffer
export threadresults
@ -143,7 +145,7 @@ proc delete*(
let bkey = StringBuffer.new(key.id())
tds[].tp.spawn deleteTask(ret, tds, bkey)
import os
# import os
proc queryTask*(
ret: TResult[QueryResponseBuffer],

View File

@ -35,12 +35,10 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
(await ds.put(key2, val2)).tryGet
(await ds.put(key3, val3)).tryGet
let
iter = (await ds.query(q)).tryGet
let iter = (await ds.query(q)).tryGet
var res: seq[QueryResponse]
while not iter.finished:
let val = await iter.next()
for item in iter:
let val = await item
let qr = val.tryGet()
if qr.key.isSome:
res.add qr

View File

@ -14,7 +14,7 @@ import pkg/datastore/threadproxyds
import ./dscommontests
import ./querycommontests
import pretty
# import pretty
suite "Test Basic ThreadProxyDatastore":
@ -31,18 +31,19 @@ suite "Test Basic ThreadProxyDatastore":
data = "value for 1".toBytes()
test "check put":
echo "\n\n=== put ==="
# echo "\n\n=== put ==="
let res1 = await sds.put(key1, data)
print "res1: ", res1
check res1.isOk
# print "res1: ", res1
test "check get":
echo "\n\n=== get ==="
# echo "\n\n=== get ==="
let res2 = await sds.get(key1)
check res2.get() == data
var val = ""
for c in res2.get():
val &= char(c)
print "get res2: ", $val
# print "get res2: ", $val
# echo "\n\n=== put cancel ==="
# # let res1 = await sds.put(key1, "value for 1".toBytes())
@ -79,3 +80,20 @@ suite "Test Query":
queryTests(sds, false)
test "query iter fails":
expect FutureDefect:
let q = Query.init(key1)
(await sds.put(key1, val1)).tryGet
(await sds.put(key2, val2)).tryGet
(await sds.put(key3, val3)).tryGet
let
iter = (await sds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt( it.read.tryGet )
.filterIt( it.key.isSome )
check res.len() > 0