mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-06 23:53:09 +00:00
query iterator using items is breaks when the DS isn't blocking
This commit is contained in:
parent
057493c809
commit
d02a416814
@ -29,11 +29,9 @@ type
|
|||||||
next*: GetNext
|
next*: GetNext
|
||||||
dispose*: IterDispose
|
dispose*: IterDispose
|
||||||
|
|
||||||
proc waitForAllQueryResults*(qi: Future[?!QueryIter]): Future[?!seq[QueryResponse]] {.async.} =
|
proc waitForAllQueryResults*(iter: QueryIter): Future[?!seq[QueryResponse]] {.async.} =
|
||||||
## for large blocks this would be *expensive*
|
## for large blocks this would be *expensive*
|
||||||
var res: seq[QueryResponse]
|
var res: seq[QueryResponse]
|
||||||
without iter =? (await qi), err:
|
|
||||||
return failure err
|
|
||||||
|
|
||||||
while not iter.finished:
|
while not iter.finished:
|
||||||
let val = await iter.next()
|
let val = await iter.next()
|
||||||
@ -47,6 +45,12 @@ proc waitForAllQueryResults*(qi: Future[?!QueryIter]): Future[?!seq[QueryRespons
|
|||||||
await iter.dispose()
|
await iter.dispose()
|
||||||
return success res
|
return success res
|
||||||
|
|
||||||
|
proc waitForAllQueryResults*(qi: Future[?!QueryIter]): Future[?!seq[QueryResponse]] {.async.} =
|
||||||
|
without iter =? (await qi), err:
|
||||||
|
return failure err
|
||||||
|
await waitForAllQueryResults(iter)
|
||||||
|
|
||||||
|
|
||||||
proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} =
|
proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} =
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|||||||
@ -91,10 +91,8 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
|||||||
(await ds.put(key3, val3)).tryGet
|
(await ds.put(key3, val3)).tryGet
|
||||||
|
|
||||||
let
|
let
|
||||||
iter = (await ds.query(q)).tryGet
|
iter = tryGet(await ds.query(q))
|
||||||
res = (await allFinished(toSeq(iter)))
|
res = tryGet(await iter.waitForAllQueryResults())
|
||||||
.mapIt( it.read.tryGet )
|
|
||||||
.filterIt( it.key.isSome )
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
res.len == 2
|
res.len == 2
|
||||||
@ -119,9 +117,7 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
|||||||
iter = (await ds.query(q)).tryGet
|
iter = (await ds.query(q)).tryGet
|
||||||
|
|
||||||
var
|
var
|
||||||
res = (await allFinished(toSeq(iter)))
|
res = tryGet(await ds.query(q).waitForAllQueryResults())
|
||||||
.mapIt( it.read.tryGet )
|
|
||||||
.filterIt( it.key.isSome )
|
|
||||||
|
|
||||||
res.sort do (a, b: QueryResponse) -> int:
|
res.sort do (a, b: QueryResponse) -> int:
|
||||||
cmp(a.key.get.id, b.key.get.id)
|
cmp(a.key.get.id, b.key.get.id)
|
||||||
@ -153,16 +149,11 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
|||||||
(await ds.put(key, val)).tryGet
|
(await ds.put(key, val)).tryGet
|
||||||
|
|
||||||
let
|
let
|
||||||
iter = (await ds.query(q)).tryGet
|
res = tryGet(await ds.query(q).waitForAllQueryResults())
|
||||||
res = (await allFinished(toSeq(iter)))
|
|
||||||
.mapIt( it.read.tryGet )
|
|
||||||
.filterIt( it.key.isSome )
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
res.len == 10
|
res.len == 10
|
||||||
|
|
||||||
(await iter.dispose()).tryGet
|
|
||||||
|
|
||||||
test "Should not apply offset":
|
test "Should not apply offset":
|
||||||
let
|
let
|
||||||
key = Key.init("/a").tryGet
|
key = Key.init("/a").tryGet
|
||||||
@ -176,16 +167,11 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
|||||||
(await ds.put(key, val)).tryGet
|
(await ds.put(key, val)).tryGet
|
||||||
|
|
||||||
let
|
let
|
||||||
iter = (await ds.query(q)).tryGet
|
res = tryGet(await ds.query(q).waitForAllQueryResults())
|
||||||
res = (await allFinished(toSeq(iter)))
|
|
||||||
.mapIt( it.read.tryGet )
|
|
||||||
.filterIt( it.key.isSome )
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
res.len == 10
|
res.len == 10
|
||||||
|
|
||||||
(await iter.dispose()).tryGet
|
|
||||||
|
|
||||||
test "Should not apply offset and limit":
|
test "Should not apply offset and limit":
|
||||||
let
|
let
|
||||||
key = Key.init("/a").tryGet
|
key = Key.init("/a").tryGet
|
||||||
@ -199,10 +185,8 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
|||||||
(await ds.put(key, val)).tryGet
|
(await ds.put(key, val)).tryGet
|
||||||
|
|
||||||
let
|
let
|
||||||
iter = (await ds.query(q)).tryGet
|
iter = tryGet(await ds.query(q))
|
||||||
res = (await allFinished(toSeq(iter)))
|
res = tryGet(await iter.waitForAllQueryResults())
|
||||||
.mapIt( it.read.tryGet )
|
|
||||||
.filterIt( it.key.isSome )
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
res.len == 5
|
res.len == 5
|
||||||
@ -238,10 +222,8 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
|||||||
|
|
||||||
kvs = kvs.reversed
|
kvs = kvs.reversed
|
||||||
let
|
let
|
||||||
iter = (await ds.query(q)).tryGet
|
iter = tryGet(await ds.query(q))
|
||||||
res = (await allFinished(toSeq(iter)))
|
res = tryGet(await iter.waitForAllQueryResults())
|
||||||
.mapIt( it.read.tryGet )
|
|
||||||
.filterIt( it.key.isSome )
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
res.len == 100
|
res.len == 100
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user