From 2f390ce8848d4e2761520fea3bccea0fdf5b5f85 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Fri, 10 Feb 2023 10:43:16 +0100 Subject: [PATCH] refactor(jsonrpc): deep code and tests reorganization --- apps/chat2bridge/chat2bridge.nim | 54 +- apps/wakubridge/wakubridge.nim | 13 +- apps/wakunode2/wakunode2_setup_rpc.nim | 40 +- tests/all_tests_v2.nim | 9 +- tests/v2/test_jsonrpc_waku.nim | 699 ------------------ .../wakunode_jsonrpc/test_jsonrpc_admin.nim | 196 +++++ .../wakunode_jsonrpc/test_jsonrpc_debug.nim | 53 ++ .../wakunode_jsonrpc/test_jsonrpc_filter.nim | 84 +++ .../wakunode_jsonrpc/test_jsonrpc_relay.nim | 359 +++++++++ .../wakunode_jsonrpc/test_jsonrpc_store.nim | 102 +++ waku/v2/node/jsonrpc/admin/callsigs.nim | 4 + waku/v2/node/jsonrpc/admin/client.nim | 14 + waku/v2/node/jsonrpc/admin/handlers.nim | 91 +++ waku/v2/node/jsonrpc/admin/types.nim | 10 + waku/v2/node/jsonrpc/admin_api.nim | 111 --- waku/v2/node/jsonrpc/debug/callsigs.nim | 3 + waku/v2/node/jsonrpc/debug/client.nim | 14 + .../{debug_api.nim => debug/handlers.nim} | 11 +- waku/v2/node/jsonrpc/filter/callsigs.nim | 5 + waku/v2/node/jsonrpc/filter/client.nim | 15 + waku/v2/node/jsonrpc/filter/handlers.nim | 89 +++ waku/v2/node/jsonrpc/filter_api.nim | 104 --- waku/v2/node/jsonrpc/hexstrings.nim | 8 +- waku/v2/node/jsonrpc/jsonrpc_types.nim | 41 - waku/v2/node/jsonrpc/jsonrpc_utils.nim | 99 --- waku/v2/node/jsonrpc/marshalling.nim | 22 + waku/v2/node/jsonrpc/private_api.nim | 115 --- .../callsigs.nim} | 22 +- waku/v2/node/jsonrpc/relay/client.nim | 20 + waku/v2/node/jsonrpc/relay/handlers.nim | 225 ++++++ waku/v2/node/jsonrpc/relay/types.nim | 22 + waku/v2/node/jsonrpc/relay_api.nim | 114 --- waku/v2/node/jsonrpc/store/callsigs.nim | 4 + waku/v2/node/jsonrpc/store/client.nim | 19 + waku/v2/node/jsonrpc/store/handlers.nim | 85 +++ waku/v2/node/jsonrpc/store/types.nim | 22 + waku/v2/node/jsonrpc/store_api.nim | 61 -- 37 files changed, 1533 insertions(+), 1426 deletions(-) delete mode 100644 tests/v2/test_jsonrpc_waku.nim create mode 100644 tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim create mode 100644 tests/v2/wakunode_jsonrpc/test_jsonrpc_debug.nim create mode 100644 tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim create mode 100644 tests/v2/wakunode_jsonrpc/test_jsonrpc_relay.nim create mode 100644 tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim create mode 100644 waku/v2/node/jsonrpc/admin/callsigs.nim create mode 100644 waku/v2/node/jsonrpc/admin/client.nim create mode 100644 waku/v2/node/jsonrpc/admin/handlers.nim create mode 100644 waku/v2/node/jsonrpc/admin/types.nim delete mode 100644 waku/v2/node/jsonrpc/admin_api.nim create mode 100644 waku/v2/node/jsonrpc/debug/callsigs.nim create mode 100644 waku/v2/node/jsonrpc/debug/client.nim rename waku/v2/node/jsonrpc/{debug_api.nim => debug/handlers.nim} (62%) create mode 100644 waku/v2/node/jsonrpc/filter/callsigs.nim create mode 100644 waku/v2/node/jsonrpc/filter/client.nim create mode 100644 waku/v2/node/jsonrpc/filter/handlers.nim delete mode 100644 waku/v2/node/jsonrpc/filter_api.nim delete mode 100644 waku/v2/node/jsonrpc/jsonrpc_types.nim delete mode 100644 waku/v2/node/jsonrpc/jsonrpc_utils.nim create mode 100644 waku/v2/node/jsonrpc/marshalling.nim delete mode 100644 waku/v2/node/jsonrpc/private_api.nim rename waku/v2/node/jsonrpc/{jsonrpc_callsigs.nim => relay/callsigs.nim} (50%) create mode 100644 waku/v2/node/jsonrpc/relay/client.nim create mode 100644 waku/v2/node/jsonrpc/relay/handlers.nim create mode 100644 waku/v2/node/jsonrpc/relay/types.nim delete mode 100644 waku/v2/node/jsonrpc/relay_api.nim create mode 100644 waku/v2/node/jsonrpc/store/callsigs.nim create mode 100644 waku/v2/node/jsonrpc/store/client.nim create mode 100644 waku/v2/node/jsonrpc/store/handlers.nim create mode 100644 waku/v2/node/jsonrpc/store/types.nim delete mode 100644 waku/v2/node/jsonrpc/store_api.nim diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 8cabd16e3..2dfa2bdf9 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -46,7 +46,7 @@ type pollPeriod: chronos.Duration seen: seq[Hash] #FIFO queue contentTopic: string - + MbMessageHandler* = proc (jsonNode: JsonNode) {.gcsafe.} ################### @@ -55,12 +55,12 @@ type proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool = if sequence.contains(hash): - return true + return true if sequence.len >= DeduplQSize: trace "Deduplication queue full. Removing oldest item." sequence.delete 0, 0 # Remove first item in queue - + sequence.add(hash) return false @@ -88,7 +88,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} = return trace "Post Matterbridge message to chat2" - + chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"]) await cmb.nodev2.publish(DefaultPubsubTopic, msg) @@ -114,7 +114,7 @@ proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: let postRes = cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload), username = chat2Msg[].nick) - + if postRes.isErr() or (postRes[] == false): chat2_mb_dropped.inc(labelValues = ["duplicate"]) error "Matterbridge host unreachable. Dropping message." @@ -146,10 +146,10 @@ proc new*(T: type Chat2MatterBridge, contentTopic: string): T {.raises: [Defect, ValueError, KeyError, TLSStreamProtocolError, IOError, LPError].} = - # Setup Matterbridge + # Setup Matterbridge let mbClient = MatterbridgeClient.new(mbHostUri, mbGateway) - + # Let's verify the Matterbridge configuration before continuing let clientHealth = mbClient.isHealthy() @@ -163,7 +163,7 @@ proc new*(T: type Chat2MatterBridge, nodev2 = WakuNode.new(nodev2Key, nodev2BindIp, nodev2BindPort, nodev2ExtIp, nodev2ExtPort) - + return Chat2MatterBridge(mbClient: mbClient, nodev2: nodev2, running: false, @@ -176,18 +176,18 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = cmb.running = true debug "Start polling Matterbridge" - + # Start Matterbridge polling (@TODO: use streaming interface) proc mbHandler(jsonNode: JsonNode) {.gcsafe, raises: [Exception].} = trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode waitFor cmb.toChat2(jsonNode) - + asyncSpawn cmb.pollMatterbridge(mbHandler) - + # Start Waku v2 node debug "Start listening on Waku v2" await cmb.nodev2.start() - + # Always mount relay for bridge # `triggerSelf` is false on a `bridge` to avoid duplicates await cmb.nodev2.mountRelay(triggerSelf = false) @@ -199,12 +199,12 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = if msg.isOk(): trace "Bridging message from Chat2 to Matterbridge", msg=msg[] cmb.toMatterbridge(msg[]) - + cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler) proc stop*(cmb: Chat2MatterBridge) {.async.} = info "Stopping Chat2MatterBridge" - + cmb.running = false await cmb.nodev2.stop() @@ -213,32 +213,34 @@ proc stop*(cmb: Chat2MatterBridge) {.async.} = when isMainModule: import ../../../waku/common/utils/nat, - ../../../waku/v2/node/jsonrpc/[debug_api, - filter_api, - relay_api, - store_api] + ../../waku/v2/node/message_cache, + ../../waku/v2/node/jsonrpc/debug/handlers as debug_api, + ../../waku/v2/node/jsonrpc/filter/handlers as filter_api, + ../../waku/v2/node/jsonrpc/relay/handlers as relay_api, + ../../waku/v2/node/jsonrpc/store/handlers as store_api + proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) {.raises: [Exception].} = installDebugApiHandlers(node, rpcServer) # Install enabled API handlers: if conf.relay: - let topicCache = newTable[string, seq[WakuMessage]]() + let topicCache = relay_api.MessageCache.init(capacity=30) installRelayApiHandlers(node, rpcServer, topicCache) - + if conf.filter: - let messageCache = newTable[ContentTopic, seq[WakuMessage]]() + let messageCache = filter_api.MessageCache.init(capacity=30) installFilterApiHandlers(node, rpcServer, messageCache) - + if conf.store: installStoreApiHandlers(node, rpcServer) - + rpcServer.start() - + let rng = newRng() conf = Chat2MatterbridgeConf.load() - + if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) @@ -262,7 +264,7 @@ when isMainModule: nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = extPort, contentTopic = conf.contentTopic) - + waitFor bridge.start() # Now load rest of config diff --git a/apps/wakubridge/wakubridge.nim b/apps/wakubridge/wakubridge.nim index 7ffa20015..5fcebb937 100644 --- a/apps/wakubridge/wakubridge.nim +++ b/apps/wakubridge/wakubridge.nim @@ -27,12 +27,13 @@ import ../../waku/v2/utils/namespacing, ../../waku/v2/utils/time, ../../waku/v2/protocol/waku_message, + ../../waku/v2/node/message_cache, ../../waku/v2/node/waku_node, ../../waku/v2/node/peer_manager, - ../../waku/v2/node/jsonrpc/[debug_api, - filter_api, - relay_api, - store_api], + ../../waku/v2/node/jsonrpc/debug/handlers as debug_api, + ../../waku/v2/node/jsonrpc/filter/handlers as filter_api, + ../../waku/v2/node/jsonrpc/relay/handlers as relay_api, + ../../waku/v2/node/jsonrpc/store/handlers as store_api, ./message_compat, ./config @@ -298,11 +299,11 @@ proc setupV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuBridgeConf) # Install enabled API handlers: if conf.relay: - let topicCache = newTable[PubsubTopic, seq[WakuMessage]]() + let topicCache = relay_api.MessageCache.init(capacity=30) installRelayApiHandlers(node, rpcServer, topicCache) if conf.filternode != "": - let messageCache = newTable[ContentTopic, seq[WakuMessage]]() + let messageCache = filter_api.MessageCache.init(capacity=30) installFilterApiHandlers(node, rpcServer, messageCache) if conf.storenode != "": diff --git a/apps/wakunode2/wakunode2_setup_rpc.nim b/apps/wakunode2/wakunode2_setup_rpc.nim index 99ed2e14b..1749fbb7d 100644 --- a/apps/wakunode2/wakunode2_setup_rpc.nim +++ b/apps/wakunode2/wakunode2_setup_rpc.nim @@ -10,50 +10,46 @@ import json_rpc/rpcserver import ../../waku/v2/protocol/waku_message, + ../../waku/v2/node/message_cache, ../../waku/v2/node/waku_node, - ../../waku/v2/node/jsonrpc/[admin_api, - debug_api, - filter_api, - relay_api, - store_api, - private_api, - debug_api], + ../../waku/v2/node/jsonrpc/admin/handlers as admin_api, + ../../waku/v2/node/jsonrpc/debug/handlers as debug_api, + ../../waku/v2/node/jsonrpc/filter/handlers as filter_api, + ../../waku/v2/node/jsonrpc/relay/handlers as relay_api, + ../../waku/v2/node/jsonrpc/store/handlers as store_api, ./config logScope: topics = "wakunode jsonrpc" -proc startRpcServer*(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) +proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) {.raises: [CatchableError].} = let - ta = initTAddress(rpcIp, rpcPort) - rpcServer = newRpcHttpServer([ta]) + ta = initTAddress(address, port) + server = newRpcHttpServer([ta]) - installDebugApiHandlers(node, rpcServer) + installDebugApiHandlers(node, server) # TODO: Move to setup protocols proc if conf.relay: - let topicCache = newTable[PubsubTopic, seq[WakuMessage]]() - installRelayApiHandlers(node, rpcServer, topicCache) - + let relayMessageCache = relay_api.MessageCache.init(capacity=30) + installRelayApiHandlers(node, server, relayMessageCache) if conf.rpcPrivate: - # Private API access allows WakuRelay functionality that - # is backwards compatible with Waku v1. - installPrivateApiHandlers(node, rpcServer, topicCache) + installRelayPrivateApiHandlers(node, server, relayMessageCache) # TODO: Move to setup protocols proc if conf.filternode != "": - let messageCache = newTable[ContentTopic, seq[WakuMessage]]() - installFilterApiHandlers(node, rpcServer, messageCache) + let filterMessageCache = filter_api.MessageCache.init(capacity=30) + installFilterApiHandlers(node, server, filterMessageCache) # TODO: Move to setup protocols proc if conf.storenode != "": - installStoreApiHandlers(node, rpcServer) + installStoreApiHandlers(node, server) if conf.rpcAdmin: - installAdminApiHandlers(node, rpcServer) + installAdminApiHandlers(node, server) - rpcServer.start() + server.start() info "RPC Server started", address=ta diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index e86cc9adf..58f864144 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -42,7 +42,6 @@ import ./v2/test_peer_store_extended, ./v2/test_utils_peers, ./v2/test_message_cache, - ./v2/test_jsonrpc_waku, ./v2/test_rest_serdes, ./v2/test_rest_debug_api_serdes, ./v2/test_rest_debug_api, @@ -68,6 +67,14 @@ import ./v2/test_waku_keystore_keyfile, ./v2/test_waku_keystore +## Wakunode JSON-RPC API test suite +import + ./v2/wakunode_jsonrpc/test_jsonrpc_admin, + ./v2/wakunode_jsonrpc/test_jsonrpc_debug, + ./v2/wakunode_jsonrpc/test_jsonrpc_filter, + ./v2/wakunode_jsonrpc/test_jsonrpc_relay, + ./v2/wakunode_jsonrpc/test_jsonrpc_store + ## Apps diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim deleted file mode 100644 index 5f7dc28dc..000000000 --- a/tests/v2/test_jsonrpc_waku.nim +++ /dev/null @@ -1,699 +0,0 @@ -{.used.} - -import - std/[options, sets, tables, os, strutils, sequtils, times], - chronicles, - testutils/unittests, stew/shims/net as stewNet, - json_rpc/[rpcserver, rpcclient], - eth/keys, eth/common/eth_types, - libp2p/[builders, switch, multiaddress], - libp2p/protobuf/minprotobuf, - libp2p/stream/[bufferstream, connection], - libp2p/crypto/crypto, - libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/rpc/message -import - ../../waku/v1/node/rpc/hexstrings, - ../../waku/v2/node/peer_manager, - ../../waku/v2/node/waku_node, - ../../waku/v2/node/jsonrpc/[store_api, - relay_api, - debug_api, - filter_api, - admin_api, - private_api], - ../../waku/v2/protocol/waku_message, - ../../waku/v2/protocol/waku_relay, - ../../waku/v2/protocol/waku_archive, - ../../waku/v2/protocol/waku_archive/driver/queue_driver, - ../../waku/v2/protocol/waku_store, - ../../waku/v2/protocol/waku_store/rpc, - ../../waku/v2/protocol/waku_swap/waku_swap, - ../../waku/v2/protocol/waku_filter, - ../../waku/v2/protocol/waku_filter/rpc, - ../../waku/v2/protocol/waku_filter/client, - ../../waku/v2/utils/compat, - ../../waku/v2/utils/peers, - ../../waku/v2/utils/time, - ./testlib/common, - ../test_helpers - -template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] -const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim" -createRpcSigs(RpcHttpClient, sigPath) - -proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = - let - digest = waku_archive.computeDigest(message) - receivedTime = if message.timestamp > 0: message.timestamp - else: getNanosecondTime(getTime().toUnixFloat()) - - store.put(pubsubTopic, message, digest, receivedTime) - -procSuite "Waku v2 JSON-RPC API": - let - rng = crypto.newRng() - privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() - bindIp = ValidIpAddress.init("0.0.0.0") - extIp = ValidIpAddress.init("127.0.0.1") - port = Port(9000) - node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port)) - - asyncTest "Debug API: get node info": - await node.start() - - await node.mountRelay() - - # RPC server setup - let - rpcPort = Port(8546) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - installDebugApiHandlers(node, server) - server.start() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - let response = await client.get_waku_v2_debug_v1_info() - - check: - response.listenAddresses == @[$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId] - - await server.stop() - await server.closeWait() - - await node.stop() - - asyncTest "Relay API: publish and subscribe/unsubscribe": - await node.start() - - await node.mountRelay() - - # RPC server setup - let - rpcPort = Port(8547) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - installRelayApiHandlers(node, server, newTable[string, seq[WakuMessage]]()) - server.start() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - check: - # At this stage the node is only subscribed to the default topic - PubSub(node.wakuRelay).topics.len == 1 - - # Subscribe to new topics - let newTopics = @["1","2","3"] - var response = await client.post_waku_v2_relay_v1_subscriptions(newTopics) - - check: - # Node is now subscribed to default + new topics - PubSub(node.wakuRelay).topics.len == 1 + newTopics.len - response == true - - # Publish a message on the default topic - response = await client.post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime())))) - - check: - # @TODO poll topic to verify message has been published - response == true - - # Unsubscribe from new topics - response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics) - - check: - # Node is now unsubscribed from new topics - PubSub(node.wakuRelay).topics.len == 1 - response == true - - await server.stop() - await server.closeWait() - - await node.stop() - - asyncTest "Relay API: get latest messages": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, bindIp, Port(60300)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, bindIp, Port(60302)) - nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, bindIp, Port(60303), some(extIp), some(port)) - pubSubTopic = "polling" - contentTopic = DefaultContentTopic - payload1 = @[byte 9] - message1 = WakuMessage(payload: payload1, contentTopic: contentTopic) - payload2 = @[byte 8] - message2 = WakuMessage(payload: payload2, contentTopic: contentTopic) - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node3.start() - await node3.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - # RPC server setup - let - rpcPort = Port(8548) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - # Let's connect to node 3 via the API - installRelayApiHandlers(node3, server, newTable[string, seq[WakuMessage]]()) - server.start() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - # First see if we can retrieve messages published on the default topic (node is already subscribed) - await node2.publish(DefaultPubsubTopic, message1) - - await sleepAsync(100.millis) - - var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic) - - check: - messages.len == 1 - messages[0].contentTopic == contentTopic - messages[0].payload == payload1 - - # Ensure that read messages are cleared from cache - messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) - check: - messages.len == 0 - - # Now try to subscribe using API - - var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) - - await sleepAsync(100.millis) - - check: - # Node is now subscribed to pubSubTopic - response == true - - # Now publish a message on node1 and see if we receive it on node3 - await node1.publish(pubSubTopic, message2) - - await sleepAsync(100.millis) - - messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) - - check: - messages.len == 1 - messages[0].contentTopic == contentTopic - messages[0].payload == payload2 - - # Ensure that read messages are cleared from cache - messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) - check: - messages.len == 0 - - await server.stop() - await server.closeWait() - - await node1.stop() - await node2.stop() - await node3.stop() - - asyncTest "Store API: retrieve historical messages": - await node.start() - - await node.mountRelay() - - # RPC server setup - let - rpcPort = Port(8549) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - installStoreApiHandlers(node, server) - server.start() - - # WakuStore setup - let - key = crypto.PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - - let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) - await node.mountStore() - node.mountStoreClient() - - var listenSwitch = newStandardSwitch(some(key)) - await listenSwitch.start() - - node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo()) - - listenSwitch.mount(node.wakuRelay) - listenSwitch.mount(node.wakuStore) - - # Now prime it with some history before tests - let msgList = @[ - fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=0), - fakeWakuMessage(@[byte 1], ts=1), - fakeWakuMessage(@[byte 2], ts=2), - fakeWakuMessage(@[byte 3], ts=3), - fakeWakuMessage(@[byte 4], ts=4), - fakeWakuMessage(@[byte 5], ts=5), - fakeWakuMessage(@[byte 6], ts=6), - fakeWakuMessage(@[byte 7], ts=7), - fakeWakuMessage(@[byte 8], ts=8), - fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=9) - ] - - for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilterRPC(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) - check: - response.messages.len() == 8 - response.pagingOptions.isNone() - - await server.stop() - await server.closeWait() - - await node.stop() - - asyncTest "Filter API: subscribe/unsubscribe": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, bindIp, Port(60390)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, bindIp, Port(60392)) - - await allFutures(node1.start(), node2.start()) - - await node1.mountFilter() - await node2.mountFilterClient() - - node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo()) - - # RPC server setup - let - rpcPort = Port(8550) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - installFilterApiHandlers(node2, server, newTable[ContentTopic, seq[WakuMessage]]()) - server.start() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - check: - # Light node has not yet subscribed to any filters - node2.wakuFilterClient.getSubscriptionsCount() == 0 - - let contentFilters = @[ - ContentFilter(contentTopic: DefaultContentTopic), - ContentFilter(contentTopic: ContentTopic("2")), - ContentFilter(contentTopic: ContentTopic("3")), - ContentFilter(contentTopic: ContentTopic("4")), - ] - var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic)) - check: - response == true - # Light node has successfully subscribed to 4 content topics - node2.wakuFilterClient.getSubscriptionsCount() == 4 - - response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic)) - check: - response == true - # Light node has successfully unsubscribed from all filters - node2.wakuFilterClient.getSubscriptionsCount() == 0 - - ## Cleanup - await server.stop() - await server.closeWait() - - await allFutures(node1.stop(), node2.stop()) - - asyncTest "Admin API: connect to ad-hoc peers": - # Create a couple of nodes - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60600)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60602)) - peerInfo2 = node2.switch.peerInfo - nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60604)) - peerInfo3 = node3.switch.peerInfo - - await allFutures([node1.start(), node2.start(), node3.start()]) - - await node1.mountRelay() - await node2.mountRelay() - await node3.mountRelay() - - # RPC server setup - let - rpcPort = Port(8551) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - installAdminApiHandlers(node1, server) - server.start() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - # Connect to nodes 2 and 3 using the Admin API - let postRes = await client.post_waku_v2_admin_v1_peers(@[constructMultiaddrStr(peerInfo2), - constructMultiaddrStr(peerInfo3)]) - - check: - postRes - - # Verify that newly connected peers are being managed - let getRes = await client.get_waku_v2_admin_v1_peers() - - check: - getRes.len == 2 - # Check peer 2 - getRes.anyIt(it.protocol == WakuRelayCodec and - it.multiaddr == constructMultiaddrStr(peerInfo2)) - # Check peer 3 - getRes.anyIt(it.protocol == WakuRelayCodec and - it.multiaddr == constructMultiaddrStr(peerInfo3)) - - # Verify that raises an exception if we can't connect to the peer - let nonExistentPeer = "/ip4/0.0.0.0/tcp/10000/p2p/16Uiu2HAm6HZZr7aToTvEBPpiys4UxajCTU97zj5v7RNR2gbniy1D" - expect(ValueError): - discard await client.post_waku_v2_admin_v1_peers(@[nonExistentPeer]) - - let malformedPeer = "/malformed/peer" - expect(ValueError): - discard await client.post_waku_v2_admin_v1_peers(@[malformedPeer]) - - await server.stop() - await server.closeWait() - - await allFutures([node1.stop(), node2.stop(), node3.stop()]) - - asyncTest "Admin API: get managed peer information": - # Create 3 nodes and start them with relay - let nodes = toSeq(0..<3).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(60220+it*2))) - await allFutures(nodes.mapIt(it.start())) - await allFutures(nodes.mapIt(it.mountRelay())) - - # Dial nodes 2 and 3 from node1 - await nodes[0].connectToNodes(@[constructMultiaddrStr(nodes[1].peerInfo)]) - await nodes[0].connectToNodes(@[constructMultiaddrStr(nodes[2].peerInfo)]) - - # RPC server setup - let - rpcPort = Port(8552) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - installAdminApiHandlers(nodes[0], server) - server.start() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - let response = await client.get_waku_v2_admin_v1_peers() - - check: - response.len == 2 - # Check peer 2 - response.anyIt(it.protocol == WakuRelayCodec and - it.multiaddr == constructMultiaddrStr(nodes[1].peerInfo)) - # Check peer 3 - response.anyIt(it.protocol == WakuRelayCodec and - it.multiaddr == constructMultiaddrStr(nodes[2].peerInfo)) - - # Artificially remove the address from the book - nodes[0].peerManager.peerStore[AddressBook][nodes[1].peerInfo.peerId] = @[] - nodes[0].peerManager.peerStore[AddressBook][nodes[2].peerInfo.peerId] = @[] - - # Verify that the returned addresses are empty - let responseEmptyAdd = await client.get_waku_v2_admin_v1_peers() - check: - responseEmptyAdd[0].multiaddr == "" - responseEmptyAdd[1].multiaddr == "" - - await server.stop() - await server.closeWait() - - await allFutures(nodes.mapIt(it.stop())) - - asyncTest "Admin API: get unmanaged peer information": - let - nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60523)) - - await node.start() - - # RPC server setup - let - rpcPort = Port(8553) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - installAdminApiHandlers(node, server) - server.start() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - await node.mountFilter() - await node.mountFilterClient() - await node.mountSwap() - let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) - await node.mountStore() - node.mountStoreClient() - - # Create and set some peers - let - locationAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() - - filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get() - filterPeer = PeerInfo.new(filterKey, @[locationAddr]) - - swapKey = crypto.PrivateKey.random(ECDSA, rng[]).get() - swapPeer = PeerInfo.new(swapKey, @[locationAddr]) - - storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get() - storePeer = PeerInfo.new(storeKey, @[locationAddr]) - - node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) - node.setStorePeer(storePeer.toRemotePeerInfo()) - node.setFilterPeer(filterPeer.toRemotePeerInfo()) - - let response = await client.get_waku_v2_admin_v1_peers() - - ## Then - check: - response.len == 3 - # Check filter peer - (response.filterIt(it.protocol == WakuFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer) - # Check swap peer - (response.filterIt(it.protocol == WakuSwapCodec)[0]).multiaddr == constructMultiaddrStr(swapPeer) - # Check store peer - (response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer) - - ## Cleanup - await server.stop() - await server.closeWait() - - await node.stop() - - asyncTest "Private API: generate asymmetric keys and encrypt/decrypt communication": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, bindIp, Port(62001)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, bindIp, Port(62002)) - nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, bindIp, Port(62003), some(extIp), some(port)) - pubSubTopic = "polling" - contentTopic = DefaultContentTopic - payload = @[byte 9] - message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime()))) - topicCache = newTable[string, seq[WakuMessage]]() - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node3.start() - await node3.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - # Setup two servers so we can see both sides of encrypted communication - let - rpcPort1 = Port(8554) - ta1 = initTAddress(bindIp, rpcPort1) - server1 = newRpcHttpServer([ta1]) - rpcPort3 = Port(8555) - ta3 = initTAddress(bindIp, rpcPort3) - server3 = newRpcHttpServer([ta3]) - - # Let's connect to nodes 1 and 3 via the API - installPrivateApiHandlers(node1, server1, newTable[string, seq[WakuMessage]]()) - installPrivateApiHandlers(node3, server3, topicCache) - installRelayApiHandlers(node3, server3, topicCache) - server1.start() - server3.start() - - let client1 = newRpcHttpClient() - await client1.connect("127.0.0.1", rpcPort1, false) - - let client3 = newRpcHttpClient() - await client3.connect("127.0.0.1", rpcPort3, false) - - # Let's get a keypair for node3 - - let keypair = await client3.get_waku_v2_private_v1_asymmetric_keypair() - - # Now try to subscribe on node3 using API - - let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) - - await sleepAsync(100.millis) - - check: - # node3 is now subscribed to pubSubTopic - sub - - # Now publish and encrypt a message on node1 using node3's public key - let posted = await client1.post_waku_v2_private_v1_asymmetric_message(pubSubTopic, message, publicKey = (%keypair.pubkey).getStr()) - check: - posted - - await sleepAsync(100.millis) - - # Let's see if we can receive, and decrypt, this message on node3 - var messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr()) - - check: - messages.len == 1 - messages[0].contentTopic.get == contentTopic - messages[0].payload == payload - - # Ensure that read messages are cleared from cache - messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr()) - check: - messages.len == 0 - - await server1.stop() - await server1.closeWait() - await server3.stop() - await server3.closeWait() - - await node1.stop() - await node2.stop() - await node3.stop() - - asyncTest "Private API: generate symmetric keys and encrypt/decrypt communication": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, bindIp, Port(62100)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, bindIp, Port(62102)) - nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, bindIp, Port(62103), some(extIp), some(port)) - pubSubTopic = "polling" - contentTopic = DefaultContentTopic - payload = @[byte 9] - message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime()))) - topicCache = newTable[string, seq[WakuMessage]]() - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node3.start() - await node3.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - # Setup two servers so we can see both sides of encrypted communication - let - rpcPort1 = Port(8556) - ta1 = initTAddress(bindIp, rpcPort1) - server1 = newRpcHttpServer([ta1]) - rpcPort3 = Port(8557) - ta3 = initTAddress(bindIp, rpcPort3) - server3 = newRpcHttpServer([ta3]) - - # Let's connect to nodes 1 and 3 via the API - installPrivateApiHandlers(node1, server1, newTable[string, seq[WakuMessage]]()) - installPrivateApiHandlers(node3, server3, topicCache) - installRelayApiHandlers(node3, server3, topicCache) - server1.start() - server3.start() - - let client1 = newRpcHttpClient() - await client1.connect("127.0.0.1", rpcPort1, false) - - let client3 = newRpcHttpClient() - await client3.connect("127.0.0.1", rpcPort3, false) - - # Let's get a symkey for node3 - - let symkey = await client3.get_waku_v2_private_v1_symmetric_key() - - # Now try to subscribe on node3 using API - - let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) - - await sleepAsync(100.millis) - - check: - # node3 is now subscribed to pubSubTopic - sub - - # Now publish and encrypt a message on node1 using node3's symkey - let posted = await client1.post_waku_v2_private_v1_symmetric_message(pubSubTopic, message, symkey = (%symkey).getStr()) - check: - posted - - await sleepAsync(100.millis) - - # Let's see if we can receive, and decrypt, this message on node3 - var messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr()) - - check: - messages.len == 1 - messages[0].contentTopic.get == contentTopic - messages[0].payload == payload - - # Ensure that read messages are cleared from cache - messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr()) - check: - messages.len == 0 - - await server1.stop() - await server1.closeWait() - await server3.stop() - await server3.closeWait() - - await node1.stop() - await node2.stop() - await node3.stop() diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim new file mode 100644 index 000000000..77ea5e4cc --- /dev/null +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim @@ -0,0 +1,196 @@ +{.used.} + +import + std/[options, sequtils], + stew/shims/net as stewNet, + testutils/unittests, + chronicles, + chronos, + eth/keys, + libp2p/crypto/crypto, + json_rpc/[rpcserver, rpcclient] +import + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/node/jsonrpc/admin/handlers as admin_api, + ../../../waku/v2/node/jsonrpc/admin/client as admin_api_client, + ../../../waku/v2/protocol/waku_relay, + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/protocol/waku_archive/driver/queue_driver, + ../../../waku/v2/protocol/waku_store, + ../../../waku/v2/protocol/waku_filter, + ../../../waku/v2/utils/peers, + ../../test_helpers + + +procSuite "Waku v2 JSON-RPC API - Admin": + let + rng = crypto.newRng() + bindIp = ValidIpAddress.init("0.0.0.0") + + asyncTest "connect to ad-hoc peers": + # Create a couple of nodes + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60600)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60602)) + peerInfo2 = node2.switch.peerInfo + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60604)) + peerInfo3 = node3.switch.peerInfo + + await allFutures([node1.start(), node2.start(), node3.start()]) + + await node1.mountRelay() + await node2.mountRelay() + await node3.mountRelay() + + # RPC server setup + let + rpcPort = Port(8551) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installAdminApiHandlers(node1, server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + # Connect to nodes 2 and 3 using the Admin API + let postRes = await client.post_waku_v2_admin_v1_peers(@[constructMultiaddrStr(peerInfo2), + constructMultiaddrStr(peerInfo3)]) + + check: + postRes + + # Verify that newly connected peers are being managed + let getRes = await client.get_waku_v2_admin_v1_peers() + + check: + getRes.len == 2 + # Check peer 2 + getRes.anyIt(it.protocol == WakuRelayCodec and + it.multiaddr == constructMultiaddrStr(peerInfo2)) + # Check peer 3 + getRes.anyIt(it.protocol == WakuRelayCodec and + it.multiaddr == constructMultiaddrStr(peerInfo3)) + + # Verify that raises an exception if we can't connect to the peer + let nonExistentPeer = "/ip4/0.0.0.0/tcp/10000/p2p/16Uiu2HAm6HZZr7aToTvEBPpiys4UxajCTU97zj5v7RNR2gbniy1D" + expect(ValueError): + discard await client.post_waku_v2_admin_v1_peers(@[nonExistentPeer]) + + let malformedPeer = "/malformed/peer" + expect(ValueError): + discard await client.post_waku_v2_admin_v1_peers(@[malformedPeer]) + + await server.stop() + await server.closeWait() + + await allFutures([node1.stop(), node2.stop(), node3.stop()]) + + asyncTest "get managed peer information": + # Create 3 nodes and start them with relay + let nodes = toSeq(0..<3).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(60220+it*2))) + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + # Dial nodes 2 and 3 from node1 + await nodes[0].connectToNodes(@[constructMultiaddrStr(nodes[1].peerInfo)]) + await nodes[0].connectToNodes(@[constructMultiaddrStr(nodes[2].peerInfo)]) + + # RPC server setup + let + rpcPort = Port(8552) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installAdminApiHandlers(nodes[0], server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + let response = await client.get_waku_v2_admin_v1_peers() + + check: + response.len == 2 + # Check peer 2 + response.anyIt(it.protocol == WakuRelayCodec and + it.multiaddr == constructMultiaddrStr(nodes[1].peerInfo)) + # Check peer 3 + response.anyIt(it.protocol == WakuRelayCodec and + it.multiaddr == constructMultiaddrStr(nodes[2].peerInfo)) + + # Artificially remove the address from the book + nodes[0].peerManager.peerStore[AddressBook][nodes[1].peerInfo.peerId] = @[] + nodes[0].peerManager.peerStore[AddressBook][nodes[2].peerInfo.peerId] = @[] + + # Verify that the returned addresses are empty + let responseEmptyAdd = await client.get_waku_v2_admin_v1_peers() + check: + responseEmptyAdd[0].multiaddr == "" + responseEmptyAdd[1].multiaddr == "" + + await server.stop() + await server.closeWait() + + await allFutures(nodes.mapIt(it.stop())) + + asyncTest "get unmanaged peer information": + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60523)) + + await node.start() + + # RPC server setup + let + rpcPort = Port(8553) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installAdminApiHandlers(node, server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + await node.mountFilter() + await node.mountFilterClient() + await node.mountSwap() + let driver: ArchiveDriver = QueueDriver.new() + node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + await node.mountStore() + node.mountStoreClient() + + # Create and set some peers + let + locationAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() + + filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get() + filterPeer = PeerInfo.new(filterKey, @[locationAddr]) + + storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get() + storePeer = PeerInfo.new(storeKey, @[locationAddr]) + + node.setStorePeer(storePeer.toRemotePeerInfo()) + node.setFilterPeer(filterPeer.toRemotePeerInfo()) + + let response = await client.get_waku_v2_admin_v1_peers() + + ## Then + check: + response.len == 2 + # Check filter peer + (response.filterIt(it.protocol == WakuFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer) + # Check store peer + (response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer) + + ## Cleanup + await server.stop() + await server.closeWait() + + await node.stop() diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_debug.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_debug.nim new file mode 100644 index 000000000..38ba60071 --- /dev/null +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_debug.nim @@ -0,0 +1,53 @@ +{.used.} + +import + std/options, + stew/shims/net as stewNet, + testutils/unittests, + chronicles, + chronos, + eth/keys, + libp2p/crypto/crypto, + json_rpc/[rpcserver, rpcclient] +import + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/node/jsonrpc/debug/handlers as debug_api, + ../../../waku/v2/node/jsonrpc/debug/client as debug_api_client + + +procSuite "Waku v2 JSON-RPC API - Debug": + let + rng = crypto.newRng() + privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(9000) + node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port)) + + asyncTest "get node info": + await node.start() + + await node.mountRelay() + + # RPC server setup + let + rpcPort = Port(8546) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installDebugApiHandlers(node, server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + let response = await client.get_waku_v2_debug_v1_info() + + check: + response.listenAddresses == @[$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId] + + await server.stop() + await server.closeWait() + + await node.stop() diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim new file mode 100644 index 000000000..f98f52bd6 --- /dev/null +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim @@ -0,0 +1,84 @@ +{.used.} + +import + std/options, + stew/shims/net as stewNet, + testutils/unittests, + chronicles, + eth/keys, + libp2p/crypto/crypto, + json_rpc/[rpcserver, rpcclient] +import + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/node/message_cache, + ../../../waku/v2/node/jsonrpc/filter/handlers as filter_api, + ../../../waku/v2/node/jsonrpc/filter/client as filter_api_client, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/protocol/waku_filter/rpc, + ../../../waku/v2/protocol/waku_filter/client, + ../../../waku/v2/utils/peers + + +proc newTestMessageCache(): filter_api.MessageCache = + filter_api.MessageCache.init(capacity=30) + + +procSuite "Waku v2 JSON-RPC API - Filter": + let + rng = crypto.newRng() + bindIp = ValidIpAddress.init("0.0.0.0") + + asyncTest "subscribe and unsubscribe": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, bindIp, Port(60390)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, bindIp, Port(60392)) + + await allFutures(node1.start(), node2.start()) + + await node1.mountFilter() + await node2.mountFilterClient() + + node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo()) + + # RPC server setup + let + rpcPort = Port(8550) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installFilterApiHandlers(node2, server, newTestMessageCache()) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + check: + # Light node has not yet subscribed to any filters + node2.wakuFilterClient.getSubscriptionsCount() == 0 + + let contentFilters = @[ + ContentFilter(contentTopic: DefaultContentTopic), + ContentFilter(contentTopic: ContentTopic("2")), + ContentFilter(contentTopic: ContentTopic("3")), + ContentFilter(contentTopic: ContentTopic("4")), + ] + var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic)) + check: + response == true + # Light node has successfully subscribed to 4 content topics + node2.wakuFilterClient.getSubscriptionsCount() == 4 + + response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic)) + check: + response == true + # Light node has successfully unsubscribed from all filters + node2.wakuFilterClient.getSubscriptionsCount() == 0 + + ## Cleanup + await server.stop() + await server.closeWait() + + await allFutures(node1.stop(), node2.stop()) diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_relay.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_relay.nim new file mode 100644 index 000000000..e623835a3 --- /dev/null +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_relay.nim @@ -0,0 +1,359 @@ +{.used.} + +import + std/[options, sequtils, times], + stew/shims/net as stewNet, + testutils/unittests, + chronicles, + eth/keys, + libp2p/crypto/crypto, + json_rpc/[rpcserver, rpcclient] +import + ../../../waku/v1/node/rpc/hexstrings, + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/node/message_cache, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/node/jsonrpc/relay/handlers as relay_api, + ../../../waku/v2/node/jsonrpc/relay/client as relay_api_client, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/protocol/waku_relay, + ../../../waku/v2/utils/compat, + ../../../waku/v2/utils/peers, + ../../../waku/v2/utils/time + + +proc newTestMessageCache(): relay_api.MessageCache = + relay_api.MessageCache.init(capacity=30) + + +procSuite "Waku v2 JSON-RPC API - Relay": + let + rng = crypto.newRng() + privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(9000) + node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port)) + + asyncTest "subscribe, unsubscribe and publish": + await node.start() + + await node.mountRelay() + + # RPC server setup + let + rpcPort = Port(8547) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installRelayApiHandlers(node, server, newTestMessageCache()) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + check: + # At this stage the node is only subscribed to the default topic + node.wakuRelay.subscribedTopics.toSeq().len == 1 + + # Subscribe to new topics + let newTopics = @["1","2","3"] + var response = await client.post_waku_v2_relay_v1_subscriptions(newTopics) + + check: + # Node is now subscribed to default + new topics + node.wakuRelay.subscribedTopics.toSeq().len == 1 + newTopics.len + response == true + + # Publish a message on the default topic + response = await client.post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime())))) + + check: + # @TODO poll topic to verify message has been published + response == true + + # Unsubscribe from new topics + response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics) + + check: + # Node is now unsubscribed from new topics + node.wakuRelay.subscribedTopics.toSeq().len == 1 + response == true + + await server.stop() + await server.closeWait() + + await node.stop() + + asyncTest "get latest messages": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, bindIp, Port(60300)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, bindIp, Port(60302)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.new(nodeKey3, bindIp, Port(60303), some(extIp), some(port)) + pubSubTopic = "polling" + contentTopic = DefaultContentTopic + payload1 = @[byte 9] + message1 = WakuMessage(payload: payload1, contentTopic: contentTopic) + payload2 = @[byte 8] + message2 = WakuMessage(payload: payload2, contentTopic: contentTopic) + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + # RPC server setup + let + rpcPort = Port(8548) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + # Let's connect to node 3 via the API + installRelayApiHandlers(node3, server, newTestMessageCache()) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + # First see if we can retrieve messages published on the default topic (node is already subscribed) + await node2.publish(DefaultPubsubTopic, message1) + + await sleepAsync(100.millis) + + var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic) + + check: + messages.len == 1 + messages[0].contentTopic == contentTopic + messages[0].payload == payload1 + + # Ensure that read messages are cleared from cache + messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) + check: + messages.len == 0 + + # Now try to subscribe using API + + var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) + + await sleepAsync(100.millis) + + check: + # Node is now subscribed to pubSubTopic + response == true + + # Now publish a message on node1 and see if we receive it on node3 + await node1.publish(pubSubTopic, message2) + + await sleepAsync(100.millis) + + messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) + + check: + messages.len == 1 + messages[0].contentTopic == contentTopic + messages[0].payload == payload2 + + # Ensure that read messages are cleared from cache + messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) + check: + messages.len == 0 + + await server.stop() + await server.closeWait() + + await node1.stop() + await node2.stop() + await node3.stop() + + asyncTest "generate asymmetric keys and encrypt/decrypt communication": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, bindIp, Port(62001)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, bindIp, Port(62002)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.new(nodeKey3, bindIp, Port(62003), some(extIp), some(port)) + pubSubTopic = "polling" + contentTopic = DefaultContentTopic + payload = @[byte 9] + message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime()))) + topicCache = newTestMessageCache() + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + # Setup two servers so we can see both sides of encrypted communication + let + rpcPort1 = Port(8554) + ta1 = initTAddress(bindIp, rpcPort1) + server1 = newRpcHttpServer([ta1]) + rpcPort3 = Port(8555) + ta3 = initTAddress(bindIp, rpcPort3) + server3 = newRpcHttpServer([ta3]) + + # Let's connect to nodes 1 and 3 via the API + installRelayPrivateApiHandlers(node1, server1, newTestMessageCache()) + installRelayPrivateApiHandlers(node3, server3, topicCache) + installRelayApiHandlers(node3, server3, topicCache) + server1.start() + server3.start() + + let client1 = newRpcHttpClient() + await client1.connect("127.0.0.1", rpcPort1, false) + + let client3 = newRpcHttpClient() + await client3.connect("127.0.0.1", rpcPort3, false) + + # Let's get a keypair for node3 + + let keypair = await client3.get_waku_v2_private_v1_asymmetric_keypair() + + # Now try to subscribe on node3 using API + + let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) + + await sleepAsync(100.millis) + + check: + # node3 is now subscribed to pubSubTopic + sub + + # Now publish and encrypt a message on node1 using node3's public key + let posted = await client1.post_waku_v2_private_v1_asymmetric_message(pubSubTopic, message, publicKey = (%keypair.pubkey).getStr()) + check: + posted + + await sleepAsync(100.millis) + + # Let's see if we can receive, and decrypt, this message on node3 + var messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr()) + + check: + messages.len == 1 + messages[0].contentTopic.get == contentTopic + messages[0].payload == payload + + # Ensure that read messages are cleared from cache + messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr()) + check: + messages.len == 0 + + await server1.stop() + await server1.closeWait() + await server3.stop() + await server3.closeWait() + + await node1.stop() + await node2.stop() + await node3.stop() + + asyncTest "generate symmetric keys and encrypt/decrypt communication": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, bindIp, Port(62100)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, bindIp, Port(62102)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.new(nodeKey3, bindIp, Port(62103), some(extIp), some(port)) + pubSubTopic = "polling" + contentTopic = DefaultContentTopic + payload = @[byte 9] + message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime()))) + topicCache = newTestMessageCache() + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + # Setup two servers so we can see both sides of encrypted communication + let + rpcPort1 = Port(8556) + ta1 = initTAddress(bindIp, rpcPort1) + server1 = newRpcHttpServer([ta1]) + rpcPort3 = Port(8557) + ta3 = initTAddress(bindIp, rpcPort3) + server3 = newRpcHttpServer([ta3]) + + # Let's connect to nodes 1 and 3 via the API + installRelayPrivateApiHandlers(node1, server1, newTestMessageCache()) + installRelayPrivateApiHandlers(node3, server3, topicCache) + installRelayApiHandlers(node3, server3, topicCache) + server1.start() + server3.start() + + let client1 = newRpcHttpClient() + await client1.connect("127.0.0.1", rpcPort1, false) + + let client3 = newRpcHttpClient() + await client3.connect("127.0.0.1", rpcPort3, false) + + # Let's get a symkey for node3 + + let symkey = await client3.get_waku_v2_private_v1_symmetric_key() + + # Now try to subscribe on node3 using API + + let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) + + await sleepAsync(100.millis) + + check: + # node3 is now subscribed to pubSubTopic + sub + + # Now publish and encrypt a message on node1 using node3's symkey + let posted = await client1.post_waku_v2_private_v1_symmetric_message(pubSubTopic, message, symkey = (%symkey).getStr()) + check: + posted + + await sleepAsync(100.millis) + + # Let's see if we can receive, and decrypt, this message on node3 + var messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr()) + + check: + messages.len == 1 + messages[0].contentTopic.get == contentTopic + messages[0].payload == payload + + # Ensure that read messages are cleared from cache + messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr()) + check: + messages.len == 0 + + await server1.stop() + await server1.closeWait() + await server3.stop() + await server3.closeWait() + + await node1.stop() + await node2.stop() + await node3.stop() diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim new file mode 100644 index 000000000..a4faf2f18 --- /dev/null +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim @@ -0,0 +1,102 @@ +{.used.} + +import + std/[options, times], + stew/shims/net as stewNet, + chronicles, + testutils/unittests, + eth/keys, + libp2p/crypto/crypto, + json_rpc/[rpcserver, rpcclient] +import + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/node/jsonrpc/store/handlers as store_api, + ../../../waku/v2/node/jsonrpc/store/client as store_api_client, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/protocol/waku_archive/driver/queue_driver, + ../../../waku/v2/protocol/waku_store/rpc, + ../../../waku/v2/utils/peers, + ../../../waku/v2/utils/time, + ../../v2/testlib/common + + +proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = + let + digest = waku_archive.computeDigest(message) + receivedTime = if message.timestamp > 0: message.timestamp + else: getNanosecondTime(getTime().toUnixFloat()) + + store.put(pubsubTopic, message, digest, receivedTime) + +procSuite "Waku v2 JSON-RPC API - Store": + let + rng = crypto.newRng() + privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(9000) + node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port)) + + asyncTest "query a node and retrieve historical messages": + await node.start() + + await node.mountRelay() + + # RPC server setup + let + rpcPort = Port(8549) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installStoreApiHandlers(node, server) + server.start() + + # WakuStore setup + let + key = crypto.PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.new(key) + + let driver: ArchiveDriver = QueueDriver.new() + node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + await node.mountStore() + node.mountStoreClient() + + var listenSwitch = newStandardSwitch(some(key)) + await listenSwitch.start() + + node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo()) + + listenSwitch.mount(node.wakuRelay) + listenSwitch.mount(node.wakuStore) + + # Now prime it with some history before tests + let msgList = @[ + fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=0), + fakeWakuMessage(@[byte 1], ts=1), + fakeWakuMessage(@[byte 2], ts=2), + fakeWakuMessage(@[byte 3], ts=3), + fakeWakuMessage(@[byte 4], ts=4), + fakeWakuMessage(@[byte 5], ts=5), + fakeWakuMessage(@[byte 6], ts=6), + fakeWakuMessage(@[byte 7], ts=7), + fakeWakuMessage(@[byte 8], ts=8), + fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=9) + ] + + for msg in msgList: + require driver.put(DefaultPubsubTopic, msg).isOk() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilterRPC(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) + check: + response.messages.len() == 8 + response.pagingOptions.isNone() + + await server.stop() + await server.closeWait() + + await node.stop() diff --git a/waku/v2/node/jsonrpc/admin/callsigs.nim b/waku/v2/node/jsonrpc/admin/callsigs.nim new file mode 100644 index 000000000..97e770165 --- /dev/null +++ b/waku/v2/node/jsonrpc/admin/callsigs.nim @@ -0,0 +1,4 @@ +# Admin API + +proc get_waku_v2_admin_v1_peers(): seq[WakuPeer] +proc post_waku_v2_admin_v1_peers(peers: seq[string]): bool diff --git a/waku/v2/node/jsonrpc/admin/client.nim b/waku/v2/node/jsonrpc/admin/client.nim new file mode 100644 index 000000000..10f69330b --- /dev/null +++ b/waku/v2/node/jsonrpc/admin/client.nim @@ -0,0 +1,14 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[os, strutils], + json_rpc/rpcclient +import + ./types + +template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] + +createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim") diff --git a/waku/v2/node/jsonrpc/admin/handlers.nim b/waku/v2/node/jsonrpc/admin/handlers.nim new file mode 100644 index 000000000..973de045b --- /dev/null +++ b/waku/v2/node/jsonrpc/admin/handlers.nim @@ -0,0 +1,91 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sequtils, + chronicles, + json_rpc/rpcserver, + libp2p/[peerinfo, switch] +import + ../../../../waku/v2/protocol/waku_store, + ../../../../waku/v2/protocol/waku_filter, + ../../../../waku/v2/protocol/waku_relay, + ../../../../waku/v2/node/peer_manager, + ../../../../waku/v2/node/waku_node, + ./types + + +logScope: + topics = "waku node jsonrpc admin_api" + + + +proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string = + # Constructs a multiaddress with both wire address and p2p identity + $wireaddr & "/p2p/" & $peerId + +proc constructMultiaddrStr*(peerInfo: PeerInfo): string = + # Constructs a multiaddress with both location (wire) address and p2p identity + if peerInfo.listenAddrs.len == 0: + return "" + constructMultiaddrStr(peerInfo.listenAddrs[0], peerInfo.peerId) + +proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string = + # Constructs a multiaddress with both location (wire) address and p2p identity + if remotePeerInfo.addrs.len == 0: + return "" + constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId) + +proc constructMultiaddrStr*(storedInfo: StoredInfo): string = + # Constructs a multiaddress with both location (wire) address and p2p identity + if storedInfo.addrs.len == 0: + return "" + constructMultiaddrStr(storedInfo.addrs[0], storedInfo.peerId) + + +proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = + + rpcsrv.rpc("post_waku_v2_admin_v1_peers") do (peers: seq[string]) -> bool: + ## Connect to a list of peers + debug "post_waku_v2_admin_v1_peers" + + for i, peer in peers: + let conn = await node.peerManager.dialPeer(parseRemotePeerInfo(peer), WakuRelayCodec, source="rpc") + if conn.isNone(): + raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer) + + return true + + rpcsrv.rpc("get_waku_v2_admin_v1_peers") do () -> seq[WakuPeer]: + ## Returns a list of peers registered for this node + debug "get_waku_v2_admin_v1_peers" + + var peers = newSeq[WakuPeer]() + + if not node.wakuRelay.isNil(): + # Map managed peers to WakuPeers and add to return list + let relayPeers = node.peerManager.peerStore.peers(WakuRelayCodec) + .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), + protocol: WakuRelayCodec, + connected: it.connectedness == Connectedness.Connected)) + peers.add(relayPeers) + + if not node.wakuFilter.isNil(): + # Map WakuFilter peers to WakuPeers and add to return list + let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec) + .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), + protocol: WakuFilterCodec, + connected: it.connectedness == Connectedness.Connected)) + peers.add(filterPeers) + + if not node.wakuStore.isNil(): + # Map WakuStore peers to WakuPeers and add to return list + let storePeers = node.peerManager.peerStore.peers(WakuStoreCodec) + .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), + protocol: WakuStoreCodec, + connected: it.connectedness == Connectedness.Connected)) + peers.add(storePeers) + + return peers diff --git a/waku/v2/node/jsonrpc/admin/types.nim b/waku/v2/node/jsonrpc/admin/types.nim new file mode 100644 index 000000000..c9e7ea0b1 --- /dev/null +++ b/waku/v2/node/jsonrpc/admin/types.nim @@ -0,0 +1,10 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + + +type WakuPeer* = object + multiaddr*: string + protocol*: string + connected*: bool diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim deleted file mode 100644 index a387209df..000000000 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ /dev/null @@ -1,111 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/sequtils, - chronicles, - json_rpc/rpcserver, - libp2p/[peerinfo, switch] -import - ../../protocol/waku_store, - ../../protocol/waku_filter, - ../../protocol/waku_relay, - ../../protocol/waku_swap/waku_swap, - ../peer_manager, - ../waku_node, - ./jsonrpc_types - -export jsonrpc_types - -logScope: - topics = "waku node jsonrpc admin_api" - -const futTimeout* = 30.seconds # Max time to wait for futures - -proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string = - # Constructs a multiaddress with both wire address and p2p identity - $wireaddr & "/p2p/" & $peerId - -proc constructMultiaddrStr*(peerInfo: PeerInfo): string = - # Constructs a multiaddress with both location (wire) address and p2p identity - if peerInfo.listenAddrs.len == 0: - return "" - constructMultiaddrStr(peerInfo.listenAddrs[0], peerInfo.peerId) - -proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string = - # Constructs a multiaddress with both location (wire) address and p2p identity - if remotePeerInfo.addrs.len == 0: - return "" - constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId) - -proc constructMultiaddrStr*(storedInfo: StoredInfo): string = - # Constructs a multiaddress with both location (wire) address and p2p identity - if storedInfo.addrs.len == 0: - return "" - constructMultiaddrStr(storedInfo.addrs[0], storedInfo.peerId) - -proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = - - ## Admin API version 1 definitions - - rpcsrv.rpc("post_waku_v2_admin_v1_peers") do(peers: seq[string]) -> bool: - ## Connect to a list of peers - debug "post_waku_v2_admin_v1_peers" - - for i, peer in peers: - let conn = await node.peerManager.dialPeer(parseRemotePeerInfo(peer), WakuRelayCodec, source="rpc") - if conn.isNone(): - raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer) - return true - - rpcsrv.rpc("get_waku_v2_admin_v1_peers") do() -> seq[WakuPeer]: - ## Returns a list of peers registered for this node - debug "get_waku_v2_admin_v1_peers" - - # Create a single list of peers from mounted protocols. - # @TODO since the switch does not expose its connections, retrieving the connected peers requires a peer store/peer management - - var wPeers: seq[WakuPeer] = @[] - - ## Managed peers - - if not node.wakuRelay.isNil: - # Map managed peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peerStore - .peers(WakuRelayCodec) - .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), - protocol: WakuRelayCodec, - connected: it.connectedness == Connectedness.Connected)), - wPeers.len) # Append to the end of the sequence - - if not node.wakuFilter.isNil: - # Map WakuFilter peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peerStore - .peers(WakuFilterCodec) - .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), - protocol: WakuFilterCodec, - connected: it.connectedness == Connectedness.Connected)), - wPeers.len) # Append to the end of the sequence - - if not node.wakuSwap.isNil: - # Map WakuSwap peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peerStore - .peers(WakuSwapCodec) - .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), - protocol: WakuSwapCodec, - connected: it.connectedness == Connectedness.Connected)), - wPeers.len) # Append to the end of the sequence - - if not node.wakuStore.isNil: - # Map WakuStore peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peerStore - .peers(WakuStoreCodec) - .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), - protocol: WakuStoreCodec, - connected: it.connectedness == Connectedness.Connected)), - wPeers.len) # Append to the end of the sequence - - # @TODO filter output on protocol/connected-status - return wPeers diff --git a/waku/v2/node/jsonrpc/debug/callsigs.nim b/waku/v2/node/jsonrpc/debug/callsigs.nim new file mode 100644 index 000000000..1d87a5789 --- /dev/null +++ b/waku/v2/node/jsonrpc/debug/callsigs.nim @@ -0,0 +1,3 @@ +# Debug API + +proc get_waku_v2_debug_v1_info(): WakuInfo diff --git a/waku/v2/node/jsonrpc/debug/client.nim b/waku/v2/node/jsonrpc/debug/client.nim new file mode 100644 index 000000000..a562a16b0 --- /dev/null +++ b/waku/v2/node/jsonrpc/debug/client.nim @@ -0,0 +1,14 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[os, strutils], + json_rpc/rpcclient +import + ../../../../waku/v2/node/waku_node + +template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] + +createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim") diff --git a/waku/v2/node/jsonrpc/debug_api.nim b/waku/v2/node/jsonrpc/debug/handlers.nim similarity index 62% rename from waku/v2/node/jsonrpc/debug_api.nim rename to waku/v2/node/jsonrpc/debug/handlers.nim index f03693683..e0d801e17 100644 --- a/waku/v2/node/jsonrpc/debug_api.nim +++ b/waku/v2/node/jsonrpc/debug/handlers.nim @@ -5,23 +5,24 @@ else: import chronicles, - json_rpc/rpcserver, - ../waku_node + json_rpc/rpcserver +import + ../../../../waku/v2/node/waku_node logScope: topics = "waku node jsonrpc debug_api" -proc installDebugApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = +proc installDebugApiHandlers*(node: WakuNode, server: RpcServer) = ## Debug API version 1 definitions - rpcsrv.rpc("get_waku_v2_debug_v1_info") do() -> WakuInfo: + server.rpc("get_waku_v2_debug_v1_info") do () -> WakuInfo: ## Returns information about WakuNode debug "get_waku_v2_debug_v1_info" return node.info() - rpcsrv.rpc("get_waku_v2_debug_v1_version") do() -> string: + server.rpc("get_waku_v2_debug_v1_version") do () -> string: ## Returns information about WakuNode debug "get_waku_v2_debug_v1_version" diff --git a/waku/v2/node/jsonrpc/filter/callsigs.nim b/waku/v2/node/jsonrpc/filter/callsigs.nim new file mode 100644 index 000000000..8af15f9fa --- /dev/null +++ b/waku/v2/node/jsonrpc/filter/callsigs.nim @@ -0,0 +1,5 @@ +# Filter API + +proc get_waku_v2_filter_v1_messages(contentTopic: ContentTopic): seq[WakuMessage] +proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool +proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool diff --git a/waku/v2/node/jsonrpc/filter/client.nim b/waku/v2/node/jsonrpc/filter/client.nim new file mode 100644 index 000000000..c9c4db7ad --- /dev/null +++ b/waku/v2/node/jsonrpc/filter/client.nim @@ -0,0 +1,15 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[os, strutils], + json_rpc/rpcclient +import + ../../../../waku/v2/protocol/waku_message, + ../../../../waku/v2/protocol/waku_filter/rpc + +template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] + +createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim") diff --git a/waku/v2/node/jsonrpc/filter/handlers.nim b/waku/v2/node/jsonrpc/filter/handlers.nim new file mode 100644 index 000000000..f2d197749 --- /dev/null +++ b/waku/v2/node/jsonrpc/filter/handlers.nim @@ -0,0 +1,89 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[tables, sequtils], + chronicles, + json_rpc/rpcserver +import + ../../../../waku/v2/protocol/waku_message, + ../../../../waku/v2/protocol/waku_filter, + ../../../../waku/v2/protocol/waku_filter/rpc, + ../../../../waku/v2/protocol/waku_filter/client, + ../../../../waku/v2/node/message_cache, + ../../../../waku/v2/node/peer_manager, + ../../../../waku/v2/node/waku_node + + +logScope: + topics = "waku node jsonrpc filter_api" + + +const DefaultPubsubTopic: PubsubTopic = "/waku/2/default-waku/proto" + +const futTimeout* = 5.seconds # Max time to wait for futures + + +type + MessageCache* = message_cache.MessageCache[ContentTopic] + + +proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) = + + server.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[PubsubTopic]) -> bool: + ## Subscribes a node to a list of content filters + debug "post_waku_v2_filter_v1_subscription" + + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + if peerOpt.isNone(): + raise newException(ValueError, "no suitable remote filter peers") + + let + pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic) + contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) + + let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} = + cache.addMessage(msg.contentTopic, msg) + + let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) + if not await subFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to subscribe to contentFilters") + + # Successfully subscribed to all content filters + for cTopic in contentTopics: + cache.subscribe(cTopic) + + return true + + server.rpc("delete_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[PubsubTopic]) -> bool: + ## Unsubscribes a node from a list of content filters + debug "delete_waku_v2_filter_v1_subscription" + + let + pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic) + contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) + + let unsubFut = node.unsubscribe(pubsubTopic, contentTopics) + if not await unsubFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to unsubscribe from contentFilters") + + for cTopic in contentTopics: + cache.unsubscribe(cTopic) + + return true + + server.rpc("get_waku_v2_filter_v1_messages") do (contentTopic: ContentTopic) -> seq[WakuMessage]: + ## Returns all WakuMessages received on a content topic since the + ## last time this method was called + debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic + + if not cache.isSubscribed(contentTopic): + raise newException(ValueError, "Not subscribed to topic: " & contentTopic) + + let msgRes = cache.getMessages(contentTopic, clear=true) + if msgRes.isErr(): + raise newException(ValueError, "Not subscribed to topic: " & contentTopic) + + return msgRes.value diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim deleted file mode 100644 index bc8429bf0..000000000 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ /dev/null @@ -1,104 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/[tables, sequtils], - chronicles, - json_rpc/rpcserver -import - ../../protocol/waku_message, - ../../protocol/waku_filter, - ../../protocol/waku_filter/rpc, - ../../protocol/waku_filter/client, - ../waku_node, - ./jsonrpc_types - -export jsonrpc_types - -logScope: - topics = "waku node jsonrpc filter_api" - - -const DefaultPubsubTopic: PubsubTopic = "/waku/2/default-waku/proto" -const futTimeout* = 5.seconds # Max time to wait for futures -const maxCache* = 30 # Max number of messages cached per topic TODO: make this configurable - - -proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) = - ## Filter API version 1 definitions - - rpcsrv.rpc("get_waku_v2_filter_v1_messages") do (contentTopic: ContentTopic) -> seq[WakuMessage]: - ## Returns all WakuMessages received on a content topic since the - ## last time this method was called - ## TODO: ability to specify a return message limit - debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic - - if not messageCache.hasKey(contentTopic): - raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic) - - let msgs = messageCache[contentTopic] - # Clear cache before next call - messageCache[contentTopic] = @[] - return msgs - - - rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: - ## Subscribes a node to a list of content filters - debug "post_waku_v2_filter_v1_subscription" - - let - pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic) - contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) - - let pushHandler:FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} = - # Add message to current cache - trace "WakuMessage received", msg=msg - - # Make a copy of msgs for this topic to modify - var msgs = messageCache.getOrDefault(msg.contentTopic, @[]) - - if msgs.len >= maxCache: - # Message cache on this topic exceeds maximum. Delete oldest. - # TODO: this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed. - msgs.delete(0,0) - - msgs.add(msg) - - # Replace indexed entry with copy - # TODO: max number of content topics could be limited in node - messageCache[msg.contentTopic] = msgs - - let subFut = node.subscribe(pubsubTopic, contentTopics, pushHandler) - - if not await subFut.withTimeout(futTimeout): - raise newException(ValueError, "Failed to subscribe to contentFilters") - - # Successfully subscribed to all content filters - for cTopic in contentTopics: - # Create message cache for each subscribed content topic - messageCache[cTopic] = @[] - - return true - - - rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: - ## Unsubscribes a node from a list of content filters - debug "delete_waku_v2_filter_v1_subscription" - - let - pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic) - contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) - - let unsubFut = node.unsubscribe(pubsubTopic, contentTopics) - - if not await unsubFut.withTimeout(futTimeout): - raise newException(ValueError, "Failed to unsubscribe from contentFilters") - - # Successfully unsubscribed from all content filters - for cTopic in contentTopics: - # Remove message cache for each unsubscribed content topic - messageCache.del(cTopic) - - return true diff --git a/waku/v2/node/jsonrpc/hexstrings.nim b/waku/v2/node/jsonrpc/hexstrings.nim index b595f9a82..ee2f48f32 100644 --- a/waku/v2/node/jsonrpc/hexstrings.nim +++ b/waku/v2/node/jsonrpc/hexstrings.nim @@ -24,8 +24,12 @@ ]# import - stint, stew/byteutils, eth/keys, eth/common/eth_types, - ../../../whisper/whisper_types + stew/byteutils, + eth/keys, + eth/common/eth_types, + stint +import + ../../waku/whisper/whisper_types type HexDataStr* = distinct string diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim deleted file mode 100644 index 08a57b84a..000000000 --- a/waku/v2/node/jsonrpc/jsonrpc_types.nim +++ /dev/null @@ -1,41 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/[options,tables], - eth/keys, - ../../protocol/waku_message, - ../../protocol/waku_store/rpc, - ../../utils/time - -type - StoreResponse* = object - messages*: seq[WakuMessage] - pagingOptions*: Option[StorePagingOptions] - - StorePagingOptions* = object - ## This type holds some options for pagination - pageSize*: uint64 - cursor*: Option[PagingIndexRPC] - forward*: bool - - WakuRelayMessage* = object - payload*: seq[byte] - contentTopic*: Option[ContentTopic] - # sender generated timestamp - timestamp*: Option[Timestamp] - - WakuPeer* = object - multiaddr*: string - protocol*: string - connected*: bool - - WakuKeyPair* = object - seckey*: keys.PrivateKey - pubkey*: keys.PublicKey - - TopicCache* = TableRef[string, seq[WakuMessage]] - - MessageCache* = TableRef[ContentTopic, seq[WakuMessage]] diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim deleted file mode 100644 index 033830ea3..000000000 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ /dev/null @@ -1,99 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/[options, json], - eth/keys, - ../../protocol/waku_message, - ../../protocol/waku_store, - ../../protocol/waku_store/rpc, - ../../utils/compat, - ../../utils/time, - ./hexstrings, - ./jsonrpc_types - -export hexstrings - -## Json marshalling - -proc `%`*(value: WakuMessage): JsonNode = - ## This ensures that seq[byte] fields are marshalled to hex-format JStrings - ## (as defined in `hexstrings.nim`) rather than the default JArray[JInt] - let jObj = newJObject() - for k, v in value.fieldPairs: - jObj[k] = %v - return jObj - -## Conversion tools -## Since the Waku v2 JSON-RPC API has its own defined types, -## we need to convert between these and the types for the Nim API - -proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC = - PagingInfoRPC( - pageSize: some(pagingOptions.pageSize), - cursor: pagingOptions.cursor, - direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD) - else: some(PagingDirectionRPC.BACKWARD) - ) - -proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions = - StorePagingOptions( - pageSize: pagingInfo.pageSize.get(0'u64), - cursor: pagingInfo.cursor, - forward: if pagingInfo.direction.isNone(): true - else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD - ) - -proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse = - StoreResponse( - messages: response.messages, - pagingOptions: if response.cursor.isNone(): none(StorePagingOptions) - else: some(StorePagingOptions( - pageSize: uint64(response.messages.len), # This field will be deprecated soon - forward: true, # Hardcoded. This field will be deprecated soon - cursor: response.cursor.map(toRPC) - )) - ) - -proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = - var t: Timestamp - if relayMessage.timestamp.isSome: - t = relayMessage.timestamp.get - else: - # incoming WakuRelayMessages with no timestamp will get 0 timestamp - t = Timestamp(0) - WakuMessage(payload: relayMessage.payload, - contentTopic: relayMessage.contentTopic.get(DefaultContentTopic), - version: version, - timestamp: t) - -proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage = - let payload = Payload(payload: relayMessage.payload, - dst: pubKey, - symkey: symkey) - - var t: Timestamp - if relayMessage.timestamp.isSome: - t = relayMessage.timestamp.get - else: - # incoming WakuRelayMessages with no timestamp will get 0 timestamp - t = Timestamp(0) - - WakuMessage(payload: payload.encode(version, rng[]).get(), - contentTopic: relayMessage.contentTopic.get(DefaultContentTopic), - version: version, - timestamp: t) - -proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage = - let - keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get()) - elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get()) - else: KeyInfo(kind: KeyKind.None) - decoded = decodePayload(message, keyInfo) - - WakuRelayMessage(payload: decoded.get().payload, - contentTopic: some(message.contentTopic), - timestamp: some(message.timestamp)) - diff --git a/waku/v2/node/jsonrpc/marshalling.nim b/waku/v2/node/jsonrpc/marshalling.nim new file mode 100644 index 000000000..4c3e70b11 --- /dev/null +++ b/waku/v2/node/jsonrpc/marshalling.nim @@ -0,0 +1,22 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + json +import + ../../../waku/v2/protocol/waku_message, + ./hexstrings + +export hexstrings + +## Json marshalling + +proc `%`*(value: WakuMessage): JsonNode = + ## This ensures that seq[byte] fields are marshalled to hex-format JStrings + ## (as defined in `hexstrings.nim`) rather than the default JArray[JInt] + let jObj = newJObject() + for k, v in value.fieldPairs: + jObj[k] = %v + return jObj diff --git a/waku/v2/node/jsonrpc/private_api.nim b/waku/v2/node/jsonrpc/private_api.nim deleted file mode 100644 index 6f8fb68e6..000000000 --- a/waku/v2/node/jsonrpc/private_api.nim +++ /dev/null @@ -1,115 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/[tables, sequtils], - chronicles, - eth/keys, - json_rpc/rpcserver, - nimcrypto/sysrand -import - ../../utils/compat, - ../waku_node, - ./jsonrpc_types, - ./jsonrpc_utils - -export compat, jsonrpc_types - -logScope: - topics = "waku node jsonrpc private_api" - -const futTimeout* = 5.seconds # Max time to wait for futures - -proc installPrivateApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) = - ## Private API version 1 definitions - - ## Definitions for symmetric cryptography - - rpcsrv.rpc("get_waku_v2_private_v1_symmetric_key") do() -> SymKey: - ## Generates and returns a symmetric key for message encryption and decryption - debug "get_waku_v2_private_v1_symmetric_key" - - var key: SymKey - if randomBytes(key) != key.len: - raise newException(ValueError, "Failed generating key") - - return key - - rpcsrv.rpc("post_waku_v2_private_v1_symmetric_message") do(topic: string, message: WakuRelayMessage, symkey: string) -> bool: - ## Publishes and encrypts a message to be relayed on a PubSub topic - debug "post_waku_v2_private_v1_symmetric_message" - - let msg = message.toWakuMessage(version = 1, - rng = node.rng, - pubKey = none(compat.PublicKey), - symkey = some(symkey.toSymKey())) - - if (await node.publish(topic, msg).withTimeout(futTimeout)): - # Successfully published message - return true - else: - # Failed to publish message to topic - raise newException(ValueError, "Failed to publish to topic " & topic) - - rpcsrv.rpc("get_waku_v2_private_v1_symmetric_messages") do(topic: string, symkey: string) -> seq[WakuRelayMessage]: - ## Returns all WakuMessages received on a PubSub topic since the - ## last time this method was called. Decrypts the message payloads - ## before returning. - ## - ## @TODO ability to specify a return message limit - debug "get_waku_v2_private_v1_symmetric_messages", topic=topic - - if topicCache.hasKey(topic): - let msgs = topicCache[topic] - # Clear cache before next call - topicCache[topic] = @[] - return msgs.mapIt(it.toWakuRelayMessage(symkey = some(symkey.toSymKey()), - privateKey = none(compat.PrivateKey))) - else: - # Not subscribed to this topic - raise newException(ValueError, "Not subscribed to topic: " & topic) - - ## Definitions for asymmetric cryptography - - rpcsrv.rpc("get_waku_v2_private_v1_asymmetric_keypair") do() -> WakuKeyPair: - ## Generates and returns a public/private key pair for asymmetric message encryption and decryption. - debug "get_waku_v2_private_v1_asymmetric_keypair" - - let privKey = compat.PrivateKey.random(node.rng[]) - - return WakuKeyPair(seckey: privKey, pubkey: privKey.toPublicKey()) - - rpcsrv.rpc("post_waku_v2_private_v1_asymmetric_message") do(topic: string, message: WakuRelayMessage, publicKey: string) -> bool: - ## Publishes and encrypts a message to be relayed on a PubSub topic - debug "post_waku_v2_private_v1_asymmetric_message" - - let msg = message.toWakuMessage(version = 1, - rng = node.rng, - symkey = none(SymKey), - pubKey = some(publicKey.toPublicKey())) - - if (await node.publish(topic, msg).withTimeout(futTimeout)): - # Successfully published message - return true - else: - # Failed to publish message to topic - raise newException(ValueError, "Failed to publish to topic " & topic) - - rpcsrv.rpc("get_waku_v2_private_v1_asymmetric_messages") do(topic: string, privateKey: string) -> seq[WakuRelayMessage]: - ## Returns all WakuMessages received on a PubSub topic since the - ## last time this method was called. Decrypts the message payloads - ## before returning. - ## - ## @TODO ability to specify a return message limit - debug "get_waku_v2_private_v1_asymmetric_messages", topic=topic - - if topicCache.hasKey(topic): - let msgs = topicCache[topic] - # Clear cache before next call - topicCache[topic] = @[] - return msgs.mapIt(it.toWakuRelayMessage(symkey = none(SymKey), privateKey = some(privateKey.toPrivateKey()))) - else: - # Not subscribed to this topic - raise newException(ValueError, "Not subscribed to topic: " & topic) diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/relay/callsigs.nim similarity index 50% rename from waku/v2/node/jsonrpc/jsonrpc_callsigs.nim rename to waku/v2/node/jsonrpc/relay/callsigs.nim index 6011c4040..a0792357f 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/relay/callsigs.nim @@ -1,12 +1,3 @@ -# Admin API - -proc get_waku_v2_admin_v1_peers(): seq[WakuPeer] -proc post_waku_v2_admin_v1_peers(peers: seq[string]): bool - -# Debug API - -proc get_waku_v2_debug_v1_info(): WakuInfo - # Relay API proc post_waku_v2_relay_v1_message(topic: string, message: WakuRelayMessage): bool @@ -14,17 +5,8 @@ proc get_waku_v2_relay_v1_messages(topic: string): seq[WakuMessage] proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool -# Store API -proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse - -# Filter API - -proc get_waku_v2_filter_v1_messages(contentTopic: ContentTopic): seq[WakuMessage] -proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool -proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool - -# Private API +# Relay Private API # Symmetric proc get_waku_v2_private_v1_symmetric_key(): SymKey proc post_waku_v2_private_v1_symmetric_message(topic: string, message: WakuRelayMessage, symkey: string): bool @@ -32,4 +14,4 @@ proc get_waku_v2_private_v1_symmetric_messages(topic: string, symkey: string): s # Asymmetric proc get_waku_v2_private_v1_asymmetric_keypair(): WakuKeyPair proc post_waku_v2_private_v1_asymmetric_message(topic: string, message: WakuRelayMessage, publicKey: string): bool -proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage] \ No newline at end of file +proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage] diff --git a/waku/v2/node/jsonrpc/relay/client.nim b/waku/v2/node/jsonrpc/relay/client.nim new file mode 100644 index 000000000..f4f9b14c4 --- /dev/null +++ b/waku/v2/node/jsonrpc/relay/client.nim @@ -0,0 +1,20 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[os, strutils], + json_rpc/rpcclient +import + ../../../../waku/v2/protocol/waku_message, + ../../../../waku/v2/utils/compat, + ../marshalling, + ./types + +export types + + +template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] + +createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim") diff --git a/waku/v2/node/jsonrpc/relay/handlers.nim b/waku/v2/node/jsonrpc/relay/handlers.nim new file mode 100644 index 000000000..6c19c79eb --- /dev/null +++ b/waku/v2/node/jsonrpc/relay/handlers.nim @@ -0,0 +1,225 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sequtils, + chronicles, + json_rpc/rpcserver, + eth/keys, + nimcrypto/sysrand +import + ../../../../waku/v2/protocol/waku_message, + ../../../../waku/v2/protocol/waku_relay, + ../../../../waku/v2/node/waku_node, + ../../../../waku/v2/node/message_cache, + ../../../../waku/v2/utils/compat, + ../../../../waku/v2/utils/time, + ../hexstrings, + ./types + + +logScope: + topics = "waku node jsonrpc relay_api" + + +const futTimeout* = 5.seconds # Max time to wait for futures + +type + MessageCache* = message_cache.MessageCache[PubsubTopic] + + +## Waku Relay JSON-RPC API + +proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) = + if node.wakuRelay.isNil(): + debug "waku relay protocol is nil. skipping json rpc api handlers installation" + return + + let topicHandler = proc(topic: PubsubTopic, message: WakuMessage) {.async.} = + cache.addMessage(topic, message) + + # The node may already be subscribed to some topics when Relay API handlers + # are installed + for topic in node.wakuRelay.subscribedTopics: + node.subscribe(topic, topicHandler) + cache.subscribe(topic) + + + server.rpc("post_waku_v2_relay_v1_subscriptions") do (topics: seq[string]) -> bool: + ## Subscribes a node to a list of PubSub topics + debug "post_waku_v2_relay_v1_subscriptions" + + # Subscribe to all requested topics + for topic in topics: + if cache.isSubscribed(topic): + continue + + cache.subscribe(topic) + node.subscribe(topic, topicHandler) + + return true + + server.rpc("delete_waku_v2_relay_v1_subscriptions") do (topics: seq[string]) -> bool: + ## Unsubscribes a node from a list of PubSub topics + debug "delete_waku_v2_relay_v1_subscriptions" + + # Unsubscribe all handlers from requested topics + for topic in topics: + node.unsubscribeAll(topic) + cache.unsubscribe(topic) + + return true + + server.rpc("post_waku_v2_relay_v1_message") do (topic: string, msg: WakuRelayMessage) -> bool: + ## Publishes a WakuMessage to a PubSub topic + debug "post_waku_v2_relay_v1_message" + + let message = block: + WakuMessage( + payload: msg.payload, + # TODO: Fail if the message doesn't have a content topic + contentTopic: msg.contentTopic.get(DefaultContentTopic), + version: 0, + timestamp: msg.timestamp.get(Timestamp(0)) + ) + + let publishFut = node.publish(topic, message) + + if not await publishFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to publish to topic " & topic) + + return true + + server.rpc("get_waku_v2_relay_v1_messages") do (topic: string) -> seq[WakuMessage]: + ## Returns all WakuMessages received on a PubSub topic since the + ## last time this method was called + debug "get_waku_v2_relay_v1_messages", topic=topic + + if not cache.isSubscribed(topic): + raise newException(ValueError, "Not subscribed to topic: " & topic) + + let msgRes = cache.getMessages(topic, clear=true) + if msgRes.isErr(): + raise newException(ValueError, "Not subscribed to topic: " & topic) + + return msgRes.value + + +## Waku Relay Private JSON-RPC API (Whisper/Waku v1 compatibility) + +proc toWakuMessage(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage = + let payload = Payload(payload: relayMessage.payload, + dst: pubKey, + symkey: symkey) + + var t: Timestamp + if relayMessage.timestamp.isSome: + t = relayMessage.timestamp.get + else: + # incoming WakuRelayMessages with no timestamp will get 0 timestamp + t = Timestamp(0) + + WakuMessage(payload: payload.encode(version, rng[]).get(), + contentTopic: relayMessage.contentTopic.get(DefaultContentTopic), + version: version, + timestamp: t) + +proc toWakuRelayMessage(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage = + let + keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get()) + elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get()) + else: KeyInfo(kind: KeyKind.None) + decoded = decodePayload(message, keyInfo) + + WakuRelayMessage(payload: decoded.get().payload, + contentTopic: some(message.contentTopic), + timestamp: some(message.timestamp)) + + +proc installRelayPrivateApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) = + + server.rpc("get_waku_v2_private_v1_symmetric_key") do () -> SymKey: + ## Generates and returns a symmetric key for message encryption and decryption + debug "get_waku_v2_private_v1_symmetric_key" + + var key: SymKey + if randomBytes(key) != key.len: + raise newException(ValueError, "Failed generating key") + + return key + + server.rpc("post_waku_v2_private_v1_symmetric_message") do (topic: string, message: WakuRelayMessage, symkey: string) -> bool: + ## Publishes and encrypts a message to be relayed on a PubSub topic + debug "post_waku_v2_private_v1_symmetric_message" + + let msg = message.toWakuMessage(version = 1, + rng = node.rng, + pubKey = none(keys.PublicKey), + symkey = some(symkey.toSymKey())) + + if (await node.publish(topic, msg).withTimeout(futTimeout)): + # Successfully published message + return true + else: + # Failed to publish message to topic + raise newException(ValueError, "Failed to publish to topic " & topic) + + server.rpc("get_waku_v2_private_v1_symmetric_messages") do (topic: string, symkey: string) -> seq[WakuRelayMessage]: + ## Returns all WakuMessages received on a PubSub topic since the + ## last time this method was called. Decrypts the message payloads + ## before returning. + debug "get_waku_v2_private_v1_symmetric_messages", topic=topic + + if not cache.isSubscribed(topic): + raise newException(ValueError, "Not subscribed to topic: " & topic) + + let msgRes = cache.getMessages(topic, clear=true) + if msgRes.isErr(): + raise newException(ValueError, "Not subscribed to topic: " & topic) + + let msgs = msgRes.get() + + return msgs.mapIt(it.toWakuRelayMessage(symkey=some(symkey.toSymKey()), + privateKey=none(keys.PrivateKey))) + + server.rpc("get_waku_v2_private_v1_asymmetric_keypair") do () -> WakuKeyPair: + ## Generates and returns a public/private key pair for asymmetric message encryption and decryption. + debug "get_waku_v2_private_v1_asymmetric_keypair" + + let privKey = keys.PrivateKey.random(node.rng[]) + + return WakuKeyPair(seckey: privKey, pubkey: privKey.toPublicKey()) + + server.rpc("post_waku_v2_private_v1_asymmetric_message") do (topic: string, message: WakuRelayMessage, publicKey: string) -> bool: + ## Publishes and encrypts a message to be relayed on a PubSub topic + debug "post_waku_v2_private_v1_asymmetric_message" + + let msg = message.toWakuMessage(version = 1, + rng = node.rng, + symkey = none(SymKey), + pubKey = some(publicKey.toPublicKey())) + + let publishFut = node.publish(topic, msg) + if not await publishFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to publish to topic " & topic) + + return true + + server.rpc("get_waku_v2_private_v1_asymmetric_messages") do (topic: string, privateKey: string) -> seq[WakuRelayMessage]: + ## Returns all WakuMessages received on a PubSub topic since the + ## last time this method was called. Decrypts the message payloads + ## before returning. + debug "get_waku_v2_private_v1_asymmetric_messages", topic=topic + + if not cache.isSubscribed(topic): + raise newException(ValueError, "Not subscribed to topic: " & topic) + + let msgRes = cache.getMessages(topic, clear=true) + if msgRes.isErr(): + raise newException(ValueError, "Not subscribed to topic: " & topic) + + let msgs = msgRes.get() + return msgs.mapIt(it.toWakuRelayMessage(symkey=none(SymKey), + privateKey=some(privateKey.toPrivateKey()))) diff --git a/waku/v2/node/jsonrpc/relay/types.nim b/waku/v2/node/jsonrpc/relay/types.nim new file mode 100644 index 000000000..66f7baf33 --- /dev/null +++ b/waku/v2/node/jsonrpc/relay/types.nim @@ -0,0 +1,22 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options, + eth/keys +import + ../../../../waku/v2/protocol/waku_message, + ../../../../waku/v2/utils/time + +type + WakuRelayMessage* = object + payload*: seq[byte] + contentTopic*: Option[ContentTopic] + timestamp*: Option[Timestamp] + + WakuKeyPair* = object + seckey*: keys.PrivateKey + pubkey*: keys.PublicKey + diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim deleted file mode 100644 index 17fbcbd4b..000000000 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ /dev/null @@ -1,114 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/[tables,sequtils], - chronicles, - json_rpc/rpcserver, - libp2p/protocols/pubsub/pubsub, - ../../protocol/waku_message, - ../waku_node, - ./jsonrpc_types, - ./jsonrpc_utils - -export jsonrpc_types - -logScope: - topics = "waku node jsonrpc relay_api" - -const futTimeout* = 5.seconds # Max time to wait for futures -const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable - -proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) = - - proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} = - trace "Topic handler triggered", topic=topic - let msg = WakuMessage.decode(data) - if msg.isOk(): - # Add message to current cache - trace "WakuMessage received", msg=msg, topic=topic - - # Make a copy of msgs for this topic to modify - var msgs = topicCache.getOrDefault(topic, @[]) - - if msgs.len >= maxCache: - # Message cache on this topic exceeds maximum. Delete oldest. - # @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed. - msgs.delete(0,0) - msgs.add(msg[]) - - # Replace indexed entry with copy - # @TODO max number of topics could be limited in node - topicCache[topic] = msgs - else: - debug "WakuMessage received but failed to decode", msg=msg, topic=topic - # @TODO handle message decode failure - - ## Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these - for topic in PubSub(node.wakuRelay).topics.keys: - debug "Adding API topic handler for existing subscription", topic=topic - - node.subscribe(topic, topicHandler) - - # Create message cache for this topic - debug "MessageCache for topic", topic=topic - topicCache[topic] = @[] - - ## Relay API version 1 definitions - - rpcsrv.rpc("post_waku_v2_relay_v1_message") do(topic: string, message: WakuRelayMessage) -> bool: - ## Publishes a WakuMessage to a PubSub topic - debug "post_waku_v2_relay_v1_message" - - if (await node.publish(topic, message.toWakuMessage(version = 0)).withTimeout(futTimeout)): - # Successfully published message - return true - else: - # Failed to publish message to topic - raise newException(ValueError, "Failed to publish to topic " & topic) - - rpcsrv.rpc("get_waku_v2_relay_v1_messages") do(topic: string) -> seq[WakuMessage]: - ## Returns all WakuMessages received on a PubSub topic since the - ## last time this method was called - ## @TODO ability to specify a return message limit - debug "get_waku_v2_relay_v1_messages", topic=topic - - if topicCache.hasKey(topic): - let msgs = topicCache[topic] - # Clear cache before next call - topicCache[topic] = @[] - return msgs - else: - # Not subscribed to this topic - raise newException(ValueError, "Not subscribed to topic: " & topic) - - rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool: - ## Subscribes a node to a list of PubSub topics - debug "post_waku_v2_relay_v1_subscriptions" - - # Subscribe to all requested topics - for topic in topics: - # Only subscribe to topics for which we have no subscribed topic handlers yet - if not topicCache.hasKey(topic): - node.subscribe(topic, topicHandler) - # Create message cache for this topic - trace "MessageCache for topic", topic=topic - topicCache[topic] = @[] - - # Successfully subscribed to all requested topics - return true - - rpcsrv.rpc("delete_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool: - ## Unsubscribes a node from a list of PubSub topics - debug "delete_waku_v2_relay_v1_subscriptions" - - # Unsubscribe all handlers from requested topics - for topic in topics: - node.unsubscribeAll(topic) - # Remove message cache for topic - topicCache.del(topic) - - # Successfully unsubscribed from all requested topics - return true diff --git a/waku/v2/node/jsonrpc/store/callsigs.nim b/waku/v2/node/jsonrpc/store/callsigs.nim new file mode 100644 index 000000000..d576a4f6f --- /dev/null +++ b/waku/v2/node/jsonrpc/store/callsigs.nim @@ -0,0 +1,4 @@ +# Store API + +proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse + diff --git a/waku/v2/node/jsonrpc/store/client.nim b/waku/v2/node/jsonrpc/store/client.nim new file mode 100644 index 000000000..f04645923 --- /dev/null +++ b/waku/v2/node/jsonrpc/store/client.nim @@ -0,0 +1,19 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[os, strutils], + json_rpc/rpcclient +import + ../../../../waku/v2/protocol/waku_store/rpc, + ../../../../waku/v2/utils/time, + ./types + +export types + + +template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] + +createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim") diff --git a/waku/v2/node/jsonrpc/store/handlers.nim b/waku/v2/node/jsonrpc/store/handlers.nim new file mode 100644 index 000000000..ac3303384 --- /dev/null +++ b/waku/v2/node/jsonrpc/store/handlers.nim @@ -0,0 +1,85 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[options, sequtils], + chronicles, + json_rpc/rpcserver +import + ../../../../../waku/v2/protocol/waku_message, + ../../../../../waku/v2/protocol/waku_store, + ../../../../../waku/v2/protocol/waku_store/rpc, + ../../../../../waku/v2/utils/time, + ../../../../waku/v2/node/waku_node, + ../../../../waku/v2/node/peer_manager, + ./types + + +logScope: + topics = "waku node jsonrpc store_api" + + +const futTimeout = 5.seconds + + +proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC = + PagingInfoRPC( + pageSize: some(pagingOptions.pageSize), + cursor: pagingOptions.cursor, + direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD) + else: some(PagingDirectionRPC.BACKWARD) + ) + +proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions = + StorePagingOptions( + pageSize: pagingInfo.pageSize.get(0'u64), + cursor: pagingInfo.cursor, + forward: if pagingInfo.direction.isNone(): true + else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD + ) + +proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse = + StoreResponse( + messages: response.messages, + pagingOptions: if response.cursor.isNone(): none(StorePagingOptions) + else: some(StorePagingOptions( + pageSize: uint64(response.messages.len), # This field will be deprecated soon + forward: true, # Hardcoded. This field will be deprecated soon + cursor: response.cursor.map(toRPC) + )) + ) + +proc installStoreApiHandlers*(node: WakuNode, server: RpcServer) = + + server.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: + ## Returns history for a list of content topics with optional paging + debug "get_waku_v2_store_v1_messages" + + let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + raise newException(ValueError, "no suitable remote store peers") + + let req = HistoryQuery( + pubsubTopic: pubsubTopicOption, + contentTopics: contentFiltersOption.get(@[]).mapIt(it.contentTopic), + startTime: startTime, + endTime: endTime, + ascending: if pagingOptions.isNone(): true + else: pagingOptions.get().forward, + pageSize: if pagingOptions.isNone(): DefaultPageSize + else: min(pagingOptions.get().pageSize, MaxPageSize), + cursor: if pagingOptions.isNone(): none(HistoryCursor) + else: pagingOptions.get().cursor.map(toAPI) + ) + + let queryFut = node.query(req, peerOpt.get()) + if not await queryFut.withTimeout(futTimeout): + raise newException(ValueError, "No history response received (timeout)") + + let res = queryFut.read() + if res.isErr(): + raise newException(ValueError, $res.error) + + return res.value.toJsonRPCStoreResponse() diff --git a/waku/v2/node/jsonrpc/store/types.nim b/waku/v2/node/jsonrpc/store/types.nim new file mode 100644 index 000000000..d7e6f9b41 --- /dev/null +++ b/waku/v2/node/jsonrpc/store/types.nim @@ -0,0 +1,22 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options +import + ../../../../waku/v2/protocol/waku_message, + ../../../../waku/v2/protocol/waku_store/rpc + + +type + StoreResponse* = object + messages*: seq[WakuMessage] + pagingOptions*: Option[StorePagingOptions] + + StorePagingOptions* = object + ## This type holds some options for pagination + pageSize*: uint64 + cursor*: Option[PagingIndexRPC] + forward*: bool diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim deleted file mode 100644 index d95f40951..000000000 --- a/waku/v2/node/jsonrpc/store_api.nim +++ /dev/null @@ -1,61 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/[options, sequtils], - chronicles, - json_rpc/rpcserver -import - ../../protocol/waku_store, - ../../protocol/waku_store/rpc, - ../../utils/time, - ../waku_node, - ../peer_manager, - ./jsonrpc_types, - ./jsonrpc_utils - -export jsonrpc_types - -logScope: - topics = "waku node jsonrpc store_api" - -proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = - const futTimeout = 5.seconds - - ## Store API version 1 definitions - - rpcsrv.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: - ## Returns history for a list of content topics with optional paging - debug "get_waku_v2_store_v1_messages" - - let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) - if peerOpt.isNone(): - raise newException(ValueError, "no suitable remote store peers") - - let req = HistoryQuery( - pubsubTopic: pubsubTopicOption, - contentTopics: if contentFiltersOption.isNone(): @[] - else: contentFiltersOption.get().mapIt(it.contentTopic), - startTime: startTime, - endTime: endTime, - ascending: if pagingOptions.isNone(): true - else: pagingOptions.get().forward, - pageSize: if pagingOptions.isNone(): DefaultPageSize - else: min(pagingOptions.get().pageSize, MaxPageSize), - cursor: if pagingOptions.isNone(): none(HistoryCursor) - else: pagingOptions.get().cursor.map(toAPI) - ) - - let queryFut = node.query(req, peerOpt.get()) - - if not await queryFut.withTimeout(futTimeout): - raise newException(ValueError, "No history response received (timeout)") - - let res = queryFut.read() - if res.isErr(): - raise newException(ValueError, $res.error) - - debug "get_waku_v2_store_v1_messages response" - return res.value.toJsonRPCStoreResponse()