From 50a54cb4eeb22a16330f0d8254b3cc7f6f5f7ed8 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Tue, 27 Apr 2021 16:52:24 -0700 Subject: [PATCH] Enabling pubsub topic filter in history queries (#492) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * replaces topics with seq of ContentFilters * update topics to contentFilter * updates the contentFilter structure one content topic per content filter instead of a sequence of topics * updates store json rpc api * renames ContentFilter to HistoryContentFilter * unit test for a query with several content filters * makes shortcut for store api * updates chat2 * clean up * renames topic to contentTopic * adds pubsub topic to the history query updates message store interface to return the pubsub topic updates waku message store implementation updates database schema to hold pubsub topi per waku message * clarifies the use of content topic in store api * clarifies the use of contentTopic in the init method of HistoryContentFilter * simplifies the test and add comments * lowers the field number of pubsub topic in historyQuery protobuf * captures an empty contentFilter case * test pubsub topic filter for the entire history and no message match * demoves duplicates * adds TODO * fix a broken comment line * updates waku store codec * swaps the order of pubsub topic and content topic in protobuf * Update waku/v2/protocol/waku_store/waku_store_types.nim Co-authored-by: Oskar Thorén * updates the pubsub topic to the default value * bumps protocol id * moves the comment close to IndexedWakuMessage * adds checks to the store put method * makes table title a constant variable and retitles the table to Message * updates the changelog * minor update * minor * beta2 to beta3 * minor Co-authored-by: Oskar Thorén --- CHANGELOG.md | 2 + tests/v2/test_message_store.nim | 7 +- tests/v2/test_waku_store.nim | 144 ++++++++++++++++++ .../v2/node/storage/message/message_store.nim | 9 +- .../storage/message/waku_message_store.nim | 19 ++- waku/v2/protocol/waku_store/waku_store.nim | 67 ++++---- .../protocol/waku_store/waku_store_types.nim | 3 + 7 files changed, 210 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c06b043f9..671fae6d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ - Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart. - Changed `contentTopic` back to a string - Fixed: content filtering now works on any PubSub topic and not just the `waku` default. +- Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`. +- Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed. ## 2021-01-05 v0.2 diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 85ca00136..1360c34f6 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -13,6 +13,7 @@ suite "Message Store": database = SqliteDatabase.init("", inMemory = true)[] store = WakuMessageStore.init(database)[] topic = ContentTopic("/waku/2/default-content/proto") + pubsubTopic = "/waku/2/default-waku/proto" var msgs = @[ WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic), @@ -23,12 +24,14 @@ suite "Message Store": defer: store.close() for msg in msgs: - discard store.put(computeIndex(msg), msg) + let output = store.put(computeIndex(msg), msg, pubsubTopic) + check output.isOk var responseCount = 0 - proc data(timestamp: uint64, msg: WakuMessage) = + proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) = responseCount += 1 check msg in msgs + check psTopic == pubsubTopic let res = store.getAll(data) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 60ad520df..46ce1046b 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -104,6 +104,150 @@ procSuite "Waku Store": check: (await completionFut.withTimeout(5.seconds)) == true + + asyncTest "handle query with pubsub topic filter": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + contentTopic1 = defaultContentTopic + contentTopic2 = ContentTopic("2") + contentTopic3 = ContentTopic("3") + msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic1) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic2) + msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic3) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + pubsubtopic1 = "queried topic" + pubsubtopic2 = "non queried topic" + subscription: MessageNotificationSubscription = proto.subscription() + # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) + rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + # publish messages + await subscriptions.notify(pubsubtopic1, msg1) + await subscriptions.notify(pubsubtopic2, msg2) + await subscriptions.notify(pubsubtopic2, msg3) + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 1 + # msg1 is the only match for the query predicate pubsubtopic1 AND (contentTopic1 OR contentTopic3) + response.messages.anyIt(it == msg1) + completionFut.complete(true) + + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + asyncTest "handle query with pubsub topic filter with no match": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + pubsubtopic1 = "queried topic" + pubsubtopic2 = "non queried topic" + subscription: MessageNotificationSubscription = proto.subscription() + # this query targets: pubsubtopic1 + rpc = HistoryQuery(pubsubTopic: pubsubTopic1) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + # publish messages + await subscriptions.notify(pubsubtopic2, msg1) + await subscriptions.notify(pubsubtopic2, msg2) + await subscriptions.notify(pubsubtopic2, msg3) + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 0 + completionFut.complete(true) + + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + asyncTest "handle query with pubsub topic filter matching the entire stored messages": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + pubsubtopic = "queried topic" + subscription: MessageNotificationSubscription = proto.subscription() + # this query targets: pubsubtopic + rpc = HistoryQuery(pubsubTopic: pubsubtopic) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + # publish messages + await subscriptions.notify(pubsubtopic, msg1) + await subscriptions.notify(pubsubtopic, msg2) + await subscriptions.notify(pubsubtopic, msg3) + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 3 + response.messages.anyIt(it == msg1) + response.messages.anyIt(it == msg2) + response.messages.anyIt(it == msg3) + completionFut.complete(true) + + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true asyncTest "handle query with store and restarts": let diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 6de59d2b3..84fe20eb3 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -4,16 +4,17 @@ import ../../../utils/pagination ## This module defines a message store interface. Implementations of -## MessageStore are used by the `WakuStore` protocol to store and re- -## trieve historical messages +## MessageStore are used by the `WakuStore` protocol to store and +## retrieve historical messages type - DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.} + DataProc* = proc(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) {.closure.} MessageStoreResult*[T] = Result[T, string] MessageStore* = ref object of RootObj # MessageStore interface -method put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] {.base.} = discard +method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard + diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index aaa8925e9..359a55595 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -13,6 +13,7 @@ import export sqlite +const TABLE_TITLE = "Message" # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim # @@ -28,10 +29,11 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] ## - 4-Byte ContentTopic stored as an Integer ## - Payload stored as a blob let prepare = db.prepareStmt(""" - CREATE TABLE IF NOT EXISTS messages ( + CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( id BLOB PRIMARY KEY, timestamp INTEGER NOT NULL, contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, payload BLOB ) WITHOUT ROWID; """, NoParams, void) @@ -45,7 +47,7 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] ok(WakuMessageStore(database: db)) -method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] = +method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = ## Adds a message to the storage. ## ## **Example:** @@ -56,15 +58,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageS ## echo "error" ## let prepare = db.database.prepareStmt( - "INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);", - (seq[byte], int64, seq[byte], seq[byte]), + "INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic) VALUES (?, ?, ?, ?, ?);", + (seq[byte], int64, seq[byte], seq[byte], seq[byte]), void ) if prepare.isErr: return err("failed to prepare") - let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload)) + let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes())) if res.isErr: return err("failed") @@ -91,12 +93,15 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto topicL = sqlite3_column_bytes(s,1) p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) l = sqlite3_column_bytes(s, 2) + pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3)) + pubsubTopicL = sqlite3_column_bytes(s,3) onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))), - payload: @(toOpenArray(p, 0, l-1)))) + payload: @(toOpenArray(p, 0, l-1))), + string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1)))) - let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages ORDER BY timestamp ASC", msg) + let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg) if res.isErr: return err("failed") diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 02dc1c496..12393bc5c 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -27,7 +27,7 @@ logScope: topics = "wakustore" const - WakuStoreCodec* = "/vac/waku/store/2.0.0-beta2" + WakuStoreCodec* = "/vac/waku/store/2.0.0-beta3" # Error types (metric label values) const @@ -124,24 +124,21 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() let pb = initProtoBuffer(buffer) - # var topics: seq[ContentTopic] + discard ? pb.getField(2, msg.pubsubTopic) - # discard ? pb.getRepeatedField(2, topics) - # msg.topics = topics var buffs: seq[seq[byte]] - discard ? pb.getRepeatedField(2, buffs) + discard ? pb.getRepeatedField(3, buffs) for buf in buffs: msg.contentFilters.add(? HistoryContentFilter.init(buf)) - var pagingInfoBuffer: seq[byte] - discard ? pb.getField(3, pagingInfoBuffer) + discard ? pb.getField(4, pagingInfoBuffer) msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer) - discard ? pb.getField(4, msg.startTime) - discard ? pb.getField(5, msg.endTime) + discard ? pb.getField(5, msg.startTime) + discard ? pb.getField(6, msg.endTime) ok(msg) @@ -186,16 +183,17 @@ proc encode*(filter: HistoryContentFilter): ProtoBuffer = proc encode*(query: HistoryQuery): ProtoBuffer = result = initProtoBuffer() - - # for topic in query.topics: - # result.write(2, topic) - for filter in query.contentFilters: - result.write(2, filter.encode()) - result.write(3, query.pagingInfo.encode()) + result.write(2, query.pubsubTopic) + + for filter in query.contentFilters: + result.write(3, filter.encode()) + + result.write(4, query.pagingInfo.encode()) + + result.write(5, query.startTime) + result.write(6, query.endTime) - result.write(4, query.startTime) - result.write(5, query.endTime) proc encode*(response: HistoryResponse): ProtoBuffer = result = initProtoBuffer() @@ -311,12 +309,24 @@ proc paginateWithoutIndex(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (se proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = result = HistoryResponse(messages: newSeq[WakuMessage]()) - # data holds IndexedWakuMessage whose topics match the query - var data : seq[IndexedWakuMessage] = @[] - for filter in query.contentFilters: - var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic) - # TODO remove duplicates from data - data.add(matched) + var data : seq[IndexedWakuMessage] = w.messages + + # filter based on content filters + # an empty list of contentFilters means no content filter is requested + if ((query.contentFilters).len != 0): + # matchedMessages holds IndexedWakuMessage whose content topics match the queried Content filters + var matchedMessages : seq[IndexedWakuMessage] = @[] + for filter in query.contentFilters: + var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic) + matchedMessages.add(matched) + # remove duplicates + # duplicates may exist if two content filters target the same content topic, then the matched message gets added more than once + data = matchedMessages.deduplicate() + + # filter based on pubsub topic + # an empty pubsub topic means no pubsub topic filter is requested + if ((query.pubsubTopic).len != 0): + data = data.filterIt(it.pubsubTopic == query.pubsubTopic) # temporal filtering # check whether the history query contains a time filter @@ -366,8 +376,9 @@ method init*(ws: WakuStore) = if ws.store.isNil: return - proc onData(timestamp: uint64, msg: WakuMessage) = - ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex())) + proc onData(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) = + # TODO index should not be recalculated + ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic)) let res = ws.store.getAll(onData) if res.isErr: @@ -399,17 +410,17 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = proc handle(topic: string, msg: WakuMessage) {.async.} = debug "subscription handle", topic=topic let index = msg.computeIndex() - proto.messages.add(IndexedWakuMessage(msg: msg, index: index)) + proto.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) waku_store_messages.inc(labelValues = ["stored"]) if proto.store.isNil: return - let res = proto.store.put(index, msg) + let res = proto.store.put(index, msg, topic) if res.isErr: warn "failed to store messages", err = res.error waku_store_errors.inc(labelValues = ["store_failure"]) - MessageNotificationSubscription.init(@[], handle) + result = MessageNotificationSubscription.init(@[], handle) proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 3046de3fb..3d96d5d21 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -23,9 +23,11 @@ type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} IndexedWakuMessage* = object + # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message ## This type is used to encapsulate a WakuMessage and its Index msg*: WakuMessage index*: Index + pubsubTopic*: string PagingDirection* {.pure.} = enum ## PagingDirection determines the direction of pagination @@ -40,6 +42,7 @@ type HistoryQuery* = object contentFilters*: seq[HistoryContentFilter] + pubsubTopic*: string pagingInfo*: PagingInfo # used for pagination startTime*: float64 # used for time-window query endTime*: float64 # used for time-window query