diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index b169dc406..ce33b15de 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -19,7 +19,7 @@ import metrics/chronos_httpserver import ../../waku/common/utils/nat, - ../../waku/common/sqlite, + ../../waku/common/databases/db_sqlite, ../../waku/v2/waku_core, ../../waku/v2/waku_node, ../../waku/v2/node/waku_metrics, diff --git a/tests/common/test_postgresql_asyncpool.nim b/tests/common/test_postgresql_asyncpool.nim deleted file mode 100644 index 42a3af047..000000000 --- a/tests/common/test_postgresql_asyncpool.nim +++ /dev/null @@ -1,16 +0,0 @@ -{.used.} - -import - std/[strutils, os], - stew/results, - testutils/unittests, - chronos -import - ../../waku/common/postgres/asyncpool, - ../../waku/common/postgres/pg_asyncpool_opts - -suite "Async pool": - - asyncTest "Create connection pool": - ## TODO: extend unit tests - var pgOpts = PgAsyncPoolOptions.init() diff --git a/tests/common/test_sqlite_migrations.nim b/tests/common/test_sqlite_migrations.nim index 1c0969708..5b165bf05 100644 --- a/tests/common/test_sqlite_migrations.nim +++ b/tests/common/test_sqlite_migrations.nim @@ -5,8 +5,7 @@ import stew/results, testutils/unittests import - ../../waku/common/sqlite/database, - ../../waku/common/sqlite/migrations {.all.} + ../../waku/common/databases/db_sqlite {.all.} proc newTestDatabase(): SqliteDatabase = diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 0151cca5a..60a55d466 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -17,7 +17,7 @@ import libp2p/protocols/pubsub/rpc/message, libp2p/peerid import - ../../waku/common/sqlite, + ../../waku/common/databases/db_sqlite, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ../../waku/v2/waku_node, diff --git a/tests/v2/test_peer_storage.nim b/tests/v2/test_peer_storage.nim index f89544b43..0cd2e20a2 100644 --- a/tests/v2/test_peer_storage.nim +++ b/tests/v2/test_peer_storage.nim @@ -4,7 +4,7 @@ import testutils/unittests, libp2p/crypto/crypto import - ../../waku/common/sqlite, + ../../waku/common/databases/db_sqlite, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ./testlib/wakucore diff --git a/tests/v2/waku_archive/test_driver_sqlite.nim b/tests/v2/waku_archive/test_driver_sqlite.nim index 4a918247e..d31e12d31 100644 --- a/tests/v2/waku_archive/test_driver_sqlite.nim +++ b/tests/v2/waku_archive/test_driver_sqlite.nim @@ -5,7 +5,7 @@ import testutils/unittests, chronos import - ../../../waku/common/sqlite, + ../../../waku/common/databases/db_sqlite, ../../../waku/v2/waku_archive, ../../../waku/v2/waku_archive/driver/sqlite_driver, ../../../waku/v2/waku_core, diff --git a/tests/v2/waku_archive/test_driver_sqlite_query.nim b/tests/v2/waku_archive/test_driver_sqlite_query.nim index c0cd03c39..6e25e5fd2 100644 --- a/tests/v2/waku_archive/test_driver_sqlite_query.nim +++ b/tests/v2/waku_archive/test_driver_sqlite_query.nim @@ -6,7 +6,7 @@ import chronos, chronicles import - ../../../waku/common/sqlite, + ../../../waku/common/databases/db_sqlite, ../../../waku/v2/waku_archive, ../../../waku/v2/waku_archive/driver/sqlite_driver, ../../../waku/v2/waku_core, diff --git a/tests/v2/waku_archive/test_retention_policy.nim b/tests/v2/waku_archive/test_retention_policy.nim index cddf259a1..9a45d7e9f 100644 --- a/tests/v2/waku_archive/test_retention_policy.nim +++ b/tests/v2/waku_archive/test_retention_policy.nim @@ -6,7 +6,7 @@ import testutils/unittests, chronos import - ../../../waku/common/sqlite, + ../../../waku/common/databases/db_sqlite, ../../../waku/v2/waku_core, ../../../waku/v2/waku_archive, ../../../waku/v2/waku_archive/driver/sqlite_driver, diff --git a/tests/v2/waku_archive/test_waku_archive.nim b/tests/v2/waku_archive/test_waku_archive.nim index 4e3ae9e04..21baa5a83 100644 --- a/tests/v2/waku_archive/test_waku_archive.nim +++ b/tests/v2/waku_archive/test_waku_archive.nim @@ -7,7 +7,7 @@ import chronos, libp2p/crypto/crypto import - ../../../waku/common/sqlite, + ../../../waku/common/databases/db_sqlite, ../../../waku/v2/waku_core, ../../../waku/v2/waku_archive/driver/sqlite_driver, ../../../waku/v2/waku_archive, diff --git a/tests/v2/waku_store/test_resume.nim b/tests/v2/waku_store/test_resume.nim index 28b41d407..762877e09 100644 --- a/tests/v2/waku_store/test_resume.nim +++ b/tests/v2/waku_store/test_resume.nim @@ -7,7 +7,7 @@ import chronicles, libp2p/crypto/crypto import - ../../waku/common/sqlite, + ../../waku/common/databases/db_sqlite, ../../waku/v2/node/message_store/sqlite_store, ../../waku/v2/node/peer_manager, ../../waku/v2/waku_core, diff --git a/tests/v2/waku_store/test_wakunode_store.nim b/tests/v2/waku_store/test_wakunode_store.nim index 568caf705..d27f2c498 100644 --- a/tests/v2/waku_store/test_wakunode_store.nim +++ b/tests/v2/waku_store/test_wakunode_store.nim @@ -13,7 +13,7 @@ import libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub import - ../../../waku/common/sqlite, + ../../../waku/common/databases/db_sqlite, ../../../waku/v2/waku_core, ../../../waku/v2/node/peer_manager, ../../../waku/v2/waku_archive, diff --git a/waku/common/databases/common.nim b/waku/common/databases/common.nim new file mode 100644 index 000000000..4d4629e5c --- /dev/null +++ b/waku/common/databases/common.nim @@ -0,0 +1,6 @@ + +import + stew/results + +type + DatabaseResult*[T] = Result[T, string] diff --git a/waku/common/databases/db_postgres.nim b/waku/common/databases/db_postgres.nim new file mode 100644 index 000000000..4d685f47d --- /dev/null +++ b/waku/common/databases/db_postgres.nim @@ -0,0 +1,8 @@ + +import + ./common, + ./db_postgres/pgasyncpool + +export + common, + pgasyncpool diff --git a/waku/v2/waku_archive/driver/postgres_driver/connection.nim b/waku/common/databases/db_postgres/dbconn.nim similarity index 100% rename from waku/v2/waku_archive/driver/postgres_driver/connection.nim rename to waku/common/databases/db_postgres/dbconn.nim diff --git a/waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim similarity index 80% rename from waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim rename to waku/common/databases/db_postgres/pgasyncpool.nim index e26655d04..dc1104b3c 100644 --- a/waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -6,13 +6,13 @@ else: {.push raises: [].} import - std/sequtils, + std/[sequtils,nre, strformat], stew/results, chronicles, chronos import - ../../driver, - ./connection + ./dbconn, + ../common logScope: topics = "postgres asyncpool" @@ -39,8 +39,25 @@ type conns: seq[PgDbConn] proc new*(T: type PgAsyncPool, - connString: string, - maxConnections: int): T = + dbUrl: string, + maxConnections: int): + DatabaseResult[T] = + + var connString: string + + try: + let regex = re("""^postgres:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$""") + let matches = find(dbUrl,regex).get.captures + let user = matches[0] + let password = matches[1] + let host = matches[2] + let port = matches[3] + let dbName = matches[4] + connString = + fmt"user={user} host={host} port={port} dbname={dbName} password={password}" + except KeyError,InvalidUnicodeError, RegexInternalError, ValueError, + StudyError, SyntaxError: + return err("could not parse postgres string: " & getCurrentExceptionMsg()) let pool = PgAsyncPool( connString: connString, @@ -49,7 +66,7 @@ proc new*(T: type PgAsyncPool, conns: newSeq[PgDbConn](0) ) - return pool + return ok(pool) func isLive(pool: PgAsyncPool): bool = pool.state == PgAsyncPoolState.Live @@ -70,17 +87,16 @@ proc close*(pool: PgAsyncPool): # wait for the connections to be released and close them, without # blocking the async runtime - if pool.conns.anyIt(it.busy): - while pool.conns.anyIt(it.busy): - await sleepAsync(0.milliseconds) + while pool.conns.anyIt(it.busy): + await sleepAsync(0.milliseconds) - for i in 0.. 0 and freelistCount > 0): + + debug "starting sqlite database vacuuming" + + let resVacuum = db.vacuum() + if resVacuum.isErr(): + return err("failed to execute vacuum: " & resVacuum.error) + + debug "finished sqlite database vacuuming" diff --git a/waku/common/databases/dburl.nim b/waku/common/databases/dburl.nim new file mode 100644 index 000000000..affdc04b6 --- /dev/null +++ b/waku/common/databases/dburl.nim @@ -0,0 +1,33 @@ + +import + std/strutils, + regex, + stew/results + +proc validateDbUrl*(dbUrl: string): Result[string, string] = + ## dbUrl mimics SQLAlchemy Database URL schema + ## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls + let regex = re"^[\w\+]+:\/\/[\w\/\\\.\:\@]+$" + let dbUrl = dbUrl.strip() + if dbUrl == "" or dbUrl == "none" or dbUrl.match(regex): + return ok(dbUrl) + else: + return err("invalid 'db url' option format: " & dbUrl) + +proc getDbEngine*(dbUrl: string): Result[string, string] = + let dbUrlParts = dbUrl.split("://", 1) + + if dbUrlParts.len != 2: + return err("Incorrect dbUrl : " & dbUrl) + + let engine = dbUrlParts[0] + return ok(engine) + +proc getDbPath*(dbUrl: string): Result[string, string] = + let dbUrlParts = dbUrl.split("://", 1) + + if dbUrlParts.len != 2: + return err("Incorrect dbUrl : " & dbUrl) + + let path = dbUrlParts[1] + return ok(path) diff --git a/waku/common/sqlite.nim b/waku/common/sqlite.nim deleted file mode 100644 index 2f68f7534..000000000 --- a/waku/common/sqlite.nim +++ /dev/null @@ -1,7 +0,0 @@ -{.push raises: [].} - -import - ./sqlite/database - -export - database \ No newline at end of file diff --git a/waku/common/sqlite/migrations.nim b/waku/common/sqlite/migrations.nim deleted file mode 100644 index f04e7eac3..000000000 --- a/waku/common/sqlite/migrations.nim +++ /dev/null @@ -1,138 +0,0 @@ -{.push raises: [].} - -import - std/[strutils, sequtils, os, algorithm], - stew/results, - chronicles -import - ../sqlite - - -logScope: - topics = "sqlite" - - -## Migration scripts - -proc getMigrationScriptVersion(path: string): DatabaseResult[int64] = - let name = extractFilename(path) - let parts = name.split("_", 1) - - try: - let version = parseInt(parts[0]) - return ok(version) - except ValueError: - return err("failed to parse file version: " & name) - -proc isSqlScript(path: string): bool = - path.toLower().endsWith(".sql") - - -proc listSqlScripts(path: string): DatabaseResult[seq[string]] = - var scripts = newSeq[string]() - - try: - for scriptPath in walkDirRec(path): - if isSqlScript(scriptPath): - scripts.add(scriptPath) - else: - debug "invalid migration script", file=scriptPath - except OSError: - return err("failed to list migration scripts: " & getCurrentExceptionMsg()) - - ok(scripts) - - -proc filterMigrationScripts(paths: seq[string], lowVersion, highVersion: int64, direction: string = "up"): seq[string] = - ## Returns migration scripts whose version fall between lowVersion and highVersion (inclusive) - let filterPredicate = proc(script: string): bool = - if not isSqlScript(script): - return false - - if direction != "" and not script.toLower().endsWith("." & direction & ".sql"): - return false - - let scriptVersionRes = getMigrationScriptVersion(script) - if scriptVersionRes.isErr(): - return false - - let scriptVersion = scriptVersionRes.value - return lowVersion < scriptVersion and scriptVersion <= highVersion - - paths.filter(filterPredicate) - - -proc sortMigrationScripts(paths: seq[string]): seq[string] = - ## Sort migration scripts paths alphabetically - paths.sorted(system.cmp[string]) - - -proc loadMigrationScripts(paths: seq[string]): DatabaseResult[seq[string]] = - var loadedScripts = newSeq[string]() - - for script in paths: - try: - loadedScripts.add(readFile(script)) - except OSError, IOError: - return err("failed to load script '" & script & "': " & getCurrentExceptionMsg()) - - ok(loadedScripts) - - -proc breakIntoStatements(script: string): seq[string] = - var statements = newSeq[string]() - - for chunk in script.split(';'): - if chunk.strip().isEmptyOrWhitespace(): - continue - - let statement = chunk.strip() & ";" - statements.add(statement) - - statements - - -proc migrate*(db: SqliteDatabase, targetVersion: int64, migrationsScriptsDir: string): DatabaseResult[void] = - ## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then - ## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path - ## points to the directory holding the migrations scripts once the db is updated, it sets the - ## `user_version` to the `tragetVersion`. - ## - ## NOTE: Down migration it is not currently supported - let userVersion = ?db.getUserVersion() - - if userVersion == targetVersion: - debug "database schema is up to date", userVersion=userVersion, targetVersion=targetVersion - return ok() - - info "database schema is outdated", userVersion=userVersion, targetVersion=targetVersion - - # Load migration scripts - var migrationScriptsPaths = ?listSqlScripts(migrationsScriptsDir) - migrationScriptsPaths = filterMigrationScripts(migrationScriptsPaths, lowVersion=userVersion, highVersion=targetVersion, direction="up") - migrationScriptsPaths = sortMigrationScripts(migrationScriptsPaths) - - if migrationScriptsPaths.len <= 0: - debug "no scripts to be run" - return ok() - - let scripts = ?loadMigrationScripts(migrationScriptsPaths) - - # Run the migration scripts - for script in scripts: - - for statement in script.breakIntoStatements(): - debug "executing migration statement", statement=statement - - let execRes = db.query(statement, NoopRowHandler) - if execRes.isErr(): - error "failed to execute migration statement", statement=statement, error=execRes.error - return err("failed to execute migration statement") - - debug "migration statement executed succesfully", statement=statement - - # Update user_version - ?db.setUserVersion(targetVersion) - - debug "database user_version updated", userVersion=targetVersion - ok() diff --git a/waku/v2/node/peer_manager/peer_store/migrations.nim b/waku/v2/node/peer_manager/peer_store/migrations.nim index ae88e867e..a008bb222 100644 --- a/waku/v2/node/peer_manager/peer_store/migrations.nim +++ b/waku/v2/node/peer_manager/peer_store/migrations.nim @@ -9,8 +9,8 @@ import stew/results, chronicles import - ../../../../common/sqlite, - ../../../../common/sqlite/migrations + ../../../../common/databases/db_sqlite, + ../../../../common/databases/common logScope: @@ -23,7 +23,7 @@ template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / " const PeerStoreMigrationPath: string = projectRoot / "migrations" / "peer_store" -proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] = +proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] = ## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then ## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path ## points to the directory holding the migrations scripts once the db is updated, it sets the diff --git a/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim b/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim index e2fb8200d..858363234 100644 --- a/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim +++ b/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim @@ -10,12 +10,13 @@ import sqlite3_abi, libp2p/protobuf/minprotobuf import - ../../../../common/sqlite, + ../../../../common/databases/db_sqlite, + ../../../../common/databases/common, ../../../waku_core, ../waku_peer_store, ./peer_storage -export sqlite +export db_sqlite type WakuPeerStorage* = ref object of PeerStorage diff --git a/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim index 20124690e..9bd3cf7c8 100644 --- a/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -12,7 +12,7 @@ import ../../../waku_core, ../../common, ../../driver, - asyncpool + ../../../../common/databases/db_postgres as waku_postgres export postgres_driver @@ -46,24 +46,11 @@ proc new*(T: type PostgresDriver, maxConnections: int = DefaultMaxConnections): ArchiveDriverResult[T] = - var connPool: PgAsyncPool + let connPoolRes = PgAsyncPool.new(dbUrl, maxConnections) + if connPoolRes.isErr(): + return err("error creating PgAsyncPool: " & connPoolRes.error) - try: - let regex = re("""^postgres:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$""") - let matches = find(dbUrl,regex).get.captures - let user = matches[0] - let password = matches[1] - let host = matches[2] - let port = matches[3] - let dbName = matches[4] - let connectionString = fmt"user={user} host={host} port={port} dbname={dbName} password={password}" - - connPool = PgAsyncPool.new(connectionString, maxConnections) - - except KeyError,InvalidUnicodeError, RegexInternalError, ValueError, StudyError, SyntaxError: - return err("could not parse postgres string") - - return ok(PostgresDriver(connPool: connPool)) + return ok(PostgresDriver(connPool: connPoolRes.get())) proc createMessageTable(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = diff --git a/waku/v2/waku_archive/driver/sqlite_driver/migrations.nim b/waku/v2/waku_archive/driver/sqlite_driver/migrations.nim index fe73834e9..3d63a8f32 100644 --- a/waku/v2/waku_archive/driver/sqlite_driver/migrations.nim +++ b/waku/v2/waku_archive/driver/sqlite_driver/migrations.nim @@ -5,8 +5,8 @@ import stew/results, chronicles import - ../../../../common/sqlite, - ../../../../common/sqlite/migrations + ../../../../common/databases/db_sqlite, + ../../../../common/databases/common logScope: @@ -30,7 +30,7 @@ proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult ## NOTE: Down migration it is not currently supported debug "starting message store's sqlite database migration" - let migrationRes = migrations.migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath) + let migrationRes = migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath) if migrationRes.isErr(): return err("failed to execute migration scripts: " & migrationRes.error) diff --git a/waku/v2/waku_archive/driver/sqlite_driver/queries.nim b/waku/v2/waku_archive/driver/sqlite_driver/queries.nim index 8145d0b3d..24afcc722 100644 --- a/waku/v2/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/v2/waku_archive/driver/sqlite_driver/queries.nim @@ -9,7 +9,8 @@ import stew/[results, byteutils], sqlite3_abi import - ../../../../common/sqlite, + ../../../../common/databases/db_sqlite, + ../../../../common/databases/common, ../../../waku_core, ./cursor @@ -201,7 +202,10 @@ proc selectAllMessagesQuery(table: string): SqlQueryStr = " FROM " & table & " ORDER BY storedAt ASC" -proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] = +proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, + WakuMessage, + seq[byte], + Timestamp)]] = ## Retrieve all messages from the store. var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] proc queryRowCallback(s: ptr sqlite3_stmt) = @@ -355,7 +359,11 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, startTime: Option[Timestamp], endTime: Option[Timestamp], limit: uint, - ascending: bool): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] = + ascending: bool): + DatabaseResult[seq[(PubsubTopic, + WakuMessage, + seq[byte], + Timestamp)]] = var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[] diff --git a/waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim index aaa208be9..705b82eb0 100644 --- a/waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -11,7 +11,7 @@ import chronicles, chronos import - ../../../../common/sqlite, + ../../../../common/databases/db_sqlite, ../../../waku_core, ../../common, ../../driver,