From a575c44934b9588e2e9d30f9488446e25c95163c Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Thu, 11 Mar 2021 09:48:59 +0200 Subject: [PATCH] Fix Relay API: handle messages on previously subscribed topics (#413) --- tests/v2/test_jsonrpc_waku.nim | 19 ++++++++++++++++++- waku/v2/node/jsonrpc/relay_api.nim | 22 +++++++++++++++++----- waku/v2/node/wakunode2.nim | 5 ++++- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 15bd9a4e8..ead70da12 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -153,6 +153,23 @@ procSuite "Waku v2 JSON-RPC API": let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort) + # First see if we can retrieve messages published on the default topic (node is already subscribed) + await node2.publish(defaultTopic, message) + + await sleepAsync(2000.millis) + + var messages = await client.get_waku_v2_relay_v1_messages(defaultTopic) + + check: + messages.len == 1 + messages[0].contentTopic == contentTopic + messages[0].payload == payload + + # Ensure that read messages are cleared from cache + messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) + check: + messages.len == 0 + # Now try to subscribe using API var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) @@ -168,7 +185,7 @@ procSuite "Waku v2 JSON-RPC API": await sleepAsync(2000.millis) - var messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) + messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) check: messages.len == 1 diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index a1a6cfec4..aaa0cf80b 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -17,7 +17,7 @@ const maxCache* = 100 # Max number of messages cached per topic @TODO make this proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) = proc topicHandler(topic: string, data: seq[byte]) {.async.} = - trace "Topic handler triggered" + trace "Topic handler triggered", topic=topic let msg = WakuMessage.init(data) if msg.isOk(): # Add message to current cache @@ -38,6 +38,16 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top else: debug "WakuMessage received but failed to decode", msg=msg, topic=topic # @TODO handle message decode failure + + ## Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these + for topic in PubSub(node.wakuRelay).topics.keys: + debug "Adding API topic handler for existing subscription", topic=topic + + node.subscribe(topic, topicHandler) + + # Create message cache for this topic + debug "MessageCache for topic", topic=topic + topicCache[topic] = @[] ## Relay API version 1 definitions @@ -73,10 +83,12 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top # Subscribe to all requested topics for topic in topics: - node.subscribe(topic, topicHandler) - # Create message cache for this topic - debug "MessageCache for topic", topic=topic - topicCache[topic] = @[] + # Only subscribe to topics for which we have no subscribed topic handlers yet + if not topicCache.hasKey(topic): + node.subscribe(topic, topicHandler) + # Create message cache for this topic + trace "MessageCache for topic", topic=topic + topicCache[topic] = @[] # Successfully subscribed to all requested topics return true diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index e045b8b10..e29568e78 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -30,6 +30,9 @@ logScope: # Default clientId const clientId* = "Nimbus Waku v2 node" +# Default topic +const defaultTopic = "/waku/2/default-waku/proto" + # key and crypto modules different type KeyPair* = crypto.KeyPair @@ -361,7 +364,7 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela await node.subscriptions.notify(topic, msg.value()) waku_node_messages.inc(labelValues = ["relay"]) - node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler) + node.wakuRelay.subscribe(defaultTopic, relayHandler) for topic in topics: proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =