From 969d5e48486bd91d9b2f0c6cbd924a3d318bc22a Mon Sep 17 00:00:00 2001 From: Raycho Mukelov Date: Sun, 3 Sep 2023 07:03:09 +0300 Subject: [PATCH] Added MDBX DB Store wrapper in order to start writing the persistance part --- db/abstract_db_transaction.nim | 53 ++++++++ db/kvstore_mdbx.nim | 228 +++++++++++++++++++++++++++++++++ raft.nimble | 2 + 3 files changed, 283 insertions(+) create mode 100644 db/abstract_db_transaction.nim create mode 100644 db/kvstore_mdbx.nim diff --git a/db/abstract_db_transaction.nim b/db/abstract_db_transaction.nim new file mode 100644 index 0000000..5742745 --- /dev/null +++ b/db/abstract_db_transaction.nim @@ -0,0 +1,53 @@ +import stew/results + +type + ADbTResult*[T] = Result[T, string] + + AbstractDbTransaction* = ref object + obj: RootRef + commitProc: AbstractDbTransactionCommitProc + rollbackProc: AbstractDbTransactionRollbackProc + putProc: AbstractDbTransactionPutProc + delProc: AbstractDbTransactionDelProc + + AbstractDbTransactionCommitProc = proc (t: RootRef): ADbTResult[void] {.nimcall, gcsafe, raises: [Defect].} + AbstractDbTransactionRollbackProc = proc (t: RootRef): ADbTResult[void] {.nimcall, gcsafe, raises: [Defect].} + AbstractDbTransactionPutProc = proc (db: RootRef, key, val: openArray[byte]): ADbTResult[void] {.nimcall, gcsafe, raises: [Defect].} + AbstractDbTransactionDelProc = proc (db: RootRef, key: openArray[byte]): ADbTResult[void] {.nimcall, gcsafe, raises: [Defect].} + +proc abstractTransactionCommitImpl[T](x: RootRef): ADbTResult[void] = + mixin commit + commit(T(x)) + +proc abstractTransactionRollbackImpl[T](x: RootRef): ADbTResult[void] = + mixin rollback + rollback(T(x)) + +proc abstractTransactionPutImpl[T](x: RootRef, key, val: openArray[byte]): ADbTResult[void] = + mixin put + put(T(x), key, val) + +proc abstractTransactionDelImpl[T](x: RootRef, key: openArray[byte]): ADbTResult[void] = + mixin del + del(T(x), key) + +proc init*[T: RootRef](_:type AbstractDbTransaction, x: T): AbstractDbTransaction = + mixin commit, rollback, put, del + new result + result.obj = x + result.commitProc = abstractTransactionCommitImpl[T] + result.rollbackProc = abstractTransactionRollbackImpl[T] + result.putProc = abstractTransactionPutImpl[T] + result.delProc = abstractTransactionDelImpl[T] + +proc commit*(t: AbstractDbTransaction): ADbTResult[void] = + t.commitProc(t.obj) + +proc rollback*(t: AbstractDbTransaction): ADbTResult[void] = + t.rollbackProc(t.obj) + +proc put*(t: AbstractDbTransaction, key, val: openArray[byte]): ADbTResult[void] = + t.putProc(t.obj, key, val) + +proc del*(t: AbstractDbTransaction, key: openArray[byte]): ADbTResult[void] = + t.delProc(t.obj, key) diff --git a/db/kvstore_mdbx.nim b/db/kvstore_mdbx.nim new file mode 100644 index 0000000..d0bbeb2 --- /dev/null +++ b/db/kvstore_mdbx.nim @@ -0,0 +1,228 @@ +{.push raises: [].} + +import std/os +import stew/results +import ./abstract_db_transaction + +import nimdbx/[Database, CRUD, Collection, Transaction, Cursor, Error, Index, Collatable, Data] + +export Database, CRUD, Collection, Transaction, Cursor, Error, Index, Collatable, Data + +type + MDBXStoreRef* = ref object of RootObj + database* {.requiresInit.}: Database + chains* {.requiresInit.}: Collection + + MDBXTransaction* = ref object of RootObj + transaction: CollectionTransaction + +const + MaxFileSize = 1024 * 1024 * 1024 * 1024 # 1 TB (MDBX default is 400 MB) + +# ---------------------------------------------------------------------------------------- +# MDBX exception handling helper templates +# ---------------------------------------------------------------------------------------- + +template handleEx(body: untyped) = + ## Handle and convert MDBX exceptions to Result + try: + body + except MDBXError as e: + return err(e.msg) + except OSError as e: + return err(e.msg) + except CatchableError as e: + return err(e.msg) + +template handleExEx(body: untyped) = + ## Handle and convert MDBX exceptions to Result + try: + body + except MDBXError as e: + return err(e.msg) + except OSError as e: + return err(e.msg) + except CatchableError as e: + return err(e.msg) + except Exception as e: + return err(e.msg) + +# ---------------------------------------------------------------------------------------- +# MDBX transactions methods +# ---------------------------------------------------------------------------------------- + +proc commit*(t: MDBXTransaction): ADbTResult[void] = + handleEx(): + t.transaction.commit() + ok() + +proc rollback*(t: MDBXTransaction): ADbTResult[void] = + handleEx(): + t.transaction.abort() + ok() + +proc beginDbTransaction*(db: MDBXStoreRef): ADbTResult[MDBXTransaction] = + if db.chains != nil: + handleEx(): + ok(MDBXTransaction(transaction: db.chains.beginTransaction())) + else: + err("MDBXStoreRef.chains is nil") + +proc put*(t: MDBXTransaction, key, value: openArray[byte]): ADbTResult[void] = + handleExEx(): + t.transaction.put(asData(key), asData(value)) + ok() + +proc del*(t: MDBXTransaction, key: openArray[byte]): ADbTResult[void] = + handleExEx(): + t.transaction.del(asData(key)) + ok() + +# ---------------------------------------------------------------------------------------- +# MDBX transactions convenience templates +# ---------------------------------------------------------------------------------------- + +template checkDbChainsNotNil(db: MDBXStoreRef, body: untyped) = + ## Check if db.chains is not nil and execute the body + ## if it is not nil. Otherwise, raise an assert. + ## + if db.chains != nil: + body + else: + raiseAssert "MDBXStoreRef.chains is nil" + +template withDbSnapshot*(db: MDBXStoreRef, body: untyped) = + ## Create an MDBX snapshot and execute the body providing + ## a snapshot variable cs for the body statements to operate on. + ## Finish the snapshot after the body is executed. + ## + ## + checkDbChainsNotNil(db): + handleEx(): + let cs {.inject.} = db.chains.beginSnapshot() + defer: cs.finish() + body + +template withDbTransaction*(db: MDBXStoreRef, body: untyped) = + ## Create an MDBX transaction and execute the body and inject + ## a transaction variable dbTransaction in the body statements. + ## Handle MDBX errors and abort the transaction on error. + ## + checkDbChainsNotNil(db): + handleEx(): + var dbTransaction {.inject.} = db.chains.beginTransaction() + defer: dbTransaction.commit() + try: + body + except MDBXError as e: + dbTransaction.abort() + return err(e.msg) + except OSError as e: + dbTransaction.abort() + return err(e.msg) + except Exception as e: + dbTransaction.abort() + return err(e.msg) + +# ------------------------------------------------------------------------------------------ +# MDBX KvStore interface implementation +# ------------------------------------------------------------------------------------------ + +proc get*(db: MDBXStoreRef, key: openArray[byte], onData: kvstore.DataProc): Result[bool] = + if key.len <= 0: + return err("mdbx: key cannot be empty on get") + + withDbSnapshot(db): + let mdbxData = asByteSeq(cs.get(asData(key))) + if mdbxData.len > 0: + onData(mdbxData) + return ok(true) + else: + return ok(false) + +proc find*(db: MDBXStoreRef, prefix: openArray[byte], onFind: kvstore.KeyValueProc): Result[int] = + raiseAssert "Unimplemented" + +proc put*(db: MDBXStoreRef, key, value: openArray[byte]): Result[void] = + if key.len <= 0: + return err("mdbx: key cannot be empty on get") + + withDbTransaction(db): + dbTransaction.put(asData(key), asData(value)) + ok() + +proc contains*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] = + if key.len <= 0: + return err("mdbx: key cannot be empty on get") + + withDbSnapshot(db): + let mdbxData = asByteSeq(cs.get(asData(key))) + if mdbxData.len > 0: + return ok(true) + else: + return ok(false) + +proc del*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] = + if key.len <= 0: + return err("mdbx: key cannot be empty on del") + + withDbTransaction(db): + let mdbxData = asByteSeq(dbTransaction.get(asData(key))) + if mdbxData.len > 0: + dbTransaction.del(asData(key)) + return ok(true) + else: + return ok(false) + +proc clear*(db: MDBXStoreRef): Result[bool] = + raiseAssert "Unimplemented" + +proc close*(db: MDBXStoreRef) = + try: + db.database.close() + except MDBXError as e: + raiseAssert e.msg + except OSError as e: + raiseAssert e.msg + +# ------------------------------------------------------------------------------------------ +# .End. MDBX KvStore interface implementation +# ------------------------------------------------------------------------------------------ + +proc bulkPutSortedData*[KT: ByteArray32 | ByteArray33](db: MDBXStoreRef, keys: openArray[KT], vals: openArray[seq[byte]]): KvResult[int64] = + if keys.len <= 0: + return err("mdbx: keys cannot be empty on bulkPutSortedData") + + if keys.len != vals.len: + return err("mdbx: keys and vals must have the same length") + + withDbTransaction(db): + for i in 0 ..< keys.len: + dbTransaction.put(asData(keys[i]), asData(vals[i])) + return ok(0) + +proc init*( + T: type MDBXStoreRef, basePath: string, name: string, + readOnly = false): KvResult[T] = + let + dataDir = basePath / name / "data" + backupsDir = basePath / name / "backups" # Do we need that in case of MDBX? Should discuss this with @zah + + try: + createDir(dataDir) + createDir(backupsDir) + except OSError as e: + return err(e.msg) + except IOError as e: + return err(e.msg) + + var + mdbxFlags = {Exclusive, SafeNoSync} + if readOnly: + mdbxFlags.incl(ReadOnly) + + handleEx(): + let + db = openDatabase(dataDir, flags=mdbxFlags, maxFileSize=MaxFileSize) + chains = createCollection(db, "chains", StringKeys, BlobValues) + ok(T(database: db, chains: chains)) diff --git a/raft.nimble b/raft.nimble index 828f868..c6a2ccf 100644 --- a/raft.nimble +++ b/raft.nimble @@ -22,6 +22,8 @@ requires "unittest2 >= 0.0.4" requires "uuids >= 0.1.11" requires "chronicles >= 0.10.3" requires "chronos >= 3.0.11" +requires "nimdbx >= 0.4.1" +requires "nimterop >= 0.6.13" proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = if not dirExists "build":