From 1b0e0655b0587f2c5fa74a67de6b31bccd2e3f85 Mon Sep 17 00:00:00 2001 From: ABresting Date: Wed, 25 Oct 2023 18:48:41 +0200 Subject: [PATCH] feat: messageHash attaribute added in SQLite + migration script ready + testcase --- .../00008_updatePrimaryKey_rm_id.up.sql | 18 ++++++++ tests/waku_archive/test_driver_postgres.nim | 6 +-- .../test_driver_postgres_query.nim | 2 +- tests/waku_archive/test_driver_queue.nim | 2 +- .../waku_archive/test_driver_queue_index.nim | 30 ++++++------ .../test_driver_queue_pagination.nim | 6 +-- .../waku_archive/test_driver_queue_query.nim | 2 +- tests/waku_archive/test_driver_sqlite.nim | 2 +- .../waku_archive/test_driver_sqlite_query.nim | 46 ++++++++++++++++++- tests/waku_archive/test_retention_policy.nim | 2 +- tests/waku_archive/test_waku_archive.nim | 2 +- tests/waku_store/test_resume.nim | 8 ++-- tests/waku_store/test_wakunode_store.nim | 6 +-- tests/wakunode_rest/test_rest_store.nim | 10 ++-- waku/node/waku_node.nim | 4 +- waku/waku_api/rest/store/client.nim | 2 +- waku/waku_api/rest/store/handlers.nim | 14 +++--- waku/waku_api/rest/store/types.nim | 22 ++++----- waku/waku_archive/archive.nim | 10 ++-- waku/waku_archive/common.nim | 2 +- .../postgres_driver/postgres_driver.nim | 12 ++--- .../driver/queue_driver/index.nim | 14 +++--- .../driver/queue_driver/queue_driver.nim | 6 +-- .../driver/sqlite_driver/cursor.nim | 2 +- .../driver/sqlite_driver/migrations.nim | 12 ++--- .../driver/sqlite_driver/queries.nim | 36 +++++++-------- .../driver/sqlite_driver/sqlite_driver.nim | 2 +- waku/waku_store/client.nim | 4 +- waku/waku_store/common.nim | 8 ++-- waku/waku_store/rpc.nim | 12 ++--- waku/waku_store/rpc_codec.nim | 10 ++-- 31 files changed, 189 insertions(+), 125 deletions(-) create mode 100644 migrations/message_store/00008_updatePrimaryKey_rm_id.up.sql diff --git a/migrations/message_store/00008_updatePrimaryKey_rm_id.up.sql b/migrations/message_store/00008_updatePrimaryKey_rm_id.up.sql new file mode 100644 index 000000000..fd8c4aded --- /dev/null +++ b/migrations/message_store/00008_updatePrimaryKey_rm_id.up.sql @@ -0,0 +1,18 @@ +ALTER TABLE message RENAME TO message_backup; + +CREATE TABLE IF NOT EXISTS message( + pubsubTopic BLOB NOT NULL, + contentTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + messageHash BLOB, + storedAt INTEGER NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash) +) WITHOUT ROWID; + +INSERT OR IGNORE INTO message(pubsubTopic, contentTopic, payload, version, timestamp, messageHash, storedAt) + SELECT pubsubTopic, contentTopic, payload, version, timestamp, id, storedAt + FROM message_backup; + +DROP TABLE message_backup; \ No newline at end of file diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 149c43aa0..8f90dc86e 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -19,7 +19,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) suite "Postgres driver": @@ -87,10 +87,10 @@ suite "Postgres driver": require: storedMsg.len == 1 storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, actualMsg, digest, storeTimestamp) = item + let (pubsubTopic, actualMsg, messageHash, storeTimestamp) = item actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and - toHex(computedDigest.data) == toHex(digest) and + toHex(computedDigest.data) == toHex(messageHash) and toHex(actualMsg.payload) == toHex(msg.payload) (await driver.close()).expect("driver to close") diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index ea174eab0..ea043815e 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -37,7 +37,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) suite "Postgres driver - query by content topic": diff --git a/tests/waku_archive/test_driver_queue.nim b/tests/waku_archive/test_driver_queue.nim index 852697ea0..ca2560559 100644 --- a/tests/waku_archive/test_driver_queue.nim +++ b/tests/waku_archive/test_driver_queue.nim @@ -23,7 +23,7 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage = cursor = Index( receiverTime: Timestamp(i), senderTime: Timestamp(i), - digest: MessageDigest(data: data), + messageHash: MessageDigest(data: data), pubsubTopic: "test-pubsub-topic" ) diff --git a/tests/waku_archive/test_driver_queue_index.nim b/tests/waku_archive/test_driver_queue_index.nim index d5b113646..3fcee3190 100644 --- a/tests/waku_archive/test_driver_queue_index.nim +++ b/tests/waku_archive/test_driver_queue_index.nim @@ -31,44 +31,44 @@ suite "Queue Driver - index": ## Test vars let - smallIndex1 = Index(digest: hashFromStr("1234"), + smallIndex1 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(1000)) - smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime + smallIndex2 = Index(messageHash: hashFromStr("1234567"), # messageHash is less significant than senderTime receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(1000)) - largeIndex1 = Index(digest: hashFromStr("1234"), + largeIndex1 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1 - largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1 + largeIndex2 = Index(messageHash: hashFromStr("12345"), # only messageHash differs from smallIndex1 receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(1000)) - eqIndex1 = Index(digest: hashFromStr("0003"), + eqIndex1 = Index(messageHash: hashFromStr("0003"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(54321)) - eqIndex2 = Index(digest: hashFromStr("0003"), + eqIndex2 = Index(messageHash: hashFromStr("0003"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(54321)) - eqIndex3 = Index(digest: hashFromStr("0003"), + eqIndex3 = Index(messageHash: hashFromStr("0003"), receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons senderTime: getNanosecondTime(54321)) - diffPsTopic = Index(digest: hashFromStr("1234"), + diffPsTopic = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(1000), pubsubTopic: "zzzz") - noSenderTime1 = Index(digest: hashFromStr("1234"), + noSenderTime1 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(1100), senderTime: getNanosecondTime(0), pubsubTopic: "zzzz") - noSenderTime2 = Index(digest: hashFromStr("1234"), + noSenderTime2 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(10000), senderTime: getNanosecondTime(0), pubsubTopic: "zzzz") - noSenderTime3 = Index(digest: hashFromStr("1234"), + noSenderTime3 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(1200), senderTime: getNanosecondTime(0), pubsubTopic: "aaaa") - noSenderTime4 = Index(digest: hashFromStr("0"), + noSenderTime4 = Index(messageHash: hashFromStr("0"), receiverTime: getNanosecondTime(1200), senderTime: getNanosecondTime(0), pubsubTopic: "zzzz") @@ -156,8 +156,8 @@ suite "Queue Driver - index": ## Then check: - index.digest.data.len != 0 - index.digest.data.len == 32 # sha2 output length in bytes + index.messageHash.data.len != 0 + index.messageHash.data.len == 32 # sha2 output length in bytes index.receiverTime == ts2 # the receiver timestamp should be a non-zero value index.senderTime == ts index.pubsubTopic == DefaultContentTopic @@ -177,4 +177,4 @@ suite "Queue Driver - index": ## Then check: - index1.digest == index2.digest + index1.messageHash == index2.messageHash diff --git a/tests/waku_archive/test_driver_queue_pagination.nim b/tests/waku_archive/test_driver_queue_pagination.nim index e073c1f45..05bf70236 100644 --- a/tests/waku_archive/test_driver_queue_pagination.nim +++ b/tests/waku_archive/test_driver_queue_pagination.nim @@ -25,7 +25,7 @@ proc getTestQueueDriver(numMessages: int): QueueDriver = index: Index( receiverTime: Timestamp(i), senderTime: Timestamp(i), - digest: MessageDigest(data: data) + messageHash: MessageDigest(data: data) ) ) discard testQueueDriver.add(msg) @@ -156,7 +156,7 @@ procSuite "Queue driver - pagination": pubsubTopic: DefaultPubsubTopic, senderTime: msg.timestamp, storeTime: msg.timestamp, - digest: computeDigest(msg, DefaultPubsubTopic) + messageHash: computeDigest(msg, DefaultPubsubTopic) ).toIndex() let @@ -337,7 +337,7 @@ procSuite "Queue driver - pagination": pubsubTopic: DefaultPubsubTopic, senderTime: msg.timestamp, storeTime: msg.timestamp, - digest: computeDigest(msg, DefaultPubsubTopic) + messageHash: computeDigest(msg, DefaultPubsubTopic) ).toIndex() let diff --git a/tests/waku_archive/test_driver_queue_query.nim b/tests/waku_archive/test_driver_queue_query.nim index 1d7294c2f..89c40d605 100644 --- a/tests/waku_archive/test_driver_queue_query.nim +++ b/tests/waku_archive/test_driver_queue_query.nim @@ -29,7 +29,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) diff --git a/tests/waku_archive/test_driver_sqlite.nim b/tests/waku_archive/test_driver_sqlite.nim index 0fae560e0..d60fdc5d4 100644 --- a/tests/waku_archive/test_driver_sqlite.nim +++ b/tests/waku_archive/test_driver_sqlite.nim @@ -60,7 +60,7 @@ suite "SQLite driver": check: storedMsg.len == 1 storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, msg, digest, storeTimestamp) = item + let (pubsubTopic, msg, messageHash, storeTimestamp) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic diff --git a/tests/waku_archive/test_driver_sqlite_query.nim b/tests/waku_archive/test_driver_sqlite_query.nim index 2ed20f0fa..c7e2bc383 100644 --- a/tests/waku_archive/test_driver_sqlite_query.nim +++ b/tests/waku_archive/test_driver_sqlite_query.nim @@ -33,7 +33,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) @@ -423,6 +423,50 @@ suite "SQLite driver - query by pubsub topic": ## Cleanup (await driver.close()).expect("driver to close") + + asyncTest "pubSubTopic messageHash match": + ## Given + const pubsubTopic1 = "test-pubsub-topic1" + const pubsubTopic2 = "test-pubsub-topic2" + # take 2 variables to hold the message hashes + var msgHash1: seq[byte] + var msgHash2: seq[byte] + + let driver = newTestSqliteDriver() + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(1)) + putFutures.add(driver.put(pubsubTopic1, msg1, computeDigest(msg1, pubsubTopic1), msg1.timestamp)) + + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(2)) + putFutures.add(driver.put(pubsubTopic2, msg2, computeDigest(msg2, pubsubTopic2), msg2.timestamp)) + + discard waitFor allFinished(putFutures) + + # get the messages from the database + let storedMsg = (waitFor driver.getAllMessages()).tryGet() + + check: + # there needs to be two messages + storedMsg.len > 0 + storedMsg.len == 2 + + # get the individual messages and message hash values + @[storedMsg[0]].all do (item1: auto) -> bool: + let (gotPubsubTopic1, gotMsg1, messageHash1, timestamp1) = item1 + msgHash1 = messageHash1 + true + + @[storedMsg[1]].all do (item2: auto) -> bool: + let (gotPubsubTopic2, gotMsg2, messageHash2, timestamp2) = item2 + msgHash2 = messageHash2 + true + + # compare of the messge hashes, given the context, they should be different + msgHash1 != msgHash2 + + ## Cleanup + (await driver.close()).expect("driver to close") suite "SQLite driver - query by cursor": diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 1e6bab461..834137a80 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -147,7 +147,7 @@ suite "Waku Archive - Retention policy": check: storedMsg.len == capacity storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, msg, digest, storeTimestamp) = item + let (pubsubTopic, msg, messageHash, storeTimestamp) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic diff --git a/tests/waku_archive/test_waku_archive.nim b/tests/waku_archive/test_waku_archive.nim index 681c93c06..ac671303d 100644 --- a/tests/waku_archive/test_waku_archive.nim +++ b/tests/waku_archive/test_waku_archive.nim @@ -30,7 +30,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) diff --git a/tests/waku_store/test_resume.nim b/tests/waku_store/test_resume.nim index ea918ce3c..1a4bfbef5 100644 --- a/tests/waku_store/test_resume.nim +++ b/tests/waku_store/test_resume.nim @@ -11,7 +11,7 @@ import ../../waku/node/message_store/sqlite_store, ../../waku/node/peer_manager, ../../waku/waku_core, - ../../waku/waku_store, + ../../waku/waku_store ./testlib/common, ./testlib/switch @@ -58,7 +58,7 @@ procSuite "Waku Store - resume store": ] for msg in msgList: - require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp).isOk() store @@ -76,7 +76,7 @@ procSuite "Waku Store - resume store": ] for msg in msgList2: - require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp).isOk() store @@ -272,7 +272,7 @@ suite "WakuNode - waku store": # Insert the same message in both node's store let receivedTime3 = now() + getNanosecondTime(10) - digest3 = computeDigest(msg3) + digest3 = computeDigest(msg3, DefaultPubsubTopic) require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk() require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk() diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index 9fbe234bd..9bfd0c66e 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -34,7 +34,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: waku_archive.computeDigest(message, pubsubTopic) + messageHash: waku_archive.computeDigest(message, pubsubTopic) ) procSuite "WakuNode - Store": @@ -57,8 +57,8 @@ procSuite "WakuNode - Store": let driver = newTestArchiveDriver() for msg in msgListA: - let msg_digest = waku_archive.computeDigest(msg, DefaultPubsubTopic) - require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk() + let msg_hash = waku_archive.computeDigest(msg, DefaultPubsubTopic) + require (waitFor driver.put(DefaultPubsubTopic, msg, msg_hash, msg.timestamp)).isOk() driver diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index cf1109ff9..994f871d8 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -32,11 +32,11 @@ logScope: proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] = let - digest = waku_archive.computeDigest(message, pubsubTopic) + messageHash = waku_archive.computeDigest(message, pubsubTopic) receivedTime = if message.timestamp > 0: message.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - store.put(pubsubTopic, message, digest, receivedTime) + store.put(pubsubTopic, message, messageHash, receivedTime) # Creates a new WakuNode proc testWakuNode(): WakuNode = @@ -60,7 +60,7 @@ procSuite "Waku v2 Rest API - Store": payload: @[byte('H'), byte('i'), byte('!')] ) - let messageDigest = waku_store.computeDigest(wakuMsg) + let messageDigest = waku_store.computeDigest(wakuMsg, DefaultPubsubTopic) let restMsgDigest = some(messageDigest.toRestStringMessageDigest()) let parsedMsgDigest = restMsgDigest.parseMsgDigest().value @@ -129,7 +129,7 @@ procSuite "Waku v2 Rest API - Store": "6", # end time "", # sender time "", # store time - "", # base64-encoded digest + "", # base64-encoded messageHash "", # empty implies default page size "true" # ascending ) @@ -224,7 +224,7 @@ procSuite "Waku v2 Rest API - Store": # populate the cursor for next page if response.data.cursor.isSome(): reqPubsubTopic = response.data.cursor.get().pubsubTopic - reqDigest = response.data.cursor.get().digest + reqDigest = response.data.cursor.get().messageHash reqSenderTime = response.data.cursor.get().senderTime reqStoreTime = response.data.cursor.get().storeTime diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 83207024e..71f864cbe 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -769,7 +769,7 @@ proc toArchiveQuery(request: HistoryQuery): ArchiveQuery = ArchiveQuery( pubsubTopic: request.pubsubTopic, contentTopics: request.contentTopics, - cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)), + cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, messageHash: cursor.messageHash)), startTime: request.startTime, endTime: request.endTime, pageSize: request.pageSize.uint, @@ -793,7 +793,7 @@ proc toHistoryResult*(res: ArchiveResult): HistoryResult = let response = res.get() ok(HistoryResponse( messages: response.messages, - cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)), + cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, messageHash: cursor.messageHash)), )) proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} = diff --git a/waku/waku_api/rest/store/client.nim b/waku/waku_api/rest/store/client.nim index 71babc8e1..24a3dc2f3 100644 --- a/waku/waku_api/rest/store/client.nim +++ b/waku/waku_api/rest/store/client.nim @@ -60,7 +60,7 @@ proc getStoreMessagesV1*( # Optional cursor fields senderTime: string = "", storeTime: string = "", - digest: string = "", # base64-encoded digest + messageHash: string = "", # base64-encoded digest pageSize: string = "", ascending: string = "" diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index 58b31f4fb..8eaa3f1c8 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -76,7 +76,7 @@ proc parseTime(input: Option[string]): proc parseCursor(parsedPubsubTopic: Option[string], senderTime: Option[string], storeTime: Option[string], - digest: Option[string]): + messageHash: Option[string]): Result[Option[HistoryCursor], string] = # Parse sender time @@ -90,7 +90,7 @@ proc parseCursor(parsedPubsubTopic: Option[string], return err(parsedStoreTime.error) # Parse message digest - let parsedMsgDigest = parseMsgDigest(digest) + let parsedMsgDigest = parseMsgDigest(messageHash) if not parsedMsgDigest.isOk(): return err(parsedMsgDigest.error) @@ -105,7 +105,7 @@ proc parseCursor(parsedPubsubTopic: Option[string], pubsubTopic: parsedPubsubTopic.get(), senderTime: parsedSenderTime.value.get(), storeTime: parsedStoreTime.value.get(), - digest: parsedMsgDigest.value.get()) + messageHash: parsedMsgDigest.value.get()) )) else: return ok(none(HistoryCursor)) @@ -115,7 +115,7 @@ proc createHistoryQuery(pubsubTopic: Option[string], contentTopics: Option[string], senderTime: Option[string], storeTime: Option[string], - digest: Option[string], + messageHash: Option[string], startTime: Option[string], endTime: Option[string], pageSize: Option[string], @@ -142,7 +142,7 @@ proc createHistoryQuery(pubsubTopic: Option[string], let parsedCursor = ? parseCursor(parsedPubsubTopic, senderTime, storeTime, - digest) + messageHash) # Parse page size field var parsedPagedSize = DefaultPageSize @@ -195,7 +195,7 @@ proc installStoreV1Handler(router: var RestRouter, contentTopics: Option[string], senderTime: Option[string], storeTime: Option[string], - digest: Option[string], + messageHash: Option[string], startTime: Option[string], endTime: Option[string], pageSize: Option[string], @@ -228,7 +228,7 @@ proc installStoreV1Handler(router: var RestRouter, contentTopics.toOpt(), senderTime.toOpt(), storeTime.toOpt(), - digest.toOpt(), + messageHash.toOpt(), startTime.toOpt(), endTime.toOpt(), pageSize.toOpt(), diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index 92da532a9..78b5a3d94 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -24,7 +24,7 @@ type pubsubTopic*: PubsubTopic senderTime*: Timestamp storeTime*: Timestamp - digest*: MessageDigest + messageHash*: MessageDigest StoreRequestRest* = object # inspired by https://github.com/waku-org/nwaku/blob/f95147f5b7edfd45f914586f2d41cd18fb0e0d18/waku/v2//waku_store/common.nim#L52 @@ -119,7 +119,7 @@ proc toStoreResponseRest*(histResp: HistoryResponse): StoreResponseRest = pubsubTopic: histResp.cursor.get().pubsubTopic, senderTime: histResp.cursor.get().senderTime, storeTime: histResp.cursor.get().storeTime, - digest: histResp.cursor.get().digest + messageHash: histResp.cursor.get().messageHash )) StoreResponseRest( @@ -247,7 +247,7 @@ proc writeValue*(writer: var JsonWriter[RestJson], writer.writeField("pubsub_topic", value.pubsubTopic) writer.writeField("sender_time", value.senderTime) writer.writeField("store_time", value.storeTime) - writer.writeField("digest", value.digest) + writer.writeField("messageHash", value.messageHash) writer.endRecord() proc readValue*(reader: var JsonReader[RestJson], @@ -257,7 +257,7 @@ proc readValue*(reader: var JsonReader[RestJson], pubsubTopic = none(PubsubTopic) senderTime = none(Timestamp) storeTime = none(Timestamp) - digest = none(MessageDigest) + messageHash = none(MessageDigest) for fieldName in readObjectFields(reader): case fieldName @@ -273,10 +273,10 @@ proc readValue*(reader: var JsonReader[RestJson], if storeTime.isSome(): reader.raiseUnexpectedField("Multiple `store_time` fields found", "HistoryCursorRest") storeTime = some(reader.readValue(Timestamp)) - of "digest": - if digest.isSome(): - reader.raiseUnexpectedField("Multiple `digest` fields found", "HistoryCursorRest") - digest = some(reader.readValue(MessageDigest)) + of "messageHash": + if messageHash.isSome(): + reader.raiseUnexpectedField("Multiple `messageHash` fields found", "HistoryCursorRest") + messageHash = some(reader.readValue(MessageDigest)) else: reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) @@ -289,14 +289,14 @@ proc readValue*(reader: var JsonReader[RestJson], if storeTime.isNone(): reader.raiseUnexpectedValue("Field `store_time` is missing") - if digest.isNone(): - reader.raiseUnexpectedValue("Field `digest` is missing") + if messageHash.isNone(): + reader.raiseUnexpectedValue("Field `messageHash` is missing") value = HistoryCursorRest( pubsubTopic: pubsubTopic.get(), senderTime: senderTime.get(), storeTime: storeTime.get(), - digest: digest.get() + messageHash: messageHash.get() ) ## End of HistoryCursorRest serde diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 8d236caf7..3513376f8 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -104,7 +104,7 @@ proc handleMessage*(w: WakuArchive, msgReceivedTime = if msg.timestamp > 0: msg.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest + trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, messageHash=msgDigest let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime) if putRes.isErr(): @@ -163,13 +163,13 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] { ## Build last message cursor ## The cursor is built from the last message INCLUDED in the response ## (i.e. the second last message in the rows list) - let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] + let (pubsubTopic, message, messageHash, storeTimestamp) = rows[^2] # TODO: Improve coherence of MessageDigest type let messageDigest = block: var data: array[32, byte] - for i in 0.." else: "<" statements.add("(storedAt, id) " & comp & " (?,?)") args.add($cursor.get().storeTime) - args.add(toHex(cursor.get().digest.data)) + args.add(toHex(cursor.get().messageHash.data)) if startTime.isSome(): statements.add("storedAt >= ?") diff --git a/waku/waku_archive/driver/queue_driver/index.nim b/waku/waku_archive/driver/queue_driver/index.nim index e04e0246d..badc8e54c 100644 --- a/waku/waku_archive/driver/queue_driver/index.nim +++ b/waku/waku_archive/driver/queue_driver/index.nim @@ -16,19 +16,19 @@ type Index* = object pubsubTopic*: string senderTime*: Timestamp # the time at which the message is generated receiverTime*: Timestamp - digest*: MessageDigest # calculated over payload and content topic + messageHash*: MessageDigest # calculated over payload and content topic proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T = ## Takes a WakuMessage with received timestamp and returns its Index. let - digest = computeDigest(msg, pubsubTopic) + messageHash = computeDigest(msg, pubsubTopic) senderTime = msg.timestamp Index( pubsubTopic: pubsubTopic, senderTime: senderTime, receiverTime: receivedTime, - digest: digest + messageHash: messageHash ) @@ -37,7 +37,7 @@ proc tohistoryCursor*(index: Index): ArchiveCursor = pubsubTopic: index.pubsubTopic, senderTime: index.senderTime, storeTime: index.receiverTime, - digest: index.digest + messageHash: index.messageHash ) proc toIndex*(index: ArchiveCursor): Index = @@ -45,14 +45,14 @@ proc toIndex*(index: ArchiveCursor): Index = pubsubTopic: index.pubsubTopic, senderTime: index.senderTime, receiverTime: index.storeTime, - digest: index.digest + messageHash: index.messageHash ) proc `==`*(x, y: Index): bool = ## receiverTime plays no role in index equality (x.senderTime == y.senderTime) and - (x.digest == y.digest) and + (x.messageHash == y.messageHash) and (x.pubsubTopic == y.pubsubTopic) proc cmp*(x, y: Index): int = @@ -84,7 +84,7 @@ proc cmp*(x, y: Index): int = return timecmp # Continue only when timestamps are equal - let digestcmp = cmp(x.digest.data, y.digest.data) + let digestcmp = cmp(x.messageHash.data, y.messageHash.data) if digestcmp != 0: return digestcmp diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index cce7d895b..2e3be9829 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -138,7 +138,7 @@ proc getPage(driver: QueueDriver, numberOfItems += 1 - outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime)) + outSeq.add((key.pubsubTopic, data.msg, @(key.messageHash.data), key.receiverTime)) currentEntry = if forward: w.next() else: w.prev() @@ -227,10 +227,10 @@ proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[voi method put*(driver: QueueDriver, pubsubTopic: PubsubTopic, message: WakuMessage, - digest: MessageDigest, + messageHash: MessageDigest, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = - let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) + let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, messageHash: messageHash) let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic) return driver.add(message) diff --git a/waku/waku_archive/driver/sqlite_driver/cursor.nim b/waku/waku_archive/driver/sqlite_driver/cursor.nim index 9b4d00fd9..a5258f22b 100644 --- a/waku/waku_archive/driver/sqlite_driver/cursor.nim +++ b/waku/waku_archive/driver/sqlite_driver/cursor.nim @@ -10,4 +10,4 @@ import type DbCursor* = (Timestamp, seq[byte], PubsubTopic) -proc toDbCursor*(c: ArchiveCursor): DbCursor = (c.storeTime, @(c.digest.data), c.pubsubTopic) +proc toDbCursor*(c: ArchiveCursor): DbCursor = (c.storeTime, @(c.messageHash.data), c.pubsubTopic) diff --git a/waku/waku_archive/driver/sqlite_driver/migrations.nim b/waku/waku_archive/driver/sqlite_driver/migrations.nim index 0aa925fda..8d64c7908 100644 --- a/waku/waku_archive/driver/sqlite_driver/migrations.nim +++ b/waku/waku_archive/driver/sqlite_driver/migrations.nim @@ -14,12 +14,12 @@ logScope: topics = "waku archive migration" -const SchemaVersion* = 7 # increase this when there is an update in the database schema +const SchemaVersion* = 8 # increase this when there is an update in the database schema template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store" -proc isSchemaVersion7*(db: SqliteDatabase): DatabaseResult[bool] = +proc isSchemaVersion8*(db: SqliteDatabase): DatabaseResult[bool] = ## Temporary proc created to analyse when the table actually belongs to the SchemaVersion 7. ## ## During many nwaku versions, 0.14.0 until 0.18.0, the SchemaVersion wasn't set or checked. @@ -48,7 +48,7 @@ proc isSchemaVersion7*(db: SqliteDatabase): DatabaseResult[bool] = return ok(true) else: - info "Not considered schema version 7" + info "Not considered schema version 8" ok(false) proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] = @@ -63,12 +63,12 @@ proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult debug "starting message store's sqlite database migration" let userVersion = ? db.getUserVersion() - let isSchemaVersion7 = ? db.isSchemaVersion7() + let isSchemaVersion8 = ? db.isSchemaVersion8() - if userVersion == 0'i64 and isSchemaVersion7: + if userVersion == 0'i64 and isSchemaVersion8: info "We found user_version 0 but the database schema reflects the user_version 7" ## Force the correct schema version - ? db.setUserVersion( 7 ) + ? db.setUserVersion( 8 ) let migrationRes = migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath) if migrationRes.isErr(): diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index 27ab4de61..5e3c893e1 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -70,9 +70,9 @@ proc createTableQuery(table: string): SqlQueryStr = " payload BLOB," & " version INTEGER NOT NULL," & " timestamp INTEGER NOT NULL," & - " id BLOB," & + " messageHash BLOB," & " storedAt INTEGER NOT NULL," & - " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & + " CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash)" & ") WITHOUT ROWID;" proc createTable*(db: SqliteDatabase): DatabaseResult[void] = @@ -93,7 +93,7 @@ proc createOldestMessageTimestampIndex*(db: SqliteDatabase): proc createHistoryQueryIndexQuery(table: string): SqlQueryStr = - "CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);" + "CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, messageHash);" proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = let query = createHistoryQueryIndexQuery(DbTable) @@ -105,7 +105,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) proc insertMessageQuery(table: string): SqlQueryStr = - "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & + "INSERT INTO " & table & "(messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & " VALUES (?, ?, ?, ?, ?, ?, ?);" proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = @@ -181,9 +181,9 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): ## Delete oldest messages not within limit proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr = - "DELETE FROM " & table & " WHERE (storedAt, id, pubsubTopic) NOT IN (" & - " SELECT storedAt, id, pubsubTopic FROM " & table & - " ORDER BY storedAt DESC, id DESC" & + "DELETE FROM " & table & " WHERE (storedAt, messageHash, pubsubTopic) NOT IN (" & + " SELECT storedAt, messageHash, pubsubTopic FROM " & table & + " ORDER BY storedAt DESC, messageHash DESC" & " LIMIT " & $limit & ");" @@ -197,7 +197,7 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): ## Select all messages proc selectAllMessagesQuery(table: string): SqlQueryStr = - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" & + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, messageHash" & " FROM " & table & " ORDER BY storedAt ASC" @@ -211,10 +211,10 @@ proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) - digest = queryRowDigestCallback(s, digestCol=6) + messageHash = queryRowDigestCallback(s, digestCol=6) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - rows.add((pubsubTopic, wakuMessage, digest, storedAt)) + rows.add((pubsubTopic, wakuMessage, messageHash, storedAt)) let query = selectAllMessagesQuery(DbTable) let res = db.query(query, queryRowCallback) @@ -246,7 +246,7 @@ proc whereClause(cursor: Option[DbCursor], none(string) else: let comp = if ascending: ">" else: "<" - some("(storedAt, id) " & comp & " (?, ?)") + some("(storedAt, messageHash) " & comp & " (?, ?)") let pubsubTopicClause = if pubsubTopic.isNone(): none(string) @@ -280,13 +280,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u var query: string - query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" + query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, messageHash" query &= " FROM " & table if where.isSome(): query &= " WHERE " & where.get() - query &= " ORDER BY storedAt " & order & ", id " & order + query &= " ORDER BY storedAt " & order & ", messageHash " & order query &= " LIMIT " & $limit & ";" query @@ -308,11 +308,11 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt, # Bind params var paramIndex = 1 - if cursor.isSome(): # cursor = storedAt, id, pubsubTopic - let (storedAt, id, _) = cursor.get() + if cursor.isSome(): # cursor = storedAt, messageHash, pubsubTopic + let (storedAt, messageHash, _) = cursor.get() checkErr bindParam(s, paramIndex, storedAt) paramIndex += 1 - checkErr bindParam(s, paramIndex, id) + checkErr bindParam(s, paramIndex, messageHash) paramIndex += 1 if pubsubTopic.isSome(): @@ -369,10 +369,10 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) - digest = queryRowDigestCallback(s, digestCol=6) + messageHash = queryRowDigestCallback(s, digestCol=6) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - messages.add((pubsubTopic, message, digest, storedAt)) + messages.add((pubsubTopic, message, messageHash, storedAt)) let query = block: let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending) diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 53da379b1..c519941f5 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -65,7 +65,7 @@ method put*(s: SqliteDriver, Future[ArchiveDriverResult[void]] {.async.} = ## Inserts a message into the store let res = s.insertStmt.exec(( - @(digest.data), # id + @(digest.data), # messageHash receivedTime, # storedAt toBytes(message.contentTopic), # contentTopic message.payload, # payload diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 1c3ec5a41..da9deec34 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -153,11 +153,11 @@ when defined(waku_exp_store_resume): proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = let - digest = waku_archive.computeDigest(message, pubsubTopic) + messageHash = waku_archive.computeDigest(message, pubsubTopic) receivedTime = if message.timestamp > 0: message.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - store.put(pubsubTopic, message, digest, receivedTime) + store.put(pubsubTopic, message, messageHash, receivedTime) proc resume*(w: WakuStoreClient, peerList = none(seq[RemotePeerInfo]), diff --git a/waku/waku_store/common.nim b/waku/waku_store/common.nim index edd86ea4b..64eb935a3 100644 --- a/waku/waku_store/common.nim +++ b/waku/waku_store/common.nim @@ -27,13 +27,15 @@ type WakuStoreResult*[T] = Result[T, string] type MessageDigest* = MDigest[256] -proc computeDigest*(msg: WakuMessage): MessageDigest = +proc computeDigest*(msg: WakuMessage, pubSubTopic: string): MessageDigest = var ctx: sha256 ctx.init() defer: ctx.clear() - ctx.update(msg.contentTopic.toBytes()) + ctx.update(pubSubTopic.toBytes()) ctx.update(msg.payload) + ctx.update(msg.contentTopic.toBytes()) + ctx.update(msg.meta) # Computes the hash return ctx.finish() @@ -46,7 +48,7 @@ type pubsubTopic*: PubsubTopic senderTime*: Timestamp storeTime*: Timestamp - digest*: MessageDigest + messageHash*: MessageDigest HistoryQuery* = object pubsubTopic*: Option[PubsubTopic] diff --git a/waku/waku_store/rpc.nim b/waku/waku_store/rpc.nim index c0b105c7b..0d54c6cbe 100644 --- a/waku/waku_store/rpc.nim +++ b/waku/waku_store/rpc.nim @@ -18,25 +18,25 @@ type PagingIndexRPC* = object pubsubTopic*: PubsubTopic senderTime*: Timestamp # the time at which the message is generated receiverTime*: Timestamp - digest*: MessageDigest # calculated over payload and content topic + messageHash*: MessageDigest # calculated over payload and content topic proc `==`*(x, y: PagingIndexRPC): bool = ## receiverTime plays no role in index equality (x.senderTime == y.senderTime) and - (x.digest == y.digest) and + (x.messageHash == y.messageHash) and (x.pubsubTopic == y.pubsubTopic) proc compute*(T: type PagingIndexRPC, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T = ## Takes a WakuMessage with received timestamp and returns its Index. let - digest = computeDigest(msg) + digest = computeDigest(msg, pubsubTopic) senderTime = msg.timestamp PagingIndexRPC( pubsubTopic: pubsubTopic, senderTime: senderTime, receiverTime: receivedTime, - digest: digest + messageHash: digest ) @@ -98,7 +98,7 @@ proc toRPC*(cursor: HistoryCursor): PagingIndexRPC {.gcsafe.}= pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, receiverTime: cursor.storeTime, - digest: cursor.digest + messageHash: cursor.messageHash ) proc toAPI*(rpc: PagingIndexRPC): HistoryCursor = @@ -106,7 +106,7 @@ proc toAPI*(rpc: PagingIndexRPC): HistoryCursor = pubsubTopic: rpc.pubsubTopic, senderTime: rpc.senderTime, storeTime: rpc.receiverTime, - digest: rpc.digest + messageHash: rpc.messageHash ) diff --git a/waku/waku_store/rpc_codec.nim b/waku/waku_store/rpc_codec.nim index 3223fb7ec..20611027c 100644 --- a/waku/waku_store/rpc_codec.nim +++ b/waku/waku_store/rpc_codec.nim @@ -23,7 +23,7 @@ proc encode*(index: PagingIndexRPC): ProtoBuffer = ## returns the resultant ProtoBuffer var pb = initProtoBuffer() - pb.write3(1, index.digest.data) + pb.write3(1, index.messageHash.data) pb.write3(2, zint64(index.receiverTime)) pb.write3(3, zint64(index.senderTime)) pb.write3(4, index.pubsubTopic) @@ -38,13 +38,13 @@ proc decode*(T: type PagingIndexRPC, buffer: seq[byte]): ProtobufResult[T] = var data: seq[byte] if not ?pb.getField(1, data): - return err(ProtobufError.missingRequiredField("digest")) + return err(ProtobufError.missingRequiredField("messageHash")) else: - var digest = MessageDigest() + var messageHash = MessageDigest() for count, b in data: - digest.data[count] = b + messageHash.data[count] = b - rpc.digest = digest + rpc.messageHash = messageHash var receiverTime: zint64 if not ?pb.getField(2, receiverTime):