From 63b59007e5f0c4af2cde20167d253e5a715716a8 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 6 Sep 2023 15:49:58 -0700 Subject: [PATCH] add check for whether future is finished before iterating on next --- datastore/query.nim | 7 +++++++ datastore/threadproxyds.nim | 5 ++++- datastore/threads/threadbackend.nim | 4 +++- tests/datastore/querycommontests.nim | 8 +++----- tests/datastore/testthreadproxyds.nim | 28 ++++++++++++++++++++++----- 5 files changed, 40 insertions(+), 12 deletions(-) diff --git a/datastore/query.nim b/datastore/query.nim index a4f694d..7f4555f 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -27,9 +27,16 @@ type IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.} QueryIter* = ref object finished*: bool + readyForNext*: bool next*: GetNext dispose*: IterDispose +iterator items*(q: QueryIter): Future[?!QueryResponse] = + while not q.finished: + if not q.readyForNext: + raise newException(FutureDefect, "query iterator not ready for next Future") + yield q.next() + proc waitForAllQueryResults*(qi: ?!QueryIter): Future[?!seq[QueryResponse]] {.async.} = ## for large blocks this would be *expensive* var res: seq[QueryResponse] diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 8fa2bee..0f488b9 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -115,7 +115,7 @@ method put*( return success() -# import pretty +import pretty method query*( self: ThreadProxyDatastore, @@ -136,13 +136,16 @@ method query*( iter[].it = it var iterWrapper = QueryIter.new() + iterWrapper.readyForNext = true proc next(): Future[?!QueryResponse] {.async.} = # print "query:next:start: " iterWrapper.finished = iter[].it.finished if not iter[].it.finished: + iterWrapper.readyForNext = false query(ret, self.tds, iter) await wait(ret[].signal) + iterWrapper.readyForNext = true # echo "" # print "query:post: ", ret[].results # print "query:post:finished: ", iter[].it.finished diff --git a/datastore/threads/threadbackend.nim b/datastore/threads/threadbackend.nim index f570131..355513a 100644 --- a/datastore/threads/threadbackend.nim +++ b/datastore/threads/threadbackend.nim @@ -13,6 +13,8 @@ import ./datastore import ./databuffer import ./threadresults +# import pretty + export key, query, smartptrs, databuffer export threadresults @@ -143,7 +145,7 @@ proc delete*( let bkey = StringBuffer.new(key.id()) tds[].tp.spawn deleteTask(ret, tds, bkey) -import os +# import os proc queryTask*( ret: TResult[QueryResponseBuffer], diff --git a/tests/datastore/querycommontests.nim b/tests/datastore/querycommontests.nim index fe11142..b08f366 100644 --- a/tests/datastore/querycommontests.nim +++ b/tests/datastore/querycommontests.nim @@ -35,12 +35,10 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} = (await ds.put(key2, val2)).tryGet (await ds.put(key3, val3)).tryGet - let - iter = (await ds.query(q)).tryGet - + let iter = (await ds.query(q)).tryGet var res: seq[QueryResponse] - while not iter.finished: - let val = await iter.next() + for item in iter: + let val = await item let qr = val.tryGet() if qr.key.isSome: res.add qr diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 30b91fc..b61239d 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -14,7 +14,7 @@ import pkg/datastore/threadproxyds import ./dscommontests import ./querycommontests -import pretty +# import pretty suite "Test Basic ThreadProxyDatastore": @@ -31,18 +31,19 @@ suite "Test Basic ThreadProxyDatastore": data = "value for 1".toBytes() test "check put": - echo "\n\n=== put ===" + # echo "\n\n=== put ===" let res1 = await sds.put(key1, data) - print "res1: ", res1 + check res1.isOk + # print "res1: ", res1 test "check get": - echo "\n\n=== get ===" + # echo "\n\n=== get ===" let res2 = await sds.get(key1) check res2.get() == data var val = "" for c in res2.get(): val &= char(c) - print "get res2: ", $val + # print "get res2: ", $val # echo "\n\n=== put cancel ===" # # let res1 = await sds.put(key1, "value for 1".toBytes()) @@ -79,3 +80,20 @@ suite "Test Query": queryTests(sds, false) + test "query iter fails": + + expect FutureDefect: + let q = Query.init(key1) + + (await sds.put(key1, val1)).tryGet + (await sds.put(key2, val2)).tryGet + (await sds.put(key3, val3)).tryGet + + let + iter = (await sds.query(q)).tryGet + res = (await allFinished(toSeq(iter))) + .mapIt( it.read.tryGet ) + .filterIt( it.key.isSome ) + + check res.len() > 0 +