mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 17:03:09 +00:00
Added some basic debug and relay json-rpc calls (#309)
This commit is contained in:
parent
51286232f0
commit
fe23bafbce
@ -11,7 +11,7 @@ import
|
|||||||
libp2p/protocols/pubsub/rpc/message,
|
libp2p/protocols/pubsub/rpc/message,
|
||||||
../../waku/v2/waku_types,
|
../../waku/v2/waku_types,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/wakunode2,
|
||||||
../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api],
|
../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api,filter_api],
|
||||||
../../waku/v2/protocol/message_notifier,
|
../../waku/v2/protocol/message_notifier,
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
@ -88,6 +88,13 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
PubSub(node.wakuRelay).topics.len == 1 + newTopics.len
|
PubSub(node.wakuRelay).topics.len == 1 + newTopics.len
|
||||||
response == true
|
response == true
|
||||||
|
|
||||||
|
# Publish a message on the default topic
|
||||||
|
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(ContentTopic(1))))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# @TODO poll topic to verify message has been published
|
||||||
|
response == true
|
||||||
|
|
||||||
# Unsubscribe from new topics
|
# Unsubscribe from new topics
|
||||||
response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics)
|
response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics)
|
||||||
|
|
||||||
@ -160,3 +167,46 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
waitfor node.stop()
|
waitfor node.stop()
|
||||||
|
|
||||||
|
asyncTest "filter_api":
|
||||||
|
waitFor node.start()
|
||||||
|
|
||||||
|
waitFor node.mountRelay()
|
||||||
|
|
||||||
|
node.mountFilter()
|
||||||
|
|
||||||
|
# RPC server setup
|
||||||
|
let
|
||||||
|
rpcPort = Port(8545)
|
||||||
|
ta = initTAddress(bindIp, rpcPort)
|
||||||
|
server = newRpcHttpServer([ta])
|
||||||
|
|
||||||
|
installFilterApiHandlers(node, server)
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
let client = newRpcHttpClient()
|
||||||
|
await client.connect("127.0.0.1", rpcPort)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Light node has not yet subscribed to any filters
|
||||||
|
node.filters.len() == 0
|
||||||
|
|
||||||
|
let contentFilters = @[ContentFilter(topics: @[ContentTopic(1), ContentTopic(2)]),
|
||||||
|
ContentFilter(topics: @[ContentTopic(3), ContentTopic(4)])]
|
||||||
|
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Light node has successfully subscribed to a single filter
|
||||||
|
node.filters.len() == 1
|
||||||
|
response == true
|
||||||
|
|
||||||
|
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Light node has successfully unsubscribed from all filters
|
||||||
|
node.filters.len() == 0
|
||||||
|
response == true
|
||||||
|
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
waitfor node.stop()
|
||||||
|
|||||||
44
waku/v2/node/jsonrpc/filter_api.nim
Normal file
44
waku/v2/node/jsonrpc/filter_api.nim
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
import
|
||||||
|
json_rpc/rpcserver,
|
||||||
|
eth/[common, rlp, keys, p2p],
|
||||||
|
../../waku_types,
|
||||||
|
../wakunode2
|
||||||
|
|
||||||
|
proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||||
|
const futTimeout = 5.seconds
|
||||||
|
|
||||||
|
## Filter API version 1 definitions
|
||||||
|
|
||||||
|
rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
|
||||||
|
## Subscribes a node to a list of content filters
|
||||||
|
debug "post_waku_v2_filter_v1_subscription"
|
||||||
|
|
||||||
|
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
|
||||||
|
debug "WakuMessage received", msg=msg, topic=topic
|
||||||
|
# @TODO handle message
|
||||||
|
|
||||||
|
# Construct a filter request
|
||||||
|
# @TODO use default PubSub topic if undefined
|
||||||
|
let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true)
|
||||||
|
|
||||||
|
if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)):
|
||||||
|
# Successfully subscribed to all content filters
|
||||||
|
return true
|
||||||
|
else:
|
||||||
|
# Failed to subscribe to one or more content filters
|
||||||
|
raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq))
|
||||||
|
|
||||||
|
rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
|
||||||
|
## Unsubscribes a node from a list of content filters
|
||||||
|
debug "delete_waku_v2_filter_v1_subscription"
|
||||||
|
|
||||||
|
# Construct a filter request
|
||||||
|
# @TODO consider using default PubSub topic if undefined
|
||||||
|
let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false)
|
||||||
|
|
||||||
|
if (await node.unsubscribe(fReq).withTimeout(futTimeout)):
|
||||||
|
# Successfully unsubscribed from all content filters
|
||||||
|
return true
|
||||||
|
else:
|
||||||
|
# Failed to unsubscribe from one or more content filters
|
||||||
|
raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq))
|
||||||
@ -4,9 +4,15 @@ proc get_waku_v2_debug_v1_info(): WakuInfo
|
|||||||
|
|
||||||
# Relay API
|
# Relay API
|
||||||
|
|
||||||
|
proc post_waku_v2_relay_v1_message(topic: string, message: WakuRelayMessage): bool
|
||||||
proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||||
proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||||
|
|
||||||
# Store API
|
# Store API
|
||||||
|
|
||||||
proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse
|
proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse
|
||||||
|
|
||||||
|
# Filter API
|
||||||
|
|
||||||
|
proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool
|
||||||
|
proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool
|
||||||
|
|||||||
@ -12,3 +12,7 @@ type
|
|||||||
pageSize*: uint64
|
pageSize*: uint64
|
||||||
cursor*: Option[Index]
|
cursor*: Option[Index]
|
||||||
forward*: bool
|
forward*: bool
|
||||||
|
|
||||||
|
WakuRelayMessage* = object
|
||||||
|
payload*: seq[byte]
|
||||||
|
contentTopic*: Option[ContentTopic]
|
||||||
|
|||||||
@ -22,3 +22,10 @@ proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions =
|
|||||||
proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
|
proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
|
||||||
StoreResponse(messages: historyResponse.messages,
|
StoreResponse(messages: historyResponse.messages,
|
||||||
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
|
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))
|
||||||
|
|
||||||
|
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
|
||||||
|
# @TODO global definition for default content topic
|
||||||
|
const defaultCT = 0
|
||||||
|
WakuMessage(payload: relayMessage.payload,
|
||||||
|
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
|
||||||
|
version: version)
|
||||||
|
|||||||
@ -2,12 +2,21 @@ import
|
|||||||
json_rpc/rpcserver,
|
json_rpc/rpcserver,
|
||||||
eth/[common, rlp, keys, p2p],
|
eth/[common, rlp, keys, p2p],
|
||||||
../../waku_types,
|
../../waku_types,
|
||||||
../wakunode2
|
../wakunode2,
|
||||||
|
./jsonrpc_types, ./jsonrpc_utils
|
||||||
|
|
||||||
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||||
const futTimeout = 5.seconds
|
const futTimeout = 5.seconds
|
||||||
|
|
||||||
## Relay API version 1 definitions
|
## Relay API version 1 definitions
|
||||||
|
|
||||||
|
rpcsrv.rpc("post_waku_v2_relay_v1_message") do(topic: string, message: WakuRelayMessage) -> bool:
|
||||||
|
## Publishes a WakuMessage to a PubSub topic
|
||||||
|
debug "post_waku_v2_relay_v1_message"
|
||||||
|
|
||||||
|
node.publish(topic, message.toWakuMessage(version = 0))
|
||||||
|
|
||||||
|
return true
|
||||||
|
|
||||||
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
|
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
|
||||||
## Subscribes a node to a list of PubSub topics
|
## Subscribes a node to a list of PubSub topics
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user