Enabling pubsub topic filter in history queries (#492)

* replaces topics with seq of ContentFilters

* update topics to contentFilter

* updates the contentFilter  structure

 one content  topic per content filter instead of a sequence of topics

* updates store json rpc api

* renames ContentFilter to HistoryContentFilter

* unit test for a query with several content filters

* makes shortcut for store api

* updates chat2

* clean up

* renames topic to contentTopic

* adds pubsub topic to the history query

updates message store interface to return the pubsub topic
updates waku message store implementation
updates database schema to hold pubsub topi per waku message

* clarifies the use of content topic in store api

* clarifies the use of contentTopic in the init method of HistoryContentFilter

* simplifies the test and add comments

* lowers the field number of pubsub topic in historyQuery  protobuf

* captures an empty contentFilter case

* test pubsub topic filter for the entire history and no message match

* demoves duplicates

* adds TODO

* fix a broken comment line

* updates waku store codec

* swaps the order of pubsub topic and content topic in protobuf

* Update waku/v2/protocol/waku_store/waku_store_types.nim

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>

* updates the pubsub topic to the default value

* bumps protocol id

* moves the comment close to IndexedWakuMessage

* adds checks to the store put method

* makes table title a constant variable and retitles the table to Message

* updates the changelog

* minor update

* minor

* beta2 to beta3

* minor

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-04-27 16:52:24 -07:00 committed by GitHub
parent 7bb58d089d
commit 50a54cb4ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 210 additions and 41 deletions

View File

@ -11,6 +11,8 @@
- Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart. - Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart.
- Changed `contentTopic` back to a string - Changed `contentTopic` back to a string
- Fixed: content filtering now works on any PubSub topic and not just the `waku` default. - Fixed: content filtering now works on any PubSub topic and not just the `waku` default.
- Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`.
- Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed.
## 2021-01-05 v0.2 ## 2021-01-05 v0.2

View File

@ -13,6 +13,7 @@ suite "Message Store":
database = SqliteDatabase.init("", inMemory = true)[] database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[] store = WakuMessageStore.init(database)[]
topic = ContentTopic("/waku/2/default-content/proto") topic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
var msgs = @[ var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic), WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic),
@ -23,12 +24,14 @@ suite "Message Store":
defer: store.close() defer: store.close()
for msg in msgs: for msg in msgs:
discard store.put(computeIndex(msg), msg) let output = store.put(computeIndex(msg), msg, pubsubTopic)
check output.isOk
var responseCount = 0 var responseCount = 0
proc data(timestamp: uint64, msg: WakuMessage) = proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) =
responseCount += 1 responseCount += 1
check msg in msgs check msg in msgs
check psTopic == pubsubTopic
let res = store.getAll(data) let res = store.getAll(data)

View File

@ -104,6 +104,150 @@ procSuite "Waku Store":
check: check:
(await completionFut.withTimeout(5.seconds)) == true (await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle query with pubsub topic filter":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
contentTopic1 = defaultContentTopic
contentTopic2 = ContentTopic("2")
contentTopic3 = ContentTopic("3")
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic1)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic2)
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic3)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic1 = "queried topic"
pubsubtopic2 = "non queried topic"
subscription: MessageNotificationSubscription = proto.subscription()
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1)
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto)
# publish messages
await subscriptions.notify(pubsubtopic1, msg1)
await subscriptions.notify(pubsubtopic2, msg2)
await subscriptions.notify(pubsubtopic2, msg3)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 1
# msg1 is the only match for the query predicate pubsubtopic1 AND (contentTopic1 OR contentTopic3)
response.messages.anyIt(it == msg1)
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle query with pubsub topic filter with no match":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic1 = "queried topic"
pubsubtopic2 = "non queried topic"
subscription: MessageNotificationSubscription = proto.subscription()
# this query targets: pubsubtopic1
rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto)
# publish messages
await subscriptions.notify(pubsubtopic2, msg1)
await subscriptions.notify(pubsubtopic2, msg2)
await subscriptions.notify(pubsubtopic2, msg3)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 0
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle query with pubsub topic filter matching the entire stored messages":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic = "queried topic"
subscription: MessageNotificationSubscription = proto.subscription()
# this query targets: pubsubtopic
rpc = HistoryQuery(pubsubTopic: pubsubtopic)
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto)
# publish messages
await subscriptions.notify(pubsubtopic, msg1)
await subscriptions.notify(pubsubtopic, msg2)
await subscriptions.notify(pubsubtopic, msg3)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 3
response.messages.anyIt(it == msg1)
response.messages.anyIt(it == msg2)
response.messages.anyIt(it == msg3)
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle query with store and restarts": asyncTest "handle query with store and restarts":
let let

View File

@ -4,16 +4,17 @@ import
../../../utils/pagination ../../../utils/pagination
## This module defines a message store interface. Implementations of ## This module defines a message store interface. Implementations of
## MessageStore are used by the `WakuStore` protocol to store and re- ## MessageStore are used by the `WakuStore` protocol to store and
## trieve historical messages ## retrieve historical messages
type type
DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.} DataProc* = proc(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) {.closure.}
MessageStoreResult*[T] = Result[T, string] MessageStoreResult*[T] = Result[T, string]
MessageStore* = ref object of RootObj MessageStore* = ref object of RootObj
# MessageStore interface # MessageStore interface
method put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] {.base.} = discard method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard

View File

@ -13,6 +13,7 @@ import
export sqlite export sqlite
const TABLE_TITLE = "Message"
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
# #
@ -28,10 +29,11 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
## - 4-Byte ContentTopic stored as an Integer ## - 4-Byte ContentTopic stored as an Integer
## - Payload stored as a blob ## - Payload stored as a blob
let prepare = db.prepareStmt(""" let prepare = db.prepareStmt("""
CREATE TABLE IF NOT EXISTS messages ( CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
id BLOB PRIMARY KEY, id BLOB PRIMARY KEY,
timestamp INTEGER NOT NULL, timestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL, contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB payload BLOB
) WITHOUT ROWID; ) WITHOUT ROWID;
""", NoParams, void) """, NoParams, void)
@ -45,7 +47,7 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
ok(WakuMessageStore(database: db)) ok(WakuMessageStore(database: db))
method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] = method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
## Adds a message to the storage. ## Adds a message to the storage.
## ##
## **Example:** ## **Example:**
@ -56,15 +58,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageS
## echo "error" ## echo "error"
## ##
let prepare = db.database.prepareStmt( let prepare = db.database.prepareStmt(
"INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);", "INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic) VALUES (?, ?, ?, ?, ?);",
(seq[byte], int64, seq[byte], seq[byte]), (seq[byte], int64, seq[byte], seq[byte], seq[byte]),
void void
) )
if prepare.isErr: if prepare.isErr:
return err("failed to prepare") return err("failed to prepare")
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload)) let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes()))
if res.isErr: if res.isErr:
return err("failed") return err("failed")
@ -91,12 +93,15 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
topicL = sqlite3_column_bytes(s,1) topicL = sqlite3_column_bytes(s,1)
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
l = sqlite3_column_bytes(s, 2) l = sqlite3_column_bytes(s, 2)
pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
pubsubTopicL = sqlite3_column_bytes(s,3)
onData(uint64(timestamp), onData(uint64(timestamp),
WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))), WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))),
payload: @(toOpenArray(p, 0, l-1)))) payload: @(toOpenArray(p, 0, l-1))),
string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1))))
let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages ORDER BY timestamp ASC", msg) let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg)
if res.isErr: if res.isErr:
return err("failed") return err("failed")

View File

@ -27,7 +27,7 @@ logScope:
topics = "wakustore" topics = "wakustore"
const const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta2" WakuStoreCodec* = "/vac/waku/store/2.0.0-beta3"
# Error types (metric label values) # Error types (metric label values)
const const
@ -124,24 +124,21 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery() var msg = HistoryQuery()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
# var topics: seq[ContentTopic] discard ? pb.getField(2, msg.pubsubTopic)
# discard ? pb.getRepeatedField(2, topics)
# msg.topics = topics
var buffs: seq[seq[byte]] var buffs: seq[seq[byte]]
discard ? pb.getRepeatedField(2, buffs) discard ? pb.getRepeatedField(3, buffs)
for buf in buffs: for buf in buffs:
msg.contentFilters.add(? HistoryContentFilter.init(buf)) msg.contentFilters.add(? HistoryContentFilter.init(buf))
var pagingInfoBuffer: seq[byte] var pagingInfoBuffer: seq[byte]
discard ? pb.getField(3, pagingInfoBuffer) discard ? pb.getField(4, pagingInfoBuffer)
msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer) msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer)
discard ? pb.getField(4, msg.startTime) discard ? pb.getField(5, msg.startTime)
discard ? pb.getField(5, msg.endTime) discard ? pb.getField(6, msg.endTime)
ok(msg) ok(msg)
@ -186,16 +183,17 @@ proc encode*(filter: HistoryContentFilter): ProtoBuffer =
proc encode*(query: HistoryQuery): ProtoBuffer = proc encode*(query: HistoryQuery): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
# for topic in query.topics:
# result.write(2, topic)
for filter in query.contentFilters:
result.write(2, filter.encode())
result.write(3, query.pagingInfo.encode()) result.write(2, query.pubsubTopic)
for filter in query.contentFilters:
result.write(3, filter.encode())
result.write(4, query.pagingInfo.encode())
result.write(5, query.startTime)
result.write(6, query.endTime)
result.write(4, query.startTime)
result.write(5, query.endTime)
proc encode*(response: HistoryResponse): ProtoBuffer = proc encode*(response: HistoryResponse): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
@ -311,12 +309,24 @@ proc paginateWithoutIndex(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (se
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
result = HistoryResponse(messages: newSeq[WakuMessage]()) result = HistoryResponse(messages: newSeq[WakuMessage]())
# data holds IndexedWakuMessage whose topics match the query var data : seq[IndexedWakuMessage] = w.messages
var data : seq[IndexedWakuMessage] = @[]
for filter in query.contentFilters: # filter based on content filters
var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic) # an empty list of contentFilters means no content filter is requested
# TODO remove duplicates from data if ((query.contentFilters).len != 0):
data.add(matched) # matchedMessages holds IndexedWakuMessage whose content topics match the queried Content filters
var matchedMessages : seq[IndexedWakuMessage] = @[]
for filter in query.contentFilters:
var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic)
matchedMessages.add(matched)
# remove duplicates
# duplicates may exist if two content filters target the same content topic, then the matched message gets added more than once
data = matchedMessages.deduplicate()
# filter based on pubsub topic
# an empty pubsub topic means no pubsub topic filter is requested
if ((query.pubsubTopic).len != 0):
data = data.filterIt(it.pubsubTopic == query.pubsubTopic)
# temporal filtering # temporal filtering
# check whether the history query contains a time filter # check whether the history query contains a time filter
@ -366,8 +376,9 @@ method init*(ws: WakuStore) =
if ws.store.isNil: if ws.store.isNil:
return return
proc onData(timestamp: uint64, msg: WakuMessage) = proc onData(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) =
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex())) # TODO index should not be recalculated
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic))
let res = ws.store.getAll(onData) let res = ws.store.getAll(onData)
if res.isErr: if res.isErr:
@ -399,17 +410,17 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription =
proc handle(topic: string, msg: WakuMessage) {.async.} = proc handle(topic: string, msg: WakuMessage) {.async.} =
debug "subscription handle", topic=topic debug "subscription handle", topic=topic
let index = msg.computeIndex() let index = msg.computeIndex()
proto.messages.add(IndexedWakuMessage(msg: msg, index: index)) proto.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
waku_store_messages.inc(labelValues = ["stored"]) waku_store_messages.inc(labelValues = ["stored"])
if proto.store.isNil: if proto.store.isNil:
return return
let res = proto.store.put(index, msg) let res = proto.store.put(index, msg, topic)
if res.isErr: if res.isErr:
warn "failed to store messages", err = res.error warn "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"]) waku_store_errors.inc(labelValues = ["store_failure"])
MessageNotificationSubscription.init(@[], handle) result = MessageNotificationSubscription.init(@[], handle)
proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.

View File

@ -23,9 +23,11 @@ type
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
IndexedWakuMessage* = object IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index ## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage msg*: WakuMessage
index*: Index index*: Index
pubsubTopic*: string
PagingDirection* {.pure.} = enum PagingDirection* {.pure.} = enum
## PagingDirection determines the direction of pagination ## PagingDirection determines the direction of pagination
@ -40,6 +42,7 @@ type
HistoryQuery* = object HistoryQuery* = object
contentFilters*: seq[HistoryContentFilter] contentFilters*: seq[HistoryContentFilter]
pubsubTopic*: string
pagingInfo*: PagingInfo # used for pagination pagingInfo*: PagingInfo # used for pagination
startTime*: float64 # used for time-window query startTime*: float64 # used for time-window query
endTime*: float64 # used for time-window query endTime*: float64 # used for time-window query