mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-02-07 23:33:31 +00:00
fix: close pending iterators before closing LevelDB store (#83)
This adds tracking of open iterators to leveldbds so that when one attempts to close it, iterators are disposed of first. It also adds automatic disposal if iterators are completely consumed. fixes #82
This commit is contained in:
parent
a7ee4b170a
commit
92af582d97
@ -5,6 +5,7 @@ import std/tables
|
||||
import std/os
|
||||
import std/strformat
|
||||
import std/strutils
|
||||
import std/sets
|
||||
|
||||
import pkg/leveldbstatic
|
||||
import pkg/chronos
|
||||
@ -19,6 +20,10 @@ type
|
||||
LevelDbDatastore* = ref object of Datastore
|
||||
db: LevelDb
|
||||
locks: TableRef[Key, AsyncLock]
|
||||
openIterators: HashSet[QueryIter]
|
||||
|
||||
proc hash(iter: QueryIter): Hash =
|
||||
hash(addr iter)
|
||||
|
||||
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
@ -70,6 +75,10 @@ method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.as
|
||||
|
||||
method close*(self: LevelDbDatastore): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
for iter in self.openIterators:
|
||||
if err =? (await iter.dispose()).errorOption:
|
||||
return failure(err.msg)
|
||||
self.openIterators.clear()
|
||||
self.db.close()
|
||||
return success()
|
||||
except LevelDbException as e:
|
||||
@ -98,6 +107,13 @@ method query*(
|
||||
limit = query.limit
|
||||
)
|
||||
|
||||
proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
dbIter.dispose()
|
||||
iter.disposed = true
|
||||
self.openIterators.excl(iter)
|
||||
|
||||
return success()
|
||||
|
||||
proc next(): Future[?!QueryResponse] {.async: (raises: [CancelledError]).} =
|
||||
if iter.finished:
|
||||
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
|
||||
@ -107,6 +123,9 @@ method query*(
|
||||
|
||||
if dbIter.finished:
|
||||
iter.finished = true
|
||||
if err =? (await dispose()).errorOption:
|
||||
return failure(err)
|
||||
|
||||
return success (Key.none, EmptyBytes)
|
||||
else:
|
||||
let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.")
|
||||
@ -114,12 +133,10 @@ method query*(
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.query -> next exception: " & $e.msg)
|
||||
|
||||
proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
dbIter.dispose()
|
||||
return success()
|
||||
|
||||
iter.next = next
|
||||
iter.dispose = dispose
|
||||
self.openIterators.incl(iter)
|
||||
|
||||
return success iter
|
||||
|
||||
method modifyGet*(
|
||||
|
||||
@ -27,6 +27,7 @@ type
|
||||
IterDispose* = proc(): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.}
|
||||
QueryIter* = ref object
|
||||
finished*: bool
|
||||
disposed*: bool
|
||||
next*: GetNext
|
||||
dispose*: IterDispose
|
||||
|
||||
|
||||
@ -67,7 +67,7 @@ suite "Test LevelDB Typed Query":
|
||||
test "Typed Queries":
|
||||
typedDsQueryTests(ds)
|
||||
|
||||
suite "LevelDB Query: keys should disregard trailing wildcards":
|
||||
suite "LevelDB Query":
|
||||
let tempDir = getTempDir() / "testleveldbds"
|
||||
var
|
||||
ds: LevelDbDatastore
|
||||
@ -97,7 +97,28 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
|
||||
(await ds.close()).tryGet
|
||||
removeDir(tempDir)
|
||||
|
||||
test "Forward":
|
||||
test "should query by prefix":
|
||||
let
|
||||
q = Query.init(Key.init("/a").tryGet)
|
||||
iter = (await ds.query(q)).tryGet
|
||||
res = (await allFinished(toSeq(iter)))
|
||||
.mapIt( it.read.tryGet )
|
||||
.filterIt( it.key.isSome )
|
||||
|
||||
check:
|
||||
res.len == 3
|
||||
res[0].key.get == key1
|
||||
res[0].data == val1
|
||||
|
||||
res[1].key.get == key2
|
||||
res[1].data == val2
|
||||
|
||||
res[2].key.get == key3
|
||||
res[2].data == val3
|
||||
|
||||
(await iter.dispose()).tryGet
|
||||
|
||||
test "should disregard forward trailing wildcards in keys":
|
||||
let
|
||||
q = Query.init(Key.init("/a/*").tryGet)
|
||||
iter = (await ds.query(q)).tryGet
|
||||
@ -116,9 +137,7 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
|
||||
res[2].key.get == key3
|
||||
res[2].data == val3
|
||||
|
||||
(await iter.dispose()).tryGet
|
||||
|
||||
test "Backwards":
|
||||
test "should disregard backward trailing wildcards in key":
|
||||
let
|
||||
q = Query.init(Key.init("/a\\*").tryGet)
|
||||
iter = (await ds.query(q)).tryGet
|
||||
@ -137,4 +156,42 @@ suite "LevelDB Query: keys should disregard trailing wildcards":
|
||||
res[2].key.get == key3
|
||||
res[2].data == val3
|
||||
|
||||
test "should dispose automatically of iterators when finished":
|
||||
let
|
||||
q = Query.init(Key.init("/a/b/c").tryGet)
|
||||
iter = (await ds.query(q)).tryGet
|
||||
|
||||
let val = (await iter.next()).tryGet()
|
||||
check val.key.get == key3
|
||||
check val.data == val3
|
||||
|
||||
check iter.finished == false
|
||||
check iter.disposed == false
|
||||
|
||||
let val2 = (await iter.next()).tryGet()
|
||||
check val2.key == Key.none
|
||||
check val2.data == EmptyBytes
|
||||
|
||||
check iter.finished == true
|
||||
check iter.disposed == true
|
||||
|
||||
test "should dispose automatically of iterators when datastore is closed":
|
||||
let
|
||||
q1 = Query.init(Key.init("/a/b/c").tryGet)
|
||||
q2 = Query.init(Key.init("/a/b").tryGet)
|
||||
i1 = (await ds.query(q1)).tryGet
|
||||
i2 = (await ds.query(q2)).tryGet
|
||||
|
||||
check i1.disposed == false
|
||||
check i2.disposed == false
|
||||
|
||||
(await ds.close()).tryGet
|
||||
|
||||
check i1.disposed == true
|
||||
check i2.disposed == true
|
||||
|
||||
test "should have idempotent QueryIterator.dispose":
|
||||
let q = Query.init(Key.init("/a/b/c").tryGet)
|
||||
let iter = (await ds.query(q)).tryGet
|
||||
(await iter.dispose()).tryGet
|
||||
(await iter.dispose()).tryGet
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user