mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-18 12:53:09 +00:00
deploy: 6c425f76e9d625d05a333841a893f64f5334f0d6
This commit is contained in:
parent
c13d837e53
commit
bb77b97bab
@ -11,6 +11,8 @@
|
||||
- Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart.
|
||||
- Changed `contentTopic` back to a string
|
||||
- Fixed: content filtering now works on any PubSub topic and not just the `waku` default.
|
||||
- Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`.
|
||||
- Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed.
|
||||
|
||||
## 2021-01-05 v0.2
|
||||
|
||||
|
||||
@ -13,6 +13,7 @@ suite "Message Store":
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = WakuMessageStore.init(database)[]
|
||||
topic = ContentTopic("/waku/2/default-content/proto")
|
||||
pubsubTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
var msgs = @[
|
||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic),
|
||||
@ -23,12 +24,14 @@ suite "Message Store":
|
||||
defer: store.close()
|
||||
|
||||
for msg in msgs:
|
||||
discard store.put(computeIndex(msg), msg)
|
||||
let output = store.put(computeIndex(msg), msg, pubsubTopic)
|
||||
check output.isOk
|
||||
|
||||
var responseCount = 0
|
||||
proc data(timestamp: uint64, msg: WakuMessage) =
|
||||
proc data(timestamp: uint64, msg: WakuMessage, psTopic: string) =
|
||||
responseCount += 1
|
||||
check msg in msgs
|
||||
check psTopic == pubsubTopic
|
||||
|
||||
let res = store.getAll(data)
|
||||
|
||||
|
||||
@ -104,6 +104,150 @@ procSuite "Waku Store":
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
||||
asyncTest "handle query with pubsub topic filter":
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
contentTopic1 = defaultContentTopic
|
||||
contentTopic2 = ContentTopic("2")
|
||||
contentTopic3 = ContentTopic("3")
|
||||
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic1)
|
||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic2)
|
||||
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic3)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
discard await listenSwitch.start()
|
||||
|
||||
let
|
||||
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
||||
pubsubtopic1 = "queried topic"
|
||||
pubsubtopic2 = "non queried topic"
|
||||
subscription: MessageNotificationSubscription = proto.subscription()
|
||||
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
|
||||
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1)
|
||||
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
|
||||
listenSwitch.mount(proto)
|
||||
|
||||
# publish messages
|
||||
await subscriptions.notify(pubsubtopic1, msg1)
|
||||
await subscriptions.notify(pubsubtopic2, msg2)
|
||||
await subscriptions.notify(pubsubtopic2, msg3)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.messages.len() == 1
|
||||
# msg1 is the only match for the query predicate pubsubtopic1 AND (contentTopic1 OR contentTopic3)
|
||||
response.messages.anyIt(it == msg1)
|
||||
completionFut.complete(true)
|
||||
|
||||
await proto.query(rpc, handler)
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
||||
asyncTest "handle query with pubsub topic filter with no match":
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
||||
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
discard await listenSwitch.start()
|
||||
|
||||
let
|
||||
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
||||
pubsubtopic1 = "queried topic"
|
||||
pubsubtopic2 = "non queried topic"
|
||||
subscription: MessageNotificationSubscription = proto.subscription()
|
||||
# this query targets: pubsubtopic1
|
||||
rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
|
||||
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
|
||||
listenSwitch.mount(proto)
|
||||
|
||||
# publish messages
|
||||
await subscriptions.notify(pubsubtopic2, msg1)
|
||||
await subscriptions.notify(pubsubtopic2, msg2)
|
||||
await subscriptions.notify(pubsubtopic2, msg3)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.messages.len() == 0
|
||||
completionFut.complete(true)
|
||||
|
||||
await proto.query(rpc, handler)
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
asyncTest "handle query with pubsub topic filter matching the entire stored messages":
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
||||
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
discard await listenSwitch.start()
|
||||
|
||||
let
|
||||
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
||||
pubsubtopic = "queried topic"
|
||||
subscription: MessageNotificationSubscription = proto.subscription()
|
||||
# this query targets: pubsubtopic
|
||||
rpc = HistoryQuery(pubsubTopic: pubsubtopic)
|
||||
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
|
||||
listenSwitch.mount(proto)
|
||||
|
||||
# publish messages
|
||||
await subscriptions.notify(pubsubtopic, msg1)
|
||||
await subscriptions.notify(pubsubtopic, msg2)
|
||||
await subscriptions.notify(pubsubtopic, msg3)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.messages.len() == 3
|
||||
response.messages.anyIt(it == msg1)
|
||||
response.messages.anyIt(it == msg2)
|
||||
response.messages.anyIt(it == msg3)
|
||||
completionFut.complete(true)
|
||||
|
||||
await proto.query(rpc, handler)
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
||||
asyncTest "handle query with store and restarts":
|
||||
let
|
||||
|
||||
@ -4,16 +4,17 @@ import
|
||||
../../../utils/pagination
|
||||
|
||||
## This module defines a message store interface. Implementations of
|
||||
## MessageStore are used by the `WakuStore` protocol to store and re-
|
||||
## trieve historical messages
|
||||
## MessageStore are used by the `WakuStore` protocol to store and
|
||||
## retrieve historical messages
|
||||
|
||||
type
|
||||
DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.}
|
||||
DataProc* = proc(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) {.closure.}
|
||||
|
||||
MessageStoreResult*[T] = Result[T, string]
|
||||
|
||||
MessageStore* = ref object of RootObj
|
||||
|
||||
# MessageStore interface
|
||||
method put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] {.base.} = discard
|
||||
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
|
||||
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
|
||||
|
||||
|
||||
@ -13,6 +13,7 @@ import
|
||||
|
||||
export sqlite
|
||||
|
||||
const TABLE_TITLE = "Message"
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
#
|
||||
@ -28,10 +29,11 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
|
||||
## - 4-Byte ContentTopic stored as an Integer
|
||||
## - Payload stored as a blob
|
||||
let prepare = db.prepareStmt("""
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
|
||||
id BLOB PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
pubsubTopic BLOB NOT NULL,
|
||||
payload BLOB
|
||||
) WITHOUT ROWID;
|
||||
""", NoParams, void)
|
||||
@ -45,7 +47,7 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
|
||||
|
||||
ok(WakuMessageStore(database: db))
|
||||
|
||||
method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] =
|
||||
method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
||||
## Adds a message to the storage.
|
||||
##
|
||||
## **Example:**
|
||||
@ -56,15 +58,15 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageS
|
||||
## echo "error"
|
||||
##
|
||||
let prepare = db.database.prepareStmt(
|
||||
"INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);",
|
||||
(seq[byte], int64, seq[byte], seq[byte]),
|
||||
"INSERT INTO " & TABLE_TITLE & " (id, timestamp, contentTopic, payload, pubsubTopic) VALUES (?, ?, ?, ?, ?);",
|
||||
(seq[byte], int64, seq[byte], seq[byte], seq[byte]),
|
||||
void
|
||||
)
|
||||
|
||||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload))
|
||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes()))
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
@ -91,12 +93,15 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
||||
topicL = sqlite3_column_bytes(s,1)
|
||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
||||
l = sqlite3_column_bytes(s, 2)
|
||||
pubsubTopic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
|
||||
pubsubTopicL = sqlite3_column_bytes(s,3)
|
||||
|
||||
onData(uint64(timestamp),
|
||||
WakuMessage(contentTopic: ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicL-1)))),
|
||||
payload: @(toOpenArray(p, 0, l-1))))
|
||||
payload: @(toOpenArray(p, 0, l-1))),
|
||||
string.fromBytes(@(toOpenArray(pubsubTopic, 0, pubsubTopicL-1))))
|
||||
|
||||
let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages ORDER BY timestamp ASC", msg)
|
||||
let res = db.database.query("SELECT timestamp, contentTopic, payload, pubsubTopic FROM " & TABLE_TITLE & " ORDER BY timestamp ASC", msg)
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
||||
@ -27,7 +27,7 @@ logScope:
|
||||
topics = "wakustore"
|
||||
|
||||
const
|
||||
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta2"
|
||||
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta3"
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
@ -124,24 +124,21 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
|
||||
var msg = HistoryQuery()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
# var topics: seq[ContentTopic]
|
||||
discard ? pb.getField(2, msg.pubsubTopic)
|
||||
|
||||
# discard ? pb.getRepeatedField(2, topics)
|
||||
# msg.topics = topics
|
||||
var buffs: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(2, buffs)
|
||||
discard ? pb.getRepeatedField(3, buffs)
|
||||
|
||||
for buf in buffs:
|
||||
msg.contentFilters.add(? HistoryContentFilter.init(buf))
|
||||
|
||||
|
||||
var pagingInfoBuffer: seq[byte]
|
||||
discard ? pb.getField(3, pagingInfoBuffer)
|
||||
discard ? pb.getField(4, pagingInfoBuffer)
|
||||
|
||||
msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer)
|
||||
|
||||
discard ? pb.getField(4, msg.startTime)
|
||||
discard ? pb.getField(5, msg.endTime)
|
||||
discard ? pb.getField(5, msg.startTime)
|
||||
discard ? pb.getField(6, msg.endTime)
|
||||
|
||||
|
||||
ok(msg)
|
||||
@ -186,16 +183,17 @@ proc encode*(filter: HistoryContentFilter): ProtoBuffer =
|
||||
|
||||
proc encode*(query: HistoryQuery): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
# for topic in query.topics:
|
||||
# result.write(2, topic)
|
||||
for filter in query.contentFilters:
|
||||
result.write(2, filter.encode())
|
||||
|
||||
result.write(3, query.pagingInfo.encode())
|
||||
result.write(2, query.pubsubTopic)
|
||||
|
||||
for filter in query.contentFilters:
|
||||
result.write(3, filter.encode())
|
||||
|
||||
result.write(4, query.pagingInfo.encode())
|
||||
|
||||
result.write(5, query.startTime)
|
||||
result.write(6, query.endTime)
|
||||
|
||||
result.write(4, query.startTime)
|
||||
result.write(5, query.endTime)
|
||||
|
||||
proc encode*(response: HistoryResponse): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
@ -311,12 +309,24 @@ proc paginateWithoutIndex(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (se
|
||||
|
||||
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
||||
result = HistoryResponse(messages: newSeq[WakuMessage]())
|
||||
# data holds IndexedWakuMessage whose topics match the query
|
||||
var data : seq[IndexedWakuMessage] = @[]
|
||||
for filter in query.contentFilters:
|
||||
var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic)
|
||||
# TODO remove duplicates from data
|
||||
data.add(matched)
|
||||
var data : seq[IndexedWakuMessage] = w.messages
|
||||
|
||||
# filter based on content filters
|
||||
# an empty list of contentFilters means no content filter is requested
|
||||
if ((query.contentFilters).len != 0):
|
||||
# matchedMessages holds IndexedWakuMessage whose content topics match the queried Content filters
|
||||
var matchedMessages : seq[IndexedWakuMessage] = @[]
|
||||
for filter in query.contentFilters:
|
||||
var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic)
|
||||
matchedMessages.add(matched)
|
||||
# remove duplicates
|
||||
# duplicates may exist if two content filters target the same content topic, then the matched message gets added more than once
|
||||
data = matchedMessages.deduplicate()
|
||||
|
||||
# filter based on pubsub topic
|
||||
# an empty pubsub topic means no pubsub topic filter is requested
|
||||
if ((query.pubsubTopic).len != 0):
|
||||
data = data.filterIt(it.pubsubTopic == query.pubsubTopic)
|
||||
|
||||
# temporal filtering
|
||||
# check whether the history query contains a time filter
|
||||
@ -366,8 +376,9 @@ method init*(ws: WakuStore) =
|
||||
if ws.store.isNil:
|
||||
return
|
||||
|
||||
proc onData(timestamp: uint64, msg: WakuMessage) =
|
||||
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex()))
|
||||
proc onData(timestamp: uint64, msg: WakuMessage, pubsubTopic: string) =
|
||||
# TODO index should not be recalculated
|
||||
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(), pubsubTopic: pubsubTopic))
|
||||
|
||||
let res = ws.store.getAll(onData)
|
||||
if res.isErr:
|
||||
@ -399,17 +410,17 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription =
|
||||
proc handle(topic: string, msg: WakuMessage) {.async.} =
|
||||
debug "subscription handle", topic=topic
|
||||
let index = msg.computeIndex()
|
||||
proto.messages.add(IndexedWakuMessage(msg: msg, index: index))
|
||||
proto.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
||||
waku_store_messages.inc(labelValues = ["stored"])
|
||||
if proto.store.isNil:
|
||||
return
|
||||
|
||||
let res = proto.store.put(index, msg)
|
||||
let res = proto.store.put(index, msg, topic)
|
||||
if res.isErr:
|
||||
warn "failed to store messages", err = res.error
|
||||
waku_store_errors.inc(labelValues = ["store_failure"])
|
||||
|
||||
MessageNotificationSubscription.init(@[], handle)
|
||||
result = MessageNotificationSubscription.init(@[], handle)
|
||||
|
||||
proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
|
||||
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
||||
|
||||
@ -23,9 +23,11 @@ type
|
||||
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||
|
||||
IndexedWakuMessage* = object
|
||||
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
|
||||
## This type is used to encapsulate a WakuMessage and its Index
|
||||
msg*: WakuMessage
|
||||
index*: Index
|
||||
pubsubTopic*: string
|
||||
|
||||
PagingDirection* {.pure.} = enum
|
||||
## PagingDirection determines the direction of pagination
|
||||
@ -40,6 +42,7 @@ type
|
||||
|
||||
HistoryQuery* = object
|
||||
contentFilters*: seq[HistoryContentFilter]
|
||||
pubsubTopic*: string
|
||||
pagingInfo*: PagingInfo # used for pagination
|
||||
startTime*: float64 # used for time-window query
|
||||
endTime*: float64 # used for time-window query
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user