mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-27 06:20:54 +00:00
refactor(sqlite): move migrations logic to sqlite common module
This commit is contained in:
parent
0e1dae5956
commit
1d3943febb
@ -36,7 +36,7 @@ import
|
||||
./v2/test_waku_bridge,
|
||||
./v2/test_peer_storage,
|
||||
./v2/test_waku_keepalive,
|
||||
./v2/test_migration_utils,
|
||||
./v2/test_sqlite_migrations,
|
||||
./v2/test_namespacing_utils,
|
||||
./v2/test_waku_dnsdisc,
|
||||
./v2/test_waku_discv5,
|
||||
|
@ -1,63 +0,0 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[unittest, tables, strutils, os],
|
||||
chronicles,
|
||||
stew/results,
|
||||
../../waku/v2/node/storage/migration/migration_utils
|
||||
|
||||
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
const MIGRATION_PATH = sourceDir / "../../waku/v2/node/storage/migration/migrations_scripts/message"
|
||||
|
||||
suite "Migration utils":
|
||||
test "read migration scripts":
|
||||
let migrationScriptsRes = getScripts(MIGRATION_PATH)
|
||||
check:
|
||||
migrationScriptsRes.isErr == false
|
||||
|
||||
test "filter migration scripts":
|
||||
let migrationUp = [("0001_init", "script1"), ("0001_add", "script1"), ("0002_init", "script2"), ("0003_init", "script3")].toOrderedTable()
|
||||
let migrationScripts = MigrationScripts(migrationUp: migrationUp)
|
||||
let scriptsRes = filterScripts(migrationScripts, 1, 3)
|
||||
check:
|
||||
scriptsRes.isErr == false
|
||||
scriptsRes.value.len == 2
|
||||
scriptsRes.value[0] == "script2"
|
||||
scriptsRes.value[1] == "script3"
|
||||
|
||||
test "filter migration scripts with varying zero-prefixed user versions":
|
||||
let migrationUp = [("0001_init", "script1"), ("1_add", "script1"), ("000002_init", "script2"), ("003_init", "script3")].toOrderedTable()
|
||||
let migrationScripts = MigrationScripts(migrationUp: migrationUp)
|
||||
let scriptsRes = filterScripts(migrationScripts, 1, 3)
|
||||
check:
|
||||
scriptsRes.isErr == false
|
||||
scriptsRes.value.len == 2
|
||||
scriptsRes.value[0] == "script2"
|
||||
scriptsRes.value[1] == "script3"
|
||||
|
||||
test "split scripts with no queries":
|
||||
let script = "; ;"
|
||||
let queries = splitScript(script)
|
||||
check queries.len == 0
|
||||
|
||||
test "split scripts with multiple queries":
|
||||
let q1 = """CREATE TABLE contacts2 (
|
||||
contact_id INTEGER PRIMARY KEY,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
phone TEXT NOT NULL UNIQUE
|
||||
);"""
|
||||
let q2 = """CREATE TABLE contacts2 (
|
||||
contact_id INTEGER PRIMARY KEY,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
phone TEXT NOT NULL UNIQUE
|
||||
);"""
|
||||
let script = q1 & q2
|
||||
let queries = splitScript(script)
|
||||
check:
|
||||
queries.len == 2
|
||||
queries[0] == q1
|
||||
queries[1] == q2
|
95
tests/v2/test_sqlite_migrations.nim
Normal file
95
tests/v2/test_sqlite_migrations.nim
Normal file
@ -0,0 +1,95 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[strutils, os],
|
||||
stew/results,
|
||||
testutils/unittests
|
||||
import
|
||||
../../waku/common/sqlite/migrations {.all.}
|
||||
|
||||
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
|
||||
|
||||
suite "SQLite - migrations":
|
||||
|
||||
test "filter and order migration script file paths":
|
||||
## Given
|
||||
let paths = @[
|
||||
sourceDir / "00001_valid.up.sql",
|
||||
sourceDir / "00002_alsoValidWithUpperCaseExtension.UP.SQL",
|
||||
sourceDir / "00007_unorderedValid.up.sql",
|
||||
sourceDir / "00003_validRepeated.up.sql",
|
||||
sourceDir / "00003_validRepeated.up.sql",
|
||||
sourceDir / "00666_noMigrationScript.bmp",
|
||||
sourceDir / "00X00_invalidVersion.down.sql",
|
||||
sourceDir / "00008_notWithinVersionRange.up.sql",
|
||||
]
|
||||
|
||||
let
|
||||
lowerVersion = 0
|
||||
highVersion = 7
|
||||
|
||||
## When
|
||||
var migrationSciptPaths: seq[string]
|
||||
migrationSciptPaths = filterMigrationScripts(paths, lowerVersion, highVersion, direction="up")
|
||||
migrationSciptPaths = sortMigrationScripts(migrationSciptPaths)
|
||||
|
||||
## Then
|
||||
check:
|
||||
migrationSciptPaths == @[
|
||||
sourceDir / "00001_valid.up.sql",
|
||||
sourceDir / "00002_alsoValidWithUpperCaseExtension.UP.SQL",
|
||||
sourceDir / "00003_validRepeated.up.sql",
|
||||
sourceDir / "00003_validRepeated.up.sql",
|
||||
sourceDir / "00007_unorderedValid.up.sql"
|
||||
]
|
||||
|
||||
test "break migration scripts into queries":
|
||||
## Given
|
||||
let statement1 = """CREATE TABLE contacts1 (
|
||||
contact_id INTEGER PRIMARY KEY,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
phone TEXT NOT NULL UNIQUE
|
||||
);"""
|
||||
let statement2 = """CREATE TABLE contacts2 (
|
||||
contact_id INTEGER PRIMARY KEY,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
phone TEXT NOT NULL UNIQUE
|
||||
);"""
|
||||
let script = statement1 & statement2
|
||||
|
||||
## When
|
||||
let statements = script.breakIntoStatements()
|
||||
|
||||
## Then
|
||||
check:
|
||||
statements == @[statement1, statement2]
|
||||
|
||||
test "break statements script into queries - empty statements":
|
||||
## Given
|
||||
let statement1 = """CREATE TABLE contacts1 (
|
||||
contact_id INTEGER PRIMARY KEY,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
phone TEXT NOT NULL UNIQUE
|
||||
);"""
|
||||
let statement2 = """CREATE TABLE contacts2 (
|
||||
contact_id INTEGER PRIMARY KEY,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
phone TEXT NOT NULL UNIQUE
|
||||
);"""
|
||||
let script = statement1 & "; ;" & statement2
|
||||
|
||||
## When
|
||||
let statements = script.breakIntoStatements()
|
||||
|
||||
## Then
|
||||
check:
|
||||
statements == @[statement1, statement2]
|
@ -1,334 +1,7 @@
|
||||
{.push raises: [Defect].}
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/os,
|
||||
stew/results,
|
||||
chronicles,
|
||||
sqlite3_abi
|
||||
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
#
|
||||
# Most of it is a direct copy, the only unique functions being `get` and `put`.
|
||||
|
||||
logScope:
|
||||
topics = "sqlite"
|
||||
|
||||
type
|
||||
Sqlite = ptr sqlite3
|
||||
|
||||
NoParams* = tuple
|
||||
RawStmtPtr* = ptr sqlite3_stmt
|
||||
SqliteStmt*[Params; Result] = distinct RawStmtPtr
|
||||
|
||||
AutoDisposed[T: ptr|ref] = object
|
||||
val: T
|
||||
|
||||
|
||||
template dispose(db: Sqlite) =
|
||||
discard sqlite3_close(db)
|
||||
|
||||
template dispose(rawStmt: RawStmtPtr) =
|
||||
discard sqlite3_finalize(rawStmt)
|
||||
|
||||
template dispose*(sqliteStmt: SqliteStmt) =
|
||||
discard sqlite3_finalize(RawStmtPtr sqliteStmt)
|
||||
|
||||
proc release[T](x: var AutoDisposed[T]): T =
|
||||
result = x.val
|
||||
x.val = nil
|
||||
|
||||
proc disposeIfUnreleased[T](x: var AutoDisposed[T]) =
|
||||
mixin dispose
|
||||
if x.val != nil:
|
||||
dispose(x.release)
|
||||
|
||||
template checkErr*(op, cleanup: untyped) =
|
||||
if (let v = (op); v != SQLITE_OK):
|
||||
cleanup
|
||||
return err($sqlite3_errstr(v))
|
||||
|
||||
template checkErr*(op) =
|
||||
checkErr(op): discard
|
||||
|
||||
|
||||
type
|
||||
DatabaseResult*[T] = Result[T, string]
|
||||
|
||||
SqliteDatabase* = ref object of RootObj
|
||||
env*: Sqlite
|
||||
|
||||
|
||||
type DataProc* = proc(s: RawStmtPtr) {.closure.} # the nim-eth definition is different; one more indirection
|
||||
|
||||
const NoopRowHandler* = proc(s: RawStmtPtr) {.closure.} = discard
|
||||
|
||||
|
||||
proc new*(T: type SqliteDatabase, path: string, readOnly=false): DatabaseResult[T] =
|
||||
var env: AutoDisposed[ptr sqlite3]
|
||||
defer: disposeIfUnreleased(env)
|
||||
|
||||
let flags = if readOnly: SQLITE_OPEN_READONLY
|
||||
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
|
||||
|
||||
if path != ":memory:":
|
||||
try:
|
||||
createDir(parentDir(path))
|
||||
except OSError, IOError:
|
||||
return err("sqlite: cannot create database directory")
|
||||
|
||||
checkErr sqlite3_open_v2(path, addr env.val, flags.cint, nil)
|
||||
|
||||
template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt =
|
||||
var s: ptr sqlite3_stmt
|
||||
checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil):
|
||||
cleanup
|
||||
s
|
||||
|
||||
template checkExec(s: ptr sqlite3_stmt) =
|
||||
if (let x = sqlite3_step(s); x != SQLITE_DONE):
|
||||
discard sqlite3_finalize(s)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_finalize(s); x != SQLITE_OK):
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
template checkExec(q: string) =
|
||||
let s = prepare(q): discard
|
||||
checkExec(s)
|
||||
|
||||
template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) =
|
||||
if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_column_text(journalModePragma, 0);
|
||||
x != "memory" and x != "wal"):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err("Invalid pragma result: " & $x)
|
||||
|
||||
|
||||
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
|
||||
checkWalPragmaResult(journalModePragma)
|
||||
checkExec(journalModePragma)
|
||||
|
||||
ok(SqliteDatabase(env: env.release))
|
||||
|
||||
|
||||
template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt =
|
||||
var s: ptr sqlite3_stmt
|
||||
checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil):
|
||||
cleanup
|
||||
s
|
||||
|
||||
proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint =
|
||||
when val is openarray[byte]|seq[byte]:
|
||||
if val.len > 0:
|
||||
sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil)
|
||||
else:
|
||||
sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil)
|
||||
elif val is int32:
|
||||
sqlite3_bind_int(s, n.cint, val)
|
||||
elif val is uint32:
|
||||
sqlite3_bind_int64(s, n.cint, val)
|
||||
elif val is int64:
|
||||
sqlite3_bind_int64(s, n.cint, val)
|
||||
elif val is float64:
|
||||
sqlite3_bind_double(s, n.cint, val)
|
||||
# Note: bind_text not yet supported in sqlite3_abi wrapper
|
||||
# elif val is string:
|
||||
# sqlite3_bind_text(s, n.cint, val.cstring, -1, nil) # `-1` implies string length is the number of bytes up to the first null-terminator
|
||||
else:
|
||||
{.fatal: "Please add support for the '" & $typeof(val) & "' type".}
|
||||
|
||||
template bindParams(s: RawStmtPtr, params: auto) =
|
||||
when params is tuple:
|
||||
var i = 1
|
||||
for param in fields(params):
|
||||
checkErr bindParam(s, i, param)
|
||||
inc i
|
||||
else:
|
||||
checkErr bindParam(s, 1, params)
|
||||
|
||||
proc exec*[P](s: SqliteStmt[P, void], params: P): DatabaseResult[void] =
|
||||
let s = RawStmtPtr s
|
||||
bindParams(s, params)
|
||||
|
||||
let res =
|
||||
if (let v = sqlite3_step(s); v != SQLITE_DONE):
|
||||
err($sqlite3_errstr(v))
|
||||
else:
|
||||
ok()
|
||||
|
||||
# release implict transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
|
||||
res
|
||||
|
||||
template readResult(s: RawStmtPtr, column: cint, T: type): auto =
|
||||
when T is Option:
|
||||
if sqlite3_column_type(s, column) == SQLITE_NULL:
|
||||
none(typeof(default(T).get()))
|
||||
else:
|
||||
some(readSimpleResult(s, column, typeof(default(T).get())))
|
||||
else:
|
||||
readSimpleResult(s, column, T)
|
||||
|
||||
template readResult(s: RawStmtPtr, T: type): auto =
|
||||
when T is tuple:
|
||||
var res: T
|
||||
var i = cint 0
|
||||
for field in fields(res):
|
||||
field = readResult(s, i, typeof(field))
|
||||
inc i
|
||||
res
|
||||
else:
|
||||
readResult(s, 0.cint, T)
|
||||
|
||||
proc exec*[Params, Res](s: SqliteStmt[Params, Res],
|
||||
params: Params,
|
||||
onData: DataProc): DatabaseResult[bool] =
|
||||
let s = RawStmtPtr s
|
||||
bindParams(s, params)
|
||||
|
||||
try:
|
||||
var gotResults = false
|
||||
while true:
|
||||
let v = sqlite3_step(s)
|
||||
case v
|
||||
of SQLITE_ROW:
|
||||
onData(s)
|
||||
gotResults = true
|
||||
of SQLITE_DONE:
|
||||
break
|
||||
else:
|
||||
return err($sqlite3_errstr(v))
|
||||
return ok gotResults
|
||||
finally:
|
||||
# release implicit transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
|
||||
|
||||
proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult[bool] =
|
||||
var s = prepare(db.env, query): discard
|
||||
|
||||
try:
|
||||
var gotResults = false
|
||||
while true:
|
||||
let v = sqlite3_step(s)
|
||||
case v
|
||||
of SQLITE_ROW:
|
||||
onData(s)
|
||||
gotResults = true
|
||||
of SQLITE_DONE:
|
||||
break
|
||||
else:
|
||||
return err($sqlite3_errstr(v))
|
||||
return ok gotResults
|
||||
finally:
|
||||
# release implicit transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
discard sqlite3_finalize(s) # NB: dispose of the prepared query statement and free associated memory
|
||||
|
||||
proc prepareStmt*(
|
||||
db: SqliteDatabase,
|
||||
stmt: string,
|
||||
Params: type,
|
||||
Res: type
|
||||
): DatabaseResult[SqliteStmt[Params, Res]] =
|
||||
var s: RawStmtPtr
|
||||
checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil)
|
||||
ok SqliteStmt[Params, Res](s)
|
||||
|
||||
proc close*(db: SqliteDatabase) =
|
||||
discard sqlite3_close(db.env)
|
||||
|
||||
db[] = SqliteDatabase()[]
|
||||
|
||||
|
||||
## Maintenance procedures
|
||||
|
||||
# TODO: Cache this value in the SqliteDatabase object.
|
||||
# Page size should not change during the node execution time
|
||||
proc getPageSize*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
## Query or set the page size of the database. The page size must be a power of
|
||||
## two between 512 and 65536 inclusive.
|
||||
var count: int64
|
||||
proc handler(s: RawStmtPtr) =
|
||||
count = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA page_size;", handler)
|
||||
if res.isErr():
|
||||
return err("failed to get page_size")
|
||||
|
||||
ok(count)
|
||||
|
||||
|
||||
proc getFreelistCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
## Return the number of unused pages in the database file.
|
||||
var count: int64
|
||||
proc handler(s: RawStmtPtr) =
|
||||
count = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA freelist_count;", handler)
|
||||
if res.isErr():
|
||||
return err("failed to get freelist_count")
|
||||
|
||||
ok(count)
|
||||
|
||||
|
||||
proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
## Return the total number of pages in the database file.
|
||||
var count: int64
|
||||
proc handler(s: RawStmtPtr) =
|
||||
count = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA page_count;", handler)
|
||||
if res.isErr():
|
||||
return err("failed to get page_count")
|
||||
|
||||
ok(count)
|
||||
|
||||
|
||||
proc vacuum*(db: SqliteDatabase): DatabaseResult[void] =
|
||||
## The VACUUM command rebuilds the database file, repacking it into a minimal amount of disk space.
|
||||
let res = db.query("VACUUM;", NoopRowHandler)
|
||||
if res.isErr():
|
||||
return err("vacuum failed")
|
||||
|
||||
ok()
|
||||
|
||||
|
||||
## Migration procedures
|
||||
|
||||
proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] =
|
||||
var version: int64
|
||||
proc handler(s: ptr sqlite3_stmt) =
|
||||
version = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = database.query("PRAGMA user_version;", handler)
|
||||
if res.isErr():
|
||||
return err("failed to get user_version")
|
||||
|
||||
ok(version)
|
||||
|
||||
|
||||
proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[void] =
|
||||
## sets the value of the user-version integer at offset 60 in the database header.
|
||||
## some context borrowed from https://www.sqlite.org/pragma.html#pragma_user_version
|
||||
## The user-version is an integer that is available to applications to use however they want.
|
||||
## SQLite makes no use of the user-version itself
|
||||
let query = "PRAGMA user_version=" & $version & ";"
|
||||
let res = database.query(query, NoopRowHandler)
|
||||
if res.isErr():
|
||||
return err("failed to set user_version")
|
||||
|
||||
ok()
|
||||
./sqlite/database
|
||||
|
||||
export
|
||||
database
|
342
waku/common/sqlite/database.nim
Normal file
342
waku/common/sqlite/database.nim
Normal file
@ -0,0 +1,342 @@
|
||||
{.push raises: [].}
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
#
|
||||
# Most of it is a direct copy, the only unique functions being `get` and `put`.
|
||||
|
||||
import
|
||||
std/os,
|
||||
stew/results,
|
||||
chronicles,
|
||||
sqlite3_abi
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "sqlite"
|
||||
|
||||
type
|
||||
Sqlite = ptr sqlite3
|
||||
|
||||
NoParams* = tuple
|
||||
RawStmtPtr* = ptr sqlite3_stmt
|
||||
SqliteStmt*[Params; Result] = distinct RawStmtPtr
|
||||
|
||||
AutoDisposed[T: ptr|ref] = object
|
||||
val: T
|
||||
|
||||
|
||||
template dispose(db: Sqlite) =
|
||||
discard sqlite3_close(db)
|
||||
|
||||
template dispose(rawStmt: RawStmtPtr) =
|
||||
discard sqlite3_finalize(rawStmt)
|
||||
|
||||
template dispose*(sqliteStmt: SqliteStmt) =
|
||||
discard sqlite3_finalize(RawStmtPtr sqliteStmt)
|
||||
|
||||
proc release[T](x: var AutoDisposed[T]): T =
|
||||
result = x.val
|
||||
x.val = nil
|
||||
|
||||
proc disposeIfUnreleased[T](x: var AutoDisposed[T]) =
|
||||
mixin dispose
|
||||
if x.val != nil:
|
||||
dispose(x.release)
|
||||
|
||||
template checkErr*(op, cleanup: untyped) =
|
||||
if (let v = (op); v != SQLITE_OK):
|
||||
cleanup
|
||||
return err($sqlite3_errstr(v))
|
||||
|
||||
template checkErr*(op) =
|
||||
checkErr(op): discard
|
||||
|
||||
|
||||
type
|
||||
DatabaseResult*[T] = Result[T, string]
|
||||
|
||||
SqliteDatabase* = ref object of RootObj
|
||||
env*: Sqlite
|
||||
|
||||
|
||||
type DataProc* = proc(s: RawStmtPtr) {.closure.} # the nim-eth definition is different; one more indirection
|
||||
|
||||
const NoopRowHandler* = proc(s: RawStmtPtr) {.closure.} = discard
|
||||
|
||||
|
||||
proc new*(T: type SqliteDatabase, path: string, readOnly=false): DatabaseResult[T] =
|
||||
var env: AutoDisposed[ptr sqlite3]
|
||||
defer: disposeIfUnreleased(env)
|
||||
|
||||
let flags = if readOnly: SQLITE_OPEN_READONLY
|
||||
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
|
||||
|
||||
if path != ":memory:":
|
||||
try:
|
||||
createDir(parentDir(path))
|
||||
except OSError, IOError:
|
||||
return err("sqlite: cannot create database directory")
|
||||
|
||||
checkErr sqlite3_open_v2(path, addr env.val, flags.cint, nil)
|
||||
|
||||
template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt =
|
||||
var s: ptr sqlite3_stmt
|
||||
checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil):
|
||||
cleanup
|
||||
s
|
||||
|
||||
template checkExec(s: ptr sqlite3_stmt) =
|
||||
if (let x = sqlite3_step(s); x != SQLITE_DONE):
|
||||
discard sqlite3_finalize(s)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_finalize(s); x != SQLITE_OK):
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
template checkExec(q: string) =
|
||||
let s = prepare(q): discard
|
||||
checkExec(s)
|
||||
|
||||
template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) =
|
||||
if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_column_text(journalModePragma, 0);
|
||||
x != "memory" and x != "wal"):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err("Invalid pragma result: " & $x)
|
||||
|
||||
|
||||
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
|
||||
checkWalPragmaResult(journalModePragma)
|
||||
checkExec(journalModePragma)
|
||||
|
||||
ok(SqliteDatabase(env: env.release))
|
||||
|
||||
|
||||
template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt =
|
||||
var s: ptr sqlite3_stmt
|
||||
checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil):
|
||||
cleanup
|
||||
s
|
||||
|
||||
proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint =
|
||||
when val is openarray[byte]|seq[byte]:
|
||||
if val.len > 0:
|
||||
sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil)
|
||||
else:
|
||||
sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil)
|
||||
elif val is int32:
|
||||
sqlite3_bind_int(s, n.cint, val)
|
||||
elif val is uint32:
|
||||
sqlite3_bind_int64(s, n.cint, val)
|
||||
elif val is int64:
|
||||
sqlite3_bind_int64(s, n.cint, val)
|
||||
elif val is float64:
|
||||
sqlite3_bind_double(s, n.cint, val)
|
||||
# Note: bind_text not yet supported in sqlite3_abi wrapper
|
||||
# elif val is string:
|
||||
# sqlite3_bind_text(s, n.cint, val.cstring, -1, nil) # `-1` implies string length is the number of bytes up to the first null-terminator
|
||||
else:
|
||||
{.fatal: "Please add support for the '" & $typeof(val) & "' type".}
|
||||
|
||||
template bindParams(s: RawStmtPtr, params: auto) =
|
||||
when params is tuple:
|
||||
var i = 1
|
||||
for param in fields(params):
|
||||
checkErr bindParam(s, i, param)
|
||||
inc i
|
||||
else:
|
||||
checkErr bindParam(s, 1, params)
|
||||
|
||||
proc exec*[P](s: SqliteStmt[P, void], params: P): DatabaseResult[void] =
|
||||
let s = RawStmtPtr s
|
||||
bindParams(s, params)
|
||||
|
||||
let res =
|
||||
if (let v = sqlite3_step(s); v != SQLITE_DONE):
|
||||
err($sqlite3_errstr(v))
|
||||
else:
|
||||
ok()
|
||||
|
||||
# release implict transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
|
||||
res
|
||||
|
||||
template readResult(s: RawStmtPtr, column: cint, T: type): auto =
|
||||
when T is Option:
|
||||
if sqlite3_column_type(s, column) == SQLITE_NULL:
|
||||
none(typeof(default(T).get()))
|
||||
else:
|
||||
some(readSimpleResult(s, column, typeof(default(T).get())))
|
||||
else:
|
||||
readSimpleResult(s, column, T)
|
||||
|
||||
template readResult(s: RawStmtPtr, T: type): auto =
|
||||
when T is tuple:
|
||||
var res: T
|
||||
var i = cint 0
|
||||
for field in fields(res):
|
||||
field = readResult(s, i, typeof(field))
|
||||
inc i
|
||||
res
|
||||
else:
|
||||
readResult(s, 0.cint, T)
|
||||
|
||||
proc exec*[Params, Res](s: SqliteStmt[Params, Res],
|
||||
params: Params,
|
||||
onData: DataProc): DatabaseResult[bool] =
|
||||
let s = RawStmtPtr s
|
||||
bindParams(s, params)
|
||||
|
||||
try:
|
||||
var gotResults = false
|
||||
while true:
|
||||
let v = sqlite3_step(s)
|
||||
case v
|
||||
of SQLITE_ROW:
|
||||
onData(s)
|
||||
gotResults = true
|
||||
of SQLITE_DONE:
|
||||
break
|
||||
else:
|
||||
return err($sqlite3_errstr(v))
|
||||
return ok gotResults
|
||||
finally:
|
||||
# release implicit transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
|
||||
|
||||
proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult[bool] =
|
||||
var s = prepare(db.env, query): discard
|
||||
|
||||
try:
|
||||
var gotResults = false
|
||||
while true:
|
||||
let v = sqlite3_step(s)
|
||||
case v
|
||||
of SQLITE_ROW:
|
||||
onData(s)
|
||||
gotResults = true
|
||||
of SQLITE_DONE:
|
||||
break
|
||||
else:
|
||||
return err($sqlite3_errstr(v))
|
||||
return ok gotResults
|
||||
finally:
|
||||
# release implicit transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
discard sqlite3_finalize(s) # NB: dispose of the prepared query statement and free associated memory
|
||||
|
||||
proc prepareStmt*(
|
||||
db: SqliteDatabase,
|
||||
stmt: string,
|
||||
Params: type,
|
||||
Res: type
|
||||
): DatabaseResult[SqliteStmt[Params, Res]] =
|
||||
var s: RawStmtPtr
|
||||
checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil)
|
||||
ok SqliteStmt[Params, Res](s)
|
||||
|
||||
proc close*(db: SqliteDatabase) =
|
||||
discard sqlite3_close(db.env)
|
||||
|
||||
db[] = SqliteDatabase()[]
|
||||
|
||||
|
||||
## Maintenance procedures
|
||||
|
||||
# TODO: Cache this value in the SqliteDatabase object.
|
||||
# Page size should not change during the node execution time
|
||||
proc getPageSize*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
## Query or set the page size of the database. The page size must be a power of
|
||||
## two between 512 and 65536 inclusive.
|
||||
var size: int64
|
||||
proc handler(s: RawStmtPtr) =
|
||||
size = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA page_size;", handler)
|
||||
if res.isErr():
|
||||
return err("failed to get page_size")
|
||||
|
||||
ok(size)
|
||||
|
||||
|
||||
proc getFreelistCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
## Return the number of unused pages in the database file.
|
||||
var count: int64
|
||||
proc handler(s: RawStmtPtr) =
|
||||
count = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA freelist_count;", handler)
|
||||
if res.isErr():
|
||||
return err("failed to get freelist_count")
|
||||
|
||||
ok(count)
|
||||
|
||||
|
||||
proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
## Return the total number of pages in the database file.
|
||||
var count: int64
|
||||
proc handler(s: RawStmtPtr) =
|
||||
count = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA page_count;", handler)
|
||||
if res.isErr():
|
||||
return err("failed to get page_count")
|
||||
|
||||
ok(count)
|
||||
|
||||
|
||||
proc vacuum*(db: SqliteDatabase): DatabaseResult[void] =
|
||||
## The VACUUM command rebuilds the database file, repacking it into a minimal amount of disk space.
|
||||
let res = db.query("VACUUM;", NoopRowHandler)
|
||||
if res.isErr():
|
||||
return err("vacuum failed")
|
||||
|
||||
ok()
|
||||
|
||||
|
||||
## Database scheme versioning
|
||||
|
||||
proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] =
|
||||
## Get the value of the user-version integer.
|
||||
##
|
||||
## The user-version is an integer that is available to applications to use however they want.
|
||||
## SQLite makes no use of the user-version itself. This integer is stored at offset 60 in
|
||||
## the database header.
|
||||
##
|
||||
## For more info check: https://www.sqlite.org/pragma.html#pragma_user_version
|
||||
var version: int64
|
||||
proc handler(s: ptr sqlite3_stmt) =
|
||||
version = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = database.query("PRAGMA user_version;", handler)
|
||||
if res.isErr():
|
||||
return err("failed to get user_version")
|
||||
|
||||
ok(version)
|
||||
|
||||
proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[void] =
|
||||
## Set the value of the user-version integer.
|
||||
##
|
||||
## The user-version is an integer that is available to applications to use however they want.
|
||||
## SQLite makes no use of the user-version itself. This integer is stored at offset 60 in
|
||||
## the database header.
|
||||
##
|
||||
## For more info check: https://www.sqlite.org/pragma.html#pragma_user_version
|
||||
let query = "PRAGMA user_version=" & $version & ";"
|
||||
let res = database.query(query, NoopRowHandler)
|
||||
if res.isErr():
|
||||
return err("failed to set user_version")
|
||||
|
||||
ok()
|
138
waku/common/sqlite/migrations.nim
Normal file
138
waku/common/sqlite/migrations.nim
Normal file
@ -0,0 +1,138 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[strutils, sequtils, os, algorithm],
|
||||
stew/results,
|
||||
chronicles
|
||||
import
|
||||
../sqlite
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "sqlite"
|
||||
|
||||
|
||||
## Migration scripts
|
||||
|
||||
proc getMigrationScriptVersion(path: string): DatabaseResult[int64] =
|
||||
let name = extractFilename(path)
|
||||
let parts = name.split("_", 1)
|
||||
|
||||
try:
|
||||
let version = parseInt(parts[0])
|
||||
return ok(version)
|
||||
except ValueError:
|
||||
return err("failed to parse file version: " & name)
|
||||
|
||||
proc isSqlScript(path: string): bool =
|
||||
path.toLower().endsWith(".sql")
|
||||
|
||||
|
||||
proc listSqlScripts(path: string): DatabaseResult[seq[string]] =
|
||||
var scripts = newSeq[string]()
|
||||
|
||||
try:
|
||||
for scriptPath in walkDirRec(path):
|
||||
if isSqlScript(scriptPath):
|
||||
scripts.add(scriptPath)
|
||||
else:
|
||||
debug "invalid migration script", file=scriptPath
|
||||
except OSError:
|
||||
return err("failed to list migration scripts: " & getCurrentExceptionMsg())
|
||||
|
||||
ok(scripts)
|
||||
|
||||
|
||||
proc filterMigrationScripts(paths: seq[string], lowVersion, highVersion: int64, direction: string = "up"): seq[string] =
|
||||
## Returns migration scripts whose version fall between lowVersion and highVersion (inclusive)
|
||||
let filterPredicate = proc(script: string): bool =
|
||||
if not isSqlScript(script):
|
||||
return false
|
||||
|
||||
if direction != "" and not script.toLower().endsWith("." & direction & ".sql"):
|
||||
return false
|
||||
|
||||
let scriptVersionRes = getMigrationScriptVersion(script)
|
||||
if scriptVersionRes.isErr():
|
||||
return false
|
||||
|
||||
let scriptVersion = scriptVersionRes.value
|
||||
return lowVersion < scriptVersion and scriptVersion <= highVersion
|
||||
|
||||
paths.filter(filterPredicate)
|
||||
|
||||
|
||||
proc sortMigrationScripts(paths: seq[string]): seq[string] =
|
||||
## Sort migration scripts paths alphabetically
|
||||
paths.sorted(system.cmp[string])
|
||||
|
||||
|
||||
proc loadMigrationScripts(paths: seq[string]): DatabaseResult[seq[string]] =
|
||||
var loadedScripts = newSeq[string]()
|
||||
|
||||
for script in paths:
|
||||
try:
|
||||
loadedScripts.add(readFile(script))
|
||||
except OSError, IOError:
|
||||
return err("failed to load script '" & script & "': " & getCurrentExceptionMsg())
|
||||
|
||||
ok(loadedScripts)
|
||||
|
||||
|
||||
proc breakIntoStatements(script: string): seq[string] =
|
||||
var statements = newSeq[string]()
|
||||
|
||||
for chunk in script.split(';'):
|
||||
if chunk.strip().isEmptyOrWhitespace():
|
||||
continue
|
||||
|
||||
let statement = chunk.strip() & ";"
|
||||
statements.add(statement)
|
||||
|
||||
statements
|
||||
|
||||
|
||||
proc migrate*(db: SqliteDatabase, targetVersion: int64, migrationsScriptsDir: string): DatabaseResult[void] =
|
||||
## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then
|
||||
## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path
|
||||
## points to the directory holding the migrations scripts once the db is updated, it sets the
|
||||
## `user_version` to the `tragetVersion`.
|
||||
##
|
||||
## NOTE: Down migration it is not currently supported
|
||||
let userVersion = ?db.getUserVersion()
|
||||
|
||||
if userVersion == targetVersion:
|
||||
debug "database schema is up to date", userVersion=userVersion, targetVersion=targetVersion
|
||||
return ok()
|
||||
|
||||
info "database schema is outdated", userVersion=userVersion, targetVersion=targetVersion
|
||||
|
||||
# Load migration scripts
|
||||
var migrationScriptsPaths = ?listSqlScripts(migrationsScriptsDir)
|
||||
migrationScriptsPaths = filterMigrationScripts(migrationScriptsPaths, lowVersion=userVersion, highVersion=targetVersion, direction="up")
|
||||
migrationScriptsPaths = sortMigrationScripts(migrationScriptsPaths)
|
||||
|
||||
if migrationScriptsPaths.len <= 0:
|
||||
debug "no scripts to be run"
|
||||
return ok()
|
||||
|
||||
let scripts = ?loadMigrationScripts(migrationScriptsPaths)
|
||||
|
||||
# Run the migration scripts
|
||||
for script in scripts:
|
||||
|
||||
for statement in script.breakIntoStatements():
|
||||
debug "executing migration statement", statement=statement
|
||||
|
||||
let execRes = db.query(statement, NoopRowHandler)
|
||||
if execRes.isErr():
|
||||
error "failed to execute migration statement", statement=statement, error=execRes.error
|
||||
return err("failed to execute migration statement")
|
||||
|
||||
debug "migration statement executed succesfully", statement=statement
|
||||
|
||||
# Update user_version
|
||||
?db.setUserVersion(targetVersion)
|
||||
|
||||
debug "database user_version updated", userVersion=targetVersion
|
||||
ok()
|
Loading…
x
Reference in New Issue
Block a user