mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-26 05:45:40 +00:00
Added some basic debug and relay json-rpc calls (#309)
This commit is contained in:
parent
08f1c62924
commit
51b35c59c6
@ -11,7 +11,7 @@ import
|
||||
libp2p/protocols/pubsub/rpc/message,
|
||||
../../waku/v2/waku_types,
|
||||
../../waku/v2/node/wakunode2,
|
||||
../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api],
|
||||
../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api,filter_api],
|
||||
../../waku/v2/protocol/message_notifier,
|
||||
../../waku/v2/protocol/waku_store/waku_store,
|
||||
../test_helpers
|
||||
@ -88,6 +88,13 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
PubSub(node.wakuRelay).topics.len == 1 + newTopics.len
|
||||
response == true
|
||||
|
||||
# Publish a message on the default topic
|
||||
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(ContentTopic(1))))
|
||||
|
||||
check:
|
||||
# @TODO poll topic to verify message has been published
|
||||
response == true
|
||||
|
||||
# Unsubscribe from new topics
|
||||
response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics)
|
||||
|
||||
@ -160,3 +167,46 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
server.stop()
|
||||
server.close()
|
||||
waitfor node.stop()
|
||||
|
||||
asyncTest "filter_api":
|
||||
waitFor node.start()
|
||||
|
||||
waitFor node.mountRelay()
|
||||
|
||||
node.mountFilter()
|
||||
|
||||
# RPC server setup
|
||||
let
|
||||
rpcPort = Port(8545)
|
||||
ta = initTAddress(bindIp, rpcPort)
|
||||
server = newRpcHttpServer([ta])
|
||||
|
||||
installFilterApiHandlers(node, server)
|
||||
server.start()
|
||||
|
||||
let client = newRpcHttpClient()
|
||||
await client.connect("127.0.0.1", rpcPort)
|
||||
|
||||
check:
|
||||
# Light node has not yet subscribed to any filters
|
||||
node.filters.len() == 0
|
||||
|
||||
let contentFilters = @[ContentFilter(topics: @[ContentTopic(1), ContentTopic(2)]),
|
||||
ContentFilter(topics: @[ContentTopic(3), ContentTopic(4)])]
|
||||
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
||||
|
||||
check:
|
||||
# Light node has successfully subscribed to a single filter
|
||||
node.filters.len() == 1
|
||||
response == true
|
||||
|
||||
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
||||
|
||||
check:
|
||||
# Light node has successfully unsubscribed from all filters
|
||||
node.filters.len() == 0
|
||||
response == true
|
||||
|
||||
server.stop()
|
||||
server.close()
|
||||
waitfor node.stop()
|
||||
|
44
waku/v2/node/jsonrpc/filter_api.nim
Normal file
44
waku/v2/node/jsonrpc/filter_api.nim
Normal file
@ -0,0 +1,44 @@
|
||||
import
|
||||
json_rpc/rpcserver,
|
||||
eth/[common, rlp, keys, p2p],
|
||||
../../waku_types,
|
||||
../wakunode2
|
||||
|
||||
proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
const futTimeout = 5.seconds
|
||||
|
||||
## Filter API version 1 definitions
|
||||
|
||||
rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
|
||||
## Subscribes a node to a list of content filters
|
||||
debug "post_waku_v2_filter_v1_subscription"
|
||||
|
||||
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
|
||||
debug "WakuMessage received", msg=msg, topic=topic
|
||||
# @TODO handle message
|
||||
|
||||
# Construct a filter request
|
||||
# @TODO use default PubSub topic if undefined
|
||||
let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true)
|
||||
|
||||
if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)):
|
||||
# Successfully subscribed to all content filters
|
||||
return true
|
||||
else:
|
||||
# Failed to subscribe to one or more content filters
|
||||
raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq))
|
||||
|
||||
rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
|
||||
## Unsubscribes a node from a list of content filters
|
||||
debug "delete_waku_v2_filter_v1_subscription"
|
||||
|
||||
# Construct a filter request
|
||||
# @TODO consider using default PubSub topic if undefined
|
||||
let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false)
|
||||
|
||||
if (await node.unsubscribe(fReq).withTimeout(futTimeout)):
|
||||
# Successfully unsubscribed from all content filters
|
||||
return true
|
||||
else:
|
||||
# Failed to unsubscribe from one or more content filters
|
||||
raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq))
|
@ -4,9 +4,15 @@ proc get_waku_v2_debug_v1_info(): WakuInfo
|
||||
|
||||
# Relay API
|
||||
|
||||
proc post_waku_v2_relay_v1_message(topic: string, message: WakuRelayMessage): bool
|
||||
proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||
proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||
|
||||
# Store API
|
||||
|
||||
proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse
|
||||
|
||||
# Filter API
|
||||
|
||||
proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool
|
||||
proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool
|
||||
|
@ -12,3 +12,7 @@ type
|
||||
pageSize*: uint64
|
||||
cursor*: Option[Index]
|
||||
forward*: bool
|
||||
|
||||
WakuRelayMessage* = object
|
||||
payload*: seq[byte]
|
||||
contentTopic*: Option[ContentTopic]
|
||||
|
@ -22,3 +22,10 @@ proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions =
|
||||
proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
|
||||
StoreResponse(messages: historyResponse.messages,
|
||||
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
|
||||
|
||||
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
|
||||
# @TODO global definition for default content topic
|
||||
const defaultCT = 0
|
||||
WakuMessage(payload: relayMessage.payload,
|
||||
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
|
||||
version: version)
|
||||
|
@ -2,12 +2,21 @@ import
|
||||
json_rpc/rpcserver,
|
||||
eth/[common, rlp, keys, p2p],
|
||||
../../waku_types,
|
||||
../wakunode2
|
||||
../wakunode2,
|
||||
./jsonrpc_types, ./jsonrpc_utils
|
||||
|
||||
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
const futTimeout = 5.seconds
|
||||
|
||||
## Relay API version 1 definitions
|
||||
|
||||
rpcsrv.rpc("post_waku_v2_relay_v1_message") do(topic: string, message: WakuRelayMessage) -> bool:
|
||||
## Publishes a WakuMessage to a PubSub topic
|
||||
debug "post_waku_v2_relay_v1_message"
|
||||
|
||||
node.publish(topic, message.toWakuMessage(version = 0))
|
||||
|
||||
return true
|
||||
|
||||
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
|
||||
## Subscribes a node to a list of PubSub topics
|
||||
|
Loading…
x
Reference in New Issue
Block a user