From 92af582d9783991fef8e871ee77f3b6cc23ed257 Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Fri, 6 Feb 2026 16:24:11 -0300 Subject: [PATCH] fix: close pending iterators before closing LevelDB store (#83) This adds tracking of open iterators to leveldbds so that when one attempts to close it, iterators are disposed of first. It also adds automatic disposal if iterators are completely consumed. fixes #82 --- datastore/leveldb/leveldbds.nim | 25 +++++++-- datastore/query.nim | 1 + tests/datastore/leveldb/testleveldbds.nim | 67 +++++++++++++++++++++-- 3 files changed, 84 insertions(+), 9 deletions(-) diff --git a/datastore/leveldb/leveldbds.nim b/datastore/leveldb/leveldbds.nim index 3bf31fe..d4f4d46 100644 --- a/datastore/leveldb/leveldbds.nim +++ b/datastore/leveldb/leveldbds.nim @@ -5,6 +5,7 @@ import std/tables import std/os import std/strformat import std/strutils +import std/sets import pkg/leveldbstatic import pkg/chronos @@ -19,6 +20,10 @@ type LevelDbDatastore* = ref object of Datastore db: LevelDb locks: TableRef[Key, AsyncLock] + openIterators: HashSet[QueryIter] + +proc hash(iter: QueryIter): Hash = + hash(addr iter) method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} = try: @@ -70,6 +75,10 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} = try: + for iter in self.openIterators: + if err =? (await iter.dispose()).errorOption: + return failure(err.msg) + self.openIterators.clear() self.db.close() return success() except LevelDbException as e: @@ -98,6 +107,13 @@ method query*( limit = query.limit ) + proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} = + dbIter.dispose() + iter.disposed = true + self.openIterators.excl(iter) + + return success() + proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} = if iter.finished: return failure(newException(QueryEndedError, "Calling next on a finished query!")) @@ -107,6 +123,9 @@ method query*( if dbIter.finished: iter.finished = true + if err =? (await dispose()).errorOption: + return failure(err) + return success (Key.none, EmptyBytes) else: let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.") @@ -114,12 +133,10 @@ method query*( except LevelDbException as e: return failure("LevelDbDatastore.query -> next exception: " & $e.msg) - proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} = - dbIter.dispose() - return success() - iter.next = next iter.dispose = dispose + self.openIterators.incl(iter) + return success iter method modifyGet*( diff --git a/datastore/query.nim b/datastore/query.nim index ab1ef5f..eb76482 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -27,6 +27,7 @@ type IterDispose* = proc(): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.} QueryIter* = ref object finished*: bool + disposed*: bool next*: GetNext dispose*: IterDispose diff --git a/tests/datastore/leveldb/testleveldbds.nim b/tests/datastore/leveldb/testleveldbds.nim index 49c175d..9b39943 100644 --- a/tests/datastore/leveldb/testleveldbds.nim +++ b/tests/datastore/leveldb/testleveldbds.nim @@ -67,7 +67,7 @@ suite "Test LevelDB Typed Query": test "Typed Queries": typedDsQueryTests(ds) -suite "LevelDB Query: keys should disregard trailing wildcards": +suite "LevelDB Query": let tempDir = getTempDir() / "testleveldbds" var ds: LevelDbDatastore @@ -97,7 +97,28 @@ suite "LevelDB Query: keys should disregard trailing wildcards": (await ds.close()).tryGet removeDir(tempDir) - test "Forward": + test "should query by prefix": + let + q = Query.init(Key.init("/a").tryGet) + iter = (await ds.query(q)).tryGet + res = (await allFinished(toSeq(iter))) + .mapIt( it.read.tryGet ) + .filterIt( it.key.isSome ) + + check: + res.len == 3 + res[0].key.get == key1 + res[0].data == val1 + + res[1].key.get == key2 + res[1].data == val2 + + res[2].key.get == key3 + res[2].data == val3 + + (await iter.dispose()).tryGet + + test "should disregard forward trailing wildcards in keys": let q = Query.init(Key.init("/a/*").tryGet) iter = (await ds.query(q)).tryGet @@ -116,9 +137,7 @@ suite "LevelDB Query: keys should disregard trailing wildcards": res[2].key.get == key3 res[2].data == val3 - (await iter.dispose()).tryGet - - test "Backwards": + test "should disregard backward trailing wildcards in key": let q = Query.init(Key.init("/a\\*").tryGet) iter = (await ds.query(q)).tryGet @@ -137,4 +156,42 @@ suite "LevelDB Query: keys should disregard trailing wildcards": res[2].key.get == key3 res[2].data == val3 + test "should dispose automatically of iterators when finished": + let + q = Query.init(Key.init("/a/b/c").tryGet) + iter = (await ds.query(q)).tryGet + + let val = (await iter.next()).tryGet() + check val.key.get == key3 + check val.data == val3 + + check iter.finished == false + check iter.disposed == false + + let val2 = (await iter.next()).tryGet() + check val2.key == Key.none + check val2.data == EmptyBytes + + check iter.finished == true + check iter.disposed == true + + test "should dispose automatically of iterators when datastore is closed": + let + q1 = Query.init(Key.init("/a/b/c").tryGet) + q2 = Query.init(Key.init("/a/b").tryGet) + i1 = (await ds.query(q1)).tryGet + i2 = (await ds.query(q2)).tryGet + + check i1.disposed == false + check i2.disposed == false + + (await ds.close()).tryGet + + check i1.disposed == true + check i2.disposed == true + + test "should have idempotent QueryIterator.dispose": + let q = Query.init(Key.init("/a/b/c").tryGet) + let iter = (await ds.query(q)).tryGet + (await iter.dispose()).tryGet (await iter.dispose()).tryGet