From 6c06a3b095d1935aaf5eec66295862c9c3b4bac5 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 30 Sep 2022 10:24:26 -0400 Subject: [PATCH] Adding batching capabilities (#37) * adding batch entry * add batched put and delete * add batched tests * adding batching capabilities to remaining stores * open db in readonly mode * make naming consistent * release prepared statements * don't use deprecated add * check file exists on test exit * remove unused var --- datastore/datastore.nim | 9 ++++++ datastore/fsds.nim | 17 ++++++++++ datastore/mountedds.nim | 22 ++++++++++++- datastore/sql/sqliteds.nim | 32 +++++++++++++++++++ datastore/sql/sqlitedsdb.nim | 44 ++++++++++++++++++++++---- datastore/sql/sqliteutils.nim | 6 ++-- datastore/tieredds.nim | 26 +++++++++++++++ tests/datastore/dscommontests.nim | 28 ++++++++++++++-- tests/datastore/sql/testsqlitedsdb.nim | 15 ++++----- tests/datastore/testfsds.nim | 3 +- tests/datastore/testmountedds.nim | 6 ++-- 11 files changed, 181 insertions(+), 27 deletions(-) diff --git a/datastore/datastore.nim b/datastore/datastore.nim index 2c027d5..f8679f7 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -10,18 +10,27 @@ export key, query, types push: {.upraises: [].} +type + BatchEntry* = tuple[key: Key, data: seq[byte]] + method contains*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} = raiseAssert("Not implemented!") method delete*(self: Datastore, key: Key): Future[?!void] {.base, locks: "unknown".} = raiseAssert("Not implemented!") +method delete*(self: Datastore, keys: seq[Key]): Future[?!void] {.base, locks: "unknown".} = + raiseAssert("Not implemented!") + method get*(self: Datastore, key: Key): Future[?!seq[byte]] {.base, locks: "unknown".} = raiseAssert("Not implemented!") method put*(self: Datastore, key: Key, data: seq[byte]): Future[?!void] {.base, locks: "unknown".} = raiseAssert("Not implemented!") +method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, locks: "unknown".} = + raiseAssert("Not implemented!") + method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} = raiseAssert("Not implemented!") diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 66e54ef..849bd35 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -75,6 +75,13 @@ method delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} = except OSError as e: return failure e +method delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} = + for key in keys: + if err =? (await self.delete(key)).errorOption: + return failure err + + return success() + proc readFile*(self: FSDatastore, path: string): ?!seq[byte] = var file: File @@ -130,6 +137,16 @@ method put*( return success() +method put*( + self: FSDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async, locks: "unknown".} = + + for entry in batch: + if err =? (await self.put(entry.key, entry.data)).errorOption: + return failure err + + return success() + proc dirWalker(path: string): iterator: string {.gcsafe.} = return iterator(): string = try: diff --git a/datastore/mountedds.nim b/datastore/mountedds.nim index c502731..1a2eb66 100644 --- a/datastore/mountedds.nim +++ b/datastore/mountedds.nim @@ -80,6 +80,16 @@ method delete*( return (await mounted.store.store.delete(mounted.relative)) +method delete*( + self: MountedDatastore, + keys: seq[Key]): Future[?!void] {.async.} = + + for key in keys: + if err =? (await self.delete(key)).errorOption: + return failure err + + return success() + method get*( self: MountedDatastore, key: Key): Future[?!seq[byte]] {.async.} = @@ -99,12 +109,22 @@ method put*( return (await mounted.store.store.put(mounted.relative, data)) +method put*( + self: MountedDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async, locks: "unknown".} = + + for entry in batch: + if err =? (await self.put(entry.key, entry.data)).errorOption: + return failure err + + return success() + func new*( T: type MountedDatastore, stores: Table[Key, Datastore] = initTable[Key, Datastore]()): ?!T = var self = T() for (k, v) in stores.pairs: - self.stores.add(?k.path, MountedStore(store: v, key: k)) + self.stores[?k.path] = MountedStore(store: v, key: k) success self diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index aae12e5..00a739b 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -45,6 +45,22 @@ method contains*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = return self.db.deleteStmt.exec((key.id)) +method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = + if err =? self.db.beginStmt.exec().errorOption: + return failure err.msg + + for key in keys: + if err =? self.db.deleteStmt.exec((key.id)).errorOption: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure err.msg + + return failure err.msg + + if err =? self.db.endStmt.exec().errorOption: + return failure err.msg + + return success() + method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = # see comment in ./filesystem_datastore re: finer control of memory # allocation in `method get`, could apply here as well if bytes were read @@ -66,6 +82,22 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = return self.db.putStmt.exec((key.id, data, timestamp())) +method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = + if err =? self.db.beginStmt.exec().errorOption: + return failure err + + for entry in batch: + if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: + if err =? self.db.rollbackStmt.exec().errorOption: + return failure err + + return failure err + + if err =? self.db.endStmt.exec().errorOption: + return failure err + + return success() + method close*(self: SQLiteDatastore): Future[?!void] {.async.} = self.db.close() diff --git a/datastore/sql/sqlitedsdb.nim b/datastore/sql/sqlitedsdb.nim index e7e6861..bc00b2f 100644 --- a/datastore/sql/sqlitedsdb.nim +++ b/datastore/sql/sqlitedsdb.nim @@ -10,23 +10,20 @@ export sqliteutils type BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].} - BoundDataCol* = proc (): seq[byte] {.closure, gcsafe, upraises: [].} - BoundTimestampCol* = proc (): int64 {.closure, gcsafe, upraises: [].} # feels odd to use `void` for prepared statements corresponding to SELECT # queries but it fits with the rest of the SQLite wrapper adapted from # status-im/nwaku, at least in its current form in ./sqlite ContainsStmt* = SQLiteStmt[(string), void] - DeleteStmt* = SQLiteStmt[(string), void] - GetStmt* = SQLiteStmt[(string), void] - PutStmt* = SQLiteStmt[(string, seq[byte], int64), void] - QueryStmt* = SQLiteStmt[(string), void] + BeginStmt* = NoParamsStmt + EndStmt* = NoParamsStmt + RollbackStmt* = NoParamsStmt SQLiteDsDb* = object readOnly*: bool @@ -37,6 +34,9 @@ type getDataCol*: BoundDataCol getStmt*: GetStmt putStmt*: PutStmt + beginStmt*: BeginStmt + endStmt*: EndStmt + rollbackStmt*: RollbackStmt const DbExt* = ".sqlite3" @@ -114,6 +114,18 @@ const ORDER BY """ & IdColName & """ DESC """ + BeginTransactionStr* = """ + BEGIN; + """ + + EndTransactionStr* = """ + END; + """ + + RollbackTransactionStr* = """ + ROLLBACK; + """ + QueryStmtIdCol* = 0 QueryStmtDataCol* = 1 @@ -197,6 +209,9 @@ proc getDBFilePath*(path: string): ?!string = proc close*(self: SQLiteDsDb) = self.containsStmt.dispose self.getStmt.dispose + self.beginStmt.dispose + self.endStmt.dispose + self.rollbackStmt.dispose if not RawStmtPtr(self.deleteStmt).isNil: self.deleteStmt.dispose @@ -246,6 +261,9 @@ proc open*( deleteStmt: DeleteStmt getStmt: GetStmt putStmt: PutStmt + beginStmt: BeginStmt + endStmt: EndStmt + rollbackStmt: RollbackStmt if not readOnly: checkExec(env.val, CreateStmtStr) @@ -256,6 +274,15 @@ proc open*( putStmt = ? PutStmt.prepare( env.val, PutStmtStr, SQLITE_PREPARE_PERSISTENT) + beginStmt = ? BeginStmt.prepare( + env.val, BeginTransactionStr, SQLITE_PREPARE_PERSISTENT) + + endStmt = ? EndStmt.prepare( + env.val, EndTransactionStr, SQLITE_PREPARE_PERSISTENT) + + rollbackStmt = ? RollbackStmt.prepare( + env.val, RollbackTransactionStr, SQLITE_PREPARE_PERSISTENT) + containsStmt = ? ContainsStmt.prepare( env.val, ContainsStmtStr, SQLITE_PREPARE_PERSISTENT) @@ -277,4 +304,7 @@ proc open*( env: env.release, getStmt: getStmt, getDataCol: getDataCol, - putStmt: putStmt) + putStmt: putStmt, + beginStmt: beginStmt, + endStmt: endStmt, + rollbackStmt: rollbackStmt) diff --git a/datastore/sql/sqliteutils.nim b/datastore/sql/sqliteutils.nim index 93cc81f..267f548 100644 --- a/datastore/sql/sqliteutils.nim +++ b/datastore/sql/sqliteutils.nim @@ -132,9 +132,7 @@ proc disposeIfUnreleased*[T](x: var AutoDisposed[T]) = mixin dispose if x.val != nil: dispose(x.release) -proc exec*[P]( - s: SQLiteStmt[P, void], - params: P): ?!void = +proc exec*[P](s: SQLiteStmt[P, void], params: P = ()): ?!void = let s = RawStmtPtr(s) @@ -148,7 +146,7 @@ proc exec*[P]( else: success() - # release implict transaction + # release implicit transaction discard sqlite3_reset(s) # same return information as step discard sqlite3_clear_bindings(s) # no errors possible diff --git a/datastore/tieredds.nim b/datastore/tieredds.nim index 9bdb69f..6655a2d 100644 --- a/datastore/tieredds.nim +++ b/datastore/tieredds.nim @@ -53,6 +53,19 @@ method delete*( return success() +method delete*( + self: TieredDatastore, + keys: seq[Key]): Future[?!void] {.async, locks: "unknown".} = + + for key in keys: + let + pending = await allFinished(self.stores.mapIt(it.delete(key))) + + for fut in pending: + if fut.read().isErr: return fut.read() + + return success() + method get*( self: TieredDatastore, key: Key): Future[?!seq[byte]] {.async, locks: "unknown".} = @@ -90,6 +103,19 @@ method put*( return success() +method put*( + self: TieredDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async, locks: "unknown".} = + + for entry in batch: + let + pending = await allFinished(self.stores.mapIt(it.put(entry.key, entry.data))) + + for fut in pending: + if fut.read().isErr: return fut.read() + + return success() + # method query*( # self: TieredDatastore, # query: ...): Future[?!(?...)] {.async, locks: "unknown".} = diff --git a/tests/datastore/dscommontests.nim b/tests/datastore/dscommontests.nim index 9ee7d2e..e8418d0 100644 --- a/tests/datastore/dscommontests.nim +++ b/tests/datastore/dscommontests.nim @@ -6,11 +6,11 @@ import pkg/stew/results import pkg/datastore -template basicStoreTests*( +proc basicStoreTests*( ds: Datastore, key: Key, bytes: seq[byte], - otherBytes: seq[byte]) {.dirty.} = + otherBytes: seq[byte]) = test "put": (await ds.put(key, bytes)).tryGet() @@ -32,3 +32,27 @@ template basicStoreTests*( test "contains": check: not (await ds.contains(key)).tryGet() + + test "put batch": + var + batch: seq[BatchEntry] + + for k in 0..<100: + batch.add((Key.init(key.id, $k).tryGet, @[k.byte])) + + (await ds.put(batch)).tryGet + + for k in batch: + check: (await ds.contains(k.key)).tryGet + + test "delete batch": + var + batch: seq[Key] + + for k in 0..<100: + batch.add(Key.init(key.id, $k).tryGet) + + (await ds.delete(batch)).tryGet + + for k in batch: + check: not (await ds.contains(k)).tryGet diff --git a/tests/datastore/sql/testsqlitedsdb.nim b/tests/datastore/sql/testsqlitedsdb.nim index 0ef737f..da6e864 100644 --- a/tests/datastore/sql/testsqlitedsdb.nim +++ b/tests/datastore/sql/testsqlitedsdb.nim @@ -35,8 +35,8 @@ suite "Test Open SQLite Datastore DB": defer: dsDb.close() - check: - fileExists(dbPathAbs) + check: + fileExists(dbPathAbs) test "Should open existing DB": let @@ -47,8 +47,8 @@ suite "Test Open SQLite Datastore DB": defer: dsDb.close() - check: - fileExists(dbPathAbs) + check: + fileExists(dbPathAbs) test "Should open existing DB in read only mode": check: @@ -57,15 +57,12 @@ suite "Test Open SQLite Datastore DB": let dsDb = SQLiteDsDb.open( path = dbPathAbs, - flags = SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE).tryGet() + flags = SQLITE_OPEN_READONLY).tryGet() defer: dsDb.close() - check: - fileExists(dbPathAbs) - - test "Should fail open existing DB in read only mode": + test "Should fail open non existent DB in read only mode": removeDir(basePathAbs) check: not fileExists(dbPathAbs) diff --git a/tests/datastore/testfsds.nim b/tests/datastore/testfsds.nim index c544bfc..cd81291 100644 --- a/tests/datastore/testfsds.nim +++ b/tests/datastore/testfsds.nim @@ -30,7 +30,7 @@ suite "Test Basic FSDatastore": require(not dirExists(basePathAbs)) createDir(basePathAbs) - fsStore = FSDatastore.new(root = basePathAbs).tryGet() + fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() teardownAll: removeDir(basePathAbs) @@ -119,7 +119,6 @@ suite "Test Query": (path, _, _) = instantiationInfo(-1, fullPaths = true) # get this file's name basePath = "tests_data" basePathAbs = path.parentDir / basePath - bytes = "some bytes".toBytes var ds: FSDatastore diff --git a/tests/datastore/testmountedds.nim b/tests/datastore/testmountedds.nim index 421f786..d58b910 100644 --- a/tests/datastore/testmountedds.nim +++ b/tests/datastore/testmountedds.nim @@ -48,10 +48,12 @@ suite "Test Basic Mounted Datastore": require(not dirExists(rootAbs)) suite "Mounted sql": - basicStoreTests(mountedDs, Key.init(sqlKey, key).tryGet, bytes, otherBytes) + let namespace = Key.init(sqlKey, key).tryGet + basicStoreTests(mountedDs, namespace, bytes, otherBytes) suite "Mounted fs": - basicStoreTests(mountedDs, Key.init(fsKey, key).tryGet, bytes, otherBytes) + let namespace = Key.init(fsKey, key).tryGet + basicStoreTests(mountedDs, namespace, bytes, otherBytes) suite "Test Mounted Datastore":