mirror of https://github.com/waku-org/nwaku.git
Added some basic debug and relay json-rpc calls (#305)
This commit is contained in:
parent
35f9e52d49
commit
dc3b3b87be
|
@ -7,10 +7,11 @@ import
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/[bufferstream, connection],
|
libp2p/stream/[bufferstream, connection],
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/protocols/pubsub/pubsub,
|
||||||
libp2p/protocols/pubsub/rpc/message,
|
libp2p/protocols/pubsub/rpc/message,
|
||||||
../../waku/v2/waku_types,
|
../../waku/v2/waku_types,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/wakunode2,
|
||||||
../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api],
|
../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api],
|
||||||
../../waku/v2/protocol/message_notifier,
|
../../waku/v2/protocol/message_notifier,
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
@ -19,13 +20,10 @@ template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||||
const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim"
|
const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim"
|
||||||
createRpcSigs(RpcHttpClient, sigPath)
|
createRpcSigs(RpcHttpClient, sigPath)
|
||||||
|
|
||||||
suite "Waku v2 JSON-RPC API":
|
procSuite "Waku v2 JSON-RPC API":
|
||||||
|
|
||||||
asyncTest "get_waku_v2_store_v1_messages":
|
|
||||||
const defaultTopic = "/waku/2/default-waku/proto"
|
const defaultTopic = "/waku/2/default-waku/proto"
|
||||||
const testCodec = "/waku/2/default-waku/codec"
|
const testCodec = "/waku/2/default-waku/codec"
|
||||||
|
|
||||||
# WakuNode setup
|
|
||||||
let
|
let
|
||||||
rng = crypto.newRng()
|
rng = crypto.newRng()
|
||||||
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
|
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
|
||||||
|
@ -34,6 +32,75 @@ suite "Waku v2 JSON-RPC API":
|
||||||
port = Port(9000)
|
port = Port(9000)
|
||||||
node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port))
|
node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port))
|
||||||
|
|
||||||
|
asyncTest "debug_api":
|
||||||
|
waitFor node.start()
|
||||||
|
|
||||||
|
waitFor node.mountRelay()
|
||||||
|
|
||||||
|
# RPC server setup
|
||||||
|
let
|
||||||
|
rpcPort = Port(8545)
|
||||||
|
ta = initTAddress(bindIp, rpcPort)
|
||||||
|
server = newRpcHttpServer([ta])
|
||||||
|
|
||||||
|
installDebugApiHandlers(node, server)
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
let client = newRpcHttpClient()
|
||||||
|
await client.connect("127.0.0.1", rpcPort)
|
||||||
|
|
||||||
|
let response = await client.get_waku_v2_debug_v1_info()
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.listenStr == $node.peerInfo.addrs[0] & "/p2p/" & $node.peerInfo.peerId
|
||||||
|
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
waitfor node.stop()
|
||||||
|
|
||||||
|
asyncTest "relay_api":
|
||||||
|
waitFor node.start()
|
||||||
|
|
||||||
|
waitFor node.mountRelay()
|
||||||
|
|
||||||
|
# RPC server setup
|
||||||
|
let
|
||||||
|
rpcPort = Port(8545)
|
||||||
|
ta = initTAddress(bindIp, rpcPort)
|
||||||
|
server = newRpcHttpServer([ta])
|
||||||
|
|
||||||
|
installRelayApiHandlers(node, server)
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
let client = newRpcHttpClient()
|
||||||
|
await client.connect("127.0.0.1", rpcPort)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# At this stage the node is only subscribed to the default topic
|
||||||
|
PubSub(node.wakuRelay).topics.len == 1
|
||||||
|
|
||||||
|
# Subscribe to new topics
|
||||||
|
let newTopics = @["1","2","3"]
|
||||||
|
var response = await client.post_waku_v2_relay_v1_subscriptions(newTopics)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Node is now subscribed to default + new topics
|
||||||
|
PubSub(node.wakuRelay).topics.len == 1 + newTopics.len
|
||||||
|
response == true
|
||||||
|
|
||||||
|
# Unsubscribe from new topics
|
||||||
|
response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Node is now unsubscribed from new topics
|
||||||
|
PubSub(node.wakuRelay).topics.len == 1
|
||||||
|
response == true
|
||||||
|
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
waitfor node.stop()
|
||||||
|
|
||||||
|
asyncTest "store_api":
|
||||||
waitFor node.start()
|
waitFor node.start()
|
||||||
|
|
||||||
waitFor node.mountRelay(@[defaultTopic])
|
waitFor node.mountRelay(@[defaultTopic])
|
||||||
|
@ -44,7 +111,7 @@ suite "Waku v2 JSON-RPC API":
|
||||||
ta = initTAddress(bindIp, rpcPort)
|
ta = initTAddress(bindIp, rpcPort)
|
||||||
server = newRpcHttpServer([ta])
|
server = newRpcHttpServer([ta])
|
||||||
|
|
||||||
setupWakuJSONRPC(node, server)
|
installStoreApiHandlers(node, server)
|
||||||
server.start()
|
server.start()
|
||||||
|
|
||||||
# WakuStore setup
|
# WakuStore setup
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
import
|
||||||
|
json_rpc/rpcserver,
|
||||||
|
../../waku_types,
|
||||||
|
../wakunode2
|
||||||
|
|
||||||
|
proc installDebugApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||||
|
|
||||||
|
## Debug API version 1 definitions
|
||||||
|
|
||||||
|
rpcsrv.rpc("get_waku_v2_debug_v1_info") do() -> WakuInfo:
|
||||||
|
## Returns information about WakuNode
|
||||||
|
debug "get_waku_v2_debug_v1_info"
|
||||||
|
|
||||||
|
return node.info()
|
|
@ -1 +1,12 @@
|
||||||
|
# Debug API
|
||||||
|
|
||||||
|
proc get_waku_v2_debug_v1_info(): WakuInfo
|
||||||
|
|
||||||
|
# Relay API
|
||||||
|
|
||||||
|
proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||||
|
proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||||
|
|
||||||
|
# Store API
|
||||||
|
|
||||||
proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse
|
proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
import
|
||||||
|
json_rpc/rpcserver,
|
||||||
|
eth/[common, rlp, keys, p2p],
|
||||||
|
../../waku_types,
|
||||||
|
../wakunode2
|
||||||
|
|
||||||
|
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||||
|
const futTimeout = 5.seconds
|
||||||
|
|
||||||
|
## Relay API version 1 definitions
|
||||||
|
|
||||||
|
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
|
||||||
|
## Subscribes a node to a list of PubSub topics
|
||||||
|
debug "post_waku_v2_relay_v1_subscriptions"
|
||||||
|
|
||||||
|
proc topicHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
let msg = WakuMessage.init(data)
|
||||||
|
if msg.isOk():
|
||||||
|
debug "WakuMessage received", msg=msg, topic=topic
|
||||||
|
# @TODO handle message
|
||||||
|
else:
|
||||||
|
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
|
||||||
|
# @TODO handle message decode failure
|
||||||
|
|
||||||
|
var failedTopics: seq[string]
|
||||||
|
|
||||||
|
# Subscribe to all requested topics
|
||||||
|
for topic in topics:
|
||||||
|
# If any topic fails to subscribe, add to list of failedTopics
|
||||||
|
if not(await node.subscribe(topic, topicHandler).withTimeout(futTimeout)):
|
||||||
|
failedTopics.add(topic)
|
||||||
|
|
||||||
|
if (failedTopics.len() == 0):
|
||||||
|
# Successfully subscribed to all requested topics
|
||||||
|
return true
|
||||||
|
else:
|
||||||
|
# Failed to subscribe to one or more topics
|
||||||
|
raise newException(ValueError, "Failed to subscribe to topics " & repr(failedTopics))
|
||||||
|
|
||||||
|
rpcsrv.rpc("delete_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
|
||||||
|
## Unsubscribes a node from a list of PubSub topics
|
||||||
|
debug "delete_waku_v2_relay_v1_subscriptions"
|
||||||
|
|
||||||
|
var failedTopics: seq[string]
|
||||||
|
|
||||||
|
# Unsubscribe all handlers from requested topics
|
||||||
|
for topic in topics:
|
||||||
|
# If any topic fails to unsubscribe, add to list of failedTopics
|
||||||
|
if not(await node.unsubscribeAll(topic).withTimeout(futTimeout)):
|
||||||
|
failedTopics.add(topic)
|
||||||
|
|
||||||
|
if (failedTopics.len() == 0):
|
||||||
|
# Successfully unsubscribed from all requested topics
|
||||||
|
return true
|
||||||
|
else:
|
||||||
|
# Failed to unsubscribe from one or more topics
|
||||||
|
raise newException(ValueError, "Failed to unsubscribe from topics " & repr(failedTopics))
|
||||||
|
|
|
@ -6,7 +6,7 @@ import
|
||||||
../wakunode2,
|
../wakunode2,
|
||||||
./jsonrpc_types, ./jsonrpc_utils
|
./jsonrpc_types, ./jsonrpc_utils
|
||||||
|
|
||||||
proc setupWakuJSONRPC*(node: WakuNode, rpcsrv: RpcServer) =
|
proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||||
const futTimeout = 5.seconds
|
const futTimeout = 5.seconds
|
||||||
|
|
||||||
## Store API version 1 definitions
|
## Store API version 1 definitions
|
||||||
|
|
Loading…
Reference in New Issue