From 3903f130cb83d47c0c548ec976b510a9b8b66ae9 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Wed, 14 Feb 2024 17:29:59 +0200 Subject: [PATCH] feat: supporting meta field in WakuMessage (#2384) --- tests/wakunode2/test_validators.nim | 27 ++++++----- tests/wakunode_rest/test_rest_filter.nim | 3 +- tests/wakunode_rest/test_rest_relay.nim | 15 +++--- tests/wakunode_rest/test_rest_store.nim | 60 +++++++++++++++++++++++- waku/waku_api/rest/filter/client.nim | 1 + waku/waku_api/rest/filter/types.nim | 16 +++++-- waku/waku_api/rest/relay/types.nim | 18 +++++-- waku/waku_api/rest/store/types.nim | 15 ++++-- 8 files changed, 125 insertions(+), 30 deletions(-) diff --git a/tests/wakunode2/test_validators.nim b/tests/wakunode2/test_validators.nim index 4f9e7d33d..8fe82bcca 100644 --- a/tests/wakunode2/test_validators.nim +++ b/tests/wakunode2/test_validators.nim @@ -187,23 +187,28 @@ suite "WakuNode2 - Validators": version: 2, timestamp: afterTimestamp, ephemeral: true) discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) - # Wait for gossip - await sleepAsync(4.seconds) - # Since we have a full mesh with 5 nodes and each one publishes 25+25+25+25+25 msgs # there are 625 messages being sent. # 125 are received ok in the handler (first hop) # 500 are are wrong so rejected (rejected not relayed) + + var msgRejected = 0 + + # Active wait for the messages to be delivered across the mesh + for i in 0..<100: + msgRejected = 0 + for i in 0..<5: + for k, v in nodes[i].wakuRelay.peerStats.mpairs: + msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int + + if msgReceived == 125 and msgRejected == 500: + break + else: + await sleepAsync(100.milliseconds) + check: msgReceived == 125 - - var msgRejected = 0 - for i in 0..<5: - for k, v in nodes[i].wakuRelay.peerStats.mpairs: - msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int - - check: - msgRejected == 500 + msgRejected == 500 await allFutures(nodes.mapIt(it.stop())) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index da1ef1e7e..b86bb8925 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -261,7 +261,8 @@ suite "Waku v2 Rest API - Filter V2": let testMessage = WakuMessage( payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(), contentTopic: "1", - timestamp: int64(2022) + timestamp: int64(2022), + meta: "test-meta".toBytes() ) let postMsgResponse = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index 8b78c5a8d..5c64faad7 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -154,16 +154,19 @@ suite "Waku v2 Rest API - Relay": let pubSubTopic = "/waku/2/default-waku/proto" var messages = @[ - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"), + meta = toBytes("test-meta") ) ] # Prevent duplicate messages for i in 0..<2: - var msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) + var msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"), + meta = toBytes("test-meta")) while msg == messages[i]: - msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) - + msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"), + meta = toBytes("test-meta")) + messages.add(msg) let cache = MessageCache.init() @@ -188,8 +191,8 @@ suite "Waku v2 Rest API - Relay": msg.payload == base64.encode("TEST-1") and msg.contentTopic.get() == "content-topic-x" and msg.version.get() == 2 and - msg.timestamp.get() != Timestamp(0) - + msg.timestamp.get() != Timestamp(0) and + msg.meta.get() == base64.encode("test-meta") check: cache.isPubsubSubscribed(pubSubTopic) diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index 24cbf48b0..6b617970c 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -24,6 +24,7 @@ import ../../../waku/waku_archive, ../../../waku/waku_archive/driver/queue_driver, ../../../waku/waku_store as waku_store, + ../../../waku/common/base64, ../testlib/common, ../testlib/wakucore, ../testlib/wakunode @@ -560,7 +561,7 @@ procSuite "Waku v2 Rest API - Store": # Now prime it with some history before tests let msgList = @[ - fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0), + fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0, meta=(@[byte 8])), fakeWakuMessage(@[byte 1], ts=1), fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9) ] @@ -574,6 +575,7 @@ procSuite "Waku v2 Rest API - Store": await client.getStoreMessagesV1( none[string](), encodeUrl(DefaultPubsubTopic)) + check: response.status == 200 $response.contentType == $MIMETYPE_JSON @@ -598,3 +600,59 @@ procSuite "Waku v2 Rest API - Store": response.status == 200 $response.contentType == $MIMETYPE_JSON response.data.messages.len == 0 + + asyncTest "correct message fields are returned": + + # Given + let node = testWakuNode() + await node.start() + + var restPort = Port(0) + let restAddress = parseIpAddress("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + restPort = restServer.server.address.port # update with bound port for client use + + installStoreApiHandlers(restServer.router, node) + restServer.start() + + # WakuStore setup + let driver: ArchiveDriver = QueueDriver.new() + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + + await node.mountStore() + + # Now prime it with some history before tests + let msg = fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0, meta=(@[byte 8])) + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + # Filtering by a known pubsub topic. + var response = + await client.getStoreMessagesV1( + none[string](), + encodeUrl(DefaultPubsubTopic)) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.messages.len == 1 + + let storeMessage = response.data.messages[0] + + check: + storeMessage.contentTopic.isSome() + storeMessage.version.isSome() + storeMessage.timestamp.isSome() + storeMessage.ephemeral.isSome() + storeMessage.meta.isSome() + + check: + storeMessage.payload == base64.encode(msg.payload) + storeMessage.contentTopic.get() == msg.contentTopic + storeMessage.version.get() == msg.version + storeMessage.timestamp.get() == msg.timestamp + storeMessage.ephemeral.get() == msg.ephemeral + storeMessage.meta.get() == base64.encode(msg.meta) diff --git a/waku/waku_api/rest/filter/client.nim b/waku/waku_api/rest/filter/client.nim index e16a39ada..7be43153b 100644 --- a/waku/waku_api/rest/filter/client.nim +++ b/waku/waku_api/rest/filter/client.nim @@ -13,6 +13,7 @@ import json_serialization/std/options, presto/[route, client, common] import + ../../../common/base64, ../../../waku_core, ../serdes, ../responses, diff --git a/waku/waku_api/rest/filter/types.nim b/waku/waku_api/rest/filter/types.nim index 6930a2312..8135b5ebe 100644 --- a/waku/waku_api/rest/filter/types.nim +++ b/waku/waku_api/rest/filter/types.nim @@ -22,6 +22,7 @@ type FilterWakuMessage* = object contentTopic*: Option[ContentTopic] version*: Option[Natural] timestamp*: Option[int64] + meta*: Option[Base64String] type FilterGetMessagesResponse* = seq[FilterWakuMessage] @@ -58,7 +59,8 @@ proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage = payload: base64.encode(msg.payload), contentTopic: some(msg.contentTopic), version: some(Natural(msg.version)), - timestamp: some(msg.timestamp) + timestamp: some(msg.timestamp), + meta: if msg.meta.len > 0: some(base64.encode(msg.meta)) else: none(Base64String) ) proc toWakuMessage*(msg: FilterWakuMessage, version = 0): Result[WakuMessage, string] = @@ -67,8 +69,10 @@ proc toWakuMessage*(msg: FilterWakuMessage, version = 0): Result[WakuMessage, st contentTopic = msg.contentTopic.get(DefaultContentTopic) version = uint32(msg.version.get(version)) timestamp = msg.timestamp.get(0) + meta = ?msg.meta.get(Base64String("")).decode() - ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, timestamp: timestamp)) + ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, + timestamp: timestamp, meta: meta)) #### Serialization and deserialization @@ -82,6 +86,8 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage) writer.writeField("version", value.version.get()) if value.timestamp.isSome(): writer.writeField("timestamp", value.timestamp.get()) + if value.meta.isSome(): + writer.writeField("meta", value.meta.get()) writer.endRecord() proc writeValue*(writer: var JsonWriter, value: FilterLegacySubscribeRequest) @@ -136,6 +142,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage) contentTopic = none(ContentTopic) version = none(Natural) timestamp = none(int64) + meta = none(Base64String) var keys = initHashSet[string]() for fieldName in readObjectFields(reader): @@ -154,6 +161,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage) version = some(reader.readValue(Natural)) of "timestamp": timestamp = some(reader.readValue(int64)) + of "meta": + meta = some(reader.readValue(Base64String)) else: unrecognizedFieldWarning() @@ -164,7 +173,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage) payload: payload.get(), contentTopic: contentTopic, version: version, - timestamp: timestamp + timestamp: timestamp, + meta: meta ) proc readValue*(reader: var JsonReader[RestJson], value: var FilterLegacySubscribeRequest) diff --git a/waku/waku_api/rest/relay/types.nim b/waku/waku_api/rest/relay/types.nim index 6dd440c9b..6be6e9191 100644 --- a/waku/waku_api/rest/relay/types.nim +++ b/waku/waku_api/rest/relay/types.nim @@ -22,7 +22,7 @@ type RelayWakuMessage* = object contentTopic*: Option[ContentTopic] version*: Option[Natural] timestamp*: Option[int64] - + meta*: Option[Base64String] type RelayGetMessagesResponse* = seq[RelayWakuMessage] @@ -35,7 +35,8 @@ proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage = payload: base64.encode(msg.payload), contentTopic: some(msg.contentTopic), version: some(Natural(msg.version)), - timestamp: some(msg.timestamp) + timestamp: some(msg.timestamp), + meta: if msg.meta.len > 0: some(base64.encode(msg.meta)) else: none(Base64String) ) proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, string] = @@ -43,14 +44,15 @@ proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, str payload = ?msg.payload.decode() contentTopic = msg.contentTopic.get(DefaultContentTopic) version = uint32(msg.version.get(version)) + meta = ?msg.meta.get(Base64String("")).decode() var timestamp = msg.timestamp.get(0) if timestamp == 0: timestamp = getNanosecondTime(getTime().toUnixFloat()) - return ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, timestamp: timestamp)) - + return ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, + timestamp: timestamp, meta: meta)) #### Serialization and deserialization @@ -64,6 +66,8 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage) writer.writeField("version", value.version.get()) if value.timestamp.isSome(): writer.writeField("timestamp", value.timestamp.get()) + if value.meta.isSome(): + writer.writeField("meta", value.meta.get()) writer.endRecord() proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) @@ -73,6 +77,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) contentTopic = none(ContentTopic) version = none(Natural) timestamp = none(int64) + meta = none(Base64String) var keys = initHashSet[string]() for fieldName in readObjectFields(reader): @@ -91,6 +96,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) version = some(reader.readValue(Natural)) of "timestamp": timestamp = some(reader.readValue(int64)) + of "meta": + meta = some(reader.readValue(Base64String)) else: unrecognizedFieldWarning() @@ -104,5 +111,6 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) payload: payload.get(), contentTopic: contentTopic, version: version, - timestamp: timestamp + timestamp: timestamp, + meta: meta ) diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index d2195acb5..b4d3f76b9 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -42,6 +42,7 @@ type version*: Option[uint32] timestamp*: Option[Timestamp] ephemeral*: Option[bool] + meta*: Option[Base64String] StoreResponseRest* = object # inspired by https://rfc.vac.dev/spec/16/#storeresponse @@ -97,7 +98,8 @@ proc toWakuMessage*(message: StoreWakuMessage): WakuMessage = contentTopic: message.contentTopic.get(), version: message.version.get(), timestamp: message.timestamp.get(), - ephemeral: message.ephemeral.get() + ephemeral: message.ephemeral.get(), + meta: message.meta.get(Base64String("")).decode().get() ) # Converts a 'HistoryResponse' object to an 'StoreResponseRest' @@ -110,7 +112,8 @@ proc toStoreResponseRest*(histResp: HistoryResponse): StoreResponseRest = contentTopic: some(message.contentTopic), version: some(message.version), timestamp: some(message.timestamp), - ephemeral: some(message.ephemeral) + ephemeral: some(message.ephemeral), + meta: if message.meta.len > 0: some(base64.encode(message.meta)) else: none(Base64String) ) var storeWakuMsgs: seq[StoreWakuMessage] @@ -146,6 +149,8 @@ proc writeValue*(writer: var JsonWriter, writer.writeField("timestamp", value.timestamp.get()) if value.ephemeral.isSome(): writer.writeField("ephemeral", value.ephemeral.get()) + if value.meta.isSome(): + writer.writeField("meta", value.meta.get()) writer.endRecord() proc readValue*(reader: var JsonReader, @@ -157,6 +162,7 @@ proc readValue*(reader: var JsonReader, version = none(uint32) timestamp = none(Timestamp) ephemeral = none(bool) + meta = none(Base64String) var keys = initHashSet[string]() for fieldName in readObjectFields(reader): @@ -177,6 +183,8 @@ proc readValue*(reader: var JsonReader, timestamp = some(reader.readValue(Timestamp)) of "ephemeral": ephemeral = some(reader.readValue(bool)) + of "meta": + meta = some(reader.readValue(Base64String)) else: reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) @@ -188,7 +196,8 @@ proc readValue*(reader: var JsonReader, contentTopic: contentTopic, version: version, timestamp: timestamp, - ephemeral: ephemeral + ephemeral: ephemeral, + meta: meta ) ## End of StoreWakuMessage serde