From 054dc61763d34ba7d73f35fa95c1a53d2f1a4bf8 Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Wed, 9 Nov 2022 15:00:11 +0100 Subject: [PATCH] refactor: continue pubsub/content types started in #1352 (#1362) * refactor: continue gossip/content topic refactor started in #1352 * refactor: enforce using pubsubTopic instead of topic --- tests/v2/test_message_cache.nim | 22 +++++------- tests/v2/test_rest_relay_api.nim | 23 ++++++------ tests/v2/test_rest_relay_api_serdes.nim | 7 ++-- waku/v2/node/jsonrpc/jsonrpc_utils.nim | 10 ++---- waku/v2/node/rest/relay/api_types.nim | 35 +++++++------------ waku/v2/node/rest/relay/relay_api.nim | 3 +- waku/v2/node/rest/relay/topic_cache.nim | 14 ++++---- waku/v2/node/waku_node.nim | 4 +-- waku/v2/protocol/waku_message.nim | 4 +-- waku/v2/protocol/waku_relay.nim | 2 +- .../waku_rln_relay/waku_rln_relay_utils.nim | 4 +-- 11 files changed, 54 insertions(+), 74 deletions(-) diff --git a/tests/v2/test_message_cache.nim b/tests/v2/test_message_cache.nim index fecbcba08..d9b055141 100644 --- a/tests/v2/test_message_cache.nim +++ b/tests/v2/test_message_cache.nim @@ -10,16 +10,12 @@ import ./testlib/common - -type PubsubTopicString = string - -type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)] - +type TestMessageCache = MessageCache[(PubsubTopic, ContentTopic)] suite "MessageCache": test "subscribe to topic": ## Given - let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let cache = TestMessageCache.init() ## When @@ -32,7 +28,7 @@ suite "MessageCache": test "unsubscribe from topic": ## Given - let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let cache = TestMessageCache.init() # Init cache content @@ -48,7 +44,7 @@ suite "MessageCache": test "get messages of a subscribed topic": ## Given - let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() let cache = TestMessageCache.init() @@ -67,7 +63,7 @@ suite "MessageCache": test "get messages with clean flag shoud clear the messages cache": ## Given - let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() let cache = TestMessageCache.init() @@ -89,7 +85,7 @@ suite "MessageCache": test "get messages of a non-subscribed topic": ## Given - let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let cache = TestMessageCache.init() ## When @@ -103,7 +99,7 @@ suite "MessageCache": test "add messages to subscribed topic": ## Given - let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() let cache = TestMessageCache.init() @@ -120,7 +116,7 @@ suite "MessageCache": test "add messages to non-subscribed topic": ## Given - let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() let cache = TestMessageCache.init() @@ -136,7 +132,7 @@ suite "MessageCache": test "add messages beyond the capacity": ## Given - let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic")) let testMessages = @[ fakeWakuMessage(toBytes("MSG-1")), fakeWakuMessage(toBytes("MSG-2")), diff --git a/tests/v2/test_rest_relay_api.nim b/tests/v2/test_rest_relay_api.nim index 85af8630c..fd1b460bd 100644 --- a/tests/v2/test_rest_relay_api.nim +++ b/tests/v2/test_rest_relay_api.nim @@ -45,9 +45,9 @@ suite "REST API - Relay": restServer.start() let pubSubTopics = @[ - PubSubTopicString("pubsub-topic-1"), - PubSubTopicString("pubsub-topic-2"), - PubSubTopicString("pubsub-topic-3") + PubSubTopic("pubsub-topic-1"), + PubSubTopic("pubsub-topic-2"), + PubSubTopic("pubsub-topic-3") ] # When @@ -94,10 +94,10 @@ suite "REST API - Relay": restServer.start() let pubSubTopics = @[ - PubSubTopicString("pubsub-topic-1"), - PubSubTopicString("pubsub-topic-2"), - PubSubTopicString("pubsub-topic-3"), - PubSubTopicString("pubsub-topic-y") + PubSubTopic("pubsub-topic-1"), + PubSubTopic("pubsub-topic-2"), + PubSubTopic("pubsub-topic-3"), + PubSubTopic("pubsub-topic-y") ] # When @@ -190,7 +190,6 @@ suite "REST API - Relay": restServer.start() let client = newRestHttpClient(initTAddress(restAddress, restPort)) - const defaultContentTopic = ContentTopic("/waku/2/default-content/proto") # At this stage the node is only subscribed to the default topic require(PubSub(node.wakuRelay).topics.len == 1) @@ -198,15 +197,15 @@ suite "REST API - Relay": # When let newTopics = @[ - PubSubTopicString("pubsub-topic-1"), - PubSubTopicString("pubsub-topic-2"), - PubSubTopicString("pubsub-topic-3") + PubSubTopic("pubsub-topic-1"), + PubSubTopic("pubsub-topic-2"), + PubSubTopic("pubsub-topic-3") ] discard await client.relayPostSubscriptionsV1(newTopics) let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage( payload: Base64String.encode("TEST-PAYLOAD"), - contentTopic: some(ContentTopicString(defaultContentTopic)), + contentTopic: some(DefaultContentTopic), timestamp: some(int64(2022)) )) diff --git a/tests/v2/test_rest_relay_api_serdes.nim b/tests/v2/test_rest_relay_api_serdes.nim index fe3b2a43b..132ac06df 100644 --- a/tests/v2/test_rest_relay_api_serdes.nim +++ b/tests/v2/test_rest_relay_api_serdes.nim @@ -8,7 +8,8 @@ import import ../../waku/v2/node/rest/serdes, ../../waku/v2/node/rest/base64, - ../../waku/v2/node/rest/relay/api_types + ../../waku/v2/node/rest/relay/api_types, + ../../waku/v2/protocol/waku_message suite "Relay API - serialization": @@ -36,8 +37,8 @@ suite "Relay API - serialization": # Given let payload = Base64String.encode("MESSAGE") let data = RelayWakuMessage( - payload: payload, - contentTopic: none(ContentTopicString), + payload: payload, + contentTopic: none(ContentTopic), version: none(Natural), timestamp: none(int64) ) diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index 3105fe2cc..eafe7d06c 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -44,7 +44,6 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse = pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions)) proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = - const defaultCT = ContentTopic("/waku/2/default-content/proto") var t: Timestamp if relayMessage.timestamp.isSome: t = relayMessage.timestamp.get @@ -52,14 +51,11 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessag # incoming WakuRelayMessages with no timestamp will get 0 timestamp t = Timestamp(0) WakuMessage(payload: relayMessage.payload, - contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, + contentTopic: relayMessage.contentTopic.get(DefaultContentTopic), version: version, timestamp: t) proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage = - # @TODO global definition for default content topic - const defaultCT = ContentTopic("/waku/2/default-content/proto") - let payload = Payload(payload: relayMessage.payload, dst: pubKey, symkey: symkey) @@ -72,13 +68,11 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Hm t = Timestamp(0) WakuMessage(payload: payload.encode(version, rng[]).get(), - contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, + contentTopic: relayMessage.contentTopic.get(DefaultContentTopic), version: version, timestamp: t) proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage = - # @TODO global definition for default content topic - let keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get()) elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get()) diff --git a/waku/v2/node/rest/relay/api_types.nim b/waku/v2/node/rest/relay/api_types.nim index a60a03cf8..a57c9ab83 100644 --- a/waku/v2/node/rest/relay/api_types.nim +++ b/waku/v2/node/rest/relay/api_types.nim @@ -17,13 +17,9 @@ import #### Types -type - PubSubTopicString* = distinct string - ContentTopicString* = distinct string - type RelayWakuMessage* = object payload*: Base64String - contentTopic*: Option[ContentTopicString] + contentTopic*: Option[ContentTopic] version*: Option[Natural] timestamp*: Option[int64] @@ -33,8 +29,8 @@ type RelayPostMessagesRequest* = RelayWakuMessage type - RelayPostSubscriptionsRequest* = seq[PubSubTopicString] - RelayDeleteSubscriptionsRequest* = seq[PubSubTopicString] + RelayPostSubscriptionsRequest* = seq[PubSubTopic] + RelayDeleteSubscriptionsRequest* = seq[PubSubTopic] #### Type conversion @@ -42,16 +38,15 @@ type proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage = RelayWakuMessage( payload: base64.encode(Base64String, msg.payload), - contentTopic: some(ContentTopicString(msg.contentTopic)), + contentTopic: some(msg.contentTopic), version: some(Natural(msg.version)), timestamp: some(msg.timestamp) ) proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] = - const defaultContentTopic = ContentTopicString("/waku/2/default-content/proto") let payload = ?msg.payload.decode() - contentTopic = ContentTopic(msg.contentTopic.get(defaultContentTopic)) + contentTopic = msg.contentTopic.get(DefaultContentTopic) version = uint32(msg.version.get(version)) timestamp = msg.timestamp.get(0) @@ -64,13 +59,9 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String) {.raises: [IOError, Defect].} = writer.writeValue(string(value)) -proc writeValue*(writer: var JsonWriter[RestJson], value: PubSubTopicString) +proc writeValue*(writer: var JsonWriter[RestJson], topic: PubSubTopic|ContentTopic) {.raises: [IOError, Defect].} = - writer.writeValue(string(value)) - -proc writeValue*(writer: var JsonWriter[RestJson], value: ContentTopicString) - {.raises: [IOError, Defect].} = - writer.writeValue(string(value)) + writer.writeValue(string(topic)) proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage) {.raises: [IOError, Defect].} = @@ -88,19 +79,19 @@ proc readValue*(reader: var JsonReader[RestJson], value: var Base64String) {.raises: [SerializationError, IOError, Defect].} = value = Base64String(reader.readValue(string)) -proc readValue*(reader: var JsonReader[RestJson], value: var PubSubTopicString) +proc readValue*(reader: var JsonReader[RestJson], pubsubTopic: var PubSubTopic) {.raises: [SerializationError, IOError, Defect].} = - value = PubSubTopicString(reader.readValue(string)) + pubsubTopic = PubSubTopic(reader.readValue(string)) -proc readValue*(reader: var JsonReader[RestJson], value: var ContentTopicString) +proc readValue*(reader: var JsonReader[RestJson], contentTopic: var ContentTopic) {.raises: [SerializationError, IOError, Defect].} = - value = ContentTopicString(reader.readValue(string)) + contentTopic = ContentTopic(reader.readValue(string)) proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) {.raises: [SerializationError, IOError, Defect].} = var payload = none(Base64String) - contentTopic = none(ContentTopicString) + contentTopic = none(ContentTopic) version = none(Natural) timestamp = none(int64) @@ -116,7 +107,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) of "payload": payload = some(reader.readValue(Base64String)) of "contentTopic": - contentTopic = some(reader.readValue(ContentTopicString)) + contentTopic = some(reader.readValue(ContentTopic)) of "version": version = some(reader.readValue(Natural)) of "timestamp": diff --git a/waku/v2/node/rest/relay/relay_api.nim b/waku/v2/node/rest/relay/relay_api.nim index 312a33a5d..4b9c4374a 100644 --- a/waku/v2/node/rest/relay/relay_api.nim +++ b/waku/v2/node/rest/relay/relay_api.nim @@ -12,6 +12,7 @@ import presto/[route, client, common] import ../../waku_node, + ../../../protocol/waku_message, ../serdes, ../utils, ./api_types, @@ -157,7 +158,7 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, topicCache #### Client -proc encodeBytes*(value: seq[PubSubTopicString], +proc encodeBytes*(value: seq[PubSubTopic], contentType: string): RestResult[seq[byte]] = if MediaType.init(contentType) != MIMETYPE_JSON: error "Unsupported contentType value", contentType = contentType diff --git a/waku/v2/node/rest/relay/topic_cache.nim b/waku/v2/node/rest/relay/topic_cache.nim index aca83626b..bf0c92888 100644 --- a/waku/v2/node/rest/relay/topic_cache.nim +++ b/waku/v2/node/rest/relay/topic_cache.nim @@ -20,11 +20,9 @@ export message_cache ##### TopicCache -type PubSubTopicString = string - type TopicCacheResult*[T] = MessageCacheResult[T] -type TopicCache* = MessageCache[PubSubTopicString] +type TopicCache* = MessageCache[PubSubTopic] ##### Message handler @@ -33,17 +31,17 @@ type TopicCacheMessageHandler* = Topichandler proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler = - let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} = - trace "Topic handler triggered", topic=topic + let handler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.async, closure.} = + trace "PubsubTopic handler triggered", pubsubTopic=pubsubTopic # Add message to current cache let msg = WakuMessage.decode(data) if msg.isErr(): - debug "WakuMessage received but failed to decode", msg=msg, topic=topic + debug "WakuMessage received but failed to decode", msg=msg, pubsubTopic=pubsubTopic # TODO: handle message decode failure return - trace "WakuMessage received", msg=msg, topic=topic - cache.addMessage(PubSubTopicString(topic), msg.get()) + trace "WakuMessage received", msg=msg, pubsubTopic=pubsubTopic + cache.addMessage(PubSubTopic(pubsubTopic), msg.get()) handler \ No newline at end of file diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index d3b6868e7..9c879b519 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -347,7 +347,7 @@ proc startRelay*(node: WakuNode) {.async.} = info "starting relay" # PubsubTopic subscriptions - for topic in node.wakuRelay.defaultTopics: + for topic in node.wakuRelay.defaultPubsubTopics: node.subscribe(topic, none(TopicHandler)) # Resume previous relay connections @@ -393,7 +393,7 @@ proc mountRelay*(node: WakuNode, ## The default relay topics is the union of ## all configured topics plus the hard-coded defaultTopic(s) - wakuRelay.defaultTopics = concat(@[DefaultPubsubTopic], topics) + wakuRelay.defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics) ## Add peer exchange handler if peerExchangeHandler.isSome(): diff --git a/waku/v2/protocol/waku_message.nim b/waku/v2/protocol/waku_message.nim index 8d451c65f..6b0e7b9e3 100644 --- a/waku/v2/protocol/waku_message.nim +++ b/waku/v2/protocol/waku_message.nim @@ -27,8 +27,8 @@ type ContentTopic* = string const - DefaultPubsubTopic*: PubsubTopic = "/waku/2/default-waku/proto" - DefaultContentTopic*: ContentTopic = "/waku/2/default-content/proto" + DefaultPubsubTopic*: PubsubTopic = PubsubTopic("/waku/2/default-waku/proto") + DefaultContentTopic*: ContentTopic = ContentTopic("/waku/2/default-content/proto") type WakuMessage* = object diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index c6b11fe28..b3e749072 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -25,7 +25,7 @@ const type WakuRelay* = ref object of GossipSub - defaultTopics*: seq[PubsubTopic] # Default configured PubSub topics + defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics method init*(w: WakuRelay) = debug "init WakuRelay" diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index 23e2bc2a3..1705716fb 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -1115,7 +1115,7 @@ proc mountRlnRelayStatic*(node: WakuNode, if node.wakuRelay.isNil(): return err("WakuRelay protocol is not mounted") # check whether the pubsub topic is supported at the relay level - if pubsubTopic notin node.wakuRelay.defaultTopics: + if pubsubTopic notin node.wakuRelay.defaultPubsubTopics: return err("The relay protocol does not support the configured pubsub topic") debug "rln-relay input validation passed" @@ -1170,7 +1170,7 @@ proc mountRlnRelayDynamic*(node: WakuNode, if node.wakuRelay.isNil: return err("WakuRelay protocol is not mounted.") # check whether the pubsub topic is supported at the relay level - if pubsubTopic notin node.wakuRelay.defaultTopics: + if pubsubTopic notin node.wakuRelay.defaultPubsubTopics: return err("WakuRelay protocol does not support the configured pubsub topic.") debug "rln-relay input validation passed"