From fedc85c2f5ef3004ce40edf42acd0e12982c8fe8 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Wed, 16 Jun 2021 13:23:55 -0700 Subject: [PATCH] Persisting Waku message timestamp & implementing DB migration & convert receiver timestamp data type to float64 (#607) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adds timestamp to waku message store impl * stores timestamp as int64 * adds untitest * stores timestamp as seq of bytes * minor * re-orders unittest * changes receiver timestamp to float64 * unit test for receiver timestamps * adds comments * reorder a few lines * updates changelog * more updates on changelog * WIP * WIP * adds migration * more debug messages * passes the path to the migration scripts from message store module * adds migration result type * replaces migrationScripts with migrationScriptsResult * adds path calculation to the message store * removes some tests binary file * removes redundant imports * comments out user_version assignment in sqlite init * more descriptive err messages * clean up test file * more info logs * minor code format * removes a todo * minor updates * remove a binary file * unit tests for migration utils * adds split script * integrates split query to handle scripts with multiple commands * updates migration script for v1 * updates the v1 migration script * update user version * updates script * fixes a few bugs on the splitScript * more debug logs * adds float64 parameter support to sqlite3 * change in timestamp type in the script * deletes float64 toBytes utils * enables storage of timestamp as a real number in the sqlite db * bump up script index * comment edits * removes migrate unit test * adds todo and docstring * updates changelog * removes an unused item in .gitignore * minor * updates changelog * organizes imports * cleans up imports * WIP * updates script fixes a few bugs on the splitScript more debug logs adds float64 parameter support to sqlite3 change in timestamp type in the script deletes float64 toBytes utils * edits migration util test * remove an empty test file * includes migration utils tests in * deletes unused codes * tides up imports * adds range based filter to the filterMigrationScripts * renames procs: removes Migration * tides up imports * edits docstring * edits docstring * edits docstring * removes unused imports * more clean up * groups std imports * updates changelog * adds docstring for setUserVersion * adds unittest for the migrate * Update waku/v2/node/storage/message/waku_message_store.nim Co-authored-by: RichΛrd * Update waku/v2/node/storage/sqlite.nim Co-authored-by: RichΛrd * Update waku/v2/node/storage/sqlite.nim Co-authored-by: RichΛrd * removes split scripts * fixes a naming issue * fixes a bug * fixes a typo * adds a log re updated user_version * fixes a proc naming mismatch * fixes naming mismatch * more descriptive var names * adds migration script of the first user version * moves migration to after persistMessages flag is checked * deletes unused comment * fixes a bug * brings back split script * adds unit tests for split scripts * runs scripts one command at a time * deletes a commented line * relocates the migrate proc to sqlite.nim * adds unit test for filter scripts * adds filterScripts unittest testing varying zero-prefixed user versions * minor Co-authored-by: RichΛrd --- CHANGELOG.md | 18 ++++ tests/all_tests_v2.nim | 3 +- tests/v2/00002_test.up.sql | 8 ++ tests/v2/test_message_store.nim | 76 +++++++++++++-- tests/v2/test_migration_utils.nim | 63 +++++++++++++ .../v2/node/storage/message/message_store.nim | 2 +- .../storage/message/waku_message_store.nim | 54 +++++++---- .../storage/migration/migration_types.nim | 7 ++ .../storage/migration/migration_utils.nim | 63 +++++++++++++ .../message/00001_basicMessageTable.up.sql | 8 ++ .../message/00002_addSenderTimeStamp.up.sql | 27 ++++++ waku/v2/node/storage/sqlite.nim | 93 ++++++++++++++++++- waku/v2/node/wakunode2.nim | 15 ++- waku/v2/protocol/waku_store/waku_store.nim | 2 +- 14 files changed, 402 insertions(+), 37 deletions(-) create mode 100644 tests/v2/00002_test.up.sql create mode 100644 tests/v2/test_migration_utils.nim create mode 100644 waku/v2/node/storage/migration/migration_types.nim create mode 100644 waku/v2/node/storage/migration/migration_utils.nim create mode 100644 waku/v2/node/storage/migration/migrations_scripts/message/00001_basicMessageTable.up.sql create mode 100644 waku/v2/node/storage/migration/migrations_scripts/message/00002_addSenderTimeStamp.up.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 326a11307..67087a4d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,22 @@ # Changelog +## Next + +This release contains the following: + +### Features + +### Changes +- Enables db migration for the message store. +#### General refactoring +#### Docs +#### Schema +- Updates the `Message` table of the persistent message store: + - Adds `senderTimestamp` column. + - Renames the `timestamp` column to `receiverTimestamp` and changes its type to `REAL`. +#### API + +### Fixes + ## 2021-06-03 v0.4 diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 5ccb64e95..2469a6380 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -14,7 +14,8 @@ import ./v2/test_web3, # TODO remove it when rln-relay tests get finalized ./v2/test_waku_bridge, ./v2/test_peer_storage, - ./v2/test_waku_keepalive + ./v2/test_waku_keepalive, + ./v2/test_migration_utils when defined(rln): import ./v2/test_waku_rln_relay diff --git a/tests/v2/00002_test.up.sql b/tests/v2/00002_test.up.sql new file mode 100644 index 000000000..3465caa37 --- /dev/null +++ b/tests/v2/00002_test.up.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS Message_backup ( + id BLOB PRIMARY KEY, + timestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL + ) WITHOUT ROWID; \ No newline at end of file diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 365646abf..7f41d1900 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -1,9 +1,10 @@ {.used.} import - std/[unittest, options, tables, sets], - chronos, chronicles, + std/[unittest, options, tables, sets, times, os, strutils], + chronos, ../../waku/v2/node/storage/message/waku_message_store, + ../../waku/v2/node/storage/sqlite, ../../waku/v2/protocol/waku_store/waku_store, ./utils @@ -15,38 +16,99 @@ suite "Message Store": topic = ContentTopic("/waku/2/default-content/proto") pubsubTopic = "/waku/2/default-waku/proto" + t1 = epochTime() + t2 = epochTime() + t3 = high(float64) var msgs = @[ - WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0)), - WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1)), - WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32)), + WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1), + WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2), + WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3), ] defer: store.close() + var indexes: seq[Index] = @[] for msg in msgs: - let output = store.put(computeIndex(msg), msg, pubsubTopic) + var index = computeIndex(msg) + let output = store.put(index, msg, pubsubTopic) check output.isOk + indexes.add(index) + + # flags for version var v0Flag, v1Flag, vMaxFlag: bool = false + # flags for sender timestamp + var t1Flag, t2Flag, t3Flag: bool = false + # flags for receiver timestamp + var rt1Flag, rt2Flag, rt3Flag: bool = false + var responseCount = 0 - proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) = + proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) = responseCount += 1 check msg in msgs check psTopic == pubsubTopic + # check the correct retrieval of versions if msg.version == uint32(0): v0Flag = true if msg.version == uint32(1): v1Flag = true # high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage if msg.version == high(uint32): vMaxFlag = true + # check correct retrieval of sender timestamps + if msg.timestamp == t1: t1Flag = true + if msg.timestamp == t2: t2Flag = true + if msg.timestamp == t3: t3Flag = true + + # check correct retrieval of receiver timestamps + if receiverTimestamp == indexes[0].receivedTime: rt1Flag = true + if receiverTimestamp == indexes[1].receivedTime: rt2Flag = true + if receiverTimestamp == indexes[2].receivedTime: rt3Flag = true + let res = store.getAll(data) check: res.isErr == false responseCount == 3 + # check version v0Flag == true v1Flag == true vMaxFlag == true + # check sender timestamp + t1Flag == true + t2Flag == true + t3Flag == true + # check receiver timestamp + rt1Flag == true + rt2Flag == true + rt3Flag == true + test "set and get user version": + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + defer: store.close() + let res = database.setUserVersion(5) + check res.isErr == false + let ver = database.getUserVersion() + check: + ver.isErr == false + ver.value == 5 + test "migration": + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + defer: store.close() + + template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] + let migrationPath = sourceDir + + let res = database.migrate(migrationPath, 10) + check: + res.isErr == false + + let ver = database.getUserVersion() + check: + ver.isErr == false + ver.value == 10 diff --git a/tests/v2/test_migration_utils.nim b/tests/v2/test_migration_utils.nim new file mode 100644 index 000000000..a30667ad5 --- /dev/null +++ b/tests/v2/test_migration_utils.nim @@ -0,0 +1,63 @@ +{.used.} + +import + std/[unittest, tables, strutils, os, sequtils], + chronicles, + stew/results, + ../../waku/v2/node/storage/migration/[migration_types, migration_utils] + +template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] +const MIGRATION_PATH = sourceDir / "../../waku/v2/node/storage/migration/migrations_scripts/message" + +suite "Migration utils": + test "read migration scripts": + let migrationScriptsRes = getScripts(MIGRATION_PATH) + check: + migrationScriptsRes.isErr == false + + test "filter migration scripts": + let migrationUp = [("0001_init", "script1"), ("0001_add", "script1"), ("0002_init", "script2"), ("0003_init", "script3")].toOrderedTable() + let migrationScripts = MigrationScripts(migrationUp: migrationUp) + let scriptsRes = filterScripts(migrationScripts, 1, 3) + check: + scriptsRes.isErr == false + scriptsRes.value.len == 2 + scriptsRes.value[0] == "script2" + scriptsRes.value[1] == "script3" + + test "filter migration scripts with varying zero-prefixed user versions": + let migrationUp = [("0001_init", "script1"), ("1_add", "script1"), ("000002_init", "script2"), ("003_init", "script3")].toOrderedTable() + let migrationScripts = MigrationScripts(migrationUp: migrationUp) + let scriptsRes = filterScripts(migrationScripts, 1, 3) + check: + scriptsRes.isErr == false + scriptsRes.value.len == 2 + scriptsRes.value[0] == "script2" + scriptsRes.value[1] == "script3" + + test "split scripts with no queries": + let script = "; ;" + let queries = splitScript(script) + check queries.len == 0 + + test "split scripts with multiple queries": + let q1 = """CREATE TABLE contacts2 ( + contact_id INTEGER PRIMARY KEY, + first_name TEXT NOT NULL, + last_name TEXT NOT NULL, + email TEXT NOT NULL UNIQUE, + phone TEXT NOT NULL UNIQUE + );""" + let q2 = """CREATE TABLE contacts2 ( + contact_id INTEGER PRIMARY KEY, + first_name TEXT NOT NULL, + last_name TEXT NOT NULL, + email TEXT NOT NULL UNIQUE, + phone TEXT NOT NULL UNIQUE + );""" + let script = q1 & q2 + let queries = splitScript(script) + check: + queries.len == 2 + queries[0] == q1 + queries[1] == q2 \ No newline at end of file diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 33c8c351c..f5d9ae97d 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -8,7 +8,7 @@ import ## retrieve historical messages type - DataProc* = proc(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) {.closure.} + DataProc* = proc(timestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure.} MessageStoreResult*[T] = Result[T, string] diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 505a84b38..3951026fa 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -1,8 +1,9 @@ {.push raises: [Defect].} import + std/[os, algorithm, tables, strutils], + chronos, metrics, chronicles, sqlite3_abi, - chronos, metrics, libp2p/crypto/crypto, libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, @@ -11,11 +12,13 @@ import ./message_store, ../sqlite, ../../../protocol/waku_message, - ../../../utils/pagination - + ../../../utils/pagination export sqlite const TABLE_TITLE = "Message" +template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] +const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "../migration/migrations_scripts/message" + # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim # @@ -30,14 +33,16 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] ## It contains: ## - 4-Byte ContentTopic stored as an Integer ## - Payload stored as a blob + let prepare = db.prepareStmt(""" CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( id BLOB PRIMARY KEY, - timestamp INTEGER NOT NULL, + receiverTimestamp REAL NOT NULL, contentTopic BLOB NOT NULL, pubsubTopic BLOB NOT NULL, payload BLOB, - version INTEGER NOT NULL + version INTEGER NOT NULL, + senderTimestamp REAL NOT NULL ) WITHOUT ROWID; """, NoParams, void) @@ -61,15 +66,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop ## echo "error" ## let prepare = db.database.prepareStmt( - "INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic, version) VALUES (?, ?, ?, ?, ?, ?);", - (seq[byte], int64, seq[byte], seq[byte], seq[byte], int64), + "INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);", + (seq[byte], float64, seq[byte], seq[byte], seq[byte], int64, float64), void ) if prepare.isErr: return err("failed to prepare") - let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version))) + let res = prepare.value.exec((@(cursor.digest.data), cursor.receivedTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp)) if res.isErr: return err("failed") @@ -91,22 +96,31 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto proc msg(s: ptr sqlite3_stmt) {.raises: [Defect, Exception].} = gotMessages = true let - timestamp = sqlite3_column_int64(s, 0) + receiverTimestamp = sqlite3_column_double(s, 0) + topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) - topicL = sqlite3_column_bytes(s,1) + topicLength = sqlite3_column_bytes(s,1) + contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))) + p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) - l = sqlite3_column_bytes(s, 2) - pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3)) - pubsubTopicL = sqlite3_column_bytes(s,3) + length = sqlite3_column_bytes(s, 2) + payload = @(toOpenArray(p, 0, length-1)) + + pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3)) + pubsubTopicLength = sqlite3_column_bytes(s,3) + pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1))) + version = sqlite3_column_int64(s, 4) - # TODO retrieve the version number - onData(uint64(timestamp), - WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))), - payload: @(toOpenArray(p, 0, l-1)), version: uint32(version)), - string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1)))) + senderTimestamp = sqlite3_column_double(s, 5) - let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic, version FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg) + + # TODO retrieve the version number + onData(receiverTimestamp.float64, + WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.float64), + pubsubTopic) + + let res = db.database.query("SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp FROM " & TABLE_TITLE & " ORDER BY receiverTimestamp ASC", msg) if res.isErr: return err("failed") @@ -115,3 +129,5 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto proc close*(db: WakuMessageStore) = ## Closes the database. db.database.close() + + diff --git a/waku/v2/node/storage/migration/migration_types.nim b/waku/v2/node/storage/migration/migration_types.nim new file mode 100644 index 000000000..fa01a1318 --- /dev/null +++ b/waku/v2/node/storage/migration/migration_types.nim @@ -0,0 +1,7 @@ +import tables, stew/results + +type MigrationScriptsResult*[T] = Result[T, string] +type + MigrationScripts* = ref object of RootObj + migrationUp*:OrderedTable[string, string] + migrationDown*:OrderedTable[string, string] \ No newline at end of file diff --git a/waku/v2/node/storage/migration/migration_utils.nim b/waku/v2/node/storage/migration/migration_utils.nim new file mode 100644 index 000000000..4aa3bc055 --- /dev/null +++ b/waku/v2/node/storage/migration/migration_utils.nim @@ -0,0 +1,63 @@ +import + std/[os, algorithm, tables, strutils], + chronicles, + stew/results, + migration_types + +proc getScripts*(migrationPath: string): MigrationScriptsResult[MigrationScripts] = + ## the code in this procedure is an adaptation of https://github.com/status-im/nim-status/blob/21aebe41be03cb6450ea261793b800ed7d3e6cda/nim_status/migrations/sql_generate.nim#L4 + var migrationScripts = MigrationScripts(migrationUp:initOrderedTable[string, string](), migrationDown:initOrderedTable[string, string]()) + try: + for kind, path in walkDir(migrationPath): + let (_, name, ext) = splitFile(path) + if ext != ".sql": continue + + let parts = name.split(".") + if parts.len < 2: + continue + let script = parts[0] + let direction = parts[1] + + debug "name", script=script + case direction: + of "up": + migrationScripts.migrationUp[script] = readFile(path) + debug "up script", readScript=migrationScripts.migrationUp[script] + of "down": + migrationScripts.migrationDown[script] = readFile(path) + debug "down script", readScript=migrationScripts.migrationDown[script] + else: + debug "Invalid script: ", name + + migrationScripts.migrationUp.sort(system.cmp) + migrationScripts.migrationDown.sort(system.cmp) + + ok(migrationScripts) + + except OSError, IOError: + return err("failed to load the migration scripts") + + +proc filterScripts*(migrationScripts: MigrationScripts, s: int64, e: int64 ): Result[seq[string], string] = + ## returns migration scripts whose version fall between s and e (e is inclusive) + var scripts: seq[string] + try: + for name, script in migrationScripts.migrationUp: + let parts = name.split("_") + #TODO this should be int64 + let ver = parseInt(parts[0]) + # filter scripts based on their version + if s < ver and ver <= e: + scripts.add(script) + ok(scripts) + except ValueError: + return err("failed to filter scripts") + +proc splitScript*(script: string): seq[string] = + ## parses the script into its individual sql commands and returns them + var queries: seq[string] = @[] + for q in script.split(';'): + if isEmptyOrWhitespace(q): continue + let query = q.strip() & ";" + queries.add(query) + return queries diff --git a/waku/v2/node/storage/migration/migrations_scripts/message/00001_basicMessageTable.up.sql b/waku/v2/node/storage/migration/migrations_scripts/message/00001_basicMessageTable.up.sql new file mode 100644 index 000000000..75d8d7eea --- /dev/null +++ b/waku/v2/node/storage/migration/migrations_scripts/message/00001_basicMessageTable.up.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS Message( + id BLOB PRIMARY KEY, + timestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL + ) WITHOUT ROWID; \ No newline at end of file diff --git a/waku/v2/node/storage/migration/migrations_scripts/message/00002_addSenderTimeStamp.up.sql b/waku/v2/node/storage/migration/migrations_scripts/message/00002_addSenderTimeStamp.up.sql new file mode 100644 index 000000000..b3597aeec --- /dev/null +++ b/waku/v2/node/storage/migration/migrations_scripts/message/00002_addSenderTimeStamp.up.sql @@ -0,0 +1,27 @@ +CREATE TABLE IF NOT EXISTS Message_backup ( + id BLOB PRIMARY KEY, + timestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL + ) WITHOUT ROWID; + +INSERT INTO Message_backup SELECT id, timestamp, contentTopic, pubsubTopic, payload, version FROM Message; + +DROP TABLE Message; + +CREATE TABLE IF NOT EXISTS Message( + id BLOB PRIMARY KEY, + receiverTimestamp REAL NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL, + senderTimestamp REAL NOT NULL + ) WITHOUT ROWID; + + +INSERT INTO Message SELECT id, timestamp, contentTopic, pubsubTopic, payload, version, 0 FROM Message_backup; + +DROP TABLE Message_backup; \ No newline at end of file diff --git a/waku/v2/node/storage/sqlite.nim b/waku/v2/node/storage/sqlite.nim index 32d46cdad..53e436ca6 100644 --- a/waku/v2/node/storage/sqlite.nim +++ b/waku/v2/node/storage/sqlite.nim @@ -3,13 +3,13 @@ import os, sqlite3_abi, - chronos, chronicles, metrics, stew/results, + chronos, chronicles, metrics, + stew/results, libp2p/crypto/crypto, libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - stew/results, metrics - + migration/[migration_types,migration_utils] # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim # @@ -30,6 +30,8 @@ type SqliteDatabase* = ref object of RootObj env*: Sqlite +const USER_VERSION = 2 # increase this when there is a breaking change in the table schema + template dispose(db: Sqlite) = discard sqlite3_close(db) @@ -110,8 +112,6 @@ proc init*( discard sqlite3_finalize(journalModePragma) return err("Invalid pragma result: " & $x) - # TODO: check current version and implement schema versioning - checkExec "PRAGMA user_version = 1;" let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard checkWalPragmaResult(journalModePragma) @@ -139,6 +139,8 @@ proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint = sqlite3_bind_int(s, int(n).cint, int(val).cint) elif val is int64: sqlite3_bind_int64(s, n.cint, val) + elif val is float64: + sqlite3_bind_double(s, n.cint, val) # Note: bind_text not yet supported in sqlite3_abi wrapper # elif val is string: # sqlite3_bind_text(s, n.cint, val.cstring, -1, nil) # `-1` implies string length is the number of bytes up to the first null-terminator @@ -208,3 +210,84 @@ proc close*(db: SqliteDatabase) = discard sqlite3_close(db.env) db[] = SqliteDatabase()[] + +proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] = + var version: int64 + proc handler(s: ptr sqlite3_stmt) = + version = sqlite3_column_int64(s, 0) + let res = database.query("PRAGMA user_version;", handler) + if res.isErr: + return err("failed to get user_version") + ok(version) + + +proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[bool] = + ## sets the value of the user-version integer at offset 60 in the database header. + ## some context borrowed from https://www.sqlite.org/pragma.html#pragma_user_version + ## The user-version is an integer that is available to applications to use however they want. + ## SQLite makes no use of the user-version itself + proc handler(s: ptr sqlite3_stmt) = + discard + let query = "PRAGMA user_version=" & $version & ";" + let res = database.query(query, handler) + if res.isErr: + return err("failed to set user_version") + ok(true) + + +proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = USER_VERSION): DatabaseResult[bool] = + ## compares the user_version of the db with the targetVersion + ## runs migration scripts if the user_version is outdated (does not support down migration) + ## path points to the directory holding the migrations scripts + ## once the db is updated, it sets the user_version to the tragetVersion + + # read database version + let userVersion = db.getUserVersion() + debug "current db user_version", userVersion=userVersion + if userVersion.value == targetVersion: + # already up to date + info "database is up to date" + ok(true) + + else: + # TODO check for the down migrations i.e., userVersion.value > tragetVersion + # fetch migration scripts + let migrationScriptsRes = getScripts(path) + if migrationScriptsRes.isErr: + return err("failed to load migration scripts") + let migrationScripts = migrationScriptsRes.value + + # filter scripts based on their versions + let scriptsRes = migrationScripts.filterScripts(userVersion.value, targetVersion) + if scriptsRes.isErr: + return err("failed to filter migration scripts") + + let scripts = scriptsRes.value + debug "scripts to be run", scripts=scripts + + + proc handler(s: ptr sqlite3_stmt) = + discard + + # run the scripts + for script in scripts: + debug "script", script=script + # a script may contain multiple queries + let queries = script.splitScript() + # TODO queries of the same script should be executed in an atomic manner + for query in queries: + let res = db.query(query, handler) + if res.isErr: + debug "failed to run the query", query=query + return err("failed to run the script") + else: + debug "query is executed", query=query + + + # bump the user version + let res = db.setUserVersion(targetVersion) + if res.isErr: + return err("failed to set the new user_version") + + debug "user_version is set to", targetVersion=targetVersion + ok(true) \ No newline at end of file diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index a42d0fdd2..09949b8eb 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -1,5 +1,5 @@ import - std/[options, tables, strutils, sequtils], + std/[options, tables, strutils, sequtils, os], chronos, chronicles, metrics, metrics/chronos_httpserver, stew/shims/net as stewNet, @@ -22,6 +22,7 @@ import ../protocol/waku_lightpush/waku_lightpush, ../protocol/waku_rln_relay/waku_rln_relay_types, ../utils/peers, + ./storage/sqlite, ./storage/message/message_store, ./storage/peer/peer_storage, ../utils/requests, @@ -710,7 +711,7 @@ when isMainModule: waku_node_errors.inc(labelValues = ["init_db_failure"]) else: sqliteDatabase = dbRes.value - + var pStorage: WakuPeerStorage if conf.persistPeers and not sqliteDatabase.isNil: @@ -745,8 +746,16 @@ when isMainModule: # Store setup if (conf.storenode != "") or (conf.store): var store: WakuMessageStore - if (not sqliteDatabase.isNil) and conf.persistMessages: + + # run migration + info "running migration ... " + let migrationResult = sqliteDatabase.migrate(MESSAGE_STORE_MIGRATION_PATH) + if migrationResult.isErr: + warn "migration failed" + else: + info "migration is done" + let res = WakuMessageStore.init(sqliteDatabase) if res.isErr: warn "failed to init WakuMessageStore", err = res.error diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 0415f1ce9..3e9ddad46 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -377,7 +377,7 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} = if ws.store.isNil: return - proc onData(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) = + proc onData(timestamp: float64, msg: WakuMessage, pubsubTopic: string) = # TODO index should not be recalculated ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic))