mirror of https://github.com/waku-org/nwaku.git
Persisting Waku message timestamp & implementing DB migration & convert receiver timestamp data type to float64 (#607)
* adds timestamp to waku message store impl * stores timestamp as int64 * adds untitest * stores timestamp as seq of bytes * minor * re-orders unittest * changes receiver timestamp to float64 * unit test for receiver timestamps * adds comments * reorder a few lines * updates changelog * more updates on changelog * WIP * WIP * adds migration * more debug messages * passes the path to the migration scripts from message store module * adds migration result type * replaces migrationScripts with migrationScriptsResult * adds path calculation to the message store * removes some tests binary file * removes redundant imports * comments out user_version assignment in sqlite init * more descriptive err messages * clean up test file * more info logs * minor code format * removes a todo * minor updates * remove a binary file * unit tests for migration utils * adds split script * integrates split query to handle scripts with multiple commands * updates migration script for v1 * updates the v1 migration script * update user version * updates script * fixes a few bugs on the splitScript * more debug logs * adds float64 parameter support to sqlite3 * change in timestamp type in the script * deletes float64 toBytes utils * enables storage of timestamp as a real number in the sqlite db * bump up script index * comment edits * removes migrate unit test * adds todo and docstring * updates changelog * removes an unused item in .gitignore * minor * updates changelog * organizes imports * cleans up imports * WIP * updates script fixes a few bugs on the splitScript more debug logs adds float64 parameter support to sqlite3 change in timestamp type in the script deletes float64 toBytes utils * edits migration util test * remove an empty test file * includes migration utils tests in * deletes unused codes * tides up imports * adds range based filter to the filterMigrationScripts * renames procs: removes Migration * tides up imports * edits docstring * edits docstring * edits docstring * removes unused imports * more clean up * groups std imports * updates changelog * adds docstring for setUserVersion * adds unittest for the migrate * Update waku/v2/node/storage/message/waku_message_store.nim Co-authored-by: RichΛrd <info@richardramos.me> * Update waku/v2/node/storage/sqlite.nim Co-authored-by: RichΛrd <info@richardramos.me> * Update waku/v2/node/storage/sqlite.nim Co-authored-by: RichΛrd <info@richardramos.me> * removes split scripts * fixes a naming issue * fixes a bug * fixes a typo * adds a log re updated user_version * fixes a proc naming mismatch * fixes naming mismatch * more descriptive var names * adds migration script of the first user version * moves migration to after persistMessages flag is checked * deletes unused comment * fixes a bug * brings back split script * adds unit tests for split scripts * runs scripts one command at a time * deletes a commented line * relocates the migrate proc to sqlite.nim * adds unit test for filter scripts * adds filterScripts unittest testing varying zero-prefixed user versions * minor Co-authored-by: RichΛrd <info@richardramos.me>
This commit is contained in:
parent
a044c6a82c
commit
ddf93814fe
18
CHANGELOG.md
18
CHANGELOG.md
|
@ -1,4 +1,22 @@
|
|||
# Changelog
|
||||
## Next
|
||||
|
||||
This release contains the following:
|
||||
|
||||
### Features
|
||||
|
||||
### Changes
|
||||
- Enables db migration for the message store.
|
||||
#### General refactoring
|
||||
#### Docs
|
||||
#### Schema
|
||||
- Updates the `Message` table of the persistent message store:
|
||||
- Adds `senderTimestamp` column.
|
||||
- Renames the `timestamp` column to `receiverTimestamp` and changes its type to `REAL`.
|
||||
#### API
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
## 2021-06-03 v0.4
|
||||
|
||||
|
|
|
@ -14,7 +14,8 @@ import
|
|||
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
|
||||
./v2/test_waku_bridge,
|
||||
./v2/test_peer_storage,
|
||||
./v2/test_waku_keepalive
|
||||
./v2/test_waku_keepalive,
|
||||
./v2/test_migration_utils
|
||||
|
||||
when defined(rln):
|
||||
import ./v2/test_waku_rln_relay
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
CREATE TABLE IF NOT EXISTS Message_backup (
|
||||
id BLOB PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
pubsubTopic BLOB NOT NULL,
|
||||
payload BLOB,
|
||||
version INTEGER NOT NULL
|
||||
) WITHOUT ROWID;
|
|
@ -1,9 +1,10 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/[unittest, options, tables, sets],
|
||||
chronos, chronicles,
|
||||
std/[unittest, options, tables, sets, times, os, strutils],
|
||||
chronos,
|
||||
../../waku/v2/node/storage/message/waku_message_store,
|
||||
../../waku/v2/node/storage/sqlite,
|
||||
../../waku/v2/protocol/waku_store/waku_store,
|
||||
./utils
|
||||
|
||||
|
@ -15,38 +16,99 @@ suite "Message Store":
|
|||
topic = ContentTopic("/waku/2/default-content/proto")
|
||||
pubsubTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
t1 = epochTime()
|
||||
t2 = epochTime()
|
||||
t3 = high(float64)
|
||||
var msgs = @[
|
||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0)),
|
||||
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1)),
|
||||
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32)),
|
||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
|
||||
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
|
||||
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3),
|
||||
]
|
||||
|
||||
defer: store.close()
|
||||
|
||||
var indexes: seq[Index] = @[]
|
||||
for msg in msgs:
|
||||
let output = store.put(computeIndex(msg), msg, pubsubTopic)
|
||||
var index = computeIndex(msg)
|
||||
let output = store.put(index, msg, pubsubTopic)
|
||||
check output.isOk
|
||||
indexes.add(index)
|
||||
|
||||
|
||||
# flags for version
|
||||
var v0Flag, v1Flag, vMaxFlag: bool = false
|
||||
# flags for sender timestamp
|
||||
var t1Flag, t2Flag, t3Flag: bool = false
|
||||
# flags for receiver timestamp
|
||||
var rt1Flag, rt2Flag, rt3Flag: bool = false
|
||||
|
||||
var responseCount = 0
|
||||
proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) =
|
||||
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) =
|
||||
responseCount += 1
|
||||
check msg in msgs
|
||||
check psTopic == pubsubTopic
|
||||
|
||||
# check the correct retrieval of versions
|
||||
if msg.version == uint32(0): v0Flag = true
|
||||
if msg.version == uint32(1): v1Flag = true
|
||||
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
|
||||
if msg.version == high(uint32): vMaxFlag = true
|
||||
|
||||
# check correct retrieval of sender timestamps
|
||||
if msg.timestamp == t1: t1Flag = true
|
||||
if msg.timestamp == t2: t2Flag = true
|
||||
if msg.timestamp == t3: t3Flag = true
|
||||
|
||||
# check correct retrieval of receiver timestamps
|
||||
if receiverTimestamp == indexes[0].receivedTime: rt1Flag = true
|
||||
if receiverTimestamp == indexes[1].receivedTime: rt2Flag = true
|
||||
if receiverTimestamp == indexes[2].receivedTime: rt3Flag = true
|
||||
|
||||
|
||||
let res = store.getAll(data)
|
||||
|
||||
check:
|
||||
res.isErr == false
|
||||
responseCount == 3
|
||||
# check version
|
||||
v0Flag == true
|
||||
v1Flag == true
|
||||
vMaxFlag == true
|
||||
# check sender timestamp
|
||||
t1Flag == true
|
||||
t2Flag == true
|
||||
t3Flag == true
|
||||
# check receiver timestamp
|
||||
rt1Flag == true
|
||||
rt2Flag == true
|
||||
rt3Flag == true
|
||||
test "set and get user version":
|
||||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = WakuMessageStore.init(database)[]
|
||||
defer: store.close()
|
||||
|
||||
let res = database.setUserVersion(5)
|
||||
check res.isErr == false
|
||||
|
||||
let ver = database.getUserVersion()
|
||||
check:
|
||||
ver.isErr == false
|
||||
ver.value == 5
|
||||
test "migration":
|
||||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = WakuMessageStore.init(database)[]
|
||||
defer: store.close()
|
||||
|
||||
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
let migrationPath = sourceDir
|
||||
|
||||
let res = database.migrate(migrationPath, 10)
|
||||
check:
|
||||
res.isErr == false
|
||||
|
||||
let ver = database.getUserVersion()
|
||||
check:
|
||||
ver.isErr == false
|
||||
ver.value == 10
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/[unittest, tables, strutils, os, sequtils],
|
||||
chronicles,
|
||||
stew/results,
|
||||
../../waku/v2/node/storage/migration/[migration_types, migration_utils]
|
||||
|
||||
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
const MIGRATION_PATH = sourceDir / "../../waku/v2/node/storage/migration/migrations_scripts/message"
|
||||
|
||||
suite "Migration utils":
|
||||
test "read migration scripts":
|
||||
let migrationScriptsRes = getScripts(MIGRATION_PATH)
|
||||
check:
|
||||
migrationScriptsRes.isErr == false
|
||||
|
||||
test "filter migration scripts":
|
||||
let migrationUp = [("0001_init", "script1"), ("0001_add", "script1"), ("0002_init", "script2"), ("0003_init", "script3")].toOrderedTable()
|
||||
let migrationScripts = MigrationScripts(migrationUp: migrationUp)
|
||||
let scriptsRes = filterScripts(migrationScripts, 1, 3)
|
||||
check:
|
||||
scriptsRes.isErr == false
|
||||
scriptsRes.value.len == 2
|
||||
scriptsRes.value[0] == "script2"
|
||||
scriptsRes.value[1] == "script3"
|
||||
|
||||
test "filter migration scripts with varying zero-prefixed user versions":
|
||||
let migrationUp = [("0001_init", "script1"), ("1_add", "script1"), ("000002_init", "script2"), ("003_init", "script3")].toOrderedTable()
|
||||
let migrationScripts = MigrationScripts(migrationUp: migrationUp)
|
||||
let scriptsRes = filterScripts(migrationScripts, 1, 3)
|
||||
check:
|
||||
scriptsRes.isErr == false
|
||||
scriptsRes.value.len == 2
|
||||
scriptsRes.value[0] == "script2"
|
||||
scriptsRes.value[1] == "script3"
|
||||
|
||||
test "split scripts with no queries":
|
||||
let script = "; ;"
|
||||
let queries = splitScript(script)
|
||||
check queries.len == 0
|
||||
|
||||
test "split scripts with multiple queries":
|
||||
let q1 = """CREATE TABLE contacts2 (
|
||||
contact_id INTEGER PRIMARY KEY,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
phone TEXT NOT NULL UNIQUE
|
||||
);"""
|
||||
let q2 = """CREATE TABLE contacts2 (
|
||||
contact_id INTEGER PRIMARY KEY,
|
||||
first_name TEXT NOT NULL,
|
||||
last_name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
phone TEXT NOT NULL UNIQUE
|
||||
);"""
|
||||
let script = q1 & q2
|
||||
let queries = splitScript(script)
|
||||
check:
|
||||
queries.len == 2
|
||||
queries[0] == q1
|
||||
queries[1] == q2
|
|
@ -8,7 +8,7 @@ import
|
|||
## retrieve historical messages
|
||||
|
||||
type
|
||||
DataProc* = proc(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) {.closure.}
|
||||
DataProc* = proc(timestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure.}
|
||||
|
||||
MessageStoreResult*[T] = Result[T, string]
|
||||
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[os, algorithm, tables, strutils],
|
||||
chronos, metrics, chronicles,
|
||||
sqlite3_abi,
|
||||
chronos, metrics,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
|
@ -11,11 +12,13 @@ import
|
|||
./message_store,
|
||||
../sqlite,
|
||||
../../../protocol/waku_message,
|
||||
../../../utils/pagination
|
||||
|
||||
../../../utils/pagination
|
||||
export sqlite
|
||||
|
||||
const TABLE_TITLE = "Message"
|
||||
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "../migration/migrations_scripts/message"
|
||||
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
#
|
||||
|
@ -30,14 +33,16 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
|
|||
## It contains:
|
||||
## - 4-Byte ContentTopic stored as an Integer
|
||||
## - Payload stored as a blob
|
||||
|
||||
let prepare = db.prepareStmt("""
|
||||
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
|
||||
id BLOB PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
receiverTimestamp REAL NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
pubsubTopic BLOB NOT NULL,
|
||||
payload BLOB,
|
||||
version INTEGER NOT NULL
|
||||
version INTEGER NOT NULL,
|
||||
senderTimestamp REAL NOT NULL
|
||||
) WITHOUT ROWID;
|
||||
""", NoParams, void)
|
||||
|
||||
|
@ -61,15 +66,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
|
|||
## echo "error"
|
||||
##
|
||||
let prepare = db.database.prepareStmt(
|
||||
"INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic, version) VALUES (?, ?, ?, ?, ?, ?);",
|
||||
(seq[byte], int64, seq[byte], seq[byte], seq[byte], int64),
|
||||
"INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);",
|
||||
(seq[byte], float64, seq[byte], seq[byte], seq[byte], int64, float64),
|
||||
void
|
||||
)
|
||||
|
||||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version)))
|
||||
let res = prepare.value.exec((@(cursor.digest.data), cursor.receivedTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp))
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
@ -91,22 +96,31 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||
proc msg(s: ptr sqlite3_stmt) {.raises: [Defect, Exception].} =
|
||||
gotMessages = true
|
||||
let
|
||||
timestamp = sqlite3_column_int64(s, 0)
|
||||
receiverTimestamp = sqlite3_column_double(s, 0)
|
||||
|
||||
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
|
||||
topicL = sqlite3_column_bytes(s,1)
|
||||
topicLength = sqlite3_column_bytes(s,1)
|
||||
contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1))))
|
||||
|
||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
||||
l = sqlite3_column_bytes(s, 2)
|
||||
pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
|
||||
pubsubTopicL = sqlite3_column_bytes(s,3)
|
||||
length = sqlite3_column_bytes(s, 2)
|
||||
payload = @(toOpenArray(p, 0, length-1))
|
||||
|
||||
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
|
||||
pubsubTopicLength = sqlite3_column_bytes(s,3)
|
||||
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
|
||||
|
||||
version = sqlite3_column_int64(s, 4)
|
||||
|
||||
# TODO retrieve the version number
|
||||
onData(uint64(timestamp),
|
||||
WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))),
|
||||
payload: @(toOpenArray(p, 0, l-1)), version: uint32(version)),
|
||||
string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1))))
|
||||
senderTimestamp = sqlite3_column_double(s, 5)
|
||||
|
||||
let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic, version FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg)
|
||||
|
||||
# TODO retrieve the version number
|
||||
onData(receiverTimestamp.float64,
|
||||
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.float64),
|
||||
pubsubTopic)
|
||||
|
||||
let res = db.database.query("SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp FROM " & TABLE_TITLE & " ORDER BY receiverTimestamp ASC", msg)
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
@ -115,3 +129,5 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||
proc close*(db: WakuMessageStore) =
|
||||
## Closes the database.
|
||||
db.database.close()
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
import tables, stew/results
|
||||
|
||||
type MigrationScriptsResult*[T] = Result[T, string]
|
||||
type
|
||||
MigrationScripts* = ref object of RootObj
|
||||
migrationUp*:OrderedTable[string, string]
|
||||
migrationDown*:OrderedTable[string, string]
|
|
@ -0,0 +1,63 @@
|
|||
import
|
||||
std/[os, algorithm, tables, strutils],
|
||||
chronicles,
|
||||
stew/results,
|
||||
migration_types
|
||||
|
||||
proc getScripts*(migrationPath: string): MigrationScriptsResult[MigrationScripts] =
|
||||
## the code in this procedure is an adaptation of https://github.com/status-im/nim-status/blob/21aebe41be03cb6450ea261793b800ed7d3e6cda/nim_status/migrations/sql_generate.nim#L4
|
||||
var migrationScripts = MigrationScripts(migrationUp:initOrderedTable[string, string](), migrationDown:initOrderedTable[string, string]())
|
||||
try:
|
||||
for kind, path in walkDir(migrationPath):
|
||||
let (_, name, ext) = splitFile(path)
|
||||
if ext != ".sql": continue
|
||||
|
||||
let parts = name.split(".")
|
||||
if parts.len < 2:
|
||||
continue
|
||||
let script = parts[0]
|
||||
let direction = parts[1]
|
||||
|
||||
debug "name", script=script
|
||||
case direction:
|
||||
of "up":
|
||||
migrationScripts.migrationUp[script] = readFile(path)
|
||||
debug "up script", readScript=migrationScripts.migrationUp[script]
|
||||
of "down":
|
||||
migrationScripts.migrationDown[script] = readFile(path)
|
||||
debug "down script", readScript=migrationScripts.migrationDown[script]
|
||||
else:
|
||||
debug "Invalid script: ", name
|
||||
|
||||
migrationScripts.migrationUp.sort(system.cmp)
|
||||
migrationScripts.migrationDown.sort(system.cmp)
|
||||
|
||||
ok(migrationScripts)
|
||||
|
||||
except OSError, IOError:
|
||||
return err("failed to load the migration scripts")
|
||||
|
||||
|
||||
proc filterScripts*(migrationScripts: MigrationScripts, s: int64, e: int64 ): Result[seq[string], string] =
|
||||
## returns migration scripts whose version fall between s and e (e is inclusive)
|
||||
var scripts: seq[string]
|
||||
try:
|
||||
for name, script in migrationScripts.migrationUp:
|
||||
let parts = name.split("_")
|
||||
#TODO this should be int64
|
||||
let ver = parseInt(parts[0])
|
||||
# filter scripts based on their version
|
||||
if s < ver and ver <= e:
|
||||
scripts.add(script)
|
||||
ok(scripts)
|
||||
except ValueError:
|
||||
return err("failed to filter scripts")
|
||||
|
||||
proc splitScript*(script: string): seq[string] =
|
||||
## parses the script into its individual sql commands and returns them
|
||||
var queries: seq[string] = @[]
|
||||
for q in script.split(';'):
|
||||
if isEmptyOrWhitespace(q): continue
|
||||
let query = q.strip() & ";"
|
||||
queries.add(query)
|
||||
return queries
|
|
@ -0,0 +1,8 @@
|
|||
CREATE TABLE IF NOT EXISTS Message(
|
||||
id BLOB PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
pubsubTopic BLOB NOT NULL,
|
||||
payload BLOB,
|
||||
version INTEGER NOT NULL
|
||||
) WITHOUT ROWID;
|
|
@ -0,0 +1,27 @@
|
|||
CREATE TABLE IF NOT EXISTS Message_backup (
|
||||
id BLOB PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
pubsubTopic BLOB NOT NULL,
|
||||
payload BLOB,
|
||||
version INTEGER NOT NULL
|
||||
) WITHOUT ROWID;
|
||||
|
||||
INSERT INTO Message_backup SELECT id, timestamp, contentTopic, pubsubTopic, payload, version FROM Message;
|
||||
|
||||
DROP TABLE Message;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS Message(
|
||||
id BLOB PRIMARY KEY,
|
||||
receiverTimestamp REAL NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
pubsubTopic BLOB NOT NULL,
|
||||
payload BLOB,
|
||||
version INTEGER NOT NULL,
|
||||
senderTimestamp REAL NOT NULL
|
||||
) WITHOUT ROWID;
|
||||
|
||||
|
||||
INSERT INTO Message SELECT id, timestamp, contentTopic, pubsubTopic, payload, version, 0 FROM Message_backup;
|
||||
|
||||
DROP TABLE Message_backup;
|
|
@ -3,13 +3,13 @@
|
|||
import
|
||||
os,
|
||||
sqlite3_abi,
|
||||
chronos, chronicles, metrics, stew/results,
|
||||
chronos, chronicles, metrics,
|
||||
stew/results,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
stew/results, metrics
|
||||
|
||||
migration/[migration_types,migration_utils]
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
#
|
||||
|
@ -30,6 +30,8 @@ type
|
|||
SqliteDatabase* = ref object of RootObj
|
||||
env*: Sqlite
|
||||
|
||||
const USER_VERSION = 2 # increase this when there is a breaking change in the table schema
|
||||
|
||||
template dispose(db: Sqlite) =
|
||||
discard sqlite3_close(db)
|
||||
|
||||
|
@ -110,8 +112,6 @@ proc init*(
|
|||
discard sqlite3_finalize(journalModePragma)
|
||||
return err("Invalid pragma result: " & $x)
|
||||
|
||||
# TODO: check current version and implement schema versioning
|
||||
checkExec "PRAGMA user_version = 1;"
|
||||
|
||||
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
|
||||
checkWalPragmaResult(journalModePragma)
|
||||
|
@ -139,6 +139,8 @@ proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint =
|
|||
sqlite3_bind_int(s, int(n).cint, int(val).cint)
|
||||
elif val is int64:
|
||||
sqlite3_bind_int64(s, n.cint, val)
|
||||
elif val is float64:
|
||||
sqlite3_bind_double(s, n.cint, val)
|
||||
# Note: bind_text not yet supported in sqlite3_abi wrapper
|
||||
# elif val is string:
|
||||
# sqlite3_bind_text(s, n.cint, val.cstring, -1, nil) # `-1` implies string length is the number of bytes up to the first null-terminator
|
||||
|
@ -208,3 +210,84 @@ proc close*(db: SqliteDatabase) =
|
|||
discard sqlite3_close(db.env)
|
||||
|
||||
db[] = SqliteDatabase()[]
|
||||
|
||||
proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] =
|
||||
var version: int64
|
||||
proc handler(s: ptr sqlite3_stmt) =
|
||||
version = sqlite3_column_int64(s, 0)
|
||||
let res = database.query("PRAGMA user_version;", handler)
|
||||
if res.isErr:
|
||||
return err("failed to get user_version")
|
||||
ok(version)
|
||||
|
||||
|
||||
proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[bool] =
|
||||
## sets the value of the user-version integer at offset 60 in the database header.
|
||||
## some context borrowed from https://www.sqlite.org/pragma.html#pragma_user_version
|
||||
## The user-version is an integer that is available to applications to use however they want.
|
||||
## SQLite makes no use of the user-version itself
|
||||
proc handler(s: ptr sqlite3_stmt) =
|
||||
discard
|
||||
let query = "PRAGMA user_version=" & $version & ";"
|
||||
let res = database.query(query, handler)
|
||||
if res.isErr:
|
||||
return err("failed to set user_version")
|
||||
ok(true)
|
||||
|
||||
|
||||
proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = USER_VERSION): DatabaseResult[bool] =
|
||||
## compares the user_version of the db with the targetVersion
|
||||
## runs migration scripts if the user_version is outdated (does not support down migration)
|
||||
## path points to the directory holding the migrations scripts
|
||||
## once the db is updated, it sets the user_version to the tragetVersion
|
||||
|
||||
# read database version
|
||||
let userVersion = db.getUserVersion()
|
||||
debug "current db user_version", userVersion=userVersion
|
||||
if userVersion.value == targetVersion:
|
||||
# already up to date
|
||||
info "database is up to date"
|
||||
ok(true)
|
||||
|
||||
else:
|
||||
# TODO check for the down migrations i.e., userVersion.value > tragetVersion
|
||||
# fetch migration scripts
|
||||
let migrationScriptsRes = getScripts(path)
|
||||
if migrationScriptsRes.isErr:
|
||||
return err("failed to load migration scripts")
|
||||
let migrationScripts = migrationScriptsRes.value
|
||||
|
||||
# filter scripts based on their versions
|
||||
let scriptsRes = migrationScripts.filterScripts(userVersion.value, targetVersion)
|
||||
if scriptsRes.isErr:
|
||||
return err("failed to filter migration scripts")
|
||||
|
||||
let scripts = scriptsRes.value
|
||||
debug "scripts to be run", scripts=scripts
|
||||
|
||||
|
||||
proc handler(s: ptr sqlite3_stmt) =
|
||||
discard
|
||||
|
||||
# run the scripts
|
||||
for script in scripts:
|
||||
debug "script", script=script
|
||||
# a script may contain multiple queries
|
||||
let queries = script.splitScript()
|
||||
# TODO queries of the same script should be executed in an atomic manner
|
||||
for query in queries:
|
||||
let res = db.query(query, handler)
|
||||
if res.isErr:
|
||||
debug "failed to run the query", query=query
|
||||
return err("failed to run the script")
|
||||
else:
|
||||
debug "query is executed", query=query
|
||||
|
||||
|
||||
# bump the user version
|
||||
let res = db.setUserVersion(targetVersion)
|
||||
if res.isErr:
|
||||
return err("failed to set the new user_version")
|
||||
|
||||
debug "user_version is set to", targetVersion=targetVersion
|
||||
ok(true)
|
|
@ -1,5 +1,5 @@
|
|||
import
|
||||
std/[options, tables, strutils, sequtils],
|
||||
std/[options, tables, strutils, sequtils, os],
|
||||
chronos, chronicles, metrics,
|
||||
metrics/chronos_httpserver,
|
||||
stew/shims/net as stewNet,
|
||||
|
@ -22,6 +22,7 @@ import
|
|||
../protocol/waku_lightpush/waku_lightpush,
|
||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||
../utils/peers,
|
||||
./storage/sqlite,
|
||||
./storage/message/message_store,
|
||||
./storage/peer/peer_storage,
|
||||
../utils/requests,
|
||||
|
@ -710,7 +711,7 @@ when isMainModule:
|
|||
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
||||
else:
|
||||
sqliteDatabase = dbRes.value
|
||||
|
||||
|
||||
var pStorage: WakuPeerStorage
|
||||
|
||||
if conf.persistPeers and not sqliteDatabase.isNil:
|
||||
|
@ -745,8 +746,16 @@ when isMainModule:
|
|||
# Store setup
|
||||
if (conf.storenode != "") or (conf.store):
|
||||
var store: WakuMessageStore
|
||||
|
||||
if (not sqliteDatabase.isNil) and conf.persistMessages:
|
||||
|
||||
# run migration
|
||||
info "running migration ... "
|
||||
let migrationResult = sqliteDatabase.migrate(MESSAGE_STORE_MIGRATION_PATH)
|
||||
if migrationResult.isErr:
|
||||
warn "migration failed"
|
||||
else:
|
||||
info "migration is done"
|
||||
|
||||
let res = WakuMessageStore.init(sqliteDatabase)
|
||||
if res.isErr:
|
||||
warn "failed to init WakuMessageStore", err = res.error
|
||||
|
|
|
@ -377,7 +377,7 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
|
|||
if ws.store.isNil:
|
||||
return
|
||||
|
||||
proc onData(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) =
|
||||
proc onData(timestamp: float64, msg: WakuMessage, pubsubTopic: string) =
|
||||
# TODO index should not be recalculated
|
||||
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic))
|
||||
|
||||
|
|
Loading…
Reference in New Issue