From 20efdd7255e09f91edb6a05178203a7b08e23888 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Mon, 28 Feb 2022 17:29:01 +0100 Subject: [PATCH] Store data model updates and fixes (#864) --- tests/v2/test_pagination_utils.nim | 49 +++++++++++++++++-- tests/v2/test_waku_pagination.nim | 3 +- .../storage/message/waku_message_store.nim | 7 ++- .../storage/migration/migration_types.nim | 2 +- .../00003_convertTimestampsToInt64.up.sql | 3 +- .../message/00004_extendPrimaryKey.up.sql | 18 +++++++ waku/v2/protocol/waku_store/waku_store.nim | 25 +++++++--- waku/v2/utils/pagination.nim | 38 +++++++++++--- 8 files changed, 119 insertions(+), 26 deletions(-) create mode 100644 waku/v2/node/storage/migration/migrations_scripts/message/00004_extendPrimaryKey.up.sql diff --git a/tests/v2/test_pagination_utils.nim b/tests/v2/test_pagination_utils.nim index b4360805c..ae0ba23b9 100644 --- a/tests/v2/test_pagination_utils.nim +++ b/tests/v2/test_pagination_utils.nim @@ -45,7 +45,26 @@ procSuite "Pagination utils": eqIndex3 = Index(digest: hashFromStr("0003"), receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons senderTime: getNanosecondTime(54321)) - + diffPsTopic = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(1000), + pubsubTopic: "zzzz") + noSenderTime1 = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(1100), + senderTime: getNanosecondTime(0), + pubsubTopic: "zzzz") + noSenderTime2 = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(10000), + senderTime: getNanosecondTime(0), + pubsubTopic: "zzzz") + noSenderTime3 = Index(digest: hashFromStr("1234"), + receiverTime: getNanosecondTime(1200), + senderTime: getNanosecondTime(0), + pubsubTopic: "aaaa") + noSenderTime4 = Index(digest: hashFromStr("0"), + receiverTime: getNanosecondTime(1200), + senderTime: getNanosecondTime(0), + pubsubTopic: "zzzz") ## Test suite asyncTest "Index comparison": @@ -63,16 +82,37 @@ procSuite "Pagination utils": # Index comparison when equal cmp(eqIndex1, eqIndex2) == 0 - # receiverTime difference play no role + # pubsubTopic difference + cmp(smallIndex1, diffPsTopic) < 0 + + # receiverTime diff plays no role when senderTime set cmp(eqIndex1, eqIndex3) == 0 + # receiverTime diff plays no role when digest/pubsubTopic equal + cmp(noSenderTime1, noSenderTime2) == 0 + + # sort on receiverTime with no senderTimestamp and unequal pubsubTopic + cmp(noSenderTime1, noSenderTime3) < 0 + + # sort on receiverTime with no senderTimestamp and unequal digest + cmp(noSenderTime1, noSenderTime4) < 0 + + # sort on receiverTime if no senderTimestamp on only one side + cmp(smallIndex1, noSenderTime1) < 0 + cmp(noSenderTime1, smallIndex1) > 0 # Test symmetry + cmp(noSenderTime2, eqIndex3) < 0 + cmp(eqIndex3, noSenderTime2) > 0 # Test symmetry + asyncTest "Index equality": check: # Exactly equal eqIndex1 == eqIndex2 - # Receiver time plays no role + # Receiver time plays no role, even without sender time eqIndex1 == eqIndex3 + noSenderTime1 == noSenderTime2 # only receiver time differs, indices are equal + noSenderTime1 != noSenderTime3 # pubsubTopics differ + noSenderTime1 != noSenderTime4 # digests differ # Unequal sender time smallIndex1 != largeIndex1 @@ -82,3 +122,6 @@ procSuite "Pagination utils": # Unequal hash and digest smallIndex1 != eqIndex1 + + # Unequal pubsubTopic + smallIndex1 != diffPsTopic diff --git a/tests/v2/test_waku_pagination.nim b/tests/v2/test_waku_pagination.nim index b4c4cb04e..b84ce8965 100644 --- a/tests/v2/test_waku_pagination.nim +++ b/tests/v2/test_waku_pagination.nim @@ -34,7 +34,8 @@ procSuite "pagination": len(index.digest.data) != 0 len(index.digest.data) == 32 # sha2 output length in bytes index.receiverTime != 0 # the receiver timestamp should be a non-zero value - index.senderTime == 2 + index.senderTime == 2 + index.pubsubTopic == DefaultTopic let wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto")) diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 3c83e24f0..24fda1d0f 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -31,13 +31,14 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] let prepare = db.prepareStmt(""" CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( - id BLOB PRIMARY KEY, + id BLOB, receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, contentTopic BLOB NOT NULL, pubsubTopic BLOB NOT NULL, payload BLOB, version INTEGER NOT NULL, - senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL + senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic) ) WITHOUT ROWID; """, NoParams, void) @@ -133,5 +134,3 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non proc close*(db: WakuMessageStore) = ## Closes the database. db.database.close() - - diff --git a/waku/v2/node/storage/migration/migration_types.nim b/waku/v2/node/storage/migration/migration_types.nim index f3145f049..12b958c38 100644 --- a/waku/v2/node/storage/migration/migration_types.nim +++ b/waku/v2/node/storage/migration/migration_types.nim @@ -7,7 +7,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message" const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer" const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts" -const USER_VERSION* = 2 # increase this when there is an update in the database schema +const USER_VERSION* = 4 # increase this when there is an update in the database schema type MigrationScriptsResult*[T] = Result[T, string] type diff --git a/waku/v2/node/storage/migration/migrations_scripts/message/00003_convertTimestampsToInt64.up.sql b/waku/v2/node/storage/migration/migrations_scripts/message/00003_convertTimestampsToInt64.up.sql index 55dd292f1..89e025041 100644 --- a/waku/v2/node/storage/migration/migrations_scripts/message/00003_convertTimestampsToInt64.up.sql +++ b/waku/v2/node/storage/migration/migrations_scripts/message/00003_convertTimestampsToInt64.up.sql @@ -22,9 +22,8 @@ CREATE TABLE IF NOT EXISTS Message( senderTimestamp INTEGER NOT NULL ) WITHOUT ROWID; - INSERT INTO Message (id, receiverTimestamp, contentTopic, pubsubTopic, payload, version, senderTimestamp) - SELECT id, FLOOR(receiverTimestamp*1000000000), contentTopic, pubsubTopic, payload, version, FLOOR(senderTimestamp*1000000000) + SELECT id, CAST(receiverTimestamp*1000000000 AS INTEGER), contentTopic, pubsubTopic, payload, version, CAST(senderTimestamp*1000000000 AS INTEGER) FROM Message_backup; DROP TABLE Message_backup; \ No newline at end of file diff --git a/waku/v2/node/storage/migration/migrations_scripts/message/00004_extendPrimaryKey.up.sql b/waku/v2/node/storage/migration/migrations_scripts/message/00004_extendPrimaryKey.up.sql new file mode 100644 index 000000000..88b191f97 --- /dev/null +++ b/waku/v2/node/storage/migration/migrations_scripts/message/00004_extendPrimaryKey.up.sql @@ -0,0 +1,18 @@ +ALTER TABLE Message RENAME TO Message_backup; + +CREATE TABLE IF NOT EXISTS Message( + id BLOB, + receiverTimestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL, + senderTimestamp INTEGER NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic) + ) WITHOUT ROWID; + +INSERT INTO Message + SELECT * + FROM Message_backup; + +DROP TABLE Message_backup; \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 06df73313..950d2eed0 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -56,7 +56,9 @@ const # TODO Move serialization function to separate file, too noisy # TODO Move pagination to separate file, self-contained logic -proc computeIndex*(msg: WakuMessage, receivedTime = getNanosecondTime(getTime().toUnixFloat())): Index = +proc computeIndex*(msg: WakuMessage, + receivedTime = getNanosecondTime(getTime().toUnixFloat()), + pubsubTopic = DefaultTopic): Index = ## Takes a WakuMessage with received timestamp and returns its Index. ## Received timestamp will default to system time if not provided. var ctx: sha256 @@ -66,8 +68,13 @@ proc computeIndex*(msg: WakuMessage, receivedTime = getNanosecondTime(getTime(). let digest = ctx.finish() # computes the hash ctx.clear() - let receiverTime = receivedTime - var index = Index(digest:digest, receiverTime: receiverTime, senderTime: msg.timestamp) + let + receiverTime = receivedTime + index = Index(digest:digest, + receiverTime: receiverTime, + senderTime: msg.timestamp, + pubsubTopic: pubsubTopic) + return index proc encode*(index: Index): ProtoBuffer = @@ -81,6 +88,7 @@ proc encode*(index: Index): ProtoBuffer = output.write(1, index.digest.data) output.write(2, zint64(index.receiverTime)) output.write(3, zint64(index.senderTime)) + output.write(4, index.pubsubTopic) return output @@ -121,6 +129,9 @@ proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getField(3, senderTime) index.senderTime = Timestamp(senderTime) + # read the pubsubTopic + discard ? pb.getField(4, index.pubsubTopic) + return ok(index) proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = @@ -177,7 +188,6 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getField(6, endTime) msg.endTime = Timestamp(endTime) - return ok(msg) proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = @@ -344,7 +354,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) = # TODO index should not be recalculated - discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic)) + discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime, pubsubTopic), pubsubTopic: pubsubTopic)) info "attempting to load messages from persistent storage" @@ -380,12 +390,13 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = # Handle WakuMessage according to store protocol trace "handle message in WakuStore", topic=topic, msg=msg - let index = msg.computeIndex() + let index = msg.computeIndex(pubsubTopic = topic) let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) if addRes.isErr: trace "Attempt to add message with duplicate index to store", msg=msg, index=index waku_store_errors.inc(labelValues = ["duplicate"]) + return # Do not attempt to store in persistent DB waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) @@ -557,7 +568,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem # exclude index from the comparison criteria for msg in msgList: - let index = msg.computeIndex() + let index = msg.computeIndex(pubsubTopic = DefaultTopic) # check for duplicate messages # TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic if ws.messages.contains(index): diff --git a/waku/v2/utils/pagination.nim b/waku/v2/utils/pagination.nim index 0086fa445..ba5c3a6a4 100644 --- a/waku/v2/utils/pagination.nim +++ b/waku/v2/utils/pagination.nim @@ -14,26 +14,48 @@ export hash type Index* = object ## This type contains the description of an Index used in the pagination of WakuMessages - digest*: MDigest[256] + digest*: MDigest[256] # calculated over payload and content topic receiverTime*: Timestamp senderTime*: Timestamp # the time at which the message is generated + pubsubTopic*: string proc `==`*(x, y: Index): bool = - ## receiverTime plays no role in index comparison - (x.senderTime == y.senderTime) and (x.digest == y.digest) + ## receiverTime plays no role in index equality + (x.senderTime == y.senderTime) and + (x.digest == y.digest) and + (x.pubsubTopic == y.pubsubTopic) proc cmp*(x, y: Index): int = ## compares x and y ## returns 0 if they are equal ## returns -1 if x < y ## returns 1 if x > y - ## receiverTime plays no role in index comparison + ## + ## Default sorting order priority is: + ## 1. senderTimestamp + ## 2. receiverTimestamp (a fallback only if senderTimestamp unset on either side, and all other fields unequal) + ## 3. message digest + ## 4. pubsubTopic + + if x == y: + # Quick exit ensures receiver time does not affect index equality + return 0 # Timestamp has a higher priority for comparison - let timecmp = cmp(x.senderTime, y.senderTime) + let + # Use receiverTime where senderTime is unset + xTimestamp = if x.senderTime == 0: x.receiverTime + else: x.senderTime + yTimestamp = if y.senderTime == 0: y.receiverTime + else: y.senderTime + + let timecmp = cmp(xTimestamp, yTimestamp) if timecmp != 0: return timecmp - # Only when timestamps are equal - let digestcm = cmp(x.digest.data, y.digest.data) - return digestcm + # Continue only when timestamps are equal + let digestcmp = cmp(x.digest.data, y.digest.data) + if digestcmp != 0: + return digestcmp + + return cmp(x.pubsubTopic, y.pubsubTopic)