mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-13 19:13:13 +00:00
Implements query iterator
This commit is contained in:
parent
57487f81ca
commit
1fbd55e910
@ -11,7 +11,7 @@ requires "nim >= 1.2.0",
|
||||
"chronos#c41599a", # FIXME change to Chronos >= 4.0.0 once it's out
|
||||
"questionable >= 0.10.3 & < 0.11.0",
|
||||
"sqlite3_abi",
|
||||
"leveldbstatic >= 0.1.0",
|
||||
"leveldbstatic >= 0.1.2",
|
||||
"stew",
|
||||
"unittest2",
|
||||
"upraises >= 0.1.0 & < 0.2.0"
|
||||
|
||||
@ -80,24 +80,6 @@ method close*(self: LevelDbDatastore): Future[?!void] {.async, locks: "unknown".
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.close exception: " & $e.msg)
|
||||
|
||||
proc iterateKeyPrefixToQueue(self: LevelDbDatastore, query: Query, queue: AsyncQueue[(string, string)]): Future[void] {.async.} =
|
||||
var
|
||||
itemsLeft = query.limit
|
||||
skip = query.offset
|
||||
|
||||
for keyStr, valueStr in self.db.iterPrefix(prefix = $(query.key)):
|
||||
if skip > 0:
|
||||
dec skip
|
||||
else:
|
||||
await queue.put((keyStr, valueStr))
|
||||
if query.offset > 0:
|
||||
dec itemsLeft
|
||||
if itemsLeft < 1:
|
||||
break
|
||||
|
||||
# Signal to the iterator loop that we're finished.
|
||||
await queue.put(("", ""))
|
||||
|
||||
method query*(
|
||||
self: LevelDbDatastore,
|
||||
query: Query): Future[?!QueryIter] {.async, gcsafe.} =
|
||||
@ -105,32 +87,37 @@ method query*(
|
||||
if not (query.sort == SortOrder.Assending):
|
||||
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
|
||||
|
||||
if not query.value:
|
||||
return failure("LevelDbDatastore.query: query.value is not true. Unsupported.")
|
||||
|
||||
var
|
||||
iter = QueryIter()
|
||||
queue = newAsyncQueue[(string, string)](1)
|
||||
dbIter = self.db.queryIter(
|
||||
prefix = $(query.key),
|
||||
keysOnly = not query.value,
|
||||
skip = query.offset,
|
||||
limit = query.limit
|
||||
)
|
||||
|
||||
proc next(): Future[?!QueryResponse] {.async.} =
|
||||
if iter.finished:
|
||||
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
|
||||
|
||||
let (keyStr, valueStr) = await queue.get()
|
||||
try:
|
||||
let (keyStr, valueStr) = dbIter.next()
|
||||
|
||||
if keyStr == "":
|
||||
iter.finished = true
|
||||
return success (Key.none, EmptyBytes)
|
||||
else:
|
||||
let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.")
|
||||
return success (key.some, valueStr.toByteSeq())
|
||||
if dbIter.finished:
|
||||
iter.finished = true
|
||||
return success (Key.none, EmptyBytes)
|
||||
else:
|
||||
let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.")
|
||||
return success (key.some, valueStr.toByteSeq())
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.query -> next exception: " & $e.msg)
|
||||
except Exception as e:
|
||||
return failure("Unknown exception in LevelDbDatastore.query -> next: " & $e.msg)
|
||||
|
||||
iter.next = next
|
||||
iter.dispose = proc(): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
asyncSpawn self.iterateKeyPrefixToQueue(query, queue)
|
||||
|
||||
return success iter
|
||||
|
||||
method modifyGet*(
|
||||
|
||||
@ -38,16 +38,15 @@ suite "Test LevelDB Query":
|
||||
let tempDir = getTempDir() / "testleveldbds"
|
||||
var ds: LevelDbDatastore
|
||||
|
||||
setupAll:
|
||||
createdir(tempDir)
|
||||
|
||||
teardownAll:
|
||||
removeDir(tempDir)
|
||||
|
||||
setup:
|
||||
createdir(tempDir)
|
||||
ds = LevelDbDatastore.new(tempDir).tryGet()
|
||||
|
||||
teardown:
|
||||
(await ds.close()).tryGet
|
||||
removeDir(tempDir)
|
||||
|
||||
queryTests(ds)
|
||||
queryTests(ds,
|
||||
testLimitsAndOffsets = true,
|
||||
testSortOrder = false
|
||||
)
|
||||
|
||||
@ -9,7 +9,7 @@ import pkg/stew/byteutils
|
||||
|
||||
import pkg/datastore
|
||||
|
||||
template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
||||
template queryTests*(ds: Datastore, testLimitsAndOffsets = true, testSortOrder = true) {.dirty.} =
|
||||
var
|
||||
key1: Key
|
||||
key2: Key
|
||||
@ -137,7 +137,7 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
||||
|
||||
(await iter.dispose()).tryGet
|
||||
|
||||
if extended:
|
||||
if testLimitsAndOffsets:
|
||||
test "Should apply limit":
|
||||
let
|
||||
key = Key.init("/a").tryGet
|
||||
@ -216,6 +216,7 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
||||
|
||||
(await iter.dispose()).tryGet
|
||||
|
||||
if testSortOrder:
|
||||
test "Should apply sort order - descending":
|
||||
let
|
||||
key = Key.init("/a").tryGet
|
||||
|
||||
@ -88,4 +88,7 @@ suite "Test Query":
|
||||
teardown:
|
||||
(await ds.close()).tryGet
|
||||
|
||||
queryTests(ds)
|
||||
queryTests(ds,
|
||||
testLimitsAndOffsets = true,
|
||||
testSortOrder = true
|
||||
)
|
||||
|
||||
@ -137,4 +137,7 @@ suite "Test Query":
|
||||
removeDir(basePathAbs)
|
||||
require(not dirExists(basePathAbs))
|
||||
|
||||
queryTests(ds, false)
|
||||
queryTests(ds,
|
||||
testLimitsAndOffsets = false,
|
||||
testSortOrder = false
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user