From 7c5df3379b42f1e5ddad66e84e152aec6ebdf10d Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Wed, 5 May 2021 15:51:36 -0700 Subject: [PATCH] Storing waku message version inside the store db (#530) * WIP * stores version in db, edits error messages * fixes a bug * reverts edit messages * converts uint32 storage to int64 * unit tests for various version numbers * minor comments edit * removes debugging messages * updates changelog --- CHANGELOG.md | 1 + tests/v2/test_message_store.nim | 20 +++++++++++++++---- .../storage/message/waku_message_store.nim | 15 ++++++++------ 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69e76954f..4629f3987 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - Fixed: content filtering now works on any PubSub topic and not just the `waku` default. - Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`. - Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed. +- Added a new column of `version` to the `Message` table of the message store db. - Fix: allow mounting light protocols without `relay` - Add `keep-alive` option to maintain stable connection to `relay` peers on idle topics diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 1360c34f6..365646abf 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -16,9 +16,9 @@ suite "Message Store": pubsubTopic = "/waku/2/default-waku/proto" var msgs = @[ - WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic), - WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic), - WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic), + WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0)), + WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1)), + WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32)), ] defer: store.close() @@ -27,14 +27,26 @@ suite "Message Store": let output = store.put(computeIndex(msg), msg, pubsubTopic) check output.isOk + var v0Flag, v1Flag, vMaxFlag: bool = false var responseCount = 0 proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) = responseCount += 1 check msg in msgs check psTopic == pubsubTopic - + # check the correct retrieval of versions + if msg.version == uint32(0): v0Flag = true + if msg.version == uint32(1): v1Flag = true + # high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage + if msg.version == high(uint32): vMaxFlag = true + + let res = store.getAll(data) check: res.isErr == false responseCount == 3 + v0Flag == true + v1Flag == true + vMaxFlag == true + + diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 359a55595..78cf0c8fb 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -34,7 +34,8 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] timestamp INTEGER NOT NULL, contentTopic BLOB NOT NULL, pubsubTopic BLOB NOT NULL, - payload BLOB + payload BLOB, + version INTEGER NOT NULL ) WITHOUT ROWID; """, NoParams, void) @@ -58,15 +59,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop ## echo "error" ## let prepare = db.database.prepareStmt( - "INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic) VALUES (?, ?, ?, ?, ?);", - (seq[byte], int64, seq[byte], seq[byte], seq[byte]), + "INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic, version) VALUES (?, ?, ?, ?, ?, ?);", + (seq[byte], int64, seq[byte], seq[byte], seq[byte], int64), void ) if prepare.isErr: return err("failed to prepare") - let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes())) + let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version))) if res.isErr: return err("failed") @@ -95,13 +96,15 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto l = sqlite3_column_bytes(s, 2) pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3)) pubsubTopicL = sqlite3_column_bytes(s,3) + version = sqlite3_column_int64(s, 4) + # TODO retrieve the version number onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))), - payload: @(toOpenArray(p, 0, l-1))), + payload: @(toOpenArray(p, 0, l-1)), version: uint32(version)), string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1)))) - let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg) + let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic, version FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg) if res.isErr: return err("failed")