nim-eth/eth/db/kvstore_sqlite3.nim
2020-10-16 21:21:00 +03:00

439 lines
13 KiB
Nim

## Implementation of KvStore based on sqlite3
{.push raises: [Defect].}
import
os, strformat,
sqlite3_abi,
./kvstore
export kvstore
type
RawStmtPtr = ptr sqlite3_stmt
AutoDisposed[T: ptr|ref] = object
val: T
# TODO: These should become AutoDisposed
# This is currently considered risky due to the destructor
# problem found in FastStreams (triggered when objects in
# the GC heap have destructors)
Sqlite* = ptr sqlite3
SqliteStmt*[Params; Result] = distinct RawStmtPtr
NoParams* = tuple # this is the empty tuple
ResultHandler*[T] = proc(val: T) {.gcsafe, raises: [Defect].}
KeySpaceStatements = object
getStmt, putStmt, delStmt, containsStmt: RawStmtPtr
SqStoreRef* = ref object of RootObj
env: Sqlite
keyspaces: seq[KeySpaceStatements]
extraStmts: seq[RawStmtPtr]
template dispose(db: Sqlite) =
discard sqlite3_close(db)
template dispose(db: RawStmtPtr) =
discard sqlite3_finalize(db)
proc release[T](x: var AutoDisposed[T]): T =
result = x.val
x.val = nil
proc disposeIfUnreleased[T](x: var AutoDisposed[T]) =
mixin dispose
if x.val != nil:
dispose(x.release)
template checkErr(op, cleanup: untyped) =
if (let v = (op); v != SQLITE_OK):
cleanup
return err($sqlite3_errstr(v))
template checkErr(op) =
checkErr(op): discard
proc prepareStmt*(db: SqStoreRef,
stmt: string,
Params: type,
Res: type): KvResult[SqliteStmt[Params, Res]] =
var s: RawStmtPtr
checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil)
ok SqliteStmt[Params, Res](s)
proc bindParam(s: RawStmtPtr, n: int, val: auto): cint =
when val is openarray[byte]|seq[byte]:
if val.len > 0:
sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil)
else:
sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil)
elif val is int32:
sqlite3_bind_int(s, n.cint, val)
elif val is int64:
sqlite3_bind_int64(s, n.cint, val)
else:
{.fatal: "Please add support for the '" & $(T) & "' type".}
template bindParams(s: RawStmtPtr, params: auto) =
when params is tuple:
var i = 1
for param in fields(params):
checkErr bindParam(s, i, param)
inc i
else:
checkErr bindParam(s, 1, params)
proc exec*[P](s: SqliteStmt[P, void], params: P): KvResult[void] =
let s = RawStmtPtr s
bindParams(s, params)
let res =
if (let v = sqlite3_step(s); v != SQLITE_DONE):
err($sqlite3_errstr(v))
else:
ok()
# release implict transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
res
template readResult(s: RawStmtPtr, column: cint, T: type): auto =
when T is int32:
sqlite3_column_int(s, column)
elif T is int64:
sqlite3_column_int64(s, column)
elif T is int:
{.fatal: "Please use specify either int32 or int64 precisely".}
elif T is openarray[byte]:
let
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, column))
l = sqlite3_column_bytes(s, column)
toOpenArray(p, 0, l-1)
elif T is seq[byte]:
var res: seq[byte]
let len = sqlite3_column_bytes(s, column)
if len > 0:
res.setLen(len)
copyMem(addr res[0], sqlite3_column_blob(s, column), len)
res
else:
{.fatal: "Please add support for the '" & $(T) & "' type".}
template readResult(s: RawStmtPtr, T: type): auto =
when T is tuple:
var res: T
var i = cint 0
for field in fields(res):
field = readResult(s, i, typeof(field))
inc i
res
else:
readResult(s, 0.cint, T)
proc exec*[Params, Res](s: SqliteStmt[Params, Res],
params: Params,
onData: ResultHandler[Res]): KvResult[bool] =
let s = RawStmtPtr s
bindParams(s, params)
try:
var gotResults = false
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
onData(readResult(s, Res))
gotResults = true
of SQLITE_DONE:
break
else:
return err($sqlite3_errstr(v))
return ok gotResults
finally:
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
template exec*(s: SqliteStmt[NoParams, void]): KvResult[void] =
exec(s, ())
template exec*[Res](s: SqliteStmt[NoParams, Res],
onData: ResultHandler[Res]): KvResult[bool] =
exec(s, (), onData)
proc exec*(db: SqStoreRef, stmt: string): KvResult[void] =
let stmt = ? db.prepareStmt(stmt, NoParams, void)
result = exec(stmt)
let finalizeStatus = sqlite3_finalize(RawStmtPtr stmt)
if finalizeStatus != SQLITE_OK and result.isOk:
return err($sqlite3_errstr(finalizeStatus))
proc getImpl(db: SqStoreRef,
keyspace: int,
key: openarray[byte],
onData: DataProc): KvResult[bool] =
let getStmt = db.keyspaces[keyspace].getStmt
checkErr bindParam(getStmt, 1, key)
let
v = sqlite3_step(getStmt)
res = case v
of SQLITE_ROW:
let
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(getStmt, 0))
l = sqlite3_column_bytes(getStmt, 0)
onData(toOpenArray(p, 0, l-1))
ok(true)
of SQLITE_DONE:
ok(false)
else:
err($sqlite3_errstr(v))
# release implicit transaction
discard sqlite3_reset(getStmt) # same return information as step
discard sqlite3_clear_bindings(getStmt) # no errors possible
res
proc get*(db: SqStoreRef, key: openarray[byte], onData: DataProc): KvResult[bool] =
getImpl(db, 0, key, onData)
template get*(db: SqStoreRef, keyspace: int, key: openarray[byte], onData: DataProc): KvResult[bool] =
getImpl(db, keyspace, key, onData)
proc putImpl(db: SqStoreRef, keyspace: int, key, value: openarray[byte]): KvResult[void] =
let putStmt = db.keyspaces[keyspace].putStmt
checkErr bindParam(putStmt, 1, key)
checkErr bindParam(putStmt, 2, value)
let res =
if (let v = sqlite3_step(putStmt); v != SQLITE_DONE):
err($sqlite3_errstr(v))
else:
ok()
# release implict transaction
discard sqlite3_reset(putStmt) # same return information as step
discard sqlite3_clear_bindings(putStmt) # no errors possible
res
proc put*(db: SqStoreRef, key, value: openarray[byte]): KvResult[void] =
putImpl(db, 0, key, value)
template put*(db: SqStoreRef, keyspace: int, key, value: openarray[byte]): KvResult[void] =
putImpl(db, keyspace, key, value)
proc containsImpl(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[bool] =
let containsStmt = db.keyspaces[keyspace].containsStmt
checkErr bindParam(containsStmt, 1, key)
let
v = sqlite3_step(containsStmt)
res = case v
of SQLITE_ROW: ok(true)
of SQLITE_DONE: ok(false)
else: err($sqlite3_errstr(v))
# release implicit transaction
discard sqlite3_reset(containsStmt) # same return information as step
discard sqlite3_clear_bindings(containsStmt) # no errors possible
res
proc contains*(db: SqStoreRef, key: openarray[byte]): KvResult[bool] =
containsImpl(db, 0, key)
template contains*(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[bool] =
containsImpl(db, keyspace, key)
proc delImpl(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[void] =
let delStmt = db.keyspaces[keyspace].delStmt
checkErr bindParam(delStmt, 1, key)
let res =
if (let v = sqlite3_step(delStmt); v != SQLITE_DONE):
err($sqlite3_errstr(v))
else:
ok()
# release implict transaction
discard sqlite3_reset(delStmt) # same return information as step
discard sqlite3_clear_bindings(delStmt) # no errors possible
res
proc del*(db: SqStoreRef, key: openarray[byte]): KvResult[void] =
delImpl(db, 0, key)
template del*(db: SqStoreRef, keyspace: int, key: openarray[byte]): KvResult[void] =
delImpl(db, keyspace, key)
proc close*(db: SqStoreRef) =
for keyspace in db.keyspaces:
discard sqlite3_finalize(keyspace.putStmt)
discard sqlite3_finalize(keyspace.getStmt)
discard sqlite3_finalize(keyspace.delStmt)
discard sqlite3_finalize(keyspace.containsStmt)
for stmt in db.extraStmts:
discard sqlite3_finalize(stmt)
discard sqlite3_close(db.env)
db[] = SqStoreRef()[]
proc isClosed*(db: SqStoreRef): bool =
db.env != nil
proc init*(
T: type SqStoreRef,
basePath: string,
name: string,
readOnly = false,
inMemory = false,
keyspaces: openarray[string] = ["kvstore"]): KvResult[T] =
var env: AutoDisposed[ptr sqlite3]
defer: disposeIfUnreleased(env)
let
name =
if inMemory: ":memory:"
else: basepath / name & ".sqlite3"
flags =
if readOnly: SQLITE_OPEN_READONLY
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
if not inMemory:
try:
createDir(basePath)
except OSError, IOError:
return err("`sqlite: cannot create database directory")
checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil)
template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt =
var s: ptr sqlite3_stmt
checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil):
cleanup
s
template checkExec(s: ptr sqlite3_stmt) =
if (let x = sqlite3_step(s); x != SQLITE_DONE):
discard sqlite3_finalize(s)
return err($sqlite3_errstr(x))
if (let x = sqlite3_finalize(s); x != SQLITE_OK):
return err($sqlite3_errstr(x))
template checkExec(q: string) =
let s = prepare(q): discard
checkExec(s)
template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) =
if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW):
discard sqlite3_finalize(journalModePragma)
return err($sqlite3_errstr(x))
if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT):
discard sqlite3_finalize(journalModePragma)
return err($sqlite3_errstr(x))
if (let x = sqlite3_column_text(journalModePragma, 0);
x != "memory" and x != "wal"):
discard sqlite3_finalize(journalModePragma)
return err("Invalid pragma result: " & $x)
# TODO: check current version and implement schema versioning
checkExec "PRAGMA user_version = 1;"
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
checkWalPragmaResult(journalModePragma)
checkExec(journalModePragma)
var keyspaceStatements = newSeq[KeySpaceStatements]()
for keyspace in keyspaces:
checkExec """
CREATE TABLE IF NOT EXISTS """ & keyspace & """ (
key BLOB PRIMARY KEY,
value BLOB
) WITHOUT ROWID;
"""
let
getStmt = prepare("SELECT value FROM " & keyspace & " WHERE key = ?;"):
discard
putStmt = prepare("INSERT OR REPLACE INTO " & keyspace & "(key, value) VALUES (?, ?);"):
discard sqlite3_finalize(getStmt)
delStmt = prepare("DELETE FROM " & keyspace & " WHERE key = ?;"):
discard sqlite3_finalize(getStmt)
discard sqlite3_finalize(putStmt)
containsStmt = prepare("SELECT 1 FROM " & keyspace & " WHERE key = ?;"):
discard sqlite3_finalize(getStmt)
discard sqlite3_finalize(putStmt)
discard sqlite3_finalize(delStmt)
keyspaceStatements.add KeySpaceStatements(
getStmt: getStmt,
putStmt: putStmt,
delStmt: delStmt,
containsStmt: containsStmt)
ok(SqStoreRef(
env: env.release,
keyspaces: keyspaceStatements
))
proc init*(
T: type SqStoreRef,
basePath: string,
name: string,
Keyspaces: type[enum],
readOnly = false,
inMemory = false): KvResult[T] =
var keyspaceNames = newSeq[string]()
for keyspace in Keyspaces:
keyspaceNames.add $keyspace
SqStoreRef.init(basePath, name, readOnly, inMemory, keyspaceNames)
when defined(metrics):
import tables, times,
chronicles, metrics
type Sqlite3Info = ref object of Gauge
proc newSqlite3Info*(name: string, help: string, registry = defaultRegistry): Sqlite3Info {.raises: [Exception].} =
validateName(name)
result = Sqlite3Info(name: name,
help: help,
typ: "gauge",
creationThreadId: getThreadId())
result.register(registry)
var sqlite3Info* {.global.} = newSqlite3Info("sqlite3_info", "SQLite3 info")
method collect*(collector: Sqlite3Info): Metrics =
result = initOrderedTable[Labels, seq[Metric]]()
result[@[]] = @[]
let timestamp = getTime().toMilliseconds()
var currentMem, highwaterMem: int64
if (let res = sqlite3_status64(SQLITE_STATUS_MEMORY_USED, currentMem.addr, highwaterMem.addr, 0); res != SQLITE_OK):
error "SQLite3 error", msg = sqlite3_errstr(res)
else:
result[@[]] = @[
Metric(
name: "sqlite3_memory_used_bytes",
value: currentMem.float64,
timestamp: timestamp,
),
]