nwaku/waku/v2/node/jsonrpc/relay_api.nim

112 lines
3.9 KiB
Nim

{.push raises: [Defect].}
import
std/[tables,sequtils],
chronicles,
json_rpc/rpcserver,
libp2p/protocols/pubsub/pubsub,
../../protocol/waku_message,
../wakunode2,
./jsonrpc_types,
./jsonrpc_utils
export jsonrpc_types
logScope:
topics = "relay api"
const futTimeout* = 5.seconds # Max time to wait for futures
const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) =
proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} =
trace "Topic handler triggered", topic=topic
let msg = WakuMessage.init(data)
if msg.isOk():
# Add message to current cache
trace "WakuMessage received", msg=msg, topic=topic
# Make a copy of msgs for this topic to modify
var msgs = topicCache.getOrDefault(topic, @[])
if msgs.len >= maxCache:
# Message cache on this topic exceeds maximum. Delete oldest.
# @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed.
msgs.delete(0,0)
msgs.add(msg[])
# Replace indexed entry with copy
# @TODO max number of topics could be limited in node
topicCache[topic] = msgs
else:
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
# @TODO handle message decode failure
## Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for topic in PubSub(node.wakuRelay).topics.keys:
debug "Adding API topic handler for existing subscription", topic=topic
node.subscribe(topic, topicHandler)
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
## Relay API version 1 definitions
rpcsrv.rpc("post_waku_v2_relay_v1_message") do(topic: string, message: WakuRelayMessage) -> bool:
## Publishes a WakuMessage to a PubSub topic
debug "post_waku_v2_relay_v1_message"
if (await node.publish(topic, message.toWakuMessage(version = 0)).withTimeout(futTimeout)):
# Successfully published message
return true
else:
# Failed to publish message to topic
raise newException(ValueError, "Failed to publish to topic " & topic)
rpcsrv.rpc("get_waku_v2_relay_v1_messages") do(topic: string) -> seq[WakuMessage]:
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called
## @TODO ability to specify a return message limit
debug "get_waku_v2_relay_v1_messages", topic=topic
if topicCache.hasKey(topic):
let msgs = topicCache[topic]
# Clear cache before next call
topicCache[topic] = @[]
return msgs
else:
# Not subscribed to this topic
raise newException(ValueError, "Not subscribed to topic: " & topic)
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions"
# Subscribe to all requested topics
for topic in topics:
# Only subscribe to topics for which we have no subscribed topic handlers yet
if not topicCache.hasKey(topic):
node.subscribe(topic, topicHandler)
# Create message cache for this topic
trace "MessageCache for topic", topic=topic
topicCache[topic] = @[]
# Successfully subscribed to all requested topics
return true
rpcsrv.rpc("delete_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Unsubscribes a node from a list of PubSub topics
debug "delete_waku_v2_relay_v1_subscriptions"
# Unsubscribe all handlers from requested topics
for topic in topics:
node.unsubscribeAll(topic)
# Remove message cache for topic
topicCache.del(topic)
# Successfully unsubscribed from all requested topics
return true