feat: supporting meta field in WakuMessage (#2384)

This commit is contained in:
gabrielmer 2024-02-14 17:29:59 +02:00 committed by GitHub
parent 731dfcbdf6
commit 3903f130cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 125 additions and 30 deletions

View File

@ -187,23 +187,28 @@ suite "WakuNode2 - Validators":
version: 2, timestamp: afterTimestamp, ephemeral: true)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Wait for gossip
await sleepAsync(4.seconds)
# Since we have a full mesh with 5 nodes and each one publishes 25+25+25+25+25 msgs
# there are 625 messages being sent.
# 125 are received ok in the handler (first hop)
# 500 are are wrong so rejected (rejected not relayed)
var msgRejected = 0
# Active wait for the messages to be delivered across the mesh
for i in 0..<100:
msgRejected = 0
for i in 0..<5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int
if msgReceived == 125 and msgRejected == 500:
break
else:
await sleepAsync(100.milliseconds)
check:
msgReceived == 125
var msgRejected = 0
for i in 0..<5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int
check:
msgRejected == 500
msgRejected == 500
await allFutures(nodes.mapIt(it.stop()))

View File

@ -261,7 +261,8 @@ suite "Waku v2 Rest API - Filter V2":
let testMessage = WakuMessage(
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
contentTopic: "1",
timestamp: int64(2022)
timestamp: int64(2022),
meta: "test-meta".toBytes()
)
let postMsgResponse = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(

View File

@ -154,16 +154,19 @@ suite "Waku v2 Rest API - Relay":
let pubSubTopic = "/waku/2/default-waku/proto"
var messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"),
meta = toBytes("test-meta") )
]
# Prevent duplicate messages
for i in 0..<2:
var msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
var msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"),
meta = toBytes("test-meta"))
while msg == messages[i]:
msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"),
meta = toBytes("test-meta"))
messages.add(msg)
let cache = MessageCache.init()
@ -188,8 +191,8 @@ suite "Waku v2 Rest API - Relay":
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get() == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
msg.timestamp.get() != Timestamp(0) and
msg.meta.get() == base64.encode("test-meta")
check:
cache.isPubsubSubscribed(pubSubTopic)

View File

@ -24,6 +24,7 @@ import
../../../waku/waku_archive,
../../../waku/waku_archive/driver/queue_driver,
../../../waku/waku_store as waku_store,
../../../waku/common/base64,
../testlib/common,
../testlib/wakucore,
../testlib/wakunode
@ -560,7 +561,7 @@ procSuite "Waku v2 Rest API - Store":
# Now prime it with some history before tests
let msgList = @[
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0),
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0, meta=(@[byte 8])),
fakeWakuMessage(@[byte 1], ts=1),
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9)
]
@ -574,6 +575,7 @@ procSuite "Waku v2 Rest API - Store":
await client.getStoreMessagesV1(
none[string](),
encodeUrl(DefaultPubsubTopic))
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
@ -598,3 +600,59 @@ procSuite "Waku v2 Rest API - Store":
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.messages.len == 0
asyncTest "correct message fields are returned":
# Given
let node = testWakuNode()
await node.start()
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
installStoreApiHandlers(restServer.router, node)
restServer.start()
# WakuStore setup
let driver: ArchiveDriver = QueueDriver.new()
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
# Now prime it with some history before tests
let msg = fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0, meta=(@[byte 8]))
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
# Filtering by a known pubsub topic.
var response =
await client.getStoreMessagesV1(
none[string](),
encodeUrl(DefaultPubsubTopic))
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.messages.len == 1
let storeMessage = response.data.messages[0]
check:
storeMessage.contentTopic.isSome()
storeMessage.version.isSome()
storeMessage.timestamp.isSome()
storeMessage.ephemeral.isSome()
storeMessage.meta.isSome()
check:
storeMessage.payload == base64.encode(msg.payload)
storeMessage.contentTopic.get() == msg.contentTopic
storeMessage.version.get() == msg.version
storeMessage.timestamp.get() == msg.timestamp
storeMessage.ephemeral.get() == msg.ephemeral
storeMessage.meta.get() == base64.encode(msg.meta)

View File

@ -13,6 +13,7 @@ import
json_serialization/std/options,
presto/[route, client, common]
import
../../../common/base64,
../../../waku_core,
../serdes,
../responses,

View File

@ -22,6 +22,7 @@ type FilterWakuMessage* = object
contentTopic*: Option[ContentTopic]
version*: Option[Natural]
timestamp*: Option[int64]
meta*: Option[Base64String]
type FilterGetMessagesResponse* = seq[FilterWakuMessage]
@ -58,7 +59,8 @@ proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage =
payload: base64.encode(msg.payload),
contentTopic: some(msg.contentTopic),
version: some(Natural(msg.version)),
timestamp: some(msg.timestamp)
timestamp: some(msg.timestamp),
meta: if msg.meta.len > 0: some(base64.encode(msg.meta)) else: none(Base64String)
)
proc toWakuMessage*(msg: FilterWakuMessage, version = 0): Result[WakuMessage, string] =
@ -67,8 +69,10 @@ proc toWakuMessage*(msg: FilterWakuMessage, version = 0): Result[WakuMessage, st
contentTopic = msg.contentTopic.get(DefaultContentTopic)
version = uint32(msg.version.get(version))
timestamp = msg.timestamp.get(0)
meta = ?msg.meta.get(Base64String("")).decode()
ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, timestamp: timestamp))
ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version,
timestamp: timestamp, meta: meta))
#### Serialization and deserialization
@ -82,6 +86,8 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage)
writer.writeField("version", value.version.get())
if value.timestamp.isSome():
writer.writeField("timestamp", value.timestamp.get())
if value.meta.isSome():
writer.writeField("meta", value.meta.get())
writer.endRecord()
proc writeValue*(writer: var JsonWriter, value: FilterLegacySubscribeRequest)
@ -136,6 +142,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage)
contentTopic = none(ContentTopic)
version = none(Natural)
timestamp = none(int64)
meta = none(Base64String)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
@ -154,6 +161,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage)
version = some(reader.readValue(Natural))
of "timestamp":
timestamp = some(reader.readValue(int64))
of "meta":
meta = some(reader.readValue(Base64String))
else:
unrecognizedFieldWarning()
@ -164,7 +173,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage)
payload: payload.get(),
contentTopic: contentTopic,
version: version,
timestamp: timestamp
timestamp: timestamp,
meta: meta
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterLegacySubscribeRequest)

View File

@ -22,7 +22,7 @@ type RelayWakuMessage* = object
contentTopic*: Option[ContentTopic]
version*: Option[Natural]
timestamp*: Option[int64]
meta*: Option[Base64String]
type
RelayGetMessagesResponse* = seq[RelayWakuMessage]
@ -35,7 +35,8 @@ proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
payload: base64.encode(msg.payload),
contentTopic: some(msg.contentTopic),
version: some(Natural(msg.version)),
timestamp: some(msg.timestamp)
timestamp: some(msg.timestamp),
meta: if msg.meta.len > 0: some(base64.encode(msg.meta)) else: none(Base64String)
)
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, string] =
@ -43,14 +44,15 @@ proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, str
payload = ?msg.payload.decode()
contentTopic = msg.contentTopic.get(DefaultContentTopic)
version = uint32(msg.version.get(version))
meta = ?msg.meta.get(Base64String("")).decode()
var timestamp = msg.timestamp.get(0)
if timestamp == 0:
timestamp = getNanosecondTime(getTime().toUnixFloat())
return ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, timestamp: timestamp))
return ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version,
timestamp: timestamp, meta: meta))
#### Serialization and deserialization
@ -64,6 +66,8 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
writer.writeField("version", value.version.get())
if value.timestamp.isSome():
writer.writeField("timestamp", value.timestamp.get())
if value.meta.isSome():
writer.writeField("meta", value.meta.get())
writer.endRecord()
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
@ -73,6 +77,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
contentTopic = none(ContentTopic)
version = none(Natural)
timestamp = none(int64)
meta = none(Base64String)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
@ -91,6 +96,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
version = some(reader.readValue(Natural))
of "timestamp":
timestamp = some(reader.readValue(int64))
of "meta":
meta = some(reader.readValue(Base64String))
else:
unrecognizedFieldWarning()
@ -104,5 +111,6 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
payload: payload.get(),
contentTopic: contentTopic,
version: version,
timestamp: timestamp
timestamp: timestamp,
meta: meta
)

View File

@ -42,6 +42,7 @@ type
version*: Option[uint32]
timestamp*: Option[Timestamp]
ephemeral*: Option[bool]
meta*: Option[Base64String]
StoreResponseRest* = object
# inspired by https://rfc.vac.dev/spec/16/#storeresponse
@ -97,7 +98,8 @@ proc toWakuMessage*(message: StoreWakuMessage): WakuMessage =
contentTopic: message.contentTopic.get(),
version: message.version.get(),
timestamp: message.timestamp.get(),
ephemeral: message.ephemeral.get()
ephemeral: message.ephemeral.get(),
meta: message.meta.get(Base64String("")).decode().get()
)
# Converts a 'HistoryResponse' object to an 'StoreResponseRest'
@ -110,7 +112,8 @@ proc toStoreResponseRest*(histResp: HistoryResponse): StoreResponseRest =
contentTopic: some(message.contentTopic),
version: some(message.version),
timestamp: some(message.timestamp),
ephemeral: some(message.ephemeral)
ephemeral: some(message.ephemeral),
meta: if message.meta.len > 0: some(base64.encode(message.meta)) else: none(Base64String)
)
var storeWakuMsgs: seq[StoreWakuMessage]
@ -146,6 +149,8 @@ proc writeValue*(writer: var JsonWriter,
writer.writeField("timestamp", value.timestamp.get())
if value.ephemeral.isSome():
writer.writeField("ephemeral", value.ephemeral.get())
if value.meta.isSome():
writer.writeField("meta", value.meta.get())
writer.endRecord()
proc readValue*(reader: var JsonReader,
@ -157,6 +162,7 @@ proc readValue*(reader: var JsonReader,
version = none(uint32)
timestamp = none(Timestamp)
ephemeral = none(bool)
meta = none(Base64String)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
@ -177,6 +183,8 @@ proc readValue*(reader: var JsonReader,
timestamp = some(reader.readValue(Timestamp))
of "ephemeral":
ephemeral = some(reader.readValue(bool))
of "meta":
meta = some(reader.readValue(Base64String))
else:
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
@ -188,7 +196,8 @@ proc readValue*(reader: var JsonReader,
contentTopic: contentTopic,
version: version,
timestamp: timestamp,
ephemeral: ephemeral
ephemeral: ephemeral,
meta: meta
)
## End of StoreWakuMessage serde