From 31464559a1f8f552aee3265eda6a1cc756ff4998 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Tue, 1 Dec 2020 11:57:54 +0200 Subject: [PATCH] Feature/jsonrpc basic impl (#311) * Added some basic debug and relay json-rpc calls * Basic relay polling --- tests/v2/test_jsonrpc_waku.nim | 79 +++++++++++++++++++++-- waku/v2/node/jsonrpc/filter_api.nim | 2 + waku/v2/node/jsonrpc/jsonrpc_callsigs.nim | 1 + waku/v2/node/jsonrpc/relay_api.nim | 59 +++++++++++++---- waku/v2/node/jsonrpc/store_api.nim | 2 + 5 files changed, 126 insertions(+), 17 deletions(-) diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 380b82860..001aab3ed 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -32,7 +32,7 @@ procSuite "Waku v2 JSON-RPC API": port = Port(9000) node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port)) - asyncTest "debug_api": + asyncTest "Debug API: get node info": waitFor node.start() waitFor node.mountRelay() @@ -58,7 +58,7 @@ procSuite "Waku v2 JSON-RPC API": server.close() waitfor node.stop() - asyncTest "relay_api": + asyncTest "Relay API: publish and subscribe/unsubscribe": waitFor node.start() waitFor node.mountRelay() @@ -106,8 +106,79 @@ procSuite "Waku v2 JSON-RPC API": server.stop() server.close() waitfor node.stop() + + asyncTest "Relay API: get latest messages": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, bindIp, Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, bindIp, Port(60002)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) + pubSubTopic = "polling" + contentTopic = ContentTopic(1) + payload = @[byte 9] + message = WakuMessage(payload: payload, contentTopic: contentTopic) - asyncTest "store_api": + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.peerInfo]) + await node3.connectToNodes(@[node2.peerInfo]) + + # RPC server setup + let + rpcPort = Port(8545) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + # Let's connect to node 3 via the API + installRelayApiHandlers(node3, server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort) + + # Now try to subscribe using API + + var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) + + await sleepAsync(2000.millis) + + check: + # Node is now subscribed to pubSubTopic + response == true + + # Now publish a message on node1 and see if we receive it on node3 + node1.publish(pubSubTopic, message) + + await sleepAsync(2000.millis) + + var messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) + + check: + messages.len == 1 + messages[0].contentTopic == contentTopic + messages[0].payload == payload + + # Ensure that read messages are cleared from cache + messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) + check: + messages.len == 0 + + server.stop() + server.close() + await node1.stop() + await node2.stop() + await node3.stop() + + asyncTest "Store API: retrieve historical messages": waitFor node.start() waitFor node.mountRelay(@[defaultTopic]) @@ -168,7 +239,7 @@ procSuite "Waku v2 JSON-RPC API": server.close() waitfor node.stop() - asyncTest "filter_api": + asyncTest "Filter API: subscribe/unsubscribe": waitFor node.start() waitFor node.mountRelay() diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 196aae3a6..6cb6bba05 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -1,3 +1,5 @@ +{.push raises: [Exception, Defect].} + import json_rpc/rpcserver, eth/[common, rlp, keys, p2p], diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index 2863cdfad..882745efd 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -5,6 +5,7 @@ proc get_waku_v2_debug_v1_info(): WakuInfo # Relay API proc post_waku_v2_relay_v1_message(topic: string, message: WakuRelayMessage): bool +proc get_waku_v2_relay_v1_messages(topic: string): seq[WakuMessage] proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index f335ae066..d0b75c1f1 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -1,12 +1,32 @@ +{.push raises: [Exception, Defect].} + import + std/[tables,sequtils], json_rpc/rpcserver, + libp2p/protocols/pubsub/pubsub, eth/[common, rlp, keys, p2p], ../../waku_types, ../wakunode2, ./jsonrpc_types, ./jsonrpc_utils +const futTimeout = 5.seconds + proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = - const futTimeout = 5.seconds + ## Create a per-topic message cache + var + topicCache = initTable[string, seq[WakuMessage]]() + + proc topicHandler(topic: string, data: seq[byte]) {.async.} = + debug "Topic handler triggered" + let msg = WakuMessage.init(data) + if msg.isOk(): + debug "WakuMessage received", msg=msg, topic=topic + # Add message to current cache + # @TODO limit max topics and messages + topicCache.mgetOrPut(topic, @[]).add(msg[]) + else: + debug "WakuMessage received but failed to decode", msg=msg, topic=topic + # @TODO handle message decode failure ## Relay API version 1 definitions @@ -18,26 +38,36 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = return true + rpcsrv.rpc("get_waku_v2_relay_v1_messages") do(topic: string) -> seq[WakuMessage]: + ## Returns all WakuMessages received on a PubSub topic since the + ## last time this method was called + ## @TODO ability to specify a return message limit + debug "get_waku_v2_relay_v1_messages", topic=topic + + if topicCache.hasKey(topic): + let msgs = topicCache[topic] + # Clear cache before next call + topicCache[topic] = @[] + return msgs + else: + # Not subscribed to this topic + raise newException(ValueError, "Not subscribed to topic: " & topic) + rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool: ## Subscribes a node to a list of PubSub topics debug "post_waku_v2_relay_v1_subscriptions" - - proc topicHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - debug "WakuMessage received", msg=msg, topic=topic - # @TODO handle message - else: - debug "WakuMessage received but failed to decode", msg=msg, topic=topic - # @TODO handle message decode failure - + var failedTopics: seq[string] # Subscribe to all requested topics for topic in topics: - # If any topic fails to subscribe, add to list of failedTopics if not(await node.subscribe(topic, topicHandler).withTimeout(futTimeout)): + # If any topic fails to subscribe, add to list of failedTopics failedTopics.add(topic) + else: + # Create message cache for this topic + debug "MessageCache for topic", topic=topic + topicCache[topic] = @[] if (failedTopics.len() == 0): # Successfully subscribed to all requested topics @@ -54,9 +84,12 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = # Unsubscribe all handlers from requested topics for topic in topics: - # If any topic fails to unsubscribe, add to list of failedTopics if not(await node.unsubscribeAll(topic).withTimeout(futTimeout)): + # If any topic fails to unsubscribe, add to list of failedTopics failedTopics.add(topic) + else: + # Remove message cache for topic + topicCache.del(topic) if (failedTopics.len() == 0): # Successfully unsubscribed from all requested topics diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index f37bc4ecb..15b76df4d 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -1,3 +1,5 @@ +{.push raises: [Exception, Defect].} + import std/options, json_rpc/rpcserver,