diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index c52665e5d..380b82860 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -11,7 +11,7 @@ import libp2p/protocols/pubsub/rpc/message, ../../waku/v2/waku_types, ../../waku/v2/node/wakunode2, - ../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api], + ../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api,filter_api], ../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/waku_store/waku_store, ../test_helpers @@ -88,6 +88,13 @@ procSuite "Waku v2 JSON-RPC API": PubSub(node.wakuRelay).topics.len == 1 + newTopics.len response == true + # Publish a message on the default topic + response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(ContentTopic(1)))) + + check: + # @TODO poll topic to verify message has been published + response == true + # Unsubscribe from new topics response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics) @@ -160,3 +167,46 @@ procSuite "Waku v2 JSON-RPC API": server.stop() server.close() waitfor node.stop() + + asyncTest "filter_api": + waitFor node.start() + + waitFor node.mountRelay() + + node.mountFilter() + + # RPC server setup + let + rpcPort = Port(8545) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installFilterApiHandlers(node, server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort) + + check: + # Light node has not yet subscribed to any filters + node.filters.len() == 0 + + let contentFilters = @[ContentFilter(topics: @[ContentTopic(1), ContentTopic(2)]), + ContentFilter(topics: @[ContentTopic(3), ContentTopic(4)])] + var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic)) + + check: + # Light node has successfully subscribed to a single filter + node.filters.len() == 1 + response == true + + response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic)) + + check: + # Light node has successfully unsubscribed from all filters + node.filters.len() == 0 + response == true + + server.stop() + server.close() + waitfor node.stop() diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim new file mode 100644 index 000000000..196aae3a6 --- /dev/null +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -0,0 +1,44 @@ +import + json_rpc/rpcserver, + eth/[common, rlp, keys, p2p], + ../../waku_types, + ../wakunode2 + +proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = + const futTimeout = 5.seconds + + ## Filter API version 1 definitions + + rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: + ## Subscribes a node to a list of content filters + debug "post_waku_v2_filter_v1_subscription" + + proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = + debug "WakuMessage received", msg=msg, topic=topic + # @TODO handle message + + # Construct a filter request + # @TODO use default PubSub topic if undefined + let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true) + + if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)): + # Successfully subscribed to all content filters + return true + else: + # Failed to subscribe to one or more content filters + raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq)) + + rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: + ## Unsubscribes a node from a list of content filters + debug "delete_waku_v2_filter_v1_subscription" + + # Construct a filter request + # @TODO consider using default PubSub topic if undefined + let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false) + + if (await node.unsubscribe(fReq).withTimeout(futTimeout)): + # Successfully unsubscribed from all content filters + return true + else: + # Failed to unsubscribe from one or more content filters + raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq)) \ No newline at end of file diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index 000f8176d..2863cdfad 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -4,9 +4,15 @@ proc get_waku_v2_debug_v1_info(): WakuInfo # Relay API +proc post_waku_v2_relay_v1_message(topic: string, message: WakuRelayMessage): bool proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool # Store API proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse + +# Filter API + +proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool +proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim index f1363ebda..9571a75cd 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_types.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_types.nim @@ -12,3 +12,7 @@ type pageSize*: uint64 cursor*: Option[Index] forward*: bool + + WakuRelayMessage* = object + payload*: seq[byte] + contentTopic*: Option[ContentTopic] diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index d950c2442..745382de3 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -22,3 +22,10 @@ proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions = proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse = StoreResponse(messages: historyResponse.messages, pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions)) + +proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = + # @TODO global definition for default content topic + const defaultCT = 0 + WakuMessage(payload: relayMessage.payload, + contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, + version: version) diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index 5295d8350..f335ae066 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -2,12 +2,21 @@ import json_rpc/rpcserver, eth/[common, rlp, keys, p2p], ../../waku_types, - ../wakunode2 + ../wakunode2, + ./jsonrpc_types, ./jsonrpc_utils proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = const futTimeout = 5.seconds ## Relay API version 1 definitions + + rpcsrv.rpc("post_waku_v2_relay_v1_message") do(topic: string, message: WakuRelayMessage) -> bool: + ## Publishes a WakuMessage to a PubSub topic + debug "post_waku_v2_relay_v1_message" + + node.publish(topic, message.toWakuMessage(version = 0)) + + return true rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool: ## Subscribes a node to a list of PubSub topics