From 1995afb87e14bfb2d3fc2f3474e4d30e10c927a7 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 May 2021 15:55:57 +0200 Subject: [PATCH] kvstore fixes (#350) Storing large blobs in a "WITHOUT ROWID" table turns out to be extremely slow when the tree must be rebalanced. * Split out keystore capability into separate interface, making each keystore a separate instance * Disable "WITHOUT ROWID" optimization by default * Implement prefix lookup that allows iterating over all values with a certain prefix in their key --- eth/db/kvstore.nim | 30 ++++ eth/db/kvstore_rocksdb.nim | 3 + eth/db/kvstore_sqlite3.nim | 289 ++++++++++++++++++------------ tests/db/test_kvstore.nim | 31 +++- tests/db/test_kvstore_rocksdb.nim | 2 +- tests/db/test_kvstore_sqlite3.nim | 4 +- 6 files changed, 240 insertions(+), 119 deletions(-) diff --git a/eth/db/kvstore.nim b/eth/db/kvstore.nim index 7f28f23..b72776c 100644 --- a/eth/db/kvstore.nim +++ b/eth/db/kvstore.nim @@ -26,9 +26,11 @@ type KvResult*[T] = Result[T, string] DataProc* = proc(val: openArray[byte]) {.gcsafe, raises: [Defect].} + KeyValueProc* = proc(key, val: openArray[byte]) {.gcsafe, raises: [Defect].} PutProc = proc (db: RootRef, key, val: openArray[byte]): KvResult[void] {.nimcall, gcsafe, raises: [Defect].} GetProc = proc (db: RootRef, key: openArray[byte], onData: DataProc): KvResult[bool] {.nimcall, gcsafe, raises: [Defect].} + FindProc = proc (db: RootRef, prefix: openArray[byte], onFind: KeyValueProc): KvResult[int] {.nimcall, gcsafe, raises: [Defect].} DelProc = proc (db: RootRef, key: openArray[byte]): KvResult[void] {.nimcall, gcsafe, raises: [Defect].} ContainsProc = proc (db: RootRef, key: openArray[byte]): KvResult[bool] {.nimcall, gcsafe, raises: [Defect].} CloseProc = proc (db: RootRef): KvResult[void] {.nimcall, gcsafe, raises: [Defect].} @@ -38,6 +40,7 @@ type obj: RootRef putProc: PutProc getProc: GetProc + findProc: FindProc delProc: DelProc containsProc: ContainsProc closeProc: CloseProc @@ -55,6 +58,16 @@ template get*(dbParam: KvStoreRef, key: openArray[byte], onData: untyped): KvRes let db = dbParam db.getProc(db.obj, key, onData) +template find*( + dbParam: KvStoreRef, prefix: openArray[byte], onFind: untyped): KvResult[int] = + ## Perform a prefix find, returning all data starting with the given prefix. + ## An empty prefix returns all rows in the store. + ## The data is valid for the duration of the callback. + ## ``onFind``: ``proc(key, value: openArray[byte])`` + ## returns the number of rows found + let db = dbParam + db.findProc(db.obj, prefix, onFind) + template del*(dbParam: KvStoreRef, key: openArray[byte]): KvResult[void] = ## Remove value at ``key`` from store - do nothing if the value is not present let db = dbParam @@ -78,6 +91,10 @@ proc getImpl[T](db: RootRef, key: openArray[byte], onData: DataProc): KvResult[b mixin get get(T(db), key, onData) +proc findImpl[T](db: RootRef, key: openArray[byte], onFind: KeyValueProc): KvResult[int] = + mixin get + find(T(db), key, onFind) + proc delImpl[T](db: RootRef, key: openArray[byte]): KvResult[void] = mixin del del(T(db), key) @@ -97,6 +114,7 @@ func kvStore*[T: RootRef](x: T): KvStoreRef = obj: x, putProc: putImpl[T], getProc: getImpl[T], + findProc: findImpl[T], delProc: delImpl[T], containsProc: containsImpl[T], closeProc: closeImpl[T] @@ -109,6 +127,18 @@ proc get*(db: MemStoreRef, key: openArray[byte], onData: DataProc): KvResult[boo ok(false) +proc find*( + db: MemStoreRef, prefix: openArray[byte], + onFind: KeyValueProc): KvResult[int] = + var total = 0 + # Should use lower/upper bounds instead + for k, v in db.records: + if k.len() >= prefix.len and k.toOpenArray(0, prefix.len() - 1) == prefix: + onFind(k, v) + total += 1 + + ok(total) + proc del*(db: MemStoreRef, key: openArray[byte]): KvResult[void] = db.records.del(@key) ok() diff --git a/eth/db/kvstore_rocksdb.nim b/eth/db/kvstore_rocksdb.nim index e1b0c67..e62cf63 100644 --- a/eth/db/kvstore_rocksdb.nim +++ b/eth/db/kvstore_rocksdb.nim @@ -16,6 +16,9 @@ type proc get*(db: RocksStoreRef, key: openarray[byte], onData: kvstore.DataProc): KvResult[bool] = db.store.get(key, onData) +proc find*(db: RocksStoreRef, prefix: openarray[byte], onFind: kvstore.KeyValueProc): KvResult[int] = + raiseAssert "Unimplemented" + proc put*(db: RocksStoreRef, key, value: openarray[byte]): KvResult[void] = db.store.put(key, value) diff --git a/eth/db/kvstore_sqlite3.nim b/eth/db/kvstore_sqlite3.nim index af99a17..e836c5d 100644 --- a/eth/db/kvstore_sqlite3.nim +++ b/eth/db/kvstore_sqlite3.nim @@ -3,7 +3,7 @@ {.push raises: [Defect].} import - std/[os, strformat], + std/[os, options, strformat], sqlite3_abi, ./kvstore @@ -24,17 +24,23 @@ type NoParams* = tuple # this is the empty tuple ResultHandler*[T] = proc(val: T) {.gcsafe, raises: [Defect].} - KeySpaceStatements = object - getStmt, putStmt, delStmt, containsStmt: RawStmtPtr - - SqStoreRef* = ref object of RootObj + SqStoreRef* = ref object + # Handle for a single database - from here, keyspaces and statements + # can be created env: Sqlite - keyspaces: seq[KeySpaceStatements] managedStmts: seq[RawStmtPtr] SqStoreCheckpointKind* {.pure.} = enum passive, full, restart, truncate + SqKeyspace* = object of RootObj + # A Keyspace is a single key-value table - it is generally efficient to + # create separate keyspaces for each type of data stored + getStmt, putStmt, delStmt, containsStmt, + findStmt0, findStmt1, findStmt2: RawStmtPtr + + SqKeyspaceRef* = ref SqKeyspace + template dispose(db: Sqlite) = discard sqlite3_close(db) @@ -72,7 +78,7 @@ proc prepareStmt*(db: SqStoreRef, ok SqliteStmt[Params, Res](s) proc bindParam(s: RawStmtPtr, n: int, val: auto): cint = - when val is openarray[byte]|seq[byte]: + when val is openArray[byte]|seq[byte]: if val.len > 0: sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil) else: @@ -80,7 +86,7 @@ proc bindParam(s: RawStmtPtr, n: int, val: auto): cint = elif val is array: when val.items.typeof is byte: # Prior to Nim 1.4 and view types array[N, byte] in tuples - # don't match with openarray[byte] + # don't match with openArray[byte] if val.len > 0: sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil) else: @@ -126,7 +132,7 @@ template readResult(s: RawStmtPtr, column: cint, T: type): auto = sqlite3_column_int64(s, column) elif T is int: {.fatal: "Please use specify either int32 or int64 precisely".} - elif T is openarray[byte]: + elif T is openArray[byte]: let p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, column)) l = sqlite3_column_bytes(s, column) @@ -209,11 +215,11 @@ proc exec*[Params: tuple](db: SqStoreRef, template exec*(db: SqStoreRef, stmt: string): KvResult[void] = exec(db, stmt, ()) -proc getImpl(db: SqStoreRef, - keyspace: int, - key: openarray[byte], - onData: DataProc): KvResult[bool] = - let getStmt = db.keyspaces[keyspace].getStmt +proc get*(db: SqKeyspaceRef, + key: openArray[byte], + onData: DataProc): KvResult[bool] = + if db.getStmt == nil: return err("sqlite: database closed") + let getStmt = db.getStmt checkErr bindParam(getStmt, 1, key) let @@ -236,15 +242,77 @@ proc getImpl(db: SqStoreRef, res -proc get*(db: SqStoreRef, key: openarray[byte], onData: DataProc): KvResult[bool] = - getImpl(db, 0, key, onData) +func nextPrefix(prefix: openArray[byte], next: var seq[byte]): bool = + # Return a seq that is greater than all strings starting with `prefix` when + # doing a lexicographical compare - we're looking for the string that + # increments the last byte by 1, removing any bytes from the back that + # cannot be incremented (0xff) -template get*(db: SqStoreRef, keyspace: int, key: openarray[byte], onData: DataProc): KvResult[bool] = - getImpl(db, keyspace, key, onData) + for i in 0..= comparison + checkErr bindParam(db.findStmt1, 1, prefix) + db.findStmt1 + else: + checkErr bindParam(db.findStmt2, 1, prefix) + checkErr bindParam(db.findStmt2, 2, next) + db.findStmt2 + + if findStmt == nil: return err("sqlite: database closed") + + var + total = 0 + while true: + let + v = sqlite3_step(findStmt) + case v + of SQLITE_ROW: + let + kp = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(findStmt, 0)) + kl = sqlite3_column_bytes(findStmt, 0) + vp = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(findStmt, 1)) + vl = sqlite3_column_bytes(findStmt, 1) + onFind(kp.toOpenArray(0, kl - 1), vp.toOpenArray(0, vl - 1)) + total += 1 + of SQLITE_DONE: + break + else: + # release implicit transaction (could use a defer, but it's slow) + discard sqlite3_reset(findStmt) # same return information as step + discard sqlite3_clear_bindings(findStmt) # no errors possible + + return err($sqlite3_errstr(v)) + + # release implicit transaction + discard sqlite3_reset(findStmt) # same return information as step + discard sqlite3_clear_bindings(findStmt) # no errors possible + + ok(total) + +proc put*(db: SqKeyspaceRef, key, value: openArray[byte]): KvResult[void] = + let putStmt = db.putStmt + if putStmt == nil: return err("sqlite: database closed") checkErr bindParam(putStmt, 1, key) checkErr bindParam(putStmt, 2, value) @@ -260,14 +328,9 @@ proc putImpl(db: SqStoreRef, keyspace: int, key, value: openarray[byte]): KvResu res -proc put*(db: SqStoreRef, key, value: openarray[byte]): KvResult[void] = - putImpl(db, 0, key, value) - -template put*(db: SqStoreRef, keyspace: int, key, value: openarray[byte]): KvResult[void] = - putImpl(db, keyspace, key, value) - -proc containsImpl(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[bool] = - let containsStmt = db.keyspaces[keyspace].containsStmt +proc contains*(db: SqKeyspaceRef, key: openArray[byte]): KvResult[bool] = + let containsStmt = db.containsStmt + if containsStmt == nil: return err("sqlite: database closed") checkErr bindParam(containsStmt, 1, key) let @@ -283,14 +346,9 @@ proc containsImpl(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult res -proc contains*(db: SqStoreRef, key: openarray[byte]): KvResult[bool] = - containsImpl(db, 0, key) - -template contains*(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[bool] = - containsImpl(db, keyspace, key) - -proc delImpl(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[void] = - let delStmt = db.keyspaces[keyspace].delStmt +proc del*(db: SqKeyspaceRef, key: openArray[byte]): KvResult[void] = + let delStmt = db.delStmt + if delStmt == nil: return err("sqlite: database closed") checkErr bindParam(delStmt, 1, key) let res = @@ -305,23 +363,26 @@ proc delImpl(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[void res -proc del*(db: SqStoreRef, key: openarray[byte]): KvResult[void] = - delImpl(db, 0, key) +proc close*(db: var SqKeyspace) = + # Calling with null stmt is harmless + discard sqlite3_finalize(db.putStmt) + discard sqlite3_finalize(db.getStmt) + discard sqlite3_finalize(db.delStmt) + discard sqlite3_finalize(db.containsStmt) + discard sqlite3_finalize(db.findStmt0) + discard sqlite3_finalize(db.findStmt1) + discard sqlite3_finalize(db.findStmt2) + db = SqKeyspace() -template del*(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[void] = - delImpl(db, keyspace, key) +proc close*(db: SqKeyspaceRef) = + close(db[]) proc close*(db: SqStoreRef) = - for keyspace in db.keyspaces: - discard sqlite3_finalize(keyspace.putStmt) - discard sqlite3_finalize(keyspace.getStmt) - discard sqlite3_finalize(keyspace.delStmt) - discard sqlite3_finalize(keyspace.containsStmt) - for stmt in db.managedStmts: discard sqlite3_finalize(stmt) - discard sqlite3_close(db.env) + # Lazy-v2-close allows closing the keyspaces in any order + discard sqlite3_close_v2(db.env) db[] = SqStoreRef()[] @@ -333,6 +394,32 @@ proc checkpoint*(db: SqStoreRef, kind = SqStoreCheckpointKind.passive) = of SqStoreCheckpointKind.truncate: SQLITE_CHECKPOINT_TRUNCATE discard sqlite3_wal_checkpoint_v2(db.env, nil, mode, nil, nil) +template prepare(env: ptr sqlite3, q: string): ptr sqlite3_stmt = + block: + var s: ptr sqlite3_stmt + checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil): + discard + s + +template prepare(env: ptr sqlite3, q: string, cleanup: untyped): ptr sqlite3_stmt = + block: + var s: ptr sqlite3_stmt + checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil) + s + +template checkExec(s: ptr sqlite3_stmt) = + if (let x = sqlite3_step(s); x != SQLITE_DONE): + discard sqlite3_finalize(s) + return err($sqlite3_errstr(x)) + + if (let x = sqlite3_finalize(s); x != SQLITE_OK): + return err($sqlite3_errstr(x)) + +template checkExec(env: ptr sqlite3, q: string) = + block: + let s = prepare(env, q): discard + checkExec(s) + proc isClosed*(db: SqStoreRef): bool = db.env != nil @@ -342,8 +429,7 @@ proc init*( name: string, readOnly = false, inMemory = false, - manualCheckpoint = false, - keyspaces: openarray[string] = ["kvstore"]): KvResult[T] = + manualCheckpoint = false): KvResult[T] = var env: AutoDisposed[ptr sqlite3] defer: disposeIfUnreleased(env) @@ -359,28 +445,10 @@ proc init*( try: createDir(basePath) except OSError, IOError: - return err("`sqlite: cannot create database directory") + return err("sqlite: cannot create database directory") checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil) - template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt = - var s: ptr sqlite3_stmt - checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil): - cleanup - s - - template checkExec(s: ptr sqlite3_stmt) = - if (let x = sqlite3_step(s); x != SQLITE_DONE): - discard sqlite3_finalize(s) - return err($sqlite3_errstr(x)) - - if (let x = sqlite3_finalize(s); x != SQLITE_OK): - return err($sqlite3_errstr(x)) - - template checkExec(q: string) = - let s = prepare(q): discard - checkExec(s) - template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) = if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW): discard sqlite3_finalize(journalModePragma) @@ -396,12 +464,11 @@ proc init*( return err("Invalid pragma result: " & $x) # TODO: check current version and implement schema versioning - checkExec "PRAGMA user_version = 2;" + checkExec env.val, "PRAGMA user_version = 3;" - let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard + let journalModePragma = prepare(env.val, "PRAGMA journal_mode = WAL;") checkWalPragmaResult(journalModePragma) - checkExec(journalModePragma) - + checkExec journalModePragma if manualCheckpoint: checkErr sqlite3_wal_autocheckpoint(env.val, 0) @@ -409,55 +476,49 @@ proc init*( # this is safe in WAL mode leaving us with a consistent database at all # times, though potentially losing any data written between checkpoints. # http://www3.sqlite.org/wal.html#performance_considerations - checkExec("PRAGMA synchronous = NORMAL;") - - var keyspaceStatements = newSeq[KeySpaceStatements]() - for keyspace in keyspaces: - checkExec """ - CREATE TABLE IF NOT EXISTS """ & keyspace & """ ( - key BLOB PRIMARY KEY, - value BLOB - ); - """ - - let - getStmt = prepare("SELECT value FROM " & keyspace & " WHERE key = ?;"): - discard - putStmt = prepare("INSERT OR REPLACE INTO " & keyspace & "(key, value) VALUES (?, ?);"): - discard sqlite3_finalize(getStmt) - delStmt = prepare("DELETE FROM " & keyspace & " WHERE key = ?;"): - discard sqlite3_finalize(getStmt) - discard sqlite3_finalize(putStmt) - containsStmt = prepare("SELECT 1 FROM " & keyspace & " WHERE key = ?;"): - discard sqlite3_finalize(getStmt) - discard sqlite3_finalize(putStmt) - discard sqlite3_finalize(delStmt) - - keyspaceStatements.add KeySpaceStatements( - getStmt: getStmt, - putStmt: putStmt, - delStmt: delStmt, - containsStmt: containsStmt) + checkExec env.val, "PRAGMA synchronous = NORMAL;" ok(SqStoreRef( env: env.release, - keyspaces: keyspaceStatements )) -proc init*( - T: type SqStoreRef, - basePath: string, - name: string, - Keyspaces: type[enum], - readOnly = false, - inMemory = false, - manualCheckpoint = false): KvResult[T] = +proc openKvStore*(db: SqStoreRef, name = "kvstore", withoutRowid = false): KvResult[SqKeyspaceRef] = + ## Open a new Key-Value store in the SQLite database + ## + ## withoutRowid: Create the table without rowid - this is more efficient when + ## rows are small (<200 bytes) but very inefficient with larger + ## rows (the row being the sum of key and value) - see + ## https://www.sqlite.org/withoutrowid.html + ## + let + createSql = """ + CREATE TABLE IF NOT EXISTS """ & name & """ ( + key BLOB PRIMARY KEY, + value BLOB + )""" - var keyspaceNames = newSeq[string]() - for keyspace in Keyspaces: - keyspaceNames.add $keyspace + checkExec db.env, + if withoutRowid: createSql & " WITHOUT ROWID;" else: createSql & ";" - SqStoreRef.init(basePath, name, readOnly, inMemory, manualCheckpoint, keyspaceNames) + var + tmp: SqKeyspace + defer: + # We'll "move" ownership to the return value, effectively disabling "close" + close(tmp) + + tmp.getStmt = prepare(db.env, "SELECT value FROM " & name & " WHERE key = ?;") + tmp.putStmt = + prepare(db.env, "INSERT OR REPLACE INTO " & name & "(key, value) VALUES (?, ?);") + tmp.delStmt = prepare(db.env, "DELETE FROM " & name & " WHERE key = ?;") + tmp.containsStmt = prepare(db.env, "SELECT 1 FROM " & name & " WHERE key = ?;") + tmp.findStmt0 = prepare(db.env, "SELECT key, value FROM " & name & ";") + tmp.findStmt1 = prepare(db.env, "SELECT key, value FROM " & name & " WHERE key >= ?;") + tmp.findStmt2 = prepare(db.env, "SELECT key, value FROM " & name & " WHERE key >= ? and key < ?;") + + var res = SqKeyspaceRef() + res[] = tmp + tmp = SqKeyspace() # make close harmless + ok res when defined(metrics): import tables, times, diff --git a/tests/db/test_kvstore.nim b/tests/db/test_kvstore.nim index b0556fb..1457794 100644 --- a/tests/db/test_kvstore.nim +++ b/tests/db/test_kvstore.nim @@ -8,8 +8,9 @@ const key = [0'u8, 1, 2, 3] value = [3'u8, 2, 1, 0] value2 = [5'u8, 2, 1, 0] + key2 = [255'u8, 255] -proc testKvStore*(db: KvStoreRef) = +proc testKvStore*(db: KvStoreRef, supportsFind: bool) = check: db != nil @@ -20,9 +21,12 @@ proc testKvStore*(db: KvStoreRef) = db.put(key, value)[] - var v: seq[byte] + var k, v: seq[byte] proc grab(data: openArray[byte]) = v = @data + proc grab2(key, value: openArray[byte]) = + k = @key + v = @value check: db.contains(key)[] @@ -42,6 +46,27 @@ proc testKvStore*(db: KvStoreRef) = db.del(key)[] # does nothing + if supportsFind: + check: + db.find([], proc(key, value: openArray[byte]) = discard).get() == 0 + + db.put(key, value)[] + + check: + db.find([], grab2).get() == 1 + db.find(key, grab2).get() == 1 + k == key + v == value + + db.put(key2, value2)[] + check: + db.find([], grab2).get() == 2 + db.find([byte 255], grab2).get() == 1 + db.find([byte 255, 255], grab2).get() == 1 + db.find([byte 255, 255, 0], grab2).get() == 0 + db.find([byte 255, 255, 255], grab2).get() == 0 + db.find([byte 255, 0], grab2).get() == 0 + suite "MemoryStoreRef": test "KvStore interface": - testKvStore(kvStore MemStoreRef.init()) + testKvStore(kvStore MemStoreRef.init(), true) diff --git a/tests/db/test_kvstore_rocksdb.nim b/tests/db/test_kvstore_rocksdb.nim index 0e434d4..088ecf6 100644 --- a/tests/db/test_kvstore_rocksdb.nim +++ b/tests/db/test_kvstore_rocksdb.nim @@ -14,4 +14,4 @@ suite "RocksStoreRef": let db = RocksStoreRef.init(tmp, "test")[] defer: db.close() - testKvStore(kvStore db) + testKvStore(kvStore db, false) diff --git a/tests/db/test_kvstore_sqlite3.nim b/tests/db/test_kvstore_sqlite3.nim index 4f3f6ee..29a73b6 100644 --- a/tests/db/test_kvstore_sqlite3.nim +++ b/tests/db/test_kvstore_sqlite3.nim @@ -10,8 +10,10 @@ procSuite "SqStoreRef": test "KvStore interface": let db = SqStoreRef.init("", "test", inMemory = true)[] defer: db.close() + let kv = db.openKvStore() + defer: kv.get()[].close() - testKvStore(kvStore db) + testKvStore(kvStore kv.get(), true) test "Prepare and execute statements": let db = SqStoreRef.init("", "test", inMemory = true)[]