mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
Persisting Waku message timestamp & implementing DB migration & convert receiver timestamp data type to float64 (#607)
* adds timestamp to waku message store impl * stores timestamp as int64 * adds untitest * stores timestamp as seq of bytes * minor * re-orders unittest * changes receiver timestamp to float64 * unit test for receiver timestamps * adds comments * reorder a few lines * updates changelog * more updates on changelog * WIP * WIP * adds migration * more debug messages * passes the path to the migration scripts from message store module * adds migration result type * replaces migrationScripts with migrationScriptsResult * adds path calculation to the message store * removes some tests binary file * removes redundant imports * comments out user_version assignment in sqlite init * more descriptive err messages * clean up test file * more info logs * minor code format * removes a todo * minor updates * remove a binary file * unit tests for migration utils * adds split script * integrates split query to handle scripts with multiple commands * updates migration script for v1 * updates the v1 migration script * update user version * updates script * fixes a few bugs on the splitScript * more debug logs * adds float64 parameter support to sqlite3 * change in timestamp type in the script * deletes float64 toBytes utils * enables storage of timestamp as a real number in the sqlite db * bump up script index * comment edits * removes migrate unit test * adds todo and docstring * updates changelog * removes an unused item in .gitignore * minor * updates changelog * organizes imports * cleans up imports * WIP * updates script fixes a few bugs on the splitScript more debug logs adds float64 parameter support to sqlite3 change in timestamp type in the script deletes float64 toBytes utils * edits migration util test * remove an empty test file * includes migration utils tests in * deletes unused codes * tides up imports * adds range based filter to the filterMigrationScripts * renames procs: removes Migration * tides up imports * edits docstring * edits docstring * edits docstring * removes unused imports * more clean up * groups std imports * updates changelog * adds docstring for setUserVersion * adds unittest for the migrate * Update waku/v2/node/storage/message/waku_message_store.nim Co-authored-by: RichΛrd <info@richardramos.me> * Update waku/v2/node/storage/sqlite.nim Co-authored-by: RichΛrd <info@richardramos.me> * Update waku/v2/node/storage/sqlite.nim Co-authored-by: RichΛrd <info@richardramos.me> * removes split scripts * fixes a naming issue * fixes a bug * fixes a typo * adds a log re updated user_version * fixes a proc naming mismatch * fixes naming mismatch * more descriptive var names * adds migration script of the first user version * moves migration to after persistMessages flag is checked * deletes unused comment * fixes a bug * brings back split script * adds unit tests for split scripts * runs scripts one command at a time * deletes a commented line * relocates the migrate proc to sqlite.nim * adds unit test for filter scripts * adds filterScripts unittest testing varying zero-prefixed user versions * minor Co-authored-by: RichΛrd <info@richardramos.me>
This commit is contained in:
parent
2d43b30926
commit
fedc85c2f5
18
CHANGELOG.md
18
CHANGELOG.md
@ -1,4 +1,22 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
## Next
|
||||||
|
|
||||||
|
This release contains the following:
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
### Changes
|
||||||
|
- Enables db migration for the message store.
|
||||||
|
#### General refactoring
|
||||||
|
#### Docs
|
||||||
|
#### Schema
|
||||||
|
- Updates the `Message` table of the persistent message store:
|
||||||
|
- Adds `senderTimestamp` column.
|
||||||
|
- Renames the `timestamp` column to `receiverTimestamp` and changes its type to `REAL`.
|
||||||
|
#### API
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
|
||||||
## 2021-06-03 v0.4
|
## 2021-06-03 v0.4
|
||||||
|
|
||||||
|
|||||||
@ -14,7 +14,8 @@ import
|
|||||||
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
|
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
|
||||||
./v2/test_waku_bridge,
|
./v2/test_waku_bridge,
|
||||||
./v2/test_peer_storage,
|
./v2/test_peer_storage,
|
||||||
./v2/test_waku_keepalive
|
./v2/test_waku_keepalive,
|
||||||
|
./v2/test_migration_utils
|
||||||
|
|
||||||
when defined(rln):
|
when defined(rln):
|
||||||
import ./v2/test_waku_rln_relay
|
import ./v2/test_waku_rln_relay
|
||||||
|
|||||||
8
tests/v2/00002_test.up.sql
Normal file
8
tests/v2/00002_test.up.sql
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS Message_backup (
|
||||||
|
id BLOB PRIMARY KEY,
|
||||||
|
timestamp INTEGER NOT NULL,
|
||||||
|
contentTopic BLOB NOT NULL,
|
||||||
|
pubsubTopic BLOB NOT NULL,
|
||||||
|
payload BLOB,
|
||||||
|
version INTEGER NOT NULL
|
||||||
|
) WITHOUT ROWID;
|
||||||
@ -1,9 +1,10 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[unittest, options, tables, sets],
|
std/[unittest, options, tables, sets, times, os, strutils],
|
||||||
chronos, chronicles,
|
chronos,
|
||||||
../../waku/v2/node/storage/message/waku_message_store,
|
../../waku/v2/node/storage/message/waku_message_store,
|
||||||
|
../../waku/v2/node/storage/sqlite,
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
./utils
|
./utils
|
||||||
|
|
||||||
@ -15,38 +16,99 @@ suite "Message Store":
|
|||||||
topic = ContentTopic("/waku/2/default-content/proto")
|
topic = ContentTopic("/waku/2/default-content/proto")
|
||||||
pubsubTopic = "/waku/2/default-waku/proto"
|
pubsubTopic = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
|
t1 = epochTime()
|
||||||
|
t2 = epochTime()
|
||||||
|
t3 = high(float64)
|
||||||
var msgs = @[
|
var msgs = @[
|
||||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0)),
|
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
|
||||||
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1)),
|
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
|
||||||
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32)),
|
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3),
|
||||||
]
|
]
|
||||||
|
|
||||||
defer: store.close()
|
defer: store.close()
|
||||||
|
|
||||||
|
var indexes: seq[Index] = @[]
|
||||||
for msg in msgs:
|
for msg in msgs:
|
||||||
let output = store.put(computeIndex(msg), msg, pubsubTopic)
|
var index = computeIndex(msg)
|
||||||
|
let output = store.put(index, msg, pubsubTopic)
|
||||||
check output.isOk
|
check output.isOk
|
||||||
|
indexes.add(index)
|
||||||
|
|
||||||
|
|
||||||
|
# flags for version
|
||||||
var v0Flag, v1Flag, vMaxFlag: bool = false
|
var v0Flag, v1Flag, vMaxFlag: bool = false
|
||||||
|
# flags for sender timestamp
|
||||||
|
var t1Flag, t2Flag, t3Flag: bool = false
|
||||||
|
# flags for receiver timestamp
|
||||||
|
var rt1Flag, rt2Flag, rt3Flag: bool = false
|
||||||
|
|
||||||
var responseCount = 0
|
var responseCount = 0
|
||||||
proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) =
|
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) =
|
||||||
responseCount += 1
|
responseCount += 1
|
||||||
check msg in msgs
|
check msg in msgs
|
||||||
check psTopic == pubsubTopic
|
check psTopic == pubsubTopic
|
||||||
|
|
||||||
# check the correct retrieval of versions
|
# check the correct retrieval of versions
|
||||||
if msg.version == uint32(0): v0Flag = true
|
if msg.version == uint32(0): v0Flag = true
|
||||||
if msg.version == uint32(1): v1Flag = true
|
if msg.version == uint32(1): v1Flag = true
|
||||||
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
|
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
|
||||||
if msg.version == high(uint32): vMaxFlag = true
|
if msg.version == high(uint32): vMaxFlag = true
|
||||||
|
|
||||||
|
# check correct retrieval of sender timestamps
|
||||||
|
if msg.timestamp == t1: t1Flag = true
|
||||||
|
if msg.timestamp == t2: t2Flag = true
|
||||||
|
if msg.timestamp == t3: t3Flag = true
|
||||||
|
|
||||||
|
# check correct retrieval of receiver timestamps
|
||||||
|
if receiverTimestamp == indexes[0].receivedTime: rt1Flag = true
|
||||||
|
if receiverTimestamp == indexes[1].receivedTime: rt2Flag = true
|
||||||
|
if receiverTimestamp == indexes[2].receivedTime: rt3Flag = true
|
||||||
|
|
||||||
|
|
||||||
let res = store.getAll(data)
|
let res = store.getAll(data)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
res.isErr == false
|
res.isErr == false
|
||||||
responseCount == 3
|
responseCount == 3
|
||||||
|
# check version
|
||||||
v0Flag == true
|
v0Flag == true
|
||||||
v1Flag == true
|
v1Flag == true
|
||||||
vMaxFlag == true
|
vMaxFlag == true
|
||||||
|
# check sender timestamp
|
||||||
|
t1Flag == true
|
||||||
|
t2Flag == true
|
||||||
|
t3Flag == true
|
||||||
|
# check receiver timestamp
|
||||||
|
rt1Flag == true
|
||||||
|
rt2Flag == true
|
||||||
|
rt3Flag == true
|
||||||
|
test "set and get user version":
|
||||||
|
let
|
||||||
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
|
store = WakuMessageStore.init(database)[]
|
||||||
|
defer: store.close()
|
||||||
|
|
||||||
|
let res = database.setUserVersion(5)
|
||||||
|
check res.isErr == false
|
||||||
|
|
||||||
|
let ver = database.getUserVersion()
|
||||||
|
check:
|
||||||
|
ver.isErr == false
|
||||||
|
ver.value == 5
|
||||||
|
test "migration":
|
||||||
|
let
|
||||||
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
|
store = WakuMessageStore.init(database)[]
|
||||||
|
defer: store.close()
|
||||||
|
|
||||||
|
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||||
|
let migrationPath = sourceDir
|
||||||
|
|
||||||
|
let res = database.migrate(migrationPath, 10)
|
||||||
|
check:
|
||||||
|
res.isErr == false
|
||||||
|
|
||||||
|
let ver = database.getUserVersion()
|
||||||
|
check:
|
||||||
|
ver.isErr == false
|
||||||
|
ver.value == 10
|
||||||
|
|||||||
63
tests/v2/test_migration_utils.nim
Normal file
63
tests/v2/test_migration_utils.nim
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[unittest, tables, strutils, os, sequtils],
|
||||||
|
chronicles,
|
||||||
|
stew/results,
|
||||||
|
../../waku/v2/node/storage/migration/[migration_types, migration_utils]
|
||||||
|
|
||||||
|
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||||
|
const MIGRATION_PATH = sourceDir / "../../waku/v2/node/storage/migration/migrations_scripts/message"
|
||||||
|
|
||||||
|
suite "Migration utils":
|
||||||
|
test "read migration scripts":
|
||||||
|
let migrationScriptsRes = getScripts(MIGRATION_PATH)
|
||||||
|
check:
|
||||||
|
migrationScriptsRes.isErr == false
|
||||||
|
|
||||||
|
test "filter migration scripts":
|
||||||
|
let migrationUp = [("0001_init", "script1"), ("0001_add", "script1"), ("0002_init", "script2"), ("0003_init", "script3")].toOrderedTable()
|
||||||
|
let migrationScripts = MigrationScripts(migrationUp: migrationUp)
|
||||||
|
let scriptsRes = filterScripts(migrationScripts, 1, 3)
|
||||||
|
check:
|
||||||
|
scriptsRes.isErr == false
|
||||||
|
scriptsRes.value.len == 2
|
||||||
|
scriptsRes.value[0] == "script2"
|
||||||
|
scriptsRes.value[1] == "script3"
|
||||||
|
|
||||||
|
test "filter migration scripts with varying zero-prefixed user versions":
|
||||||
|
let migrationUp = [("0001_init", "script1"), ("1_add", "script1"), ("000002_init", "script2"), ("003_init", "script3")].toOrderedTable()
|
||||||
|
let migrationScripts = MigrationScripts(migrationUp: migrationUp)
|
||||||
|
let scriptsRes = filterScripts(migrationScripts, 1, 3)
|
||||||
|
check:
|
||||||
|
scriptsRes.isErr == false
|
||||||
|
scriptsRes.value.len == 2
|
||||||
|
scriptsRes.value[0] == "script2"
|
||||||
|
scriptsRes.value[1] == "script3"
|
||||||
|
|
||||||
|
test "split scripts with no queries":
|
||||||
|
let script = "; ;"
|
||||||
|
let queries = splitScript(script)
|
||||||
|
check queries.len == 0
|
||||||
|
|
||||||
|
test "split scripts with multiple queries":
|
||||||
|
let q1 = """CREATE TABLE contacts2 (
|
||||||
|
contact_id INTEGER PRIMARY KEY,
|
||||||
|
first_name TEXT NOT NULL,
|
||||||
|
last_name TEXT NOT NULL,
|
||||||
|
email TEXT NOT NULL UNIQUE,
|
||||||
|
phone TEXT NOT NULL UNIQUE
|
||||||
|
);"""
|
||||||
|
let q2 = """CREATE TABLE contacts2 (
|
||||||
|
contact_id INTEGER PRIMARY KEY,
|
||||||
|
first_name TEXT NOT NULL,
|
||||||
|
last_name TEXT NOT NULL,
|
||||||
|
email TEXT NOT NULL UNIQUE,
|
||||||
|
phone TEXT NOT NULL UNIQUE
|
||||||
|
);"""
|
||||||
|
let script = q1 & q2
|
||||||
|
let queries = splitScript(script)
|
||||||
|
check:
|
||||||
|
queries.len == 2
|
||||||
|
queries[0] == q1
|
||||||
|
queries[1] == q2
|
||||||
@ -8,7 +8,7 @@ import
|
|||||||
## retrieve historical messages
|
## retrieve historical messages
|
||||||
|
|
||||||
type
|
type
|
||||||
DataProc* = proc(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) {.closure.}
|
DataProc* = proc(timestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure.}
|
||||||
|
|
||||||
MessageStoreResult*[T] = Result[T, string]
|
MessageStoreResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
|||||||
@ -1,8 +1,9 @@
|
|||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
|
std/[os, algorithm, tables, strutils],
|
||||||
|
chronos, metrics, chronicles,
|
||||||
sqlite3_abi,
|
sqlite3_abi,
|
||||||
chronos, metrics,
|
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
@ -11,11 +12,13 @@ import
|
|||||||
./message_store,
|
./message_store,
|
||||||
../sqlite,
|
../sqlite,
|
||||||
../../../protocol/waku_message,
|
../../../protocol/waku_message,
|
||||||
../../../utils/pagination
|
../../../utils/pagination
|
||||||
|
|
||||||
export sqlite
|
export sqlite
|
||||||
|
|
||||||
const TABLE_TITLE = "Message"
|
const TABLE_TITLE = "Message"
|
||||||
|
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||||
|
const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "../migration/migrations_scripts/message"
|
||||||
|
|
||||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||||
#
|
#
|
||||||
@ -30,14 +33,16 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
|
|||||||
## It contains:
|
## It contains:
|
||||||
## - 4-Byte ContentTopic stored as an Integer
|
## - 4-Byte ContentTopic stored as an Integer
|
||||||
## - Payload stored as a blob
|
## - Payload stored as a blob
|
||||||
|
|
||||||
let prepare = db.prepareStmt("""
|
let prepare = db.prepareStmt("""
|
||||||
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
|
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
|
||||||
id BLOB PRIMARY KEY,
|
id BLOB PRIMARY KEY,
|
||||||
timestamp INTEGER NOT NULL,
|
receiverTimestamp REAL NOT NULL,
|
||||||
contentTopic BLOB NOT NULL,
|
contentTopic BLOB NOT NULL,
|
||||||
pubsubTopic BLOB NOT NULL,
|
pubsubTopic BLOB NOT NULL,
|
||||||
payload BLOB,
|
payload BLOB,
|
||||||
version INTEGER NOT NULL
|
version INTEGER NOT NULL,
|
||||||
|
senderTimestamp REAL NOT NULL
|
||||||
) WITHOUT ROWID;
|
) WITHOUT ROWID;
|
||||||
""", NoParams, void)
|
""", NoParams, void)
|
||||||
|
|
||||||
@ -61,15 +66,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
|
|||||||
## echo "error"
|
## echo "error"
|
||||||
##
|
##
|
||||||
let prepare = db.database.prepareStmt(
|
let prepare = db.database.prepareStmt(
|
||||||
"INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic, version) VALUES (?, ?, ?, ?, ?, ?);",
|
"INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);",
|
||||||
(seq[byte], int64, seq[byte], seq[byte], seq[byte], int64),
|
(seq[byte], float64, seq[byte], seq[byte], seq[byte], int64, float64),
|
||||||
void
|
void
|
||||||
)
|
)
|
||||||
|
|
||||||
if prepare.isErr:
|
if prepare.isErr:
|
||||||
return err("failed to prepare")
|
return err("failed to prepare")
|
||||||
|
|
||||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version)))
|
let res = prepare.value.exec((@(cursor.digest.data), cursor.receivedTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("failed")
|
return err("failed")
|
||||||
|
|
||||||
@ -91,22 +96,31 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||||||
proc msg(s: ptr sqlite3_stmt) {.raises: [Defect, Exception].} =
|
proc msg(s: ptr sqlite3_stmt) {.raises: [Defect, Exception].} =
|
||||||
gotMessages = true
|
gotMessages = true
|
||||||
let
|
let
|
||||||
timestamp = sqlite3_column_int64(s, 0)
|
receiverTimestamp = sqlite3_column_double(s, 0)
|
||||||
|
|
||||||
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
|
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
|
||||||
topicL = sqlite3_column_bytes(s,1)
|
topicLength = sqlite3_column_bytes(s,1)
|
||||||
|
contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1))))
|
||||||
|
|
||||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
||||||
l = sqlite3_column_bytes(s, 2)
|
length = sqlite3_column_bytes(s, 2)
|
||||||
pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
|
payload = @(toOpenArray(p, 0, length-1))
|
||||||
pubsubTopicL = sqlite3_column_bytes(s,3)
|
|
||||||
|
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
|
||||||
|
pubsubTopicLength = sqlite3_column_bytes(s,3)
|
||||||
|
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
|
||||||
|
|
||||||
version = sqlite3_column_int64(s, 4)
|
version = sqlite3_column_int64(s, 4)
|
||||||
|
|
||||||
# TODO retrieve the version number
|
senderTimestamp = sqlite3_column_double(s, 5)
|
||||||
onData(uint64(timestamp),
|
|
||||||
WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))),
|
|
||||||
payload: @(toOpenArray(p, 0, l-1)), version: uint32(version)),
|
|
||||||
string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1))))
|
|
||||||
|
|
||||||
let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic, version FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg)
|
|
||||||
|
# TODO retrieve the version number
|
||||||
|
onData(receiverTimestamp.float64,
|
||||||
|
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: senderTimestamp.float64),
|
||||||
|
pubsubTopic)
|
||||||
|
|
||||||
|
let res = db.database.query("SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp FROM " & TABLE_TITLE & " ORDER BY receiverTimestamp ASC", msg)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("failed")
|
return err("failed")
|
||||||
|
|
||||||
@ -115,3 +129,5 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||||||
proc close*(db: WakuMessageStore) =
|
proc close*(db: WakuMessageStore) =
|
||||||
## Closes the database.
|
## Closes the database.
|
||||||
db.database.close()
|
db.database.close()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
7
waku/v2/node/storage/migration/migration_types.nim
Normal file
7
waku/v2/node/storage/migration/migration_types.nim
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
import tables, stew/results
|
||||||
|
|
||||||
|
type MigrationScriptsResult*[T] = Result[T, string]
|
||||||
|
type
|
||||||
|
MigrationScripts* = ref object of RootObj
|
||||||
|
migrationUp*:OrderedTable[string, string]
|
||||||
|
migrationDown*:OrderedTable[string, string]
|
||||||
63
waku/v2/node/storage/migration/migration_utils.nim
Normal file
63
waku/v2/node/storage/migration/migration_utils.nim
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
import
|
||||||
|
std/[os, algorithm, tables, strutils],
|
||||||
|
chronicles,
|
||||||
|
stew/results,
|
||||||
|
migration_types
|
||||||
|
|
||||||
|
proc getScripts*(migrationPath: string): MigrationScriptsResult[MigrationScripts] =
|
||||||
|
## the code in this procedure is an adaptation of https://github.com/status-im/nim-status/blob/21aebe41be03cb6450ea261793b800ed7d3e6cda/nim_status/migrations/sql_generate.nim#L4
|
||||||
|
var migrationScripts = MigrationScripts(migrationUp:initOrderedTable[string, string](), migrationDown:initOrderedTable[string, string]())
|
||||||
|
try:
|
||||||
|
for kind, path in walkDir(migrationPath):
|
||||||
|
let (_, name, ext) = splitFile(path)
|
||||||
|
if ext != ".sql": continue
|
||||||
|
|
||||||
|
let parts = name.split(".")
|
||||||
|
if parts.len < 2:
|
||||||
|
continue
|
||||||
|
let script = parts[0]
|
||||||
|
let direction = parts[1]
|
||||||
|
|
||||||
|
debug "name", script=script
|
||||||
|
case direction:
|
||||||
|
of "up":
|
||||||
|
migrationScripts.migrationUp[script] = readFile(path)
|
||||||
|
debug "up script", readScript=migrationScripts.migrationUp[script]
|
||||||
|
of "down":
|
||||||
|
migrationScripts.migrationDown[script] = readFile(path)
|
||||||
|
debug "down script", readScript=migrationScripts.migrationDown[script]
|
||||||
|
else:
|
||||||
|
debug "Invalid script: ", name
|
||||||
|
|
||||||
|
migrationScripts.migrationUp.sort(system.cmp)
|
||||||
|
migrationScripts.migrationDown.sort(system.cmp)
|
||||||
|
|
||||||
|
ok(migrationScripts)
|
||||||
|
|
||||||
|
except OSError, IOError:
|
||||||
|
return err("failed to load the migration scripts")
|
||||||
|
|
||||||
|
|
||||||
|
proc filterScripts*(migrationScripts: MigrationScripts, s: int64, e: int64 ): Result[seq[string], string] =
|
||||||
|
## returns migration scripts whose version fall between s and e (e is inclusive)
|
||||||
|
var scripts: seq[string]
|
||||||
|
try:
|
||||||
|
for name, script in migrationScripts.migrationUp:
|
||||||
|
let parts = name.split("_")
|
||||||
|
#TODO this should be int64
|
||||||
|
let ver = parseInt(parts[0])
|
||||||
|
# filter scripts based on their version
|
||||||
|
if s < ver and ver <= e:
|
||||||
|
scripts.add(script)
|
||||||
|
ok(scripts)
|
||||||
|
except ValueError:
|
||||||
|
return err("failed to filter scripts")
|
||||||
|
|
||||||
|
proc splitScript*(script: string): seq[string] =
|
||||||
|
## parses the script into its individual sql commands and returns them
|
||||||
|
var queries: seq[string] = @[]
|
||||||
|
for q in script.split(';'):
|
||||||
|
if isEmptyOrWhitespace(q): continue
|
||||||
|
let query = q.strip() & ";"
|
||||||
|
queries.add(query)
|
||||||
|
return queries
|
||||||
@ -0,0 +1,8 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS Message(
|
||||||
|
id BLOB PRIMARY KEY,
|
||||||
|
timestamp INTEGER NOT NULL,
|
||||||
|
contentTopic BLOB NOT NULL,
|
||||||
|
pubsubTopic BLOB NOT NULL,
|
||||||
|
payload BLOB,
|
||||||
|
version INTEGER NOT NULL
|
||||||
|
) WITHOUT ROWID;
|
||||||
@ -0,0 +1,27 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS Message_backup (
|
||||||
|
id BLOB PRIMARY KEY,
|
||||||
|
timestamp INTEGER NOT NULL,
|
||||||
|
contentTopic BLOB NOT NULL,
|
||||||
|
pubsubTopic BLOB NOT NULL,
|
||||||
|
payload BLOB,
|
||||||
|
version INTEGER NOT NULL
|
||||||
|
) WITHOUT ROWID;
|
||||||
|
|
||||||
|
INSERT INTO Message_backup SELECT id, timestamp, contentTopic, pubsubTopic, payload, version FROM Message;
|
||||||
|
|
||||||
|
DROP TABLE Message;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS Message(
|
||||||
|
id BLOB PRIMARY KEY,
|
||||||
|
receiverTimestamp REAL NOT NULL,
|
||||||
|
contentTopic BLOB NOT NULL,
|
||||||
|
pubsubTopic BLOB NOT NULL,
|
||||||
|
payload BLOB,
|
||||||
|
version INTEGER NOT NULL,
|
||||||
|
senderTimestamp REAL NOT NULL
|
||||||
|
) WITHOUT ROWID;
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO Message SELECT id, timestamp, contentTopic, pubsubTopic, payload, version, 0 FROM Message_backup;
|
||||||
|
|
||||||
|
DROP TABLE Message_backup;
|
||||||
@ -3,13 +3,13 @@
|
|||||||
import
|
import
|
||||||
os,
|
os,
|
||||||
sqlite3_abi,
|
sqlite3_abi,
|
||||||
chronos, chronicles, metrics, stew/results,
|
chronos, chronicles, metrics,
|
||||||
|
stew/results,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
stew/results, metrics
|
migration/[migration_types,migration_utils]
|
||||||
|
|
||||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||||
#
|
#
|
||||||
@ -30,6 +30,8 @@ type
|
|||||||
SqliteDatabase* = ref object of RootObj
|
SqliteDatabase* = ref object of RootObj
|
||||||
env*: Sqlite
|
env*: Sqlite
|
||||||
|
|
||||||
|
const USER_VERSION = 2 # increase this when there is a breaking change in the table schema
|
||||||
|
|
||||||
template dispose(db: Sqlite) =
|
template dispose(db: Sqlite) =
|
||||||
discard sqlite3_close(db)
|
discard sqlite3_close(db)
|
||||||
|
|
||||||
@ -110,8 +112,6 @@ proc init*(
|
|||||||
discard sqlite3_finalize(journalModePragma)
|
discard sqlite3_finalize(journalModePragma)
|
||||||
return err("Invalid pragma result: " & $x)
|
return err("Invalid pragma result: " & $x)
|
||||||
|
|
||||||
# TODO: check current version and implement schema versioning
|
|
||||||
checkExec "PRAGMA user_version = 1;"
|
|
||||||
|
|
||||||
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
|
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
|
||||||
checkWalPragmaResult(journalModePragma)
|
checkWalPragmaResult(journalModePragma)
|
||||||
@ -139,6 +139,8 @@ proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint =
|
|||||||
sqlite3_bind_int(s, int(n).cint, int(val).cint)
|
sqlite3_bind_int(s, int(n).cint, int(val).cint)
|
||||||
elif val is int64:
|
elif val is int64:
|
||||||
sqlite3_bind_int64(s, n.cint, val)
|
sqlite3_bind_int64(s, n.cint, val)
|
||||||
|
elif val is float64:
|
||||||
|
sqlite3_bind_double(s, n.cint, val)
|
||||||
# Note: bind_text not yet supported in sqlite3_abi wrapper
|
# Note: bind_text not yet supported in sqlite3_abi wrapper
|
||||||
# elif val is string:
|
# elif val is string:
|
||||||
# sqlite3_bind_text(s, n.cint, val.cstring, -1, nil) # `-1` implies string length is the number of bytes up to the first null-terminator
|
# sqlite3_bind_text(s, n.cint, val.cstring, -1, nil) # `-1` implies string length is the number of bytes up to the first null-terminator
|
||||||
@ -208,3 +210,84 @@ proc close*(db: SqliteDatabase) =
|
|||||||
discard sqlite3_close(db.env)
|
discard sqlite3_close(db.env)
|
||||||
|
|
||||||
db[] = SqliteDatabase()[]
|
db[] = SqliteDatabase()[]
|
||||||
|
|
||||||
|
proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] =
|
||||||
|
var version: int64
|
||||||
|
proc handler(s: ptr sqlite3_stmt) =
|
||||||
|
version = sqlite3_column_int64(s, 0)
|
||||||
|
let res = database.query("PRAGMA user_version;", handler)
|
||||||
|
if res.isErr:
|
||||||
|
return err("failed to get user_version")
|
||||||
|
ok(version)
|
||||||
|
|
||||||
|
|
||||||
|
proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[bool] =
|
||||||
|
## sets the value of the user-version integer at offset 60 in the database header.
|
||||||
|
## some context borrowed from https://www.sqlite.org/pragma.html#pragma_user_version
|
||||||
|
## The user-version is an integer that is available to applications to use however they want.
|
||||||
|
## SQLite makes no use of the user-version itself
|
||||||
|
proc handler(s: ptr sqlite3_stmt) =
|
||||||
|
discard
|
||||||
|
let query = "PRAGMA user_version=" & $version & ";"
|
||||||
|
let res = database.query(query, handler)
|
||||||
|
if res.isErr:
|
||||||
|
return err("failed to set user_version")
|
||||||
|
ok(true)
|
||||||
|
|
||||||
|
|
||||||
|
proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = USER_VERSION): DatabaseResult[bool] =
|
||||||
|
## compares the user_version of the db with the targetVersion
|
||||||
|
## runs migration scripts if the user_version is outdated (does not support down migration)
|
||||||
|
## path points to the directory holding the migrations scripts
|
||||||
|
## once the db is updated, it sets the user_version to the tragetVersion
|
||||||
|
|
||||||
|
# read database version
|
||||||
|
let userVersion = db.getUserVersion()
|
||||||
|
debug "current db user_version", userVersion=userVersion
|
||||||
|
if userVersion.value == targetVersion:
|
||||||
|
# already up to date
|
||||||
|
info "database is up to date"
|
||||||
|
ok(true)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# TODO check for the down migrations i.e., userVersion.value > tragetVersion
|
||||||
|
# fetch migration scripts
|
||||||
|
let migrationScriptsRes = getScripts(path)
|
||||||
|
if migrationScriptsRes.isErr:
|
||||||
|
return err("failed to load migration scripts")
|
||||||
|
let migrationScripts = migrationScriptsRes.value
|
||||||
|
|
||||||
|
# filter scripts based on their versions
|
||||||
|
let scriptsRes = migrationScripts.filterScripts(userVersion.value, targetVersion)
|
||||||
|
if scriptsRes.isErr:
|
||||||
|
return err("failed to filter migration scripts")
|
||||||
|
|
||||||
|
let scripts = scriptsRes.value
|
||||||
|
debug "scripts to be run", scripts=scripts
|
||||||
|
|
||||||
|
|
||||||
|
proc handler(s: ptr sqlite3_stmt) =
|
||||||
|
discard
|
||||||
|
|
||||||
|
# run the scripts
|
||||||
|
for script in scripts:
|
||||||
|
debug "script", script=script
|
||||||
|
# a script may contain multiple queries
|
||||||
|
let queries = script.splitScript()
|
||||||
|
# TODO queries of the same script should be executed in an atomic manner
|
||||||
|
for query in queries:
|
||||||
|
let res = db.query(query, handler)
|
||||||
|
if res.isErr:
|
||||||
|
debug "failed to run the query", query=query
|
||||||
|
return err("failed to run the script")
|
||||||
|
else:
|
||||||
|
debug "query is executed", query=query
|
||||||
|
|
||||||
|
|
||||||
|
# bump the user version
|
||||||
|
let res = db.setUserVersion(targetVersion)
|
||||||
|
if res.isErr:
|
||||||
|
return err("failed to set the new user_version")
|
||||||
|
|
||||||
|
debug "user_version is set to", targetVersion=targetVersion
|
||||||
|
ok(true)
|
||||||
@ -1,5 +1,5 @@
|
|||||||
import
|
import
|
||||||
std/[options, tables, strutils, sequtils],
|
std/[options, tables, strutils, sequtils, os],
|
||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
metrics/chronos_httpserver,
|
metrics/chronos_httpserver,
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
@ -22,6 +22,7 @@ import
|
|||||||
../protocol/waku_lightpush/waku_lightpush,
|
../protocol/waku_lightpush/waku_lightpush,
|
||||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
../utils/peers,
|
../utils/peers,
|
||||||
|
./storage/sqlite,
|
||||||
./storage/message/message_store,
|
./storage/message/message_store,
|
||||||
./storage/peer/peer_storage,
|
./storage/peer/peer_storage,
|
||||||
../utils/requests,
|
../utils/requests,
|
||||||
@ -710,7 +711,7 @@ when isMainModule:
|
|||||||
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
||||||
else:
|
else:
|
||||||
sqliteDatabase = dbRes.value
|
sqliteDatabase = dbRes.value
|
||||||
|
|
||||||
var pStorage: WakuPeerStorage
|
var pStorage: WakuPeerStorage
|
||||||
|
|
||||||
if conf.persistPeers and not sqliteDatabase.isNil:
|
if conf.persistPeers and not sqliteDatabase.isNil:
|
||||||
@ -745,8 +746,16 @@ when isMainModule:
|
|||||||
# Store setup
|
# Store setup
|
||||||
if (conf.storenode != "") or (conf.store):
|
if (conf.storenode != "") or (conf.store):
|
||||||
var store: WakuMessageStore
|
var store: WakuMessageStore
|
||||||
|
|
||||||
if (not sqliteDatabase.isNil) and conf.persistMessages:
|
if (not sqliteDatabase.isNil) and conf.persistMessages:
|
||||||
|
|
||||||
|
# run migration
|
||||||
|
info "running migration ... "
|
||||||
|
let migrationResult = sqliteDatabase.migrate(MESSAGE_STORE_MIGRATION_PATH)
|
||||||
|
if migrationResult.isErr:
|
||||||
|
warn "migration failed"
|
||||||
|
else:
|
||||||
|
info "migration is done"
|
||||||
|
|
||||||
let res = WakuMessageStore.init(sqliteDatabase)
|
let res = WakuMessageStore.init(sqliteDatabase)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
warn "failed to init WakuMessageStore", err = res.error
|
warn "failed to init WakuMessageStore", err = res.error
|
||||||
|
|||||||
@ -377,7 +377,7 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
|
|||||||
if ws.store.isNil:
|
if ws.store.isNil:
|
||||||
return
|
return
|
||||||
|
|
||||||
proc onData(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) =
|
proc onData(timestamp: float64, msg: WakuMessage, pubsubTopic: string) =
|
||||||
# TODO index should not be recalculated
|
# TODO index should not be recalculated
|
||||||
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic))
|
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic))
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user