Adds LevelDB Datastore (#63)
* set up leveldb-datastore * Adds test for leveldb * pulls in nim-leveldb by url and hash * Fixes basic tests for leveldbds * Implements query iterator * Fixes include path for defaultImpl * Review comments by Tomasz
This commit is contained in:
parent
7b3fdb854c
commit
0ee88a2a9b
|
@ -1,7 +1,8 @@
|
|||
import ./datastore/datastore
|
||||
import ./datastore/fsds
|
||||
import ./datastore/sql
|
||||
import ./datastore/leveldb
|
||||
import ./datastore/mountedds
|
||||
import ./datastore/tieredds
|
||||
|
||||
export datastore, fsds, mountedds, tieredds, sql
|
||||
export datastore, fsds, mountedds, tieredds, sql, leveldb
|
||||
|
|
|
@ -11,6 +11,7 @@ requires "nim >= 1.2.0",
|
|||
"chronos#c41599a", # FIXME change to Chronos >= 4.0.0 once it's out
|
||||
"questionable >= 0.10.15 & < 0.11.0",
|
||||
"sqlite3_abi",
|
||||
"leveldbstatic >= 0.1.2",
|
||||
"stew",
|
||||
"unittest2",
|
||||
"upraises >= 0.1.0 & < 0.2.0"
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
import ./leveldb/leveldbds
|
||||
|
||||
export leveldbds
|
|
@ -0,0 +1,149 @@
|
|||
import std/options
|
||||
import std/tables
|
||||
import std/os
|
||||
import std/strformat
|
||||
|
||||
import pkg/leveldbstatic
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/byteutils
|
||||
from pkg/stew/results as stewResults import isErr
|
||||
import pkg/upraises
|
||||
|
||||
import ../datastore
|
||||
import ../defaultimpl
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
LevelDbDatastore* = ref object of Datastore
|
||||
db: LevelDb
|
||||
locks: TableRef[Key, AsyncLock]
|
||||
|
||||
method has*(self: LevelDbDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
try:
|
||||
let str = self.db.get($key)
|
||||
return success(str.isSome)
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.has exception: " & e.msg)
|
||||
|
||||
method delete*(self: LevelDbDatastore, key: Key): Future[?!void] {.async.} =
|
||||
try:
|
||||
self.db.delete($key, sync = true)
|
||||
return success()
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.delete exception: " & e.msg)
|
||||
|
||||
method delete*(self: LevelDbDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
||||
for key in keys:
|
||||
if err =? (await self.delete(key)).errorOption:
|
||||
return failure(err.msg)
|
||||
return success()
|
||||
|
||||
method get*(self: LevelDbDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
||||
try:
|
||||
let str = self.db.get($key)
|
||||
if not str.isSome:
|
||||
return failure(newException(DatastoreKeyNotFound, "LevelDbDatastore.get: key not found " & $key))
|
||||
let bytes = str.get().toBytes()
|
||||
return success(bytes)
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.get exception: " & $e.msg)
|
||||
|
||||
method put*(self: LevelDbDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
|
||||
try:
|
||||
let str = string.fromBytes(data)
|
||||
self.db.put($key, str)
|
||||
return success()
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.put exception: " & $e.msg)
|
||||
|
||||
method put*(self: LevelDbDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
||||
for entry in batch:
|
||||
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
||||
return failure(err.msg)
|
||||
return success()
|
||||
|
||||
method close*(self: LevelDbDatastore): Future[?!void] {.async.} =
|
||||
try:
|
||||
self.db.close()
|
||||
return success()
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.close exception: " & $e.msg)
|
||||
|
||||
method query*(
|
||||
self: LevelDbDatastore,
|
||||
query: Query): Future[?!QueryIter] {.async, gcsafe.} =
|
||||
|
||||
if not (query.sort == SortOrder.Assending):
|
||||
return failure("LevelDbDatastore.query: query.sort is not SortOrder.Ascending. Unsupported.")
|
||||
|
||||
var
|
||||
iter = QueryIter()
|
||||
dbIter = self.db.queryIter(
|
||||
prefix = $(query.key),
|
||||
keysOnly = not query.value,
|
||||
skip = query.offset,
|
||||
limit = query.limit
|
||||
)
|
||||
|
||||
proc next(): Future[?!QueryResponse] {.async.} =
|
||||
if iter.finished:
|
||||
return failure(newException(QueryEndedError, "Calling next on a finished query!"))
|
||||
|
||||
try:
|
||||
let (keyStr, valueStr) = dbIter.next()
|
||||
|
||||
if dbIter.finished:
|
||||
iter.finished = true
|
||||
return success (Key.none, EmptyBytes)
|
||||
else:
|
||||
let key = Key.init(keyStr).expect("LevelDbDatastore.query (next) Failed to create key.")
|
||||
return success (key.some, valueStr.toBytes())
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.query -> next exception: " & $e.msg)
|
||||
except Exception as e:
|
||||
return failure("Unknown exception in LevelDbDatastore.query -> next: " & $e.msg)
|
||||
|
||||
iter.next = next
|
||||
iter.dispose = proc(): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
return success iter
|
||||
|
||||
method modifyGet*(
|
||||
self: LevelDbDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
||||
var lock: AsyncLock
|
||||
try:
|
||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||
return await defaultModifyGetImpl(self, lock, key, fn)
|
||||
finally:
|
||||
if not lock.locked:
|
||||
self.locks.del(key)
|
||||
|
||||
method modify*(
|
||||
self: LevelDbDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async.} =
|
||||
var lock: AsyncLock
|
||||
try:
|
||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||
return await defaultModifyImpl(self, lock, key, fn)
|
||||
finally:
|
||||
if not lock.locked:
|
||||
self.locks.del(key)
|
||||
|
||||
proc new*(
|
||||
T: type LevelDbDatastore, dbName: string): ?!T =
|
||||
try:
|
||||
let db = leveldbstatic.open(dbName)
|
||||
|
||||
success T(
|
||||
db: db,
|
||||
locks: newTable[Key, AsyncLock]()
|
||||
)
|
||||
except LevelDbException as e:
|
||||
return failure("LevelDbDatastore.new exception: " & $e.msg)
|
|
@ -0,0 +1,63 @@
|
|||
import std/options
|
||||
import std/os
|
||||
import std/sequtils
|
||||
from std/algorithm import sort, reversed
|
||||
|
||||
import pkg/asynctest
|
||||
import pkg/chronos
|
||||
import pkg/stew/results
|
||||
import pkg/stew/byteutils
|
||||
|
||||
import pkg/datastore
|
||||
import pkg/datastore/key
|
||||
import pkg/datastore/leveldb/leveldbds
|
||||
|
||||
import ../dscommontests
|
||||
import ../modifycommontests
|
||||
import ../querycommontests
|
||||
import ../typeddscommontests
|
||||
|
||||
suite "Test Basic LevelDbDatastore":
|
||||
let
|
||||
tempDir = getTempDir() / "testleveldbds"
|
||||
ds = LevelDbDatastore.new(tempDir).tryGet()
|
||||
key = Key.init("a:b/c/d:e").tryGet()
|
||||
bytes = "some bytes".toBytes
|
||||
otherBytes = "some other bytes".toBytes
|
||||
|
||||
setupAll:
|
||||
createdir(tempDir)
|
||||
|
||||
teardownAll:
|
||||
(await ds.close()).tryGet()
|
||||
removeDir(tempDir)
|
||||
|
||||
basicStoreTests(ds, key, bytes, otherBytes)
|
||||
modifyTests(ds, key)
|
||||
typedDsTests(ds, key)
|
||||
|
||||
suite "Test LevelDB Query":
|
||||
let tempDir = getTempDir() / "testleveldbds"
|
||||
var ds: LevelDbDatastore
|
||||
|
||||
setup:
|
||||
createdir(tempDir)
|
||||
ds = LevelDbDatastore.new(tempDir).tryGet()
|
||||
|
||||
teardown:
|
||||
(await ds.close()).tryGet
|
||||
removeDir(tempDir)
|
||||
|
||||
queryTests(ds,
|
||||
testLimitsAndOffsets = true,
|
||||
testSortOrder = false
|
||||
)
|
||||
|
||||
suite "Test LevelDB Typed Query":
|
||||
let
|
||||
ds = SQLiteDatastore.new(Memory).tryGet()
|
||||
|
||||
teardownAll:
|
||||
(await ds.close()).tryGet
|
||||
|
||||
typedDsQueryTests(ds)
|
|
@ -9,7 +9,7 @@ import pkg/stew/byteutils
|
|||
|
||||
import pkg/datastore
|
||||
|
||||
template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
||||
template queryTests*(ds: Datastore, testLimitsAndOffsets = true, testSortOrder = true) {.dirty.} =
|
||||
var
|
||||
key1: Key
|
||||
key2: Key
|
||||
|
@ -137,7 +137,7 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
|||
|
||||
(await iter.dispose()).tryGet
|
||||
|
||||
if extended:
|
||||
if testLimitsAndOffsets:
|
||||
test "Should apply limit":
|
||||
let
|
||||
key = Key.init("/a").tryGet
|
||||
|
@ -216,6 +216,7 @@ template queryTests*(ds: Datastore, extended = true) {.dirty.} =
|
|||
|
||||
(await iter.dispose()).tryGet
|
||||
|
||||
if testSortOrder:
|
||||
test "Should apply sort order - descending":
|
||||
let
|
||||
key = Key.init("/a").tryGet
|
||||
|
|
|
@ -90,7 +90,10 @@ suite "Test Query":
|
|||
teardown:
|
||||
(await ds.close()).tryGet
|
||||
|
||||
queryTests(ds)
|
||||
queryTests(ds,
|
||||
testLimitsAndOffsets = true,
|
||||
testSortOrder = true
|
||||
)
|
||||
|
||||
suite "Test Typed Query":
|
||||
let
|
||||
|
|
|
@ -143,5 +143,8 @@ suite "Test Query":
|
|||
removeDir(basePathAbs)
|
||||
require(not dirExists(basePathAbs))
|
||||
|
||||
queryTests(ds, false)
|
||||
queryTests(ds,
|
||||
testLimitsAndOffsets = false,
|
||||
testSortOrder = false
|
||||
)
|
||||
typedDsQueryTests(ds)
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
import ./leveldb/testleveldbds
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
|
@ -3,6 +3,7 @@ import
|
|||
./datastore/testdatastore,
|
||||
./datastore/testfsds,
|
||||
./datastore/testsql,
|
||||
./datastore/testleveldb,
|
||||
./datastore/testtieredds,
|
||||
./datastore/testmountedds
|
||||
|
||||
|
|
Loading…
Reference in New Issue