diff --git a/datastore/leveldb/leveldbds.nim b/datastore/leveldb/leveldbds.nim index 3bf31fe..d4f4d46 100644 --- a/datastore/leveldb/leveldbds.nim +++ b/datastore/leveldb/leveldbds.nim @@ -5,6 +5,7 @@ import std/tables import std/os import std/strformat import std/strutils +import std/sets import pkg/leveldbstatic import pkg/chronos @@ -19,6 +20,10 @@ type LevelDbDatastore* = ref object of Datastore db: LevelDb locks: TableRef[Key, AsyncLock] + openIterators: HashSet[QueryIter] + +proc hash(iter: QueryIter): Hash = + hash(addr iter) method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} = try: @@ -70,6 +75,10 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} = try: + for iter in self.openIterators: + if err =? (await iter.dispose()).errorOption: + return failure(err.msg) + self.openIterators.clear() self.db.close() return success() except LevelDbException as e: @@ -98,6 +107,13 @@ method query*( limit = query.limit ) + proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} = + dbIter.dispose() + iter.disposed = true + self.openIterators.excl(iter) + + return success() + proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} = if iter.finished: return failure(newException(QueryEndedError, "Calling next on a finished query!")) @@ -107,6 +123,9 @@ method query*( if dbIter.finished: iter.finished = true + if err =? (await dispose()).errorOption: + return failure(err) + return success (Key.none, EmptyBytes) else: let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.") @@ -114,12 +133,10 @@ method query*( except LevelDbException as e: return failure("LevelDbDatastore.query -> next exception: " & $e.msg) - proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} = - dbIter.dispose() - return success() - iter.next = next iter.dispose = dispose + self.openIterators.incl(iter) + return success iter method modifyGet*( diff --git a/datastore/query.nim b/datastore/query.nim index ab1ef5f..eb76482 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -27,6 +27,7 @@ type IterDispose* = proc(): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.} QueryIter* = ref object finished*: bool + disposed*: bool next*: GetNext dispose*: IterDispose diff --git a/tests/datastore/leveldb/testleveldbds.nim b/tests/datastore/leveldb/testleveldbds.nim index 49c175d..9b39943 100644 --- a/tests/datastore/leveldb/testleveldbds.nim +++ b/tests/datastore/leveldb/testleveldbds.nim @@ -67,7 +67,7 @@ suite "Test LevelDB Typed Query": test "Typed Queries": typedDsQueryTests(ds) -suite "LevelDB Query: keys should disregard trailing wildcards": +suite "LevelDB Query": let tempDir = getTempDir() / "testleveldbds" var ds: LevelDbDatastore @@ -97,7 +97,28 @@ suite "LevelDB Query: keys should disregard trailing wildcards": (await ds.close()).tryGet removeDir(tempDir) - test "Forward": + test "should query by prefix": + let + q = Query.init(Key.init("/a").tryGet) + iter = (await ds.query(q)).tryGet + res = (await allFinished(toSeq(iter))) + .mapIt( it.read.tryGet ) + .filterIt( it.key.isSome ) + + check: + res.len == 3 + res[0].key.get == key1 + res[0].data == val1 + + res[1].key.get == key2 + res[1].data == val2 + + res[2].key.get == key3 + res[2].data == val3 + + (await iter.dispose()).tryGet + + test "should disregard forward trailing wildcards in keys": let q = Query.init(Key.init("/a/*").tryGet) iter = (await ds.query(q)).tryGet @@ -116,9 +137,7 @@ suite "LevelDB Query: keys should disregard trailing wildcards": res[2].key.get == key3 res[2].data == val3 - (await iter.dispose()).tryGet - - test "Backwards": + test "should disregard backward trailing wildcards in key": let q = Query.init(Key.init("/a\\*").tryGet) iter = (await ds.query(q)).tryGet @@ -137,4 +156,42 @@ suite "LevelDB Query: keys should disregard trailing wildcards": res[2].key.get == key3 res[2].data == val3 + test "should dispose automatically of iterators when finished": + let + q = Query.init(Key.init("/a/b/c").tryGet) + iter = (await ds.query(q)).tryGet + + let val = (await iter.next()).tryGet() + check val.key.get == key3 + check val.data == val3 + + check iter.finished == false + check iter.disposed == false + + let val2 = (await iter.next()).tryGet() + check val2.key == Key.none + check val2.data == EmptyBytes + + check iter.finished == true + check iter.disposed == true + + test "should dispose automatically of iterators when datastore is closed": + let + q1 = Query.init(Key.init("/a/b/c").tryGet) + q2 = Query.init(Key.init("/a/b").tryGet) + i1 = (await ds.query(q1)).tryGet + i2 = (await ds.query(q2)).tryGet + + check i1.disposed == false + check i2.disposed == false + + (await ds.close()).tryGet + + check i1.disposed == true + check i2.disposed == true + + test "should have idempotent QueryIterator.dispose": + let q = Query.init(Key.init("/a/b/c").tryGet) + let iter = (await ds.query(q)).tryGet + (await iter.dispose()).tryGet (await iter.dispose()).tryGet