nim-eth/eth/db/kvstore_sqlite3.nim

199 lines
5.4 KiB
Nim

## Implementation of KvStore based on sqlite3
{.push raises: [Defect].}
import
os,
sqlite3_abi,
./kvstore
export kvstore
type
SqStoreRef* = ref object of RootObj
env: ptr sqlite3
getStmt, putStmt, delStmt, containsStmt: ptr sqlite3_stmt
template checkErr(op, cleanup: untyped) =
if (let v = (op); v != SQLITE_OK):
cleanup
return err($sqlite3_errstr(v))
template checkErr(op) =
checkErr(op): discard
proc bindBlob(s: ptr sqlite3_stmt, n: int, blob: openarray[byte]): cint =
sqlite3_bind_blob(s, n.cint, unsafeAddr blob[0], blob.len.cint, nil)
proc get*(db: SqStoreRef, key: openarray[byte], onData: DataProc): KvResult[bool] =
checkErr sqlite3_reset(db.getStmt)
checkErr sqlite3_clear_bindings(db.getStmt)
checkErr bindBlob(db.getStmt, 1, key)
let v = sqlite3_step(db.getStmt)
case v
of SQLITE_ROW:
let
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(db.getStmt, 0))
l = sqlite3_column_bytes(db.getStmt, 0)
onData(toOpenArray(p, 0, l-1))
ok(true)
of SQLITE_DONE:
ok(false)
else:
err($sqlite3_errstr(v))
proc put*(db: SqStoreRef, key, value: openarray[byte]): KvResult[void] =
checkErr sqlite3_reset(db.putStmt)
checkErr sqlite3_clear_bindings(db.putStmt)
checkErr bindBlob(db.putStmt, 1, key)
checkErr bindBlob(db.putStmt, 2, value)
if (let v = sqlite3_step(db.putStmt); v != SQLITE_DONE):
err($sqlite3_errstr(v))
else:
ok()
proc contains*(db: SqStoreRef, key: openarray[byte]): KvResult[bool] =
checkErr sqlite3_reset(db.containsStmt)
checkErr sqlite3_clear_bindings(db.containsStmt)
checkErr bindBlob(db.containsStmt, 1, key)
let v = sqlite3_step(db.containsStmt)
case v
of SQLITE_ROW: ok(true)
of SQLITE_DONE: ok(false)
else: err($sqlite3_errstr(v))
proc del*(db: SqStoreRef, key: openarray[byte]): KvResult[void] =
checkErr sqlite3_reset(db.delStmt)
checkErr sqlite3_clear_bindings(db.delStmt)
checkErr bindBlob(db.delStmt, 1, key)
if (let v = sqlite3_step(db.delStmt); v != SQLITE_DONE):
err($sqlite3_errstr(v))
else:
ok()
proc close*(db: SqStoreRef) =
discard sqlite3_finalize(db.putStmt)
discard sqlite3_finalize(db.getStmt)
discard sqlite3_finalize(db.delStmt)
discard sqlite3_finalize(db.containsStmt)
discard sqlite3_close(db.env)
db[] = SqStoreRef()[]
proc init*(
T: type SqStoreRef,
basePath: string,
name: string,
readOnly = false,
inMemory = false): KvResult[T] =
var
env: ptr sqlite3
let
name =
if inMemory: ":memory:"
else: basepath / name & ".sqlite3"
flags =
if readOnly: SQLITE_OPEN_READONLY
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
if not inMemory:
try:
createDir(basePath)
except OSError, IOError:
return err("`sqlite: cannot create database directory")
checkErr sqlite3_open_v2(name, addr env, flags.cint, nil)
template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt =
var s: ptr sqlite3_stmt
checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil):
cleanup
discard sqlite3_close(env)
s
template checkExec(q: string) =
let s = prepare(q): discard
if (let x = sqlite3_step(s); x != SQLITE_DONE):
discard sqlite3_finalize(s)
discard sqlite3_close(env)
return err($sqlite3_errstr(x))
if (let x = sqlite3_finalize(s); x != SQLITE_OK):
discard sqlite3_close(env)
return err($sqlite3_errstr(x))
# TODO: check current version and implement schema versioning
checkExec "PRAGMA user_version = 1;"
checkExec """
CREATE TABLE IF NOT EXISTS kvstore(
key BLOB PRIMARY KEY,
value BLOB
) WITHOUT ROWID;
"""
let
getStmt = prepare "SELECT value FROM kvstore WHERE key = ?;":
discard
putStmt = prepare "INSERT OR REPLACE INTO kvstore(key, value) VALUES (?, ?);":
discard sqlite3_finalize(getStmt)
delStmt = prepare "DELETE FROM kvstore WHERE key = ?;":
discard sqlite3_finalize(getStmt)
discard sqlite3_finalize(putStmt)
containsStmt = prepare "SELECT 1 FROM kvstore WHERE key = ?;":
discard sqlite3_finalize(getStmt)
discard sqlite3_finalize(putStmt)
discard sqlite3_finalize(delStmt)
ok(SqStoreRef(
env: env,
getStmt: getStmt,
putStmt: putStmt,
delStmt: delStmt,
containsStmt: containsStmt
))
when defined(metrics):
import tables, times,
chronicles, metrics
type Sqlite3Info = ref object of Gauge
proc newSqlite3Info*(name: string, help: string, registry = defaultRegistry): Sqlite3Info {.raises: [Exception].} =
validateName(name)
result = Sqlite3Info(name: name,
help: help,
typ: "gauge",
creationThreadId: getThreadId())
result.register(registry)
var sqlite3Info* {.global.} = newSqlite3Info("sqlite3_info", "SQLite3 info")
method collect*(collector: Sqlite3Info): Metrics =
result = initOrderedTable[Labels, seq[Metric]]()
result[@[]] = @[]
let timestamp = getTime().toMilliseconds()
var currentMem, highwaterMem: int64
if (let res = sqlite3_status64(SQLITE_STATUS_MEMORY_USED, currentMem.addr, highwaterMem.addr, 0); res != SQLITE_OK):
error "SQLite3 error", msg = sqlite3_errstr(res)
else:
result[@[]] = @[
Metric(
name: "sqlite3_memory_used_bytes",
value: currentMem.float64,
timestamp: timestamp,
),
]