mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 08:23:08 +00:00
deploy: 353fb342b84646e080595876475435a05aa0a6b8
This commit is contained in:
parent
0873fbd6b4
commit
7a9eaab47d
@ -10,16 +10,12 @@ import
|
|||||||
./testlib/common
|
./testlib/common
|
||||||
|
|
||||||
|
|
||||||
|
type TestMessageCache = MessageCache[(PubsubTopic, ContentTopic)]
|
||||||
type PubsubTopicString = string
|
|
||||||
|
|
||||||
type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)]
|
|
||||||
|
|
||||||
|
|
||||||
suite "MessageCache":
|
suite "MessageCache":
|
||||||
test "subscribe to topic":
|
test "subscribe to topic":
|
||||||
## Given
|
## Given
|
||||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
|
||||||
let cache = TestMessageCache.init()
|
let cache = TestMessageCache.init()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
@ -32,7 +28,7 @@ suite "MessageCache":
|
|||||||
|
|
||||||
test "unsubscribe from topic":
|
test "unsubscribe from topic":
|
||||||
## Given
|
## Given
|
||||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
|
||||||
let cache = TestMessageCache.init()
|
let cache = TestMessageCache.init()
|
||||||
|
|
||||||
# Init cache content
|
# Init cache content
|
||||||
@ -48,7 +44,7 @@ suite "MessageCache":
|
|||||||
|
|
||||||
test "get messages of a subscribed topic":
|
test "get messages of a subscribed topic":
|
||||||
## Given
|
## Given
|
||||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
|
||||||
let testMessage = fakeWakuMessage()
|
let testMessage = fakeWakuMessage()
|
||||||
let cache = TestMessageCache.init()
|
let cache = TestMessageCache.init()
|
||||||
|
|
||||||
@ -67,7 +63,7 @@ suite "MessageCache":
|
|||||||
|
|
||||||
test "get messages with clean flag shoud clear the messages cache":
|
test "get messages with clean flag shoud clear the messages cache":
|
||||||
## Given
|
## Given
|
||||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
|
||||||
let testMessage = fakeWakuMessage()
|
let testMessage = fakeWakuMessage()
|
||||||
let cache = TestMessageCache.init()
|
let cache = TestMessageCache.init()
|
||||||
|
|
||||||
@ -89,7 +85,7 @@ suite "MessageCache":
|
|||||||
|
|
||||||
test "get messages of a non-subscribed topic":
|
test "get messages of a non-subscribed topic":
|
||||||
## Given
|
## Given
|
||||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
|
||||||
let cache = TestMessageCache.init()
|
let cache = TestMessageCache.init()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
@ -103,7 +99,7 @@ suite "MessageCache":
|
|||||||
|
|
||||||
test "add messages to subscribed topic":
|
test "add messages to subscribed topic":
|
||||||
## Given
|
## Given
|
||||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
|
||||||
let testMessage = fakeWakuMessage()
|
let testMessage = fakeWakuMessage()
|
||||||
let cache = TestMessageCache.init()
|
let cache = TestMessageCache.init()
|
||||||
|
|
||||||
@ -120,7 +116,7 @@ suite "MessageCache":
|
|||||||
|
|
||||||
test "add messages to non-subscribed topic":
|
test "add messages to non-subscribed topic":
|
||||||
## Given
|
## Given
|
||||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
|
||||||
let testMessage = fakeWakuMessage()
|
let testMessage = fakeWakuMessage()
|
||||||
let cache = TestMessageCache.init()
|
let cache = TestMessageCache.init()
|
||||||
|
|
||||||
@ -136,7 +132,7 @@ suite "MessageCache":
|
|||||||
|
|
||||||
test "add messages beyond the capacity":
|
test "add messages beyond the capacity":
|
||||||
## Given
|
## Given
|
||||||
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
|
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
|
||||||
let testMessages = @[
|
let testMessages = @[
|
||||||
fakeWakuMessage(toBytes("MSG-1")),
|
fakeWakuMessage(toBytes("MSG-1")),
|
||||||
fakeWakuMessage(toBytes("MSG-2")),
|
fakeWakuMessage(toBytes("MSG-2")),
|
||||||
|
|||||||
@ -45,9 +45,9 @@ suite "REST API - Relay":
|
|||||||
restServer.start()
|
restServer.start()
|
||||||
|
|
||||||
let pubSubTopics = @[
|
let pubSubTopics = @[
|
||||||
PubSubTopicString("pubsub-topic-1"),
|
PubSubTopic("pubsub-topic-1"),
|
||||||
PubSubTopicString("pubsub-topic-2"),
|
PubSubTopic("pubsub-topic-2"),
|
||||||
PubSubTopicString("pubsub-topic-3")
|
PubSubTopic("pubsub-topic-3")
|
||||||
]
|
]
|
||||||
|
|
||||||
# When
|
# When
|
||||||
@ -94,10 +94,10 @@ suite "REST API - Relay":
|
|||||||
restServer.start()
|
restServer.start()
|
||||||
|
|
||||||
let pubSubTopics = @[
|
let pubSubTopics = @[
|
||||||
PubSubTopicString("pubsub-topic-1"),
|
PubSubTopic("pubsub-topic-1"),
|
||||||
PubSubTopicString("pubsub-topic-2"),
|
PubSubTopic("pubsub-topic-2"),
|
||||||
PubSubTopicString("pubsub-topic-3"),
|
PubSubTopic("pubsub-topic-3"),
|
||||||
PubSubTopicString("pubsub-topic-y")
|
PubSubTopic("pubsub-topic-y")
|
||||||
]
|
]
|
||||||
|
|
||||||
# When
|
# When
|
||||||
@ -190,7 +190,6 @@ suite "REST API - Relay":
|
|||||||
restServer.start()
|
restServer.start()
|
||||||
|
|
||||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||||
const defaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
||||||
|
|
||||||
# At this stage the node is only subscribed to the default topic
|
# At this stage the node is only subscribed to the default topic
|
||||||
require(PubSub(node.wakuRelay).topics.len == 1)
|
require(PubSub(node.wakuRelay).topics.len == 1)
|
||||||
@ -198,15 +197,15 @@ suite "REST API - Relay":
|
|||||||
|
|
||||||
# When
|
# When
|
||||||
let newTopics = @[
|
let newTopics = @[
|
||||||
PubSubTopicString("pubsub-topic-1"),
|
PubSubTopic("pubsub-topic-1"),
|
||||||
PubSubTopicString("pubsub-topic-2"),
|
PubSubTopic("pubsub-topic-2"),
|
||||||
PubSubTopicString("pubsub-topic-3")
|
PubSubTopic("pubsub-topic-3")
|
||||||
]
|
]
|
||||||
discard await client.relayPostSubscriptionsV1(newTopics)
|
discard await client.relayPostSubscriptionsV1(newTopics)
|
||||||
|
|
||||||
let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
|
let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
|
||||||
payload: Base64String.encode("TEST-PAYLOAD"),
|
payload: Base64String.encode("TEST-PAYLOAD"),
|
||||||
contentTopic: some(ContentTopicString(defaultContentTopic)),
|
contentTopic: some(DefaultContentTopic),
|
||||||
timestamp: some(int64(2022))
|
timestamp: some(int64(2022))
|
||||||
))
|
))
|
||||||
|
|
||||||
|
|||||||
@ -8,7 +8,8 @@ import
|
|||||||
import
|
import
|
||||||
../../waku/v2/node/rest/serdes,
|
../../waku/v2/node/rest/serdes,
|
||||||
../../waku/v2/node/rest/base64,
|
../../waku/v2/node/rest/base64,
|
||||||
../../waku/v2/node/rest/relay/api_types
|
../../waku/v2/node/rest/relay/api_types,
|
||||||
|
../../waku/v2/protocol/waku_message
|
||||||
|
|
||||||
|
|
||||||
suite "Relay API - serialization":
|
suite "Relay API - serialization":
|
||||||
@ -36,8 +37,8 @@ suite "Relay API - serialization":
|
|||||||
# Given
|
# Given
|
||||||
let payload = Base64String.encode("MESSAGE")
|
let payload = Base64String.encode("MESSAGE")
|
||||||
let data = RelayWakuMessage(
|
let data = RelayWakuMessage(
|
||||||
payload: payload,
|
payload: payload,
|
||||||
contentTopic: none(ContentTopicString),
|
contentTopic: none(ContentTopic),
|
||||||
version: none(Natural),
|
version: none(Natural),
|
||||||
timestamp: none(int64)
|
timestamp: none(int64)
|
||||||
)
|
)
|
||||||
|
|||||||
@ -44,7 +44,6 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
|
|||||||
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
|
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
|
||||||
|
|
||||||
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
|
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
|
||||||
const defaultCT = ContentTopic("/waku/2/default-content/proto")
|
|
||||||
var t: Timestamp
|
var t: Timestamp
|
||||||
if relayMessage.timestamp.isSome:
|
if relayMessage.timestamp.isSome:
|
||||||
t = relayMessage.timestamp.get
|
t = relayMessage.timestamp.get
|
||||||
@ -52,14 +51,11 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessag
|
|||||||
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
|
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
|
||||||
t = Timestamp(0)
|
t = Timestamp(0)
|
||||||
WakuMessage(payload: relayMessage.payload,
|
WakuMessage(payload: relayMessage.payload,
|
||||||
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
|
contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
|
||||||
version: version,
|
version: version,
|
||||||
timestamp: t)
|
timestamp: t)
|
||||||
|
|
||||||
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
|
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
|
||||||
# @TODO global definition for default content topic
|
|
||||||
const defaultCT = ContentTopic("/waku/2/default-content/proto")
|
|
||||||
|
|
||||||
let payload = Payload(payload: relayMessage.payload,
|
let payload = Payload(payload: relayMessage.payload,
|
||||||
dst: pubKey,
|
dst: pubKey,
|
||||||
symkey: symkey)
|
symkey: symkey)
|
||||||
@ -72,13 +68,11 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Hm
|
|||||||
t = Timestamp(0)
|
t = Timestamp(0)
|
||||||
|
|
||||||
WakuMessage(payload: payload.encode(version, rng[]).get(),
|
WakuMessage(payload: payload.encode(version, rng[]).get(),
|
||||||
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
|
contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
|
||||||
version: version,
|
version: version,
|
||||||
timestamp: t)
|
timestamp: t)
|
||||||
|
|
||||||
proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage =
|
proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage =
|
||||||
# @TODO global definition for default content topic
|
|
||||||
|
|
||||||
let
|
let
|
||||||
keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get())
|
keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get())
|
||||||
elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get())
|
elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get())
|
||||||
|
|||||||
@ -17,13 +17,9 @@ import
|
|||||||
|
|
||||||
#### Types
|
#### Types
|
||||||
|
|
||||||
type
|
|
||||||
PubSubTopicString* = distinct string
|
|
||||||
ContentTopicString* = distinct string
|
|
||||||
|
|
||||||
type RelayWakuMessage* = object
|
type RelayWakuMessage* = object
|
||||||
payload*: Base64String
|
payload*: Base64String
|
||||||
contentTopic*: Option[ContentTopicString]
|
contentTopic*: Option[ContentTopic]
|
||||||
version*: Option[Natural]
|
version*: Option[Natural]
|
||||||
timestamp*: Option[int64]
|
timestamp*: Option[int64]
|
||||||
|
|
||||||
@ -33,8 +29,8 @@ type
|
|||||||
RelayPostMessagesRequest* = RelayWakuMessage
|
RelayPostMessagesRequest* = RelayWakuMessage
|
||||||
|
|
||||||
type
|
type
|
||||||
RelayPostSubscriptionsRequest* = seq[PubSubTopicString]
|
RelayPostSubscriptionsRequest* = seq[PubSubTopic]
|
||||||
RelayDeleteSubscriptionsRequest* = seq[PubSubTopicString]
|
RelayDeleteSubscriptionsRequest* = seq[PubSubTopic]
|
||||||
|
|
||||||
|
|
||||||
#### Type conversion
|
#### Type conversion
|
||||||
@ -42,16 +38,15 @@ type
|
|||||||
proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
|
proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
|
||||||
RelayWakuMessage(
|
RelayWakuMessage(
|
||||||
payload: base64.encode(Base64String, msg.payload),
|
payload: base64.encode(Base64String, msg.payload),
|
||||||
contentTopic: some(ContentTopicString(msg.contentTopic)),
|
contentTopic: some(msg.contentTopic),
|
||||||
version: some(Natural(msg.version)),
|
version: some(Natural(msg.version)),
|
||||||
timestamp: some(msg.timestamp)
|
timestamp: some(msg.timestamp)
|
||||||
)
|
)
|
||||||
|
|
||||||
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] =
|
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] =
|
||||||
const defaultContentTopic = ContentTopicString("/waku/2/default-content/proto")
|
|
||||||
let
|
let
|
||||||
payload = ?msg.payload.decode()
|
payload = ?msg.payload.decode()
|
||||||
contentTopic = ContentTopic(msg.contentTopic.get(defaultContentTopic))
|
contentTopic = msg.contentTopic.get(DefaultContentTopic)
|
||||||
version = uint32(msg.version.get(version))
|
version = uint32(msg.version.get(version))
|
||||||
timestamp = msg.timestamp.get(0)
|
timestamp = msg.timestamp.get(0)
|
||||||
|
|
||||||
@ -64,13 +59,9 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String)
|
|||||||
{.raises: [IOError, Defect].} =
|
{.raises: [IOError, Defect].} =
|
||||||
writer.writeValue(string(value))
|
writer.writeValue(string(value))
|
||||||
|
|
||||||
proc writeValue*(writer: var JsonWriter[RestJson], value: PubSubTopicString)
|
proc writeValue*(writer: var JsonWriter[RestJson], topic: PubSubTopic|ContentTopic)
|
||||||
{.raises: [IOError, Defect].} =
|
{.raises: [IOError, Defect].} =
|
||||||
writer.writeValue(string(value))
|
writer.writeValue(string(topic))
|
||||||
|
|
||||||
proc writeValue*(writer: var JsonWriter[RestJson], value: ContentTopicString)
|
|
||||||
{.raises: [IOError, Defect].} =
|
|
||||||
writer.writeValue(string(value))
|
|
||||||
|
|
||||||
proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
|
proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
|
||||||
{.raises: [IOError, Defect].} =
|
{.raises: [IOError, Defect].} =
|
||||||
@ -88,19 +79,19 @@ proc readValue*(reader: var JsonReader[RestJson], value: var Base64String)
|
|||||||
{.raises: [SerializationError, IOError, Defect].} =
|
{.raises: [SerializationError, IOError, Defect].} =
|
||||||
value = Base64String(reader.readValue(string))
|
value = Base64String(reader.readValue(string))
|
||||||
|
|
||||||
proc readValue*(reader: var JsonReader[RestJson], value: var PubSubTopicString)
|
proc readValue*(reader: var JsonReader[RestJson], pubsubTopic: var PubSubTopic)
|
||||||
{.raises: [SerializationError, IOError, Defect].} =
|
{.raises: [SerializationError, IOError, Defect].} =
|
||||||
value = PubSubTopicString(reader.readValue(string))
|
pubsubTopic = PubSubTopic(reader.readValue(string))
|
||||||
|
|
||||||
proc readValue*(reader: var JsonReader[RestJson], value: var ContentTopicString)
|
proc readValue*(reader: var JsonReader[RestJson], contentTopic: var ContentTopic)
|
||||||
{.raises: [SerializationError, IOError, Defect].} =
|
{.raises: [SerializationError, IOError, Defect].} =
|
||||||
value = ContentTopicString(reader.readValue(string))
|
contentTopic = ContentTopic(reader.readValue(string))
|
||||||
|
|
||||||
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
|
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
|
||||||
{.raises: [SerializationError, IOError, Defect].} =
|
{.raises: [SerializationError, IOError, Defect].} =
|
||||||
var
|
var
|
||||||
payload = none(Base64String)
|
payload = none(Base64String)
|
||||||
contentTopic = none(ContentTopicString)
|
contentTopic = none(ContentTopic)
|
||||||
version = none(Natural)
|
version = none(Natural)
|
||||||
timestamp = none(int64)
|
timestamp = none(int64)
|
||||||
|
|
||||||
@ -116,7 +107,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
|
|||||||
of "payload":
|
of "payload":
|
||||||
payload = some(reader.readValue(Base64String))
|
payload = some(reader.readValue(Base64String))
|
||||||
of "contentTopic":
|
of "contentTopic":
|
||||||
contentTopic = some(reader.readValue(ContentTopicString))
|
contentTopic = some(reader.readValue(ContentTopic))
|
||||||
of "version":
|
of "version":
|
||||||
version = some(reader.readValue(Natural))
|
version = some(reader.readValue(Natural))
|
||||||
of "timestamp":
|
of "timestamp":
|
||||||
|
|||||||
@ -12,6 +12,7 @@ import
|
|||||||
presto/[route, client, common]
|
presto/[route, client, common]
|
||||||
import
|
import
|
||||||
../../waku_node,
|
../../waku_node,
|
||||||
|
../../../protocol/waku_message,
|
||||||
../serdes,
|
../serdes,
|
||||||
../utils,
|
../utils,
|
||||||
./api_types,
|
./api_types,
|
||||||
@ -157,7 +158,7 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, topicCache
|
|||||||
|
|
||||||
#### Client
|
#### Client
|
||||||
|
|
||||||
proc encodeBytes*(value: seq[PubSubTopicString],
|
proc encodeBytes*(value: seq[PubSubTopic],
|
||||||
contentType: string): RestResult[seq[byte]] =
|
contentType: string): RestResult[seq[byte]] =
|
||||||
if MediaType.init(contentType) != MIMETYPE_JSON:
|
if MediaType.init(contentType) != MIMETYPE_JSON:
|
||||||
error "Unsupported contentType value", contentType = contentType
|
error "Unsupported contentType value", contentType = contentType
|
||||||
|
|||||||
@ -20,11 +20,9 @@ export message_cache
|
|||||||
|
|
||||||
##### TopicCache
|
##### TopicCache
|
||||||
|
|
||||||
type PubSubTopicString = string
|
|
||||||
|
|
||||||
type TopicCacheResult*[T] = MessageCacheResult[T]
|
type TopicCacheResult*[T] = MessageCacheResult[T]
|
||||||
|
|
||||||
type TopicCache* = MessageCache[PubSubTopicString]
|
type TopicCache* = MessageCache[PubSubTopic]
|
||||||
|
|
||||||
|
|
||||||
##### Message handler
|
##### Message handler
|
||||||
@ -33,17 +31,17 @@ type TopicCacheMessageHandler* = Topichandler
|
|||||||
|
|
||||||
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
|
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
|
||||||
|
|
||||||
let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} =
|
let handler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.async, closure.} =
|
||||||
trace "Topic handler triggered", topic=topic
|
trace "PubsubTopic handler triggered", pubsubTopic=pubsubTopic
|
||||||
|
|
||||||
# Add message to current cache
|
# Add message to current cache
|
||||||
let msg = WakuMessage.decode(data)
|
let msg = WakuMessage.decode(data)
|
||||||
if msg.isErr():
|
if msg.isErr():
|
||||||
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
|
debug "WakuMessage received but failed to decode", msg=msg, pubsubTopic=pubsubTopic
|
||||||
# TODO: handle message decode failure
|
# TODO: handle message decode failure
|
||||||
return
|
return
|
||||||
|
|
||||||
trace "WakuMessage received", msg=msg, topic=topic
|
trace "WakuMessage received", msg=msg, pubsubTopic=pubsubTopic
|
||||||
cache.addMessage(PubSubTopicString(topic), msg.get())
|
cache.addMessage(PubSubTopic(pubsubTopic), msg.get())
|
||||||
|
|
||||||
handler
|
handler
|
||||||
@ -347,7 +347,7 @@ proc startRelay*(node: WakuNode) {.async.} =
|
|||||||
info "starting relay"
|
info "starting relay"
|
||||||
|
|
||||||
# PubsubTopic subscriptions
|
# PubsubTopic subscriptions
|
||||||
for topic in node.wakuRelay.defaultTopics:
|
for topic in node.wakuRelay.defaultPubsubTopics:
|
||||||
node.subscribe(topic, none(TopicHandler))
|
node.subscribe(topic, none(TopicHandler))
|
||||||
|
|
||||||
# Resume previous relay connections
|
# Resume previous relay connections
|
||||||
@ -393,7 +393,7 @@ proc mountRelay*(node: WakuNode,
|
|||||||
|
|
||||||
## The default relay topics is the union of
|
## The default relay topics is the union of
|
||||||
## all configured topics plus the hard-coded defaultTopic(s)
|
## all configured topics plus the hard-coded defaultTopic(s)
|
||||||
wakuRelay.defaultTopics = concat(@[DefaultPubsubTopic], topics)
|
wakuRelay.defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics)
|
||||||
|
|
||||||
## Add peer exchange handler
|
## Add peer exchange handler
|
||||||
if peerExchangeHandler.isSome():
|
if peerExchangeHandler.isSome():
|
||||||
|
|||||||
@ -27,8 +27,8 @@ type
|
|||||||
ContentTopic* = string
|
ContentTopic* = string
|
||||||
|
|
||||||
const
|
const
|
||||||
DefaultPubsubTopic*: PubsubTopic = "/waku/2/default-waku/proto"
|
DefaultPubsubTopic*: PubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
|
||||||
DefaultContentTopic*: ContentTopic = "/waku/2/default-content/proto"
|
DefaultContentTopic*: ContentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
|
|
||||||
|
|
||||||
type WakuMessage* = object
|
type WakuMessage* = object
|
||||||
|
|||||||
@ -25,7 +25,7 @@ const
|
|||||||
|
|
||||||
type
|
type
|
||||||
WakuRelay* = ref object of GossipSub
|
WakuRelay* = ref object of GossipSub
|
||||||
defaultTopics*: seq[PubsubTopic] # Default configured PubSub topics
|
defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics
|
||||||
|
|
||||||
method init*(w: WakuRelay) =
|
method init*(w: WakuRelay) =
|
||||||
debug "init WakuRelay"
|
debug "init WakuRelay"
|
||||||
|
|||||||
@ -1115,7 +1115,7 @@ proc mountRlnRelayStatic*(node: WakuNode,
|
|||||||
if node.wakuRelay.isNil():
|
if node.wakuRelay.isNil():
|
||||||
return err("WakuRelay protocol is not mounted")
|
return err("WakuRelay protocol is not mounted")
|
||||||
# check whether the pubsub topic is supported at the relay level
|
# check whether the pubsub topic is supported at the relay level
|
||||||
if pubsubTopic notin node.wakuRelay.defaultTopics:
|
if pubsubTopic notin node.wakuRelay.defaultPubsubTopics:
|
||||||
return err("The relay protocol does not support the configured pubsub topic")
|
return err("The relay protocol does not support the configured pubsub topic")
|
||||||
|
|
||||||
debug "rln-relay input validation passed"
|
debug "rln-relay input validation passed"
|
||||||
@ -1170,7 +1170,7 @@ proc mountRlnRelayDynamic*(node: WakuNode,
|
|||||||
if node.wakuRelay.isNil:
|
if node.wakuRelay.isNil:
|
||||||
return err("WakuRelay protocol is not mounted.")
|
return err("WakuRelay protocol is not mounted.")
|
||||||
# check whether the pubsub topic is supported at the relay level
|
# check whether the pubsub topic is supported at the relay level
|
||||||
if pubsubTopic notin node.wakuRelay.defaultTopics:
|
if pubsubTopic notin node.wakuRelay.defaultPubsubTopics:
|
||||||
return err("WakuRelay protocol does not support the configured pubsub topic.")
|
return err("WakuRelay protocol does not support the configured pubsub topic.")
|
||||||
debug "rln-relay input validation passed"
|
debug "rln-relay input validation passed"
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user