refactor(databases): Creation of the databases folder to keep the logic for sqlite and postgres (#1811)

* Refactoring in sqlite and postgres. Creation of the databases folder.
This commit is contained in:
Ivan Folgueira Bande 2023-06-22 11:27:40 +02:00 committed by GitHub
parent a4da87bb8c
commit a44d4bfbcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 281 additions and 244 deletions

View File

@ -19,7 +19,7 @@ import
metrics/chronos_httpserver metrics/chronos_httpserver
import import
../../waku/common/utils/nat, ../../waku/common/utils/nat,
../../waku/common/sqlite, ../../waku/common/databases/db_sqlite,
../../waku/v2/waku_core, ../../waku/v2/waku_core,
../../waku/v2/waku_node, ../../waku/v2/waku_node,
../../waku/v2/node/waku_metrics, ../../waku/v2/node/waku_metrics,

View File

@ -1,16 +0,0 @@
{.used.}
import
std/[strutils, os],
stew/results,
testutils/unittests,
chronos
import
../../waku/common/postgres/asyncpool,
../../waku/common/postgres/pg_asyncpool_opts
suite "Async pool":
asyncTest "Create connection pool":
## TODO: extend unit tests
var pgOpts = PgAsyncPoolOptions.init()

View File

@ -5,8 +5,7 @@ import
stew/results, stew/results,
testutils/unittests testutils/unittests
import import
../../waku/common/sqlite/database, ../../waku/common/databases/db_sqlite {.all.}
../../waku/common/sqlite/migrations {.all.}
proc newTestDatabase(): SqliteDatabase = proc newTestDatabase(): SqliteDatabase =

View File

@ -17,7 +17,7 @@ import
libp2p/protocols/pubsub/rpc/message, libp2p/protocols/pubsub/rpc/message,
libp2p/peerid libp2p/peerid
import import
../../waku/common/sqlite, ../../waku/common/databases/db_sqlite,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
../../waku/v2/waku_node, ../../waku/v2/waku_node,

View File

@ -4,7 +4,7 @@ import
testutils/unittests, testutils/unittests,
libp2p/crypto/crypto libp2p/crypto/crypto
import import
../../waku/common/sqlite, ../../waku/common/databases/db_sqlite,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
./testlib/wakucore ./testlib/wakucore

View File

@ -5,7 +5,7 @@ import
testutils/unittests, testutils/unittests,
chronos chronos
import import
../../../waku/common/sqlite, ../../../waku/common/databases/db_sqlite,
../../../waku/v2/waku_archive, ../../../waku/v2/waku_archive,
../../../waku/v2/waku_archive/driver/sqlite_driver, ../../../waku/v2/waku_archive/driver/sqlite_driver,
../../../waku/v2/waku_core, ../../../waku/v2/waku_core,

View File

@ -6,7 +6,7 @@ import
chronos, chronos,
chronicles chronicles
import import
../../../waku/common/sqlite, ../../../waku/common/databases/db_sqlite,
../../../waku/v2/waku_archive, ../../../waku/v2/waku_archive,
../../../waku/v2/waku_archive/driver/sqlite_driver, ../../../waku/v2/waku_archive/driver/sqlite_driver,
../../../waku/v2/waku_core, ../../../waku/v2/waku_core,

View File

@ -6,7 +6,7 @@ import
testutils/unittests, testutils/unittests,
chronos chronos
import import
../../../waku/common/sqlite, ../../../waku/common/databases/db_sqlite,
../../../waku/v2/waku_core, ../../../waku/v2/waku_core,
../../../waku/v2/waku_archive, ../../../waku/v2/waku_archive,
../../../waku/v2/waku_archive/driver/sqlite_driver, ../../../waku/v2/waku_archive/driver/sqlite_driver,

View File

@ -7,7 +7,7 @@ import
chronos, chronos,
libp2p/crypto/crypto libp2p/crypto/crypto
import import
../../../waku/common/sqlite, ../../../waku/common/databases/db_sqlite,
../../../waku/v2/waku_core, ../../../waku/v2/waku_core,
../../../waku/v2/waku_archive/driver/sqlite_driver, ../../../waku/v2/waku_archive/driver/sqlite_driver,
../../../waku/v2/waku_archive, ../../../waku/v2/waku_archive,

View File

@ -7,7 +7,7 @@ import
chronicles, chronicles,
libp2p/crypto/crypto libp2p/crypto/crypto
import import
../../waku/common/sqlite, ../../waku/common/databases/db_sqlite,
../../waku/v2/node/message_store/sqlite_store, ../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/peer_manager, ../../waku/v2/node/peer_manager,
../../waku/v2/waku_core, ../../waku/v2/waku_core,

View File

@ -13,7 +13,7 @@ import
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub libp2p/protocols/pubsub/gossipsub
import import
../../../waku/common/sqlite, ../../../waku/common/databases/db_sqlite,
../../../waku/v2/waku_core, ../../../waku/v2/waku_core,
../../../waku/v2/node/peer_manager, ../../../waku/v2/node/peer_manager,
../../../waku/v2/waku_archive, ../../../waku/v2/waku_archive,

View File

@ -0,0 +1,6 @@
import
stew/results
type
DatabaseResult*[T] = Result[T, string]

View File

@ -0,0 +1,8 @@
import
./common,
./db_postgres/pgasyncpool
export
common,
pgasyncpool

View File

@ -6,13 +6,13 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/sequtils, std/[sequtils,nre, strformat],
stew/results, stew/results,
chronicles, chronicles,
chronos chronos
import import
../../driver, ./dbconn,
./connection ../common
logScope: logScope:
topics = "postgres asyncpool" topics = "postgres asyncpool"
@ -39,8 +39,25 @@ type
conns: seq[PgDbConn] conns: seq[PgDbConn]
proc new*(T: type PgAsyncPool, proc new*(T: type PgAsyncPool,
connString: string, dbUrl: string,
maxConnections: int): T = maxConnections: int):
DatabaseResult[T] =
var connString: string
try:
let regex = re("""^postgres:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$""")
let matches = find(dbUrl,regex).get.captures
let user = matches[0]
let password = matches[1]
let host = matches[2]
let port = matches[3]
let dbName = matches[4]
connString =
fmt"user={user} host={host} port={port} dbname={dbName} password={password}"
except KeyError,InvalidUnicodeError, RegexInternalError, ValueError,
StudyError, SyntaxError:
return err("could not parse postgres string: " & getCurrentExceptionMsg())
let pool = PgAsyncPool( let pool = PgAsyncPool(
connString: connString, connString: connString,
@ -49,7 +66,7 @@ proc new*(T: type PgAsyncPool,
conns: newSeq[PgDbConn](0) conns: newSeq[PgDbConn](0)
) )
return pool return ok(pool)
func isLive(pool: PgAsyncPool): bool = func isLive(pool: PgAsyncPool): bool =
pool.state == PgAsyncPoolState.Live pool.state == PgAsyncPoolState.Live
@ -70,17 +87,16 @@ proc close*(pool: PgAsyncPool):
# wait for the connections to be released and close them, without # wait for the connections to be released and close them, without
# blocking the async runtime # blocking the async runtime
if pool.conns.anyIt(it.busy): while pool.conns.anyIt(it.busy):
while pool.conns.anyIt(it.busy): await sleepAsync(0.milliseconds)
await sleepAsync(0.milliseconds)
for i in 0..<pool.conns.len: for i in 0..<pool.conns.len:
if pool.conns[i].busy: if pool.conns[i].busy:
continue continue
pool.conns[i].dbConn.close() pool.conns[i].dbConn.close()
pool.conns[i].busy = false pool.conns[i].busy = false
pool.conns[i].open = false pool.conns[i].open = false
for i in 0..<pool.conns.len: for i in 0..<pool.conns.len:
if pool.conns[i].open: if pool.conns[i].open:
@ -92,7 +108,7 @@ proc close*(pool: PgAsyncPool):
return ok() return ok()
proc getConnIndex(pool: PgAsyncPool): proc getConnIndex(pool: PgAsyncPool):
Future[Result[int, string]] {.async.} = Future[DatabaseResult[int]] {.async.} =
## Waits for a free connection or create if max connections limits have not been reached. ## Waits for a free connection or create if max connections limits have not been reached.
## Returns the index of the free connection ## Returns the index of the free connection
@ -101,7 +117,7 @@ proc getConnIndex(pool: PgAsyncPool):
# stablish new connections if we are under the limit # stablish new connections if we are under the limit
if pool.isBusy() and pool.conns.len < pool.maxConnections: if pool.isBusy() and pool.conns.len < pool.maxConnections:
let connRes = connection.open(pool.connString) let connRes = dbconn.open(pool.connString)
if connRes.isOk(): if connRes.isOk():
let conn = connRes.get() let conn = connRes.get()
pool.conns.add(PgDbConn(dbConn: conn, busy: true, open: true)) pool.conns.add(PgDbConn(dbConn: conn, busy: true, open: true))
@ -129,7 +145,7 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
proc query*(pool: PgAsyncPool, proc query*(pool: PgAsyncPool,
query: string, query: string,
args: seq[string] = newSeq[string](0)): args: seq[string] = newSeq[string](0)):
Future[Result[seq[Row], string]] {.async.} = Future[DatabaseResult[seq[Row]]] {.async.} =
## Runs the SQL query getting results. ## Runs the SQL query getting results.
## Retrieves info from the database. ## Retrieves info from the database.
@ -149,7 +165,7 @@ proc query*(pool: PgAsyncPool,
proc exec*(pool: PgAsyncPool, proc exec*(pool: PgAsyncPool,
query: string, query: string,
args: seq[string] = newSeq[string](0)): args: seq[string] = newSeq[string](0)):
Future[ArchiveDriverResult[void]] {.async.} = Future[DatabaseResult[void]] {.async.} =
## Runs the SQL query without results. ## Runs the SQL query without results.
## Alters the database state. ## Alters the database state.
@ -169,14 +185,14 @@ proc exec*(pool: PgAsyncPool,
proc runStmt*(pool: PgAsyncPool, proc runStmt*(pool: PgAsyncPool,
baseStmt: string, baseStmt: string,
args: seq[string]): args: seq[string]):
Future[ArchiveDriverResult[void]] {.async.} = Future[DatabaseResult[void]] {.async.} =
# Runs a stored statement, for performance purposes. # Runs a stored statement, for performance purposes.
# In the current implementation, this is aimed # In the current implementation, this is aimed
# to run the 'insertRow' stored statement aimed to add a new Waku message. # to run the 'insertRow' stored statement aimed to add a new Waku message.
let connIndexRes = await pool.getConnIndex() let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr(): if connIndexRes.isErr():
return ArchiveDriverResult[void].err(connIndexRes.error()) return err(connIndexRes.error())
let conn = pool.conns[connIndexRes.value].dbConn let conn = pool.conns[connIndexRes.value].dbConn
defer: pool.releaseConn(conn) defer: pool.releaseConn(conn)

View File

@ -5,11 +5,12 @@
# Most of it is a direct copy, the only unique functions being `get` and `put`. # Most of it is a direct copy, the only unique functions being `get` and `put`.
import import
std/os, std/[os, strutils, sequtils, algorithm],
stew/results, stew/results,
chronicles, chronicles,
sqlite3_abi sqlite3_abi
import
./common
logScope: logScope:
topics = "sqlite" topics = "sqlite"
@ -24,7 +25,6 @@ type
AutoDisposed[T: ptr|ref] = object AutoDisposed[T: ptr|ref] = object
val: T val: T
template dispose(db: Sqlite) = template dispose(db: Sqlite) =
discard sqlite3_close(db) discard sqlite3_close(db)
@ -51,20 +51,19 @@ template checkErr*(op, cleanup: untyped) =
template checkErr*(op) = template checkErr*(op) =
checkErr(op): discard checkErr(op): discard
type type
DatabaseResult*[T] = Result[T, string]
SqliteDatabase* = ref object of RootObj SqliteDatabase* = ref object of RootObj
env*: Sqlite env*: Sqlite
type DataProc* = proc(s: RawStmtPtr) {.closure.} # the nim-eth definition is different; one more indirection type DataProc* = proc(s: RawStmtPtr) {.closure.} # the nim-eth definition is different; one more indirection
const NoopRowHandler* = proc(s: RawStmtPtr) {.closure.} = discard const NoopRowHandler* = proc(s: RawStmtPtr) {.closure.} = discard
proc new*(T: type SqliteDatabase,
path: string,
readOnly=false):
DatabaseResult[T] =
proc new*(T: type SqliteDatabase, path: string, readOnly=false): DatabaseResult[T] =
var env: AutoDisposed[ptr sqlite3] var env: AutoDisposed[ptr sqlite3]
defer: disposeIfUnreleased(env) defer: disposeIfUnreleased(env)
@ -111,14 +110,12 @@ proc new*(T: type SqliteDatabase, path: string, readOnly=false): DatabaseResult[
discard sqlite3_finalize(journalModePragma) discard sqlite3_finalize(journalModePragma)
return err("Invalid pragma result: " & $x) return err("Invalid pragma result: " & $x)
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
checkWalPragmaResult(journalModePragma) checkWalPragmaResult(journalModePragma)
checkExec(journalModePragma) checkExec(journalModePragma)
ok(SqliteDatabase(env: env.release)) ok(SqliteDatabase(env: env.release))
template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt = template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt =
var s: ptr sqlite3_stmt var s: ptr sqlite3_stmt
checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil): checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil):
@ -219,8 +216,8 @@ proc exec*[Params, Res](s: SqliteStmt[Params, Res],
discard sqlite3_reset(s) # same return information as step discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible discard sqlite3_clear_bindings(s) # no errors possible
proc query*(db: SqliteDatabase, query: string, onData: DataProc):
proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult[bool] = DatabaseResult[bool] =
var s = prepare(db.env, query): discard var s = prepare(db.env, query): discard
try: try:
@ -257,7 +254,6 @@ proc close*(db: SqliteDatabase) =
db[] = SqliteDatabase()[] db[] = SqliteDatabase()[]
## Maintenance procedures ## Maintenance procedures
# TODO: Cache this value in the SqliteDatabase object. # TODO: Cache this value in the SqliteDatabase object.
@ -273,8 +269,7 @@ proc getPageSize*(db: SqliteDatabase): DatabaseResult[int64] =
if res.isErr(): if res.isErr():
return err("failed to get page_size") return err("failed to get page_size")
ok(size) return ok(size)
proc getFreelistCount*(db: SqliteDatabase): DatabaseResult[int64] = proc getFreelistCount*(db: SqliteDatabase): DatabaseResult[int64] =
## Return the number of unused pages in the database file. ## Return the number of unused pages in the database file.
@ -286,8 +281,7 @@ proc getFreelistCount*(db: SqliteDatabase): DatabaseResult[int64] =
if res.isErr(): if res.isErr():
return err("failed to get freelist_count") return err("failed to get freelist_count")
ok(count) return ok(count)
proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] = proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] =
## Return the total number of pages in the database file. ## Return the total number of pages in the database file.
@ -299,8 +293,16 @@ proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] =
if res.isErr(): if res.isErr():
return err("failed to get page_count") return err("failed to get page_count")
ok(count) return ok(count)
proc gatherSqlitePageStats*(db: SqliteDatabase):
DatabaseResult[(int64, int64, int64)] =
let
pageSize = ?db.getPageSize()
pageCount = ?db.getPageCount()
freelistCount = ?db.getFreelistCount()
return ok((pageSize, pageCount, freelistCount))
proc vacuum*(db: SqliteDatabase): DatabaseResult[void] = proc vacuum*(db: SqliteDatabase): DatabaseResult[void] =
## The VACUUM command rebuilds the database file, repacking it into a minimal amount of disk space. ## The VACUUM command rebuilds the database file, repacking it into a minimal amount of disk space.
@ -308,8 +310,7 @@ proc vacuum*(db: SqliteDatabase): DatabaseResult[void] =
if res.isErr(): if res.isErr():
return err("vacuum failed") return err("vacuum failed")
ok() return ok()
## Database scheme versioning ## Database scheme versioning
@ -331,7 +332,8 @@ proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] =
ok(version) ok(version)
proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[void] = proc setUserVersion*(database: SqliteDatabase, version: int64):
DatabaseResult[void] =
## Set the value of the user-version integer. ## Set the value of the user-version integer.
## ##
## The user-version is an integer that is available to applications to use however they want. ## The user-version is an integer that is available to applications to use however they want.
@ -345,3 +347,141 @@ proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[v
return err("failed to set user_version") return err("failed to set user_version")
ok() ok()
## Migration scripts
proc getMigrationScriptVersion(path: string): DatabaseResult[int64] =
let name = extractFilename(path)
let parts = name.split("_", 1)
try:
let version = parseInt(parts[0])
return ok(version)
except ValueError:
return err("failed to parse file version: " & name)
proc isSqlScript(path: string): bool =
path.toLower().endsWith(".sql")
proc listSqlScripts(path: string): DatabaseResult[seq[string]] =
var scripts = newSeq[string]()
try:
for scriptPath in walkDirRec(path):
if isSqlScript(scriptPath):
scripts.add(scriptPath)
else:
debug "invalid migration script", file=scriptPath
except OSError:
return err("failed to list migration scripts: " & getCurrentExceptionMsg())
ok(scripts)
proc filterMigrationScripts(paths: seq[string],
lowVersion, highVersion: int64,
direction: string = "up"):
seq[string] =
## Returns migration scripts whose version fall between lowVersion and highVersion (inclusive)
let filterPredicate = proc(script: string): bool =
if not isSqlScript(script):
return false
if direction != "" and not script.toLower().endsWith("." & direction & ".sql"):
return false
let scriptVersionRes = getMigrationScriptVersion(script)
if scriptVersionRes.isErr():
return false
let scriptVersion = scriptVersionRes.value
return lowVersion < scriptVersion and scriptVersion <= highVersion
paths.filter(filterPredicate)
proc sortMigrationScripts(paths: seq[string]): seq[string] =
## Sort migration scripts paths alphabetically
paths.sorted(system.cmp[string])
proc loadMigrationScripts(paths: seq[string]): DatabaseResult[seq[string]] =
var loadedScripts = newSeq[string]()
for script in paths:
try:
loadedScripts.add(readFile(script))
except OSError, IOError:
return err("failed to load script '" & script & "': " & getCurrentExceptionMsg())
ok(loadedScripts)
proc breakIntoStatements(script: string): seq[string] =
var statements = newSeq[string]()
for chunk in script.split(';'):
if chunk.strip().isEmptyOrWhitespace():
continue
let statement = chunk.strip() & ";"
statements.add(statement)
statements
proc migrate*(db: SqliteDatabase,
targetVersion: int64,
migrationsScriptsDir: string):
DatabaseResult[void] =
## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then
## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path
## points to the directory holding the migrations scripts once the db is updated, it sets the
## `user_version` to the `tragetVersion`.
##
## NOTE: Down migration it is not currently supported
let userVersion = ?db.getUserVersion()
if userVersion == targetVersion:
debug "database schema is up to date", userVersion=userVersion, targetVersion=targetVersion
return ok()
info "database schema is outdated", userVersion=userVersion, targetVersion=targetVersion
# Load migration scripts
var migrationScriptsPaths = ?listSqlScripts(migrationsScriptsDir)
migrationScriptsPaths = filterMigrationScripts(migrationScriptsPaths, lowVersion=userVersion, highVersion=targetVersion, direction="up")
migrationScriptsPaths = sortMigrationScripts(migrationScriptsPaths)
if migrationScriptsPaths.len <= 0:
debug "no scripts to be run"
return ok()
let scripts = ?loadMigrationScripts(migrationScriptsPaths)
# Run the migration scripts
for script in scripts:
for statement in script.breakIntoStatements():
debug "executing migration statement", statement=statement
let execRes = db.query(statement, NoopRowHandler)
if execRes.isErr():
error "failed to execute migration statement", statement=statement, error=execRes.error
return err("failed to execute migration statement")
debug "migration statement executed succesfully", statement=statement
# Update user_version
?db.setUserVersion(targetVersion)
debug "database user_version updated", userVersion=targetVersion
ok()
proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] =
## SQLite database vacuuming
# TODO: Run vacuuming conditionally based on database page stats
# if (pageCount > 0 and freelistCount > 0):
debug "starting sqlite database vacuuming"
let resVacuum = db.vacuum()
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)
debug "finished sqlite database vacuuming"

View File

@ -0,0 +1,33 @@
import
std/strutils,
regex,
stew/results
proc validateDbUrl*(dbUrl: string): Result[string, string] =
## dbUrl mimics SQLAlchemy Database URL schema
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
let regex = re"^[\w\+]+:\/\/[\w\/\\\.\:\@]+$"
let dbUrl = dbUrl.strip()
if dbUrl == "" or dbUrl == "none" or dbUrl.match(regex):
return ok(dbUrl)
else:
return err("invalid 'db url' option format: " & dbUrl)
proc getDbEngine*(dbUrl: string): Result[string, string] =
let dbUrlParts = dbUrl.split("://", 1)
if dbUrlParts.len != 2:
return err("Incorrect dbUrl : " & dbUrl)
let engine = dbUrlParts[0]
return ok(engine)
proc getDbPath*(dbUrl: string): Result[string, string] =
let dbUrlParts = dbUrl.split("://", 1)
if dbUrlParts.len != 2:
return err("Incorrect dbUrl : " & dbUrl)
let path = dbUrlParts[1]
return ok(path)

View File

@ -1,7 +0,0 @@
{.push raises: [].}
import
./sqlite/database
export
database

View File

@ -1,138 +0,0 @@
{.push raises: [].}
import
std/[strutils, sequtils, os, algorithm],
stew/results,
chronicles
import
../sqlite
logScope:
topics = "sqlite"
## Migration scripts
proc getMigrationScriptVersion(path: string): DatabaseResult[int64] =
let name = extractFilename(path)
let parts = name.split("_", 1)
try:
let version = parseInt(parts[0])
return ok(version)
except ValueError:
return err("failed to parse file version: " & name)
proc isSqlScript(path: string): bool =
path.toLower().endsWith(".sql")
proc listSqlScripts(path: string): DatabaseResult[seq[string]] =
var scripts = newSeq[string]()
try:
for scriptPath in walkDirRec(path):
if isSqlScript(scriptPath):
scripts.add(scriptPath)
else:
debug "invalid migration script", file=scriptPath
except OSError:
return err("failed to list migration scripts: " & getCurrentExceptionMsg())
ok(scripts)
proc filterMigrationScripts(paths: seq[string], lowVersion, highVersion: int64, direction: string = "up"): seq[string] =
## Returns migration scripts whose version fall between lowVersion and highVersion (inclusive)
let filterPredicate = proc(script: string): bool =
if not isSqlScript(script):
return false
if direction != "" and not script.toLower().endsWith("." & direction & ".sql"):
return false
let scriptVersionRes = getMigrationScriptVersion(script)
if scriptVersionRes.isErr():
return false
let scriptVersion = scriptVersionRes.value
return lowVersion < scriptVersion and scriptVersion <= highVersion
paths.filter(filterPredicate)
proc sortMigrationScripts(paths: seq[string]): seq[string] =
## Sort migration scripts paths alphabetically
paths.sorted(system.cmp[string])
proc loadMigrationScripts(paths: seq[string]): DatabaseResult[seq[string]] =
var loadedScripts = newSeq[string]()
for script in paths:
try:
loadedScripts.add(readFile(script))
except OSError, IOError:
return err("failed to load script '" & script & "': " & getCurrentExceptionMsg())
ok(loadedScripts)
proc breakIntoStatements(script: string): seq[string] =
var statements = newSeq[string]()
for chunk in script.split(';'):
if chunk.strip().isEmptyOrWhitespace():
continue
let statement = chunk.strip() & ";"
statements.add(statement)
statements
proc migrate*(db: SqliteDatabase, targetVersion: int64, migrationsScriptsDir: string): DatabaseResult[void] =
## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then
## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path
## points to the directory holding the migrations scripts once the db is updated, it sets the
## `user_version` to the `tragetVersion`.
##
## NOTE: Down migration it is not currently supported
let userVersion = ?db.getUserVersion()
if userVersion == targetVersion:
debug "database schema is up to date", userVersion=userVersion, targetVersion=targetVersion
return ok()
info "database schema is outdated", userVersion=userVersion, targetVersion=targetVersion
# Load migration scripts
var migrationScriptsPaths = ?listSqlScripts(migrationsScriptsDir)
migrationScriptsPaths = filterMigrationScripts(migrationScriptsPaths, lowVersion=userVersion, highVersion=targetVersion, direction="up")
migrationScriptsPaths = sortMigrationScripts(migrationScriptsPaths)
if migrationScriptsPaths.len <= 0:
debug "no scripts to be run"
return ok()
let scripts = ?loadMigrationScripts(migrationScriptsPaths)
# Run the migration scripts
for script in scripts:
for statement in script.breakIntoStatements():
debug "executing migration statement", statement=statement
let execRes = db.query(statement, NoopRowHandler)
if execRes.isErr():
error "failed to execute migration statement", statement=statement, error=execRes.error
return err("failed to execute migration statement")
debug "migration statement executed succesfully", statement=statement
# Update user_version
?db.setUserVersion(targetVersion)
debug "database user_version updated", userVersion=targetVersion
ok()

View File

@ -9,8 +9,8 @@ import
stew/results, stew/results,
chronicles chronicles
import import
../../../../common/sqlite, ../../../../common/databases/db_sqlite,
../../../../common/sqlite/migrations ../../../../common/databases/common
logScope: logScope:
@ -23,7 +23,7 @@ template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / "
const PeerStoreMigrationPath: string = projectRoot / "migrations" / "peer_store" const PeerStoreMigrationPath: string = projectRoot / "migrations" / "peer_store"
proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] = proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] =
## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then ## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then
## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path ## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path
## points to the directory holding the migrations scripts once the db is updated, it sets the ## points to the directory holding the migrations scripts once the db is updated, it sets the

View File

@ -10,12 +10,13 @@ import
sqlite3_abi, sqlite3_abi,
libp2p/protobuf/minprotobuf libp2p/protobuf/minprotobuf
import import
../../../../common/sqlite, ../../../../common/databases/db_sqlite,
../../../../common/databases/common,
../../../waku_core, ../../../waku_core,
../waku_peer_store, ../waku_peer_store,
./peer_storage ./peer_storage
export sqlite export db_sqlite
type type
WakuPeerStorage* = ref object of PeerStorage WakuPeerStorage* = ref object of PeerStorage

View File

@ -12,7 +12,7 @@ import
../../../waku_core, ../../../waku_core,
../../common, ../../common,
../../driver, ../../driver,
asyncpool ../../../../common/databases/db_postgres as waku_postgres
export postgres_driver export postgres_driver
@ -46,24 +46,11 @@ proc new*(T: type PostgresDriver,
maxConnections: int = DefaultMaxConnections): maxConnections: int = DefaultMaxConnections):
ArchiveDriverResult[T] = ArchiveDriverResult[T] =
var connPool: PgAsyncPool let connPoolRes = PgAsyncPool.new(dbUrl, maxConnections)
if connPoolRes.isErr():
return err("error creating PgAsyncPool: " & connPoolRes.error)
try: return ok(PostgresDriver(connPool: connPoolRes.get()))
let regex = re("""^postgres:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$""")
let matches = find(dbUrl,regex).get.captures
let user = matches[0]
let password = matches[1]
let host = matches[2]
let port = matches[3]
let dbName = matches[4]
let connectionString = fmt"user={user} host={host} port={port} dbname={dbName} password={password}"
connPool = PgAsyncPool.new(connectionString, maxConnections)
except KeyError,InvalidUnicodeError, RegexInternalError, ValueError, StudyError, SyntaxError:
return err("could not parse postgres string")
return ok(PostgresDriver(connPool: connPool))
proc createMessageTable(s: PostgresDriver): proc createMessageTable(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} = Future[ArchiveDriverResult[void]] {.async.} =

View File

@ -5,8 +5,8 @@ import
stew/results, stew/results,
chronicles chronicles
import import
../../../../common/sqlite, ../../../../common/databases/db_sqlite,
../../../../common/sqlite/migrations ../../../../common/databases/common
logScope: logScope:
@ -30,7 +30,7 @@ proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult
## NOTE: Down migration it is not currently supported ## NOTE: Down migration it is not currently supported
debug "starting message store's sqlite database migration" debug "starting message store's sqlite database migration"
let migrationRes = migrations.migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath) let migrationRes = migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath)
if migrationRes.isErr(): if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error) return err("failed to execute migration scripts: " & migrationRes.error)

View File

@ -9,7 +9,8 @@ import
stew/[results, byteutils], stew/[results, byteutils],
sqlite3_abi sqlite3_abi
import import
../../../../common/sqlite, ../../../../common/databases/db_sqlite,
../../../../common/databases/common,
../../../waku_core, ../../../waku_core,
./cursor ./cursor
@ -201,7 +202,10 @@ proc selectAllMessagesQuery(table: string): SqlQueryStr =
" FROM " & table & " FROM " & table &
" ORDER BY storedAt ASC" " ORDER BY storedAt ASC"
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] = proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic,
WakuMessage,
seq[byte],
Timestamp)]] =
## Retrieve all messages from the store. ## Retrieve all messages from the store.
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
proc queryRowCallback(s: ptr sqlite3_stmt) = proc queryRowCallback(s: ptr sqlite3_stmt) =
@ -355,7 +359,11 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
startTime: Option[Timestamp], startTime: Option[Timestamp],
endTime: Option[Timestamp], endTime: Option[Timestamp],
limit: uint, limit: uint,
ascending: bool): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] = ascending: bool):
DatabaseResult[seq[(PubsubTopic,
WakuMessage,
seq[byte],
Timestamp)]] =
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[] var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[]

View File

@ -11,7 +11,7 @@ import
chronicles, chronicles,
chronos chronos
import import
../../../../common/sqlite, ../../../../common/databases/db_sqlite,
../../../waku_core, ../../../waku_core,
../../common, ../../common,
../../driver, ../../driver,