refactor: continue pubsub/content types started in #1352 (#1362)

* refactor: continue gossip/content topic refactor started in #1352

* refactor: enforce using pubsubTopic instead of topic
This commit is contained in:
Alvaro Revuelta 2022-11-09 15:00:11 +01:00 committed by GitHub
parent 4acc611e47
commit 054dc61763
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 54 additions and 74 deletions

View File

@ -10,16 +10,12 @@ import
./testlib/common ./testlib/common
type TestMessageCache = MessageCache[(PubsubTopic, ContentTopic)]
type PubsubTopicString = string
type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)]
suite "MessageCache": suite "MessageCache":
test "subscribe to topic": test "subscribe to topic":
## Given ## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init() let cache = TestMessageCache.init()
## When ## When
@ -32,7 +28,7 @@ suite "MessageCache":
test "unsubscribe from topic": test "unsubscribe from topic":
## Given ## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init() let cache = TestMessageCache.init()
# Init cache content # Init cache content
@ -48,7 +44,7 @@ suite "MessageCache":
test "get messages of a subscribed topic": test "get messages of a subscribed topic":
## Given ## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage() let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init() let cache = TestMessageCache.init()
@ -67,7 +63,7 @@ suite "MessageCache":
test "get messages with clean flag shoud clear the messages cache": test "get messages with clean flag shoud clear the messages cache":
## Given ## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage() let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init() let cache = TestMessageCache.init()
@ -89,7 +85,7 @@ suite "MessageCache":
test "get messages of a non-subscribed topic": test "get messages of a non-subscribed topic":
## Given ## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init() let cache = TestMessageCache.init()
## When ## When
@ -103,7 +99,7 @@ suite "MessageCache":
test "add messages to subscribed topic": test "add messages to subscribed topic":
## Given ## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage() let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init() let cache = TestMessageCache.init()
@ -120,7 +116,7 @@ suite "MessageCache":
test "add messages to non-subscribed topic": test "add messages to non-subscribed topic":
## Given ## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage() let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init() let cache = TestMessageCache.init()
@ -136,7 +132,7 @@ suite "MessageCache":
test "add messages beyond the capacity": test "add messages beyond the capacity":
## Given ## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessages = @[ let testMessages = @[
fakeWakuMessage(toBytes("MSG-1")), fakeWakuMessage(toBytes("MSG-1")),
fakeWakuMessage(toBytes("MSG-2")), fakeWakuMessage(toBytes("MSG-2")),

View File

@ -45,9 +45,9 @@ suite "REST API - Relay":
restServer.start() restServer.start()
let pubSubTopics = @[ let pubSubTopics = @[
PubSubTopicString("pubsub-topic-1"), PubSubTopic("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"), PubSubTopic("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3") PubSubTopic("pubsub-topic-3")
] ]
# When # When
@ -94,10 +94,10 @@ suite "REST API - Relay":
restServer.start() restServer.start()
let pubSubTopics = @[ let pubSubTopics = @[
PubSubTopicString("pubsub-topic-1"), PubSubTopic("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"), PubSubTopic("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3"), PubSubTopic("pubsub-topic-3"),
PubSubTopicString("pubsub-topic-y") PubSubTopic("pubsub-topic-y")
] ]
# When # When
@ -190,7 +190,6 @@ suite "REST API - Relay":
restServer.start() restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort)) let client = newRestHttpClient(initTAddress(restAddress, restPort))
const defaultContentTopic = ContentTopic("/waku/2/default-content/proto")
# At this stage the node is only subscribed to the default topic # At this stage the node is only subscribed to the default topic
require(PubSub(node.wakuRelay).topics.len == 1) require(PubSub(node.wakuRelay).topics.len == 1)
@ -198,15 +197,15 @@ suite "REST API - Relay":
# When # When
let newTopics = @[ let newTopics = @[
PubSubTopicString("pubsub-topic-1"), PubSubTopic("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"), PubSubTopic("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3") PubSubTopic("pubsub-topic-3")
] ]
discard await client.relayPostSubscriptionsV1(newTopics) discard await client.relayPostSubscriptionsV1(newTopics)
let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage( let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
payload: Base64String.encode("TEST-PAYLOAD"), payload: Base64String.encode("TEST-PAYLOAD"),
contentTopic: some(ContentTopicString(defaultContentTopic)), contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022)) timestamp: some(int64(2022))
)) ))

View File

@ -8,7 +8,8 @@ import
import import
../../waku/v2/node/rest/serdes, ../../waku/v2/node/rest/serdes,
../../waku/v2/node/rest/base64, ../../waku/v2/node/rest/base64,
../../waku/v2/node/rest/relay/api_types ../../waku/v2/node/rest/relay/api_types,
../../waku/v2/protocol/waku_message
suite "Relay API - serialization": suite "Relay API - serialization":
@ -36,8 +37,8 @@ suite "Relay API - serialization":
# Given # Given
let payload = Base64String.encode("MESSAGE") let payload = Base64String.encode("MESSAGE")
let data = RelayWakuMessage( let data = RelayWakuMessage(
payload: payload, payload: payload,
contentTopic: none(ContentTopicString), contentTopic: none(ContentTopic),
version: none(Natural), version: none(Natural),
timestamp: none(int64) timestamp: none(int64)
) )

View File

@ -44,7 +44,6 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions)) pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
const defaultCT = ContentTopic("/waku/2/default-content/proto")
var t: Timestamp var t: Timestamp
if relayMessage.timestamp.isSome: if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get t = relayMessage.timestamp.get
@ -52,14 +51,11 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessag
# incoming WakuRelayMessages with no timestamp will get 0 timestamp # incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = Timestamp(0) t = Timestamp(0)
WakuMessage(payload: relayMessage.payload, WakuMessage(payload: relayMessage.payload,
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
version: version, version: version,
timestamp: t) timestamp: t)
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage = proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
# @TODO global definition for default content topic
const defaultCT = ContentTopic("/waku/2/default-content/proto")
let payload = Payload(payload: relayMessage.payload, let payload = Payload(payload: relayMessage.payload,
dst: pubKey, dst: pubKey,
symkey: symkey) symkey: symkey)
@ -72,13 +68,11 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Hm
t = Timestamp(0) t = Timestamp(0)
WakuMessage(payload: payload.encode(version, rng[]).get(), WakuMessage(payload: payload.encode(version, rng[]).get(),
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
version: version, version: version,
timestamp: t) timestamp: t)
proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage = proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage =
# @TODO global definition for default content topic
let let
keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get()) keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get())
elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get()) elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get())

View File

@ -17,13 +17,9 @@ import
#### Types #### Types
type
PubSubTopicString* = distinct string
ContentTopicString* = distinct string
type RelayWakuMessage* = object type RelayWakuMessage* = object
payload*: Base64String payload*: Base64String
contentTopic*: Option[ContentTopicString] contentTopic*: Option[ContentTopic]
version*: Option[Natural] version*: Option[Natural]
timestamp*: Option[int64] timestamp*: Option[int64]
@ -33,8 +29,8 @@ type
RelayPostMessagesRequest* = RelayWakuMessage RelayPostMessagesRequest* = RelayWakuMessage
type type
RelayPostSubscriptionsRequest* = seq[PubSubTopicString] RelayPostSubscriptionsRequest* = seq[PubSubTopic]
RelayDeleteSubscriptionsRequest* = seq[PubSubTopicString] RelayDeleteSubscriptionsRequest* = seq[PubSubTopic]
#### Type conversion #### Type conversion
@ -42,16 +38,15 @@ type
proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage = proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
RelayWakuMessage( RelayWakuMessage(
payload: base64.encode(Base64String, msg.payload), payload: base64.encode(Base64String, msg.payload),
contentTopic: some(ContentTopicString(msg.contentTopic)), contentTopic: some(msg.contentTopic),
version: some(Natural(msg.version)), version: some(Natural(msg.version)),
timestamp: some(msg.timestamp) timestamp: some(msg.timestamp)
) )
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] = proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] =
const defaultContentTopic = ContentTopicString("/waku/2/default-content/proto")
let let
payload = ?msg.payload.decode() payload = ?msg.payload.decode()
contentTopic = ContentTopic(msg.contentTopic.get(defaultContentTopic)) contentTopic = msg.contentTopic.get(DefaultContentTopic)
version = uint32(msg.version.get(version)) version = uint32(msg.version.get(version))
timestamp = msg.timestamp.get(0) timestamp = msg.timestamp.get(0)
@ -64,13 +59,9 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String)
{.raises: [IOError, Defect].} = {.raises: [IOError, Defect].} =
writer.writeValue(string(value)) writer.writeValue(string(value))
proc writeValue*(writer: var JsonWriter[RestJson], value: PubSubTopicString) proc writeValue*(writer: var JsonWriter[RestJson], topic: PubSubTopic|ContentTopic)
{.raises: [IOError, Defect].} = {.raises: [IOError, Defect].} =
writer.writeValue(string(value)) writer.writeValue(string(topic))
proc writeValue*(writer: var JsonWriter[RestJson], value: ContentTopicString)
{.raises: [IOError, Defect].} =
writer.writeValue(string(value))
proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage) proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
{.raises: [IOError, Defect].} = {.raises: [IOError, Defect].} =
@ -88,19 +79,19 @@ proc readValue*(reader: var JsonReader[RestJson], value: var Base64String)
{.raises: [SerializationError, IOError, Defect].} = {.raises: [SerializationError, IOError, Defect].} =
value = Base64String(reader.readValue(string)) value = Base64String(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var PubSubTopicString) proc readValue*(reader: var JsonReader[RestJson], pubsubTopic: var PubSubTopic)
{.raises: [SerializationError, IOError, Defect].} = {.raises: [SerializationError, IOError, Defect].} =
value = PubSubTopicString(reader.readValue(string)) pubsubTopic = PubSubTopic(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var ContentTopicString) proc readValue*(reader: var JsonReader[RestJson], contentTopic: var ContentTopic)
{.raises: [SerializationError, IOError, Defect].} = {.raises: [SerializationError, IOError, Defect].} =
value = ContentTopicString(reader.readValue(string)) contentTopic = ContentTopic(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
{.raises: [SerializationError, IOError, Defect].} = {.raises: [SerializationError, IOError, Defect].} =
var var
payload = none(Base64String) payload = none(Base64String)
contentTopic = none(ContentTopicString) contentTopic = none(ContentTopic)
version = none(Natural) version = none(Natural)
timestamp = none(int64) timestamp = none(int64)
@ -116,7 +107,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
of "payload": of "payload":
payload = some(reader.readValue(Base64String)) payload = some(reader.readValue(Base64String))
of "contentTopic": of "contentTopic":
contentTopic = some(reader.readValue(ContentTopicString)) contentTopic = some(reader.readValue(ContentTopic))
of "version": of "version":
version = some(reader.readValue(Natural)) version = some(reader.readValue(Natural))
of "timestamp": of "timestamp":

View File

@ -12,6 +12,7 @@ import
presto/[route, client, common] presto/[route, client, common]
import import
../../waku_node, ../../waku_node,
../../../protocol/waku_message,
../serdes, ../serdes,
../utils, ../utils,
./api_types, ./api_types,
@ -157,7 +158,7 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, topicCache
#### Client #### Client
proc encodeBytes*(value: seq[PubSubTopicString], proc encodeBytes*(value: seq[PubSubTopic],
contentType: string): RestResult[seq[byte]] = contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON: if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType error "Unsupported contentType value", contentType = contentType

View File

@ -20,11 +20,9 @@ export message_cache
##### TopicCache ##### TopicCache
type PubSubTopicString = string
type TopicCacheResult*[T] = MessageCacheResult[T] type TopicCacheResult*[T] = MessageCacheResult[T]
type TopicCache* = MessageCache[PubSubTopicString] type TopicCache* = MessageCache[PubSubTopic]
##### Message handler ##### Message handler
@ -33,17 +31,17 @@ type TopicCacheMessageHandler* = Topichandler
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler = proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} = let handler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.async, closure.} =
trace "Topic handler triggered", topic=topic trace "PubsubTopic handler triggered", pubsubTopic=pubsubTopic
# Add message to current cache # Add message to current cache
let msg = WakuMessage.decode(data) let msg = WakuMessage.decode(data)
if msg.isErr(): if msg.isErr():
debug "WakuMessage received but failed to decode", msg=msg, topic=topic debug "WakuMessage received but failed to decode", msg=msg, pubsubTopic=pubsubTopic
# TODO: handle message decode failure # TODO: handle message decode failure
return return
trace "WakuMessage received", msg=msg, topic=topic trace "WakuMessage received", msg=msg, pubsubTopic=pubsubTopic
cache.addMessage(PubSubTopicString(topic), msg.get()) cache.addMessage(PubSubTopic(pubsubTopic), msg.get())
handler handler

View File

@ -347,7 +347,7 @@ proc startRelay*(node: WakuNode) {.async.} =
info "starting relay" info "starting relay"
# PubsubTopic subscriptions # PubsubTopic subscriptions
for topic in node.wakuRelay.defaultTopics: for topic in node.wakuRelay.defaultPubsubTopics:
node.subscribe(topic, none(TopicHandler)) node.subscribe(topic, none(TopicHandler))
# Resume previous relay connections # Resume previous relay connections
@ -393,7 +393,7 @@ proc mountRelay*(node: WakuNode,
## The default relay topics is the union of ## The default relay topics is the union of
## all configured topics plus the hard-coded defaultTopic(s) ## all configured topics plus the hard-coded defaultTopic(s)
wakuRelay.defaultTopics = concat(@[DefaultPubsubTopic], topics) wakuRelay.defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics)
## Add peer exchange handler ## Add peer exchange handler
if peerExchangeHandler.isSome(): if peerExchangeHandler.isSome():

View File

@ -27,8 +27,8 @@ type
ContentTopic* = string ContentTopic* = string
const const
DefaultPubsubTopic*: PubsubTopic = "/waku/2/default-waku/proto" DefaultPubsubTopic*: PubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
DefaultContentTopic*: ContentTopic = "/waku/2/default-content/proto" DefaultContentTopic*: ContentTopic = ContentTopic("/waku/2/default-content/proto")
type WakuMessage* = object type WakuMessage* = object

View File

@ -25,7 +25,7 @@ const
type type
WakuRelay* = ref object of GossipSub WakuRelay* = ref object of GossipSub
defaultTopics*: seq[PubsubTopic] # Default configured PubSub topics defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics
method init*(w: WakuRelay) = method init*(w: WakuRelay) =
debug "init WakuRelay" debug "init WakuRelay"

View File

@ -1115,7 +1115,7 @@ proc mountRlnRelayStatic*(node: WakuNode,
if node.wakuRelay.isNil(): if node.wakuRelay.isNil():
return err("WakuRelay protocol is not mounted") return err("WakuRelay protocol is not mounted")
# check whether the pubsub topic is supported at the relay level # check whether the pubsub topic is supported at the relay level
if pubsubTopic notin node.wakuRelay.defaultTopics: if pubsubTopic notin node.wakuRelay.defaultPubsubTopics:
return err("The relay protocol does not support the configured pubsub topic") return err("The relay protocol does not support the configured pubsub topic")
debug "rln-relay input validation passed" debug "rln-relay input validation passed"
@ -1170,7 +1170,7 @@ proc mountRlnRelayDynamic*(node: WakuNode,
if node.wakuRelay.isNil: if node.wakuRelay.isNil:
return err("WakuRelay protocol is not mounted.") return err("WakuRelay protocol is not mounted.")
# check whether the pubsub topic is supported at the relay level # check whether the pubsub topic is supported at the relay level
if pubsubTopic notin node.wakuRelay.defaultTopics: if pubsubTopic notin node.wakuRelay.defaultPubsubTopics:
return err("WakuRelay protocol does not support the configured pubsub topic.") return err("WakuRelay protocol does not support the configured pubsub topic.")
debug "rln-relay input validation passed" debug "rln-relay input validation passed"