mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-03 22:23:10 +00:00
Adding batching capabilities (#37)
* adding batch entry * add batched put and delete * add batched tests * adding batching capabilities to remaining stores * open db in readonly mode * make naming consistent * release prepared statements * don't use deprecated add * check file exists on test exit * remove unused var
This commit is contained in:
parent
308b5c08be
commit
6c06a3b095
@ -10,18 +10,27 @@ export key, query, types
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
BatchEntry* = tuple[key: Key, data: seq[byte]]
|
||||
|
||||
method contains*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method delete*(self: Datastore, key: Key): Future[?!void] {.base, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
|
||||
@ -75,6 +75,13 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
|
||||
except OSError as e:
|
||||
return failure e
|
||||
|
||||
method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
||||
for key in keys:
|
||||
if err =? (await self.delete(key)).errorOption:
|
||||
return failure err
|
||||
|
||||
return success()
|
||||
|
||||
proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
|
||||
var
|
||||
file: File
|
||||
@ -130,6 +137,16 @@ method put*(
|
||||
|
||||
return success()
|
||||
|
||||
method put*(
|
||||
self: FSDatastore,
|
||||
batch: seq[BatchEntry]): Future[?!void] {.async, locks: "unknown".} =
|
||||
|
||||
for entry in batch:
|
||||
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
||||
return failure err
|
||||
|
||||
return success()
|
||||
|
||||
proc dirWalker(path: string): iterator: string {.gcsafe.} =
|
||||
return iterator(): string =
|
||||
try:
|
||||
|
||||
@ -80,6 +80,16 @@ method delete*(
|
||||
|
||||
return (await mounted.store.store.delete(mounted.relative))
|
||||
|
||||
method delete*(
|
||||
self: MountedDatastore,
|
||||
keys: seq[Key]): Future[?!void] {.async.} =
|
||||
|
||||
for key in keys:
|
||||
if err =? (await self.delete(key)).errorOption:
|
||||
return failure err
|
||||
|
||||
return success()
|
||||
|
||||
method get*(
|
||||
self: MountedDatastore,
|
||||
key: Key): Future[?!seq[byte]] {.async.} =
|
||||
@ -99,12 +109,22 @@ method put*(
|
||||
|
||||
return (await mounted.store.store.put(mounted.relative, data))
|
||||
|
||||
method put*(
|
||||
self: MountedDatastore,
|
||||
batch: seq[BatchEntry]): Future[?!void] {.async, locks: "unknown".} =
|
||||
|
||||
for entry in batch:
|
||||
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
||||
return failure err
|
||||
|
||||
return success()
|
||||
|
||||
func new*(
|
||||
T: type MountedDatastore,
|
||||
stores: Table[Key, Datastore] = initTable[Key, Datastore]()): ?!T =
|
||||
|
||||
var self = T()
|
||||
for (k, v) in stores.pairs:
|
||||
self.stores.add(?k.path, MountedStore(store: v, key: k))
|
||||
self.stores[?k.path] = MountedStore(store: v, key: k)
|
||||
|
||||
success self
|
||||
|
||||
@ -45,6 +45,22 @@ method contains*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} =
|
||||
return self.db.deleteStmt.exec((key.id))
|
||||
|
||||
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
||||
if err =? self.db.beginStmt.exec().errorOption:
|
||||
return failure err.msg
|
||||
|
||||
for key in keys:
|
||||
if err =? self.db.deleteStmt.exec((key.id)).errorOption:
|
||||
if err =? self.db.rollbackStmt.exec().errorOption:
|
||||
return failure err.msg
|
||||
|
||||
return failure err.msg
|
||||
|
||||
if err =? self.db.endStmt.exec().errorOption:
|
||||
return failure err.msg
|
||||
|
||||
return success()
|
||||
|
||||
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
||||
# see comment in ./filesystem_datastore re: finer control of memory
|
||||
# allocation in `method get`, could apply here as well if bytes were read
|
||||
@ -66,6 +82,22 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
||||
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
|
||||
return self.db.putStmt.exec((key.id, data, timestamp()))
|
||||
|
||||
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
||||
if err =? self.db.beginStmt.exec().errorOption:
|
||||
return failure err
|
||||
|
||||
for entry in batch:
|
||||
if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption:
|
||||
if err =? self.db.rollbackStmt.exec().errorOption:
|
||||
return failure err
|
||||
|
||||
return failure err
|
||||
|
||||
if err =? self.db.endStmt.exec().errorOption:
|
||||
return failure err
|
||||
|
||||
return success()
|
||||
|
||||
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
|
||||
self.db.close()
|
||||
|
||||
|
||||
@ -10,23 +10,20 @@ export sqliteutils
|
||||
|
||||
type
|
||||
BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].}
|
||||
|
||||
BoundDataCol* = proc (): seq[byte] {.closure, gcsafe, upraises: [].}
|
||||
|
||||
BoundTimestampCol* = proc (): int64 {.closure, gcsafe, upraises: [].}
|
||||
|
||||
# feels odd to use `void` for prepared statements corresponding to SELECT
|
||||
# queries but it fits with the rest of the SQLite wrapper adapted from
|
||||
# status-im/nwaku, at least in its current form in ./sqlite
|
||||
ContainsStmt* = SQLiteStmt[(string), void]
|
||||
|
||||
DeleteStmt* = SQLiteStmt[(string), void]
|
||||
|
||||
GetStmt* = SQLiteStmt[(string), void]
|
||||
|
||||
PutStmt* = SQLiteStmt[(string, seq[byte], int64), void]
|
||||
|
||||
QueryStmt* = SQLiteStmt[(string), void]
|
||||
BeginStmt* = NoParamsStmt
|
||||
EndStmt* = NoParamsStmt
|
||||
RollbackStmt* = NoParamsStmt
|
||||
|
||||
SQLiteDsDb* = object
|
||||
readOnly*: bool
|
||||
@ -37,6 +34,9 @@ type
|
||||
getDataCol*: BoundDataCol
|
||||
getStmt*: GetStmt
|
||||
putStmt*: PutStmt
|
||||
beginStmt*: BeginStmt
|
||||
endStmt*: EndStmt
|
||||
rollbackStmt*: RollbackStmt
|
||||
|
||||
const
|
||||
DbExt* = ".sqlite3"
|
||||
@ -114,6 +114,18 @@ const
|
||||
ORDER BY """ & IdColName & """ DESC
|
||||
"""
|
||||
|
||||
BeginTransactionStr* = """
|
||||
BEGIN;
|
||||
"""
|
||||
|
||||
EndTransactionStr* = """
|
||||
END;
|
||||
"""
|
||||
|
||||
RollbackTransactionStr* = """
|
||||
ROLLBACK;
|
||||
"""
|
||||
|
||||
QueryStmtIdCol* = 0
|
||||
QueryStmtDataCol* = 1
|
||||
|
||||
@ -197,6 +209,9 @@ proc getDBFilePath*(path: string): ?!string =
|
||||
proc close*(self: SQLiteDsDb) =
|
||||
self.containsStmt.dispose
|
||||
self.getStmt.dispose
|
||||
self.beginStmt.dispose
|
||||
self.endStmt.dispose
|
||||
self.rollbackStmt.dispose
|
||||
|
||||
if not RawStmtPtr(self.deleteStmt).isNil:
|
||||
self.deleteStmt.dispose
|
||||
@ -246,6 +261,9 @@ proc open*(
|
||||
deleteStmt: DeleteStmt
|
||||
getStmt: GetStmt
|
||||
putStmt: PutStmt
|
||||
beginStmt: BeginStmt
|
||||
endStmt: EndStmt
|
||||
rollbackStmt: RollbackStmt
|
||||
|
||||
if not readOnly:
|
||||
checkExec(env.val, CreateStmtStr)
|
||||
@ -256,6 +274,15 @@ proc open*(
|
||||
putStmt = ? PutStmt.prepare(
|
||||
env.val, PutStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
beginStmt = ? BeginStmt.prepare(
|
||||
env.val, BeginTransactionStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
endStmt = ? EndStmt.prepare(
|
||||
env.val, EndTransactionStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
rollbackStmt = ? RollbackStmt.prepare(
|
||||
env.val, RollbackTransactionStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
containsStmt = ? ContainsStmt.prepare(
|
||||
env.val, ContainsStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
@ -277,4 +304,7 @@ proc open*(
|
||||
env: env.release,
|
||||
getStmt: getStmt,
|
||||
getDataCol: getDataCol,
|
||||
putStmt: putStmt)
|
||||
putStmt: putStmt,
|
||||
beginStmt: beginStmt,
|
||||
endStmt: endStmt,
|
||||
rollbackStmt: rollbackStmt)
|
||||
|
||||
@ -132,9 +132,7 @@ proc disposeIfUnreleased*[T](x: var AutoDisposed[T]) =
|
||||
mixin dispose
|
||||
if x.val != nil: dispose(x.release)
|
||||
|
||||
proc exec*[P](
|
||||
s: SQLiteStmt[P, void],
|
||||
params: P): ?!void =
|
||||
proc exec*[P](s: SQLiteStmt[P, void], params: P = ()): ?!void =
|
||||
|
||||
let
|
||||
s = RawStmtPtr(s)
|
||||
@ -148,7 +146,7 @@ proc exec*[P](
|
||||
else:
|
||||
success()
|
||||
|
||||
# release implict transaction
|
||||
# release implicit transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
|
||||
|
||||
@ -53,6 +53,19 @@ method delete*(
|
||||
|
||||
return success()
|
||||
|
||||
method delete*(
|
||||
self: TieredDatastore,
|
||||
keys: seq[Key]): Future[?!void] {.async, locks: "unknown".} =
|
||||
|
||||
for key in keys:
|
||||
let
|
||||
pending = await allFinished(self.stores.mapIt(it.delete(key)))
|
||||
|
||||
for fut in pending:
|
||||
if fut.read().isErr: return fut.read()
|
||||
|
||||
return success()
|
||||
|
||||
method get*(
|
||||
self: TieredDatastore,
|
||||
key: Key): Future[?!seq[byte]] {.async, locks: "unknown".} =
|
||||
@ -90,6 +103,19 @@ method put*(
|
||||
|
||||
return success()
|
||||
|
||||
method put*(
|
||||
self: TieredDatastore,
|
||||
batch: seq[BatchEntry]): Future[?!void] {.async, locks: "unknown".} =
|
||||
|
||||
for entry in batch:
|
||||
let
|
||||
pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data)))
|
||||
|
||||
for fut in pending:
|
||||
if fut.read().isErr: return fut.read()
|
||||
|
||||
return success()
|
||||
|
||||
# method query*(
|
||||
# self: TieredDatastore,
|
||||
# query: ...): Future[?!(?...)] {.async, locks: "unknown".} =
|
||||
|
||||
@ -6,11 +6,11 @@ import pkg/stew/results
|
||||
|
||||
import pkg/datastore
|
||||
|
||||
template basicStoreTests*(
|
||||
proc basicStoreTests*(
|
||||
ds: Datastore,
|
||||
key: Key,
|
||||
bytes: seq[byte],
|
||||
otherBytes: seq[byte]) {.dirty.} =
|
||||
otherBytes: seq[byte]) =
|
||||
|
||||
test "put":
|
||||
(await ds.put(key, bytes)).tryGet()
|
||||
@ -32,3 +32,27 @@ template basicStoreTests*(
|
||||
test "contains":
|
||||
check:
|
||||
not (await ds.contains(key)).tryGet()
|
||||
|
||||
test "put batch":
|
||||
var
|
||||
batch: seq[BatchEntry]
|
||||
|
||||
for k in 0..<100:
|
||||
batch.add((Key.init(key.id, $k).tryGet, @[k.byte]))
|
||||
|
||||
(await ds.put(batch)).tryGet
|
||||
|
||||
for k in batch:
|
||||
check: (await ds.contains(k.key)).tryGet
|
||||
|
||||
test "delete batch":
|
||||
var
|
||||
batch: seq[Key]
|
||||
|
||||
for k in 0..<100:
|
||||
batch.add(Key.init(key.id, $k).tryGet)
|
||||
|
||||
(await ds.delete(batch)).tryGet
|
||||
|
||||
for k in batch:
|
||||
check: not (await ds.contains(k)).tryGet
|
||||
|
||||
@ -35,8 +35,8 @@ suite "Test Open SQLite Datastore DB":
|
||||
defer:
|
||||
dsDb.close()
|
||||
|
||||
check:
|
||||
fileExists(dbPathAbs)
|
||||
check:
|
||||
fileExists(dbPathAbs)
|
||||
|
||||
test "Should open existing DB":
|
||||
let
|
||||
@ -47,8 +47,8 @@ suite "Test Open SQLite Datastore DB":
|
||||
defer:
|
||||
dsDb.close()
|
||||
|
||||
check:
|
||||
fileExists(dbPathAbs)
|
||||
check:
|
||||
fileExists(dbPathAbs)
|
||||
|
||||
test "Should open existing DB in read only mode":
|
||||
check:
|
||||
@ -57,15 +57,12 @@ suite "Test Open SQLite Datastore DB":
|
||||
let
|
||||
dsDb = SQLiteDsDb.open(
|
||||
path = dbPathAbs,
|
||||
flags = SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE).tryGet()
|
||||
flags = SQLITE_OPEN_READONLY).tryGet()
|
||||
|
||||
defer:
|
||||
dsDb.close()
|
||||
|
||||
check:
|
||||
fileExists(dbPathAbs)
|
||||
|
||||
test "Should fail open existing DB in read only mode":
|
||||
test "Should fail open non existent DB in read only mode":
|
||||
removeDir(basePathAbs)
|
||||
check:
|
||||
not fileExists(dbPathAbs)
|
||||
|
||||
@ -30,7 +30,7 @@ suite "Test Basic FSDatastore":
|
||||
require(not dirExists(basePathAbs))
|
||||
createDir(basePathAbs)
|
||||
|
||||
fsStore = FSDatastore.new(root = basePathAbs).tryGet()
|
||||
fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet()
|
||||
|
||||
teardownAll:
|
||||
removeDir(basePathAbs)
|
||||
@ -119,7 +119,6 @@ suite "Test Query":
|
||||
(path, _, _) = instantiationInfo(-1, fullPaths = true) # get this file's name
|
||||
basePath = "tests_data"
|
||||
basePathAbs = path.parentDir / basePath
|
||||
bytes = "some bytes".toBytes
|
||||
|
||||
var
|
||||
ds: FSDatastore
|
||||
|
||||
@ -48,10 +48,12 @@ suite "Test Basic Mounted Datastore":
|
||||
require(not dirExists(rootAbs))
|
||||
|
||||
suite "Mounted sql":
|
||||
basicStoreTests(mountedDs, Key.init(sqlKey, key).tryGet, bytes, otherBytes)
|
||||
let namespace = Key.init(sqlKey, key).tryGet
|
||||
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
|
||||
|
||||
suite "Mounted fs":
|
||||
basicStoreTests(mountedDs, Key.init(fsKey, key).tryGet, bytes, otherBytes)
|
||||
let namespace = Key.init(fsKey, key).tryGet
|
||||
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
|
||||
|
||||
suite "Test Mounted Datastore":
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user