diff --git a/CHANGELOG.md b/CHANGELOG.md index c06b043f9..671fae6d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ - Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart. - Changed `contentTopic` back to a string - Fixed: content filtering now works on any PubSub topic and not just the `waku` default. +- Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`. +- Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed. ## 2021-01-05 v0.2 diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 85ca00136..1360c34f6 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -13,6 +13,7 @@ suite "Message Store": database = SqliteDatabase.init("", inMemory = true)[] store = WakuMessageStore.init(database)[] topic = ContentTopic("/waku/2/default-content/proto") + pubsubTopic = "/waku/2/default-waku/proto" var msgs = @[ WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic), @@ -23,12 +24,14 @@ suite "Message Store": defer: store.close() for msg in msgs: - discard store.put(computeIndex(msg), msg) + let output = store.put(computeIndex(msg), msg, pubsubTopic) + check output.isOk var responseCount = 0 - proc data(timestamp: uint64, msg: WakuMessage) = + proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) = responseCount += 1 check msg in msgs + check psTopic == pubsubTopic let res = store.getAll(data) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 60ad520df..46ce1046b 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -104,6 +104,150 @@ procSuite "Waku Store": check: (await completionFut.withTimeout(5.seconds)) == true + + asyncTest "handle query with pubsub topic filter": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + contentTopic1 = defaultContentTopic + contentTopic2 = ContentTopic("2") + contentTopic3 = ContentTopic("3") + msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic1) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic2) + msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic3) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + pubsubtopic1 = "queried topic" + pubsubtopic2 = "non queried topic" + subscription: MessageNotificationSubscription = proto.subscription() + # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) + rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + # publish messages + await subscriptions.notify(pubsubtopic1, msg1) + await subscriptions.notify(pubsubtopic2, msg2) + await subscriptions.notify(pubsubtopic2, msg3) + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 1 + # msg1 is the only match for the query predicate pubsubtopic1 AND (contentTopic1 OR contentTopic3) + response.messages.anyIt(it == msg1) + completionFut.complete(true) + + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + asyncTest "handle query with pubsub topic filter with no match": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + pubsubtopic1 = "queried topic" + pubsubtopic2 = "non queried topic" + subscription: MessageNotificationSubscription = proto.subscription() + # this query targets: pubsubtopic1 + rpc = HistoryQuery(pubsubTopic: pubsubTopic1) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + # publish messages + await subscriptions.notify(pubsubtopic2, msg1) + await subscriptions.notify(pubsubtopic2, msg2) + await subscriptions.notify(pubsubtopic2, msg3) + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 0 + completionFut.complete(true) + + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + asyncTest "handle query with pubsub topic filter matching the entire stored messages": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + pubsubtopic = "queried topic" + subscription: MessageNotificationSubscription = proto.subscription() + # this query targets: pubsubtopic + rpc = HistoryQuery(pubsubTopic: pubsubtopic) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + # publish messages + await subscriptions.notify(pubsubtopic, msg1) + await subscriptions.notify(pubsubtopic, msg2) + await subscriptions.notify(pubsubtopic, msg3) + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 3 + response.messages.anyIt(it == msg1) + response.messages.anyIt(it == msg2) + response.messages.anyIt(it == msg3) + completionFut.complete(true) + + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true asyncTest "handle query with store and restarts": let diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 6de59d2b3..84fe20eb3 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -4,16 +4,17 @@ import ../../../utils/pagination ## This module defines a message store interface. Implementations of -## MessageStore are used by the `WakuStore` protocol to store and re- -## trieve historical messages +## MessageStore are used by the `WakuStore` protocol to store and +## retrieve historical messages type - DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.} + DataProc* = proc(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) {.closure.} MessageStoreResult*[T] = Result[T, string] MessageStore* = ref object of RootObj # MessageStore interface -method put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] {.base.} = discard +method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard + diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index aaa8925e9..359a55595 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -13,6 +13,7 @@ import export sqlite +const TABLE_TITLE = "Message" # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim # @@ -28,10 +29,11 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] ## - 4-Byte ContentTopic stored as an Integer ## - Payload stored as a blob let prepare = db.prepareStmt(""" - CREATE TABLE IF NOT EXISTS messages ( + CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( id BLOB PRIMARY KEY, timestamp INTEGER NOT NULL, contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, payload BLOB ) WITHOUT ROWID; """, NoParams, void) @@ -45,7 +47,7 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] ok(WakuMessageStore(database: db)) -method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] = +method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = ## Adds a message to the storage. ## ## **Example:** @@ -56,15 +58,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageS ## echo "error" ## let prepare = db.database.prepareStmt( - "INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);", - (seq[byte], int64, seq[byte], seq[byte]), + "INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic) VALUES (?, ?, ?, ?, ?);", + (seq[byte], int64, seq[byte], seq[byte], seq[byte]), void ) if prepare.isErr: return err("failed to prepare") - let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload)) + let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes())) if res.isErr: return err("failed") @@ -91,12 +93,15 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto topicL = sqlite3_column_bytes(s,1) p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) l = sqlite3_column_bytes(s, 2) + pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3)) + pubsubTopicL = sqlite3_column_bytes(s,3) onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))), - payload: @(toOpenArray(p, 0, l-1)))) + payload: @(toOpenArray(p, 0, l-1))), + string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1)))) - let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages ORDER BY timestamp ASC", msg) + let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg) if res.isErr: return err("failed") diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 02dc1c496..12393bc5c 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -27,7 +27,7 @@ logScope: topics = "wakustore" const - WakuStoreCodec* = "/vac/waku/store/2.0.0-beta2" + WakuStoreCodec* = "/vac/waku/store/2.0.0-beta3" # Error types (metric label values) const @@ -124,24 +124,21 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() let pb = initProtoBuffer(buffer) - # var topics: seq[ContentTopic] + discard ? pb.getField(2, msg.pubsubTopic) - # discard ? pb.getRepeatedField(2, topics) - # msg.topics = topics var buffs: seq[seq[byte]] - discard ? pb.getRepeatedField(2, buffs) + discard ? pb.getRepeatedField(3, buffs) for buf in buffs: msg.contentFilters.add(? HistoryContentFilter.init(buf)) - var pagingInfoBuffer: seq[byte] - discard ? pb.getField(3, pagingInfoBuffer) + discard ? pb.getField(4, pagingInfoBuffer) msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer) - discard ? pb.getField(4, msg.startTime) - discard ? pb.getField(5, msg.endTime) + discard ? pb.getField(5, msg.startTime) + discard ? pb.getField(6, msg.endTime) ok(msg) @@ -186,16 +183,17 @@ proc encode*(filter: HistoryContentFilter): ProtoBuffer = proc encode*(query: HistoryQuery): ProtoBuffer = result = initProtoBuffer() - - # for topic in query.topics: - # result.write(2, topic) - for filter in query.contentFilters: - result.write(2, filter.encode()) - result.write(3, query.pagingInfo.encode()) + result.write(2, query.pubsubTopic) + + for filter in query.contentFilters: + result.write(3, filter.encode()) + + result.write(4, query.pagingInfo.encode()) + + result.write(5, query.startTime) + result.write(6, query.endTime) - result.write(4, query.startTime) - result.write(5, query.endTime) proc encode*(response: HistoryResponse): ProtoBuffer = result = initProtoBuffer() @@ -311,12 +309,24 @@ proc paginateWithoutIndex(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (se proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = result = HistoryResponse(messages: newSeq[WakuMessage]()) - # data holds IndexedWakuMessage whose topics match the query - var data : seq[IndexedWakuMessage] = @[] - for filter in query.contentFilters: - var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic) - # TODO remove duplicates from data - data.add(matched) + var data : seq[IndexedWakuMessage] = w.messages + + # filter based on content filters + # an empty list of contentFilters means no content filter is requested + if ((query.contentFilters).len != 0): + # matchedMessages holds IndexedWakuMessage whose content topics match the queried Content filters + var matchedMessages : seq[IndexedWakuMessage] = @[] + for filter in query.contentFilters: + var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic) + matchedMessages.add(matched) + # remove duplicates + # duplicates may exist if two content filters target the same content topic, then the matched message gets added more than once + data = matchedMessages.deduplicate() + + # filter based on pubsub topic + # an empty pubsub topic means no pubsub topic filter is requested + if ((query.pubsubTopic).len != 0): + data = data.filterIt(it.pubsubTopic == query.pubsubTopic) # temporal filtering # check whether the history query contains a time filter @@ -366,8 +376,9 @@ method init*(ws: WakuStore) = if ws.store.isNil: return - proc onData(timestamp: uint64, msg: WakuMessage) = - ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex())) + proc onData(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) = + # TODO index should not be recalculated + ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic)) let res = ws.store.getAll(onData) if res.isErr: @@ -399,17 +410,17 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = proc handle(topic: string, msg: WakuMessage) {.async.} = debug "subscription handle", topic=topic let index = msg.computeIndex() - proto.messages.add(IndexedWakuMessage(msg: msg, index: index)) + proto.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) waku_store_messages.inc(labelValues = ["stored"]) if proto.store.isNil: return - let res = proto.store.put(index, msg) + let res = proto.store.put(index, msg, topic) if res.isErr: warn "failed to store messages", err = res.error waku_store_errors.inc(labelValues = ["store_failure"]) - MessageNotificationSubscription.init(@[], handle) + result = MessageNotificationSubscription.init(@[], handle) proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 3046de3fb..3d96d5d21 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -23,9 +23,11 @@ type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} IndexedWakuMessage* = object + # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message ## This type is used to encapsulate a WakuMessage and its Index msg*: WakuMessage index*: Index + pubsubTopic*: string PagingDirection* {.pure.} = enum ## PagingDirection determines the direction of pagination @@ -40,6 +42,7 @@ type HistoryQuery* = object contentFilters*: seq[HistoryContentFilter] + pubsubTopic*: string pagingInfo*: PagingInfo # used for pagination startTime*: float64 # used for time-window query endTime*: float64 # used for time-window query