Feature/jsonrpc basic impl (#311)

* Added some basic debug and relay json-rpc calls

* Basic relay polling
This commit is contained in:
Hanno Cornelius 2020-12-01 11:57:54 +02:00 committed by GitHub
parent 51b35c59c6
commit d3c5840a79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 126 additions and 17 deletions

View File

@ -32,7 +32,7 @@ procSuite "Waku v2 JSON-RPC API":
port = Port(9000)
node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port))
asyncTest "debug_api":
asyncTest "Debug API: get node info":
waitFor node.start()
waitFor node.mountRelay()
@ -58,7 +58,7 @@ procSuite "Waku v2 JSON-RPC API":
server.close()
waitfor node.stop()
asyncTest "relay_api":
asyncTest "Relay API: publish and subscribe/unsubscribe":
waitFor node.start()
waitFor node.mountRelay()
@ -106,8 +106,79 @@ procSuite "Waku v2 JSON-RPC API":
server.stop()
server.close()
waitfor node.stop()
asyncTest "Relay API: get latest messages":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, bindIp, Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, bindIp, Port(60002))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = ContentTopic(1)
payload = @[byte 9]
message = WakuMessage(payload: payload, contentTopic: contentTopic)
asyncTest "store_api":
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
# RPC server setup
let
rpcPort = Port(8545)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
# Let's connect to node 3 via the API
installRelayApiHandlers(node3, server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort)
# Now try to subscribe using API
var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(2000.millis)
check:
# Node is now subscribed to pubSubTopic
response == true
# Now publish a message on node1 and see if we receive it on node3
node1.publish(pubSubTopic, message)
await sleepAsync(2000.millis)
var messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 1
messages[0].contentTopic == contentTopic
messages[0].payload == payload
# Ensure that read messages are cleared from cache
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 0
server.stop()
server.close()
await node1.stop()
await node2.stop()
await node3.stop()
asyncTest "Store API: retrieve historical messages":
waitFor node.start()
waitFor node.mountRelay(@[defaultTopic])
@ -168,7 +239,7 @@ procSuite "Waku v2 JSON-RPC API":
server.close()
waitfor node.stop()
asyncTest "filter_api":
asyncTest "Filter API: subscribe/unsubscribe":
waitFor node.start()
waitFor node.mountRelay()

View File

@ -1,3 +1,5 @@
{.push raises: [Exception, Defect].}
import
json_rpc/rpcserver,
eth/[common, rlp, keys, p2p],

View File

@ -5,6 +5,7 @@ proc get_waku_v2_debug_v1_info(): WakuInfo
# Relay API
proc post_waku_v2_relay_v1_message(topic: string, message: WakuRelayMessage): bool
proc get_waku_v2_relay_v1_messages(topic: string): seq[WakuMessage]
proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool

View File

@ -1,12 +1,32 @@
{.push raises: [Exception, Defect].}
import
std/[tables,sequtils],
json_rpc/rpcserver,
libp2p/protocols/pubsub/pubsub,
eth/[common, rlp, keys, p2p],
../../waku_types,
../wakunode2,
./jsonrpc_types, ./jsonrpc_utils
const futTimeout = 5.seconds
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
const futTimeout = 5.seconds
## Create a per-topic message cache
var
topicCache = initTable[string, seq[WakuMessage]]()
proc topicHandler(topic: string, data: seq[byte]) {.async.} =
debug "Topic handler triggered"
let msg = WakuMessage.init(data)
if msg.isOk():
debug "WakuMessage received", msg=msg, topic=topic
# Add message to current cache
# @TODO limit max topics and messages
topicCache.mgetOrPut(topic, @[]).add(msg[])
else:
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
# @TODO handle message decode failure
## Relay API version 1 definitions
@ -18,26 +38,36 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
return true
rpcsrv.rpc("get_waku_v2_relay_v1_messages") do(topic: string) -> seq[WakuMessage]:
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called
## @TODO ability to specify a return message limit
debug "get_waku_v2_relay_v1_messages", topic=topic
if topicCache.hasKey(topic):
let msgs = topicCache[topic]
# Clear cache before next call
topicCache[topic] = @[]
return msgs
else:
# Not subscribed to this topic
raise newException(ValueError, "Not subscribed to topic: " & topic)
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions"
proc topicHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
debug "WakuMessage received", msg=msg, topic=topic
# @TODO handle message
else:
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
# @TODO handle message decode failure
var failedTopics: seq[string]
# Subscribe to all requested topics
for topic in topics:
# If any topic fails to subscribe, add to list of failedTopics
if not(await node.subscribe(topic, topicHandler).withTimeout(futTimeout)):
# If any topic fails to subscribe, add to list of failedTopics
failedTopics.add(topic)
else:
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
if (failedTopics.len() == 0):
# Successfully subscribed to all requested topics
@ -54,9 +84,12 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
# Unsubscribe all handlers from requested topics
for topic in topics:
# If any topic fails to unsubscribe, add to list of failedTopics
if not(await node.unsubscribeAll(topic).withTimeout(futTimeout)):
# If any topic fails to unsubscribe, add to list of failedTopics
failedTopics.add(topic)
else:
# Remove message cache for topic
topicCache.del(topic)
if (failedTopics.len() == 0):
# Successfully unsubscribed from all requested topics

View File

@ -1,3 +1,5 @@
{.push raises: [Exception, Defect].}
import
std/options,
json_rpc/rpcserver,