mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-12 15:06:38 +00:00
deploy: a575c44934b9588e2e9d30f9488446e25c95163c
This commit is contained in:
parent
cb49212a1d
commit
81c630d397
@ -1 +1 @@
|
||||
1615446339
|
||||
1615448939
|
@ -153,6 +153,23 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
let client = newRpcHttpClient()
|
||||
await client.connect("127.0.0.1", rpcPort)
|
||||
|
||||
# First see if we can retrieve messages published on the default topic (node is already subscribed)
|
||||
await node2.publish(defaultTopic, message)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
var messages = await client.get_waku_v2_relay_v1_messages(defaultTopic)
|
||||
|
||||
check:
|
||||
messages.len == 1
|
||||
messages[0].contentTopic == contentTopic
|
||||
messages[0].payload == payload
|
||||
|
||||
# Ensure that read messages are cleared from cache
|
||||
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
|
||||
check:
|
||||
messages.len == 0
|
||||
|
||||
# Now try to subscribe using API
|
||||
|
||||
var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
|
||||
@ -168,7 +185,7 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
var messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
|
||||
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
|
||||
|
||||
check:
|
||||
messages.len == 1
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az187-337:
|
||||
# Libtool was configured on host fv-az132-894:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -17,7 +17,7 @@ const maxCache* = 100 # Max number of messages cached per topic @TODO make this
|
||||
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) =
|
||||
|
||||
proc topicHandler(topic: string, data: seq[byte]) {.async.} =
|
||||
trace "Topic handler triggered"
|
||||
trace "Topic handler triggered", topic=topic
|
||||
let msg = WakuMessage.init(data)
|
||||
if msg.isOk():
|
||||
# Add message to current cache
|
||||
@ -38,6 +38,16 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top
|
||||
else:
|
||||
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
|
||||
# @TODO handle message decode failure
|
||||
|
||||
## Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
|
||||
for topic in PubSub(node.wakuRelay).topics.keys:
|
||||
debug "Adding API topic handler for existing subscription", topic=topic
|
||||
|
||||
node.subscribe(topic, topicHandler)
|
||||
|
||||
# Create message cache for this topic
|
||||
debug "MessageCache for topic", topic=topic
|
||||
topicCache[topic] = @[]
|
||||
|
||||
## Relay API version 1 definitions
|
||||
|
||||
@ -73,10 +83,12 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top
|
||||
|
||||
# Subscribe to all requested topics
|
||||
for topic in topics:
|
||||
node.subscribe(topic, topicHandler)
|
||||
# Create message cache for this topic
|
||||
debug "MessageCache for topic", topic=topic
|
||||
topicCache[topic] = @[]
|
||||
# Only subscribe to topics for which we have no subscribed topic handlers yet
|
||||
if not topicCache.hasKey(topic):
|
||||
node.subscribe(topic, topicHandler)
|
||||
# Create message cache for this topic
|
||||
trace "MessageCache for topic", topic=topic
|
||||
topicCache[topic] = @[]
|
||||
|
||||
# Successfully subscribed to all requested topics
|
||||
return true
|
||||
|
@ -30,6 +30,9 @@ logScope:
|
||||
# Default clientId
|
||||
const clientId* = "Nimbus Waku v2 node"
|
||||
|
||||
# Default topic
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
# key and crypto modules different
|
||||
type
|
||||
KeyPair* = crypto.KeyPair
|
||||
@ -361,7 +364,7 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
|
||||
await node.subscriptions.notify(topic, msg.value())
|
||||
waku_node_messages.inc(labelValues = ["relay"])
|
||||
|
||||
node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
|
||||
node.wakuRelay.subscribe(defaultTopic, relayHandler)
|
||||
|
||||
for topic in topics:
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
|
Loading…
x
Reference in New Issue
Block a user