mirror of https://github.com/waku-org/nwaku.git
Storing waku message version inside the store db (#530)
* WIP * stores version in db, edits error messages * fixes a bug * reverts edit messages * converts uint32 storage to int64 * unit tests for various version numbers * minor comments edit * removes debugging messages * updates changelog
This commit is contained in:
parent
a6bb3f65e1
commit
7c5df3379b
|
@ -14,6 +14,7 @@
|
|||
- Fixed: content filtering now works on any PubSub topic and not just the `waku` default.
|
||||
- Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`.
|
||||
- Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed.
|
||||
- Added a new column of `version` to the `Message` table of the message store db.
|
||||
- Fix: allow mounting light protocols without `relay`
|
||||
- Add `keep-alive` option to maintain stable connection to `relay` peers on idle topics
|
||||
|
||||
|
|
|
@ -16,9 +16,9 @@ suite "Message Store":
|
|||
pubsubTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
var msgs = @[
|
||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic),
|
||||
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic),
|
||||
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic),
|
||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0)),
|
||||
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1)),
|
||||
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32)),
|
||||
]
|
||||
|
||||
defer: store.close()
|
||||
|
@ -27,14 +27,26 @@ suite "Message Store":
|
|||
let output = store.put(computeIndex(msg), msg, pubsubTopic)
|
||||
check output.isOk
|
||||
|
||||
var v0Flag, v1Flag, vMaxFlag: bool = false
|
||||
var responseCount = 0
|
||||
proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) =
|
||||
responseCount += 1
|
||||
check msg in msgs
|
||||
check psTopic == pubsubTopic
|
||||
|
||||
# check the correct retrieval of versions
|
||||
if msg.version == uint32(0): v0Flag = true
|
||||
if msg.version == uint32(1): v1Flag = true
|
||||
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
|
||||
if msg.version == high(uint32): vMaxFlag = true
|
||||
|
||||
|
||||
let res = store.getAll(data)
|
||||
|
||||
check:
|
||||
res.isErr == false
|
||||
responseCount == 3
|
||||
v0Flag == true
|
||||
v1Flag == true
|
||||
vMaxFlag == true
|
||||
|
||||
|
||||
|
|
|
@ -34,7 +34,8 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
|
|||
timestamp INTEGER NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
pubsubTopic BLOB NOT NULL,
|
||||
payload BLOB
|
||||
payload BLOB,
|
||||
version INTEGER NOT NULL
|
||||
) WITHOUT ROWID;
|
||||
""", NoParams, void)
|
||||
|
||||
|
@ -58,15 +59,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
|
|||
## echo "error"
|
||||
##
|
||||
let prepare = db.database.prepareStmt(
|
||||
"INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic) VALUES (?, ?, ?, ?, ?);",
|
||||
(seq[byte], int64, seq[byte], seq[byte], seq[byte]),
|
||||
"INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic, version) VALUES (?, ?, ?, ?, ?, ?);",
|
||||
(seq[byte], int64, seq[byte], seq[byte], seq[byte], int64),
|
||||
void
|
||||
)
|
||||
|
||||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes()))
|
||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version)))
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
@ -95,13 +96,15 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||
l = sqlite3_column_bytes(s, 2)
|
||||
pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
|
||||
pubsubTopicL = sqlite3_column_bytes(s,3)
|
||||
version = sqlite3_column_int64(s, 4)
|
||||
|
||||
# TODO retrieve the version number
|
||||
onData(uint64(timestamp),
|
||||
WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))),
|
||||
payload: @(toOpenArray(p, 0, l-1))),
|
||||
payload: @(toOpenArray(p, 0, l-1)), version: uint32(version)),
|
||||
string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1))))
|
||||
|
||||
let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg)
|
||||
let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic, version FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg)
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
|
Loading…
Reference in New Issue