diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 13e5d95e7..76c2b5c51 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -1,5 +1,5 @@ import - std/[unittest, options, sets, tables, os, strutils], + std/[unittest, options, sets, tables, os, strutils, sequtils], stew/shims/net as stewNet, json_rpc/[rpcserver, rpcclient], libp2p/standard_setup, @@ -13,6 +13,7 @@ import ../../waku/v2/node/wakunode2, ../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api,filter_api], ../../waku/v2/protocol/message_notifier, + ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_store/waku_store, ../test_helpers @@ -281,3 +282,64 @@ procSuite "Waku v2 JSON-RPC API": server.stop() server.close() waitfor node.stop() + + asyncTest "Filter API: get latest messages": + const cTopic = ContentTopic(1) + + waitFor node.start() + + # RPC server setup + let + rpcPort = Port(8545) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installFilterApiHandlers(node, server) + server.start() + + node.mountFilter() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort) + + # First ensure subscription exists + + let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[cTopic])], topic = some(defaultTopic)) + check: + sub + + # Now prime the node with some messages before tests + var + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)), + WakuMessage(payload: @[byte 1], contentTopic: cTopic), + WakuMessage(payload: @[byte 2], contentTopic: cTopic), + WakuMessage(payload: @[byte 3], contentTopic: cTopic), + WakuMessage(payload: @[byte 4], contentTopic: cTopic), + WakuMessage(payload: @[byte 5], contentTopic: cTopic), + WakuMessage(payload: @[byte 6], contentTopic: cTopic), + WakuMessage(payload: @[byte 7], contentTopic: cTopic), + WakuMessage(payload: @[byte 8], contentTopic: cTopic), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))] + + let + filters = node.filters + requestId = toSeq(Table(filters).keys)[0] + + for wakuMsg in msgList: + filters.notify(wakuMsg, requestId) + + var response = await client.get_waku_v2_filter_v1_messages(cTopic) + check: + response.len() == 8 + response.allIt(it.contentTopic == cTopic) + + # No new messages + + response = await client.get_waku_v2_filter_v1_messages(cTopic) + + check: + response.len() == 0 + + server.stop() + server.close() + waitfor node.stop() \ No newline at end of file diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 6cb6bba05..90ba0db2b 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -1,30 +1,58 @@ {.push raises: [Exception, Defect].} import + std/[tables,sequtils], json_rpc/rpcserver, eth/[common, rlp, keys, p2p], ../../waku_types, ../wakunode2 +const futTimeout = 5.seconds + proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = - const futTimeout = 5.seconds + ## Create a message cache indexed on content topic + ## @TODO consider moving message cache elsewhere. Perhaps to node? + var + messageCache = initTable[ContentTopic, seq[WakuMessage]]() + + proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = + debug "WakuMessage received", msg=msg + # Add message to current cache + # @TODO limit max content topics and messages + messageCache.mgetOrPut(msg.contentTopic, @[]).add(msg) ## Filter API version 1 definitions + rpcsrv.rpc("get_waku_v2_filter_v1_messages") do(contentTopic: ContentTopic) -> seq[WakuMessage]: + ## Returns all WakuMessages received on a content topic since the + ## last time this method was called + ## @TODO ability to specify a return message limit + debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic + + if messageCache.hasKey(contentTopic): + let msgs = messageCache[contentTopic] + # Clear cache before next call + messageCache[contentTopic] = @[] + return msgs + else: + # Not subscribed to this content topic + raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic) + rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: ## Subscribes a node to a list of content filters debug "post_waku_v2_filter_v1_subscription" - proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = - debug "WakuMessage received", msg=msg, topic=topic - # @TODO handle message - # Construct a filter request # @TODO use default PubSub topic if undefined let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true) if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)): # Successfully subscribed to all content filters + + for cTopic in concat(contentFilters.mapIt(it.topics)): + # Create message cache for each subscribed content topic + messageCache[cTopic] = @[] + return true else: # Failed to subscribe to one or more content filters @@ -40,6 +68,11 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = if (await node.unsubscribe(fReq).withTimeout(futTimeout)): # Successfully unsubscribed from all content filters + + for cTopic in concat(contentFilters.mapIt(it.topics)): + # Remove message cache for each unsubscribed content topic + messageCache.del(cTopic) + return true else: # Failed to unsubscribe from one or more content filters diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index 882745efd..35dfff228 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -15,5 +15,6 @@ proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Opt # Filter API +proc get_waku_v2_filter_v1_messages(contentTopic: ContentTopic): seq[WakuMessage] proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool