Fix Relay API: handle messages on previously subscribed topics (#413)

This commit is contained in:
Hanno Cornelius 2021-03-11 09:48:59 +02:00 committed by GitHub
parent e6b26cc059
commit a575c44934
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 7 deletions

View File

@ -153,6 +153,23 @@ procSuite "Waku v2 JSON-RPC API":
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort)
# First see if we can retrieve messages published on the default topic (node is already subscribed)
await node2.publish(defaultTopic, message)
await sleepAsync(2000.millis)
var messages = await client.get_waku_v2_relay_v1_messages(defaultTopic)
check:
messages.len == 1
messages[0].contentTopic == contentTopic
messages[0].payload == payload
# Ensure that read messages are cleared from cache
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 0
# Now try to subscribe using API
var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
@ -168,7 +185,7 @@ procSuite "Waku v2 JSON-RPC API":
await sleepAsync(2000.millis)
var messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 1

View File

@ -17,7 +17,7 @@ const maxCache* = 100 # Max number of messages cached per topic @TODO make this
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) =
proc topicHandler(topic: string, data: seq[byte]) {.async.} =
trace "Topic handler triggered"
trace "Topic handler triggered", topic=topic
let msg = WakuMessage.init(data)
if msg.isOk():
# Add message to current cache
@ -38,6 +38,16 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top
else:
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
# @TODO handle message decode failure
## Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for topic in PubSub(node.wakuRelay).topics.keys:
debug "Adding API topic handler for existing subscription", topic=topic
node.subscribe(topic, topicHandler)
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
## Relay API version 1 definitions
@ -73,10 +83,12 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top
# Subscribe to all requested topics
for topic in topics:
node.subscribe(topic, topicHandler)
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
# Only subscribe to topics for which we have no subscribed topic handlers yet
if not topicCache.hasKey(topic):
node.subscribe(topic, topicHandler)
# Create message cache for this topic
trace "MessageCache for topic", topic=topic
topicCache[topic] = @[]
# Successfully subscribed to all requested topics
return true

View File

@ -30,6 +30,9 @@ logScope:
# Default clientId
const clientId* = "Nimbus Waku v2 node"
# Default topic
const defaultTopic = "/waku/2/default-waku/proto"
# key and crypto modules different
type
KeyPair* = crypto.KeyPair
@ -361,7 +364,7 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
await node.subscriptions.notify(topic, msg.value())
waku_node_messages.inc(labelValues = ["relay"])
node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
node.wakuRelay.subscribe(defaultTopic, relayHandler)
for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =