diff --git a/CHANGELOG.md b/CHANGELOG.md index 92e158c23..f6d1f89b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - Calls to `publish` a message on `wakunode2` now `await` instead of `discard` dispatched [`WakuRelay`](https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md) procedures - Added JSON-RPC Admin API to retrieve information about peers registered on the `wakunode2` - `StrictNoSign` enabled. +- Added JSON-RPC Private API to enable using symmetric or asymmetric cryptography to encrypt/decrypt message payloads ## 2020-11-30 v0.1 diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 0be838a08..48e194a28 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -2,15 +2,22 @@ import std/[unittest, options, sets, tables, os, strutils, sequtils], stew/shims/net as stewNet, json_rpc/[rpcserver, rpcclient], + eth/[keys, rlp], eth/common/eth_types, libp2p/[standard_setup, switch, multiaddress], libp2p/protobuf/minprotobuf, libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/rpc/message, + ../../waku/v1/node/rpc/hexstrings, ../../waku/v2/waku_types, ../../waku/v2/node/wakunode2, - ../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api,filter_api,admin_api], + ../../waku/v2/node/jsonrpc/[store_api, + relay_api, + debug_api, + filter_api, + admin_api, + private_api], ../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_store/waku_store, @@ -70,7 +77,7 @@ procSuite "Waku v2 JSON-RPC API": ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) - installRelayApiHandlers(node, server) + installRelayApiHandlers(node, server, newTable[string, seq[WakuMessage]]()) server.start() let client = newRpcHttpClient() @@ -140,7 +147,7 @@ procSuite "Waku v2 JSON-RPC API": server = newRpcHttpServer([ta]) # Let's connect to node 3 via the API - installRelayApiHandlers(node3, server) + installRelayApiHandlers(node3, server, newTable[string, seq[WakuMessage]]()) server.start() let client = newRpcHttpClient() @@ -253,7 +260,7 @@ procSuite "Waku v2 JSON-RPC API": ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) - installFilterApiHandlers(node, server) + installFilterApiHandlers(node, server, newTable[ContentTopic, seq[WakuMessage]]()) server.start() let client = newRpcHttpClient() @@ -294,7 +301,7 @@ procSuite "Waku v2 JSON-RPC API": ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) - installFilterApiHandlers(node, server) + installFilterApiHandlers(node, server, newTable[ContentTopic, seq[WakuMessage]]()) server.start() node.mountFilter() @@ -409,5 +416,186 @@ procSuite "Waku v2 JSON-RPC API": # Check store peer (response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer) + server.stop() server.close() waitfor node.stop() + + asyncTest "Private API: generate asymmetric keys and encrypt/decrypt communication": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, bindIp, Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, bindIp, Port(60002)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) + pubSubTopic = "polling" + contentTopic = ContentTopic(1) + payload = @[byte 9] + message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic)) + topicCache = newTable[string, seq[WakuMessage]]() + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.peerInfo]) + await node3.connectToNodes(@[node2.peerInfo]) + + # Setup two servers so we can see both sides of encrypted communication + let + rpcPort1 = Port(8545) + ta1 = initTAddress(bindIp, rpcPort1) + server1 = newRpcHttpServer([ta1]) + rpcPort3 = Port(8546) + ta3 = initTAddress(bindIp, rpcPort3) + server3 = newRpcHttpServer([ta3]) + + # Let's connect to nodes 1 and 3 via the API + installPrivateApiHandlers(node1, server1, rng, newTable[string, seq[WakuMessage]]()) + installPrivateApiHandlers(node3, server3, rng, topicCache) + installRelayApiHandlers(node3, server3, topicCache) + server1.start() + server3.start() + + let client1 = newRpcHttpClient() + await client1.connect("127.0.0.1", rpcPort1) + + let client3 = newRpcHttpClient() + await client3.connect("127.0.0.1", rpcPort3) + + # Let's get a keypair for node3 + + let keypair = await client3.get_waku_v2_private_v1_asymmetric_keypair() + + # Now try to subscribe on node3 using API + + let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) + + await sleepAsync(2000.millis) + + check: + # node3 is now subscribed to pubSubTopic + sub + + # Now publish and encrypt a message on node1 using node3's public key + let posted = await client1.post_waku_v2_private_v1_asymmetric_message(pubSubTopic, message, publicKey = (%keypair.pubkey).getStr()) + check: + posted + + await sleepAsync(2000.millis) + + # Let's see if we can receive, and decrypt, this message on node3 + var messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr()) + + check: + messages.len == 1 + messages[0].contentTopic.get == contentTopic + messages[0].payload == payload + + # Ensure that read messages are cleared from cache + messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr()) + check: + messages.len == 0 + + server1.stop() + server1.close() + server3.stop() + server3.close() + await node1.stop() + await node2.stop() + await node3.stop() + + asyncTest "Private API: generate symmetric keys and encrypt/decrypt communication": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, bindIp, Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, bindIp, Port(60002)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) + pubSubTopic = "polling" + contentTopic = ContentTopic(1) + payload = @[byte 9] + message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic)) + topicCache = newTable[string, seq[WakuMessage]]() + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.peerInfo]) + await node3.connectToNodes(@[node2.peerInfo]) + + # Setup two servers so we can see both sides of encrypted communication + let + rpcPort1 = Port(8545) + ta1 = initTAddress(bindIp, rpcPort1) + server1 = newRpcHttpServer([ta1]) + rpcPort3 = Port(8546) + ta3 = initTAddress(bindIp, rpcPort3) + server3 = newRpcHttpServer([ta3]) + + # Let's connect to nodes 1 and 3 via the API + installPrivateApiHandlers(node1, server1, rng, newTable[string, seq[WakuMessage]]()) + installPrivateApiHandlers(node3, server3, rng, topicCache) + installRelayApiHandlers(node3, server3, topicCache) + server1.start() + server3.start() + + let client1 = newRpcHttpClient() + await client1.connect("127.0.0.1", rpcPort1) + + let client3 = newRpcHttpClient() + await client3.connect("127.0.0.1", rpcPort3) + + # Let's get a symkey for node3 + + let symkey = await client3.get_waku_v2_private_v1_symmetric_key() + + # Now try to subscribe on node3 using API + + let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) + + await sleepAsync(2000.millis) + + check: + # node3 is now subscribed to pubSubTopic + sub + + # Now publish and encrypt a message on node1 using node3's symkey + let posted = await client1.post_waku_v2_private_v1_symmetric_message(pubSubTopic, message, symkey = (%symkey).getStr()) + check: + posted + + await sleepAsync(2000.millis) + + # Let's see if we can receive, and decrypt, this message on node3 + var messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr()) + + check: + messages.len == 1 + messages[0].contentTopic.get == contentTopic + messages[0].payload == payload + + # Ensure that read messages are cleared from cache + messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr()) + check: + messages.len == 0 + + server1.stop() + server1.close() + server3.stop() + server3.close() + await node1.stop() + await node2.stop() + await node3.stop() diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index 770e7a9a1..c69b359af 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -11,6 +11,8 @@ import ../wakunode2, ./jsonrpc_types +export jsonrpc_types + proc constructMultiaddrStr*(peerInfo: PeerInfo): string = # Constructs a multiaddress with both location address and p2p identity $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 65b36269d..ac3cce858 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -5,19 +5,15 @@ import json_rpc/rpcserver, eth/[common, rlp, keys, p2p], ../../waku_types, - ../wakunode2 + ../wakunode2, + ./jsonrpc_types + +export jsonrpc_types const futTimeout* = 5.seconds # Max time to wait for futures const maxCache* = 100 # Max number of messages cached per topic @TODO make this configurable -type - MessageCache* = Table[ContentTopic, seq[WakuMessage]] - -proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = - ## Create a message cache indexed on content topic - ## @TODO consider moving message cache elsewhere. Perhaps to node? - var - messageCache: MessageCache +proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) = proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = # Add message to current cache diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index 75b9bf7d7..98b36b158 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -22,3 +22,13 @@ proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Opt proc get_waku_v2_filter_v1_messages(contentTopic: ContentTopic): seq[WakuMessage] proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool + +# Private API +# Symmetric +proc get_waku_v2_private_v1_symmetric_key(): SymKey +proc post_waku_v2_private_v1_symmetric_message(topic: string, message: WakuRelayMessage, symkey: string): bool +proc get_waku_v2_private_v1_symmetric_messages(topic: string, symkey: string): seq[WakuRelayMessage] +# Asymmetric +proc get_waku_v2_private_v1_asymmetric_keypair(): WakuKeyPair +proc post_waku_v2_private_v1_asymmetric_message(topic: string, message: WakuRelayMessage, publicKey: string): bool +proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage] diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim index 9d8604046..c80349068 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_types.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_types.nim @@ -1,6 +1,7 @@ import + eth/keys, ../../waku_types, - std/options + std/[options,tables] type StoreResponse* = object @@ -21,3 +22,11 @@ type multiaddr*: string protocol*: string connected*: bool + + WakuKeyPair* = object + seckey*: PrivateKey + pubkey*: PublicKey + + TopicCache* = TableRef[string, seq[WakuMessage]] + + MessageCache* = TableRef[ContentTopic, seq[WakuMessage]] diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index 745382de3..caca98f0e 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -1,10 +1,14 @@ import std/options, + eth/keys, + ../../../v1/node/rpc/hexstrings, ../../waku_types, ../../protocol/waku_store/waku_store_types, - ../wakunode2, + ../wakunode2, ../waku_payload, ./jsonrpc_types +export hexstrings + ## Conversion tools ## Since the Waku v2 JSON-RPC API has its own defined types, ## we need to convert between these and the types for the Nim API @@ -29,3 +33,28 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessag WakuMessage(payload: relayMessage.payload, contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, version: version) + +proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref BrHmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage = + # @TODO global definition for default content topic + const defaultCT = 0 + + let payload = Payload(payload: relayMessage.payload, + dst: pubKey, + symkey: symkey) + + WakuMessage(payload: payload.encode(version, rng[]).get(), + contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, + version: version) + +proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage = + # @TODO global definition for default content topic + + let + keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get()) + elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get()) + else: KeyInfo(kind: None) + decoded = decodePayload(message, keyInfo) + + WakuRelayMessage(payload: decoded.get().payload, + contentTopic: some(message.contentTopic)) + diff --git a/waku/v2/node/jsonrpc/private_api.nim b/waku/v2/node/jsonrpc/private_api.nim new file mode 100644 index 000000000..fb0c6be80 --- /dev/null +++ b/waku/v2/node/jsonrpc/private_api.nim @@ -0,0 +1,107 @@ +{.push raises: [Exception, Defect].} + +import + std/[tables,sequtils], + json_rpc/rpcserver, + nimcrypto/sysrand, + eth/[common, rlp, keys, p2p], + ../../waku_types, + ../wakunode2, ../waku_payload, + ./jsonrpc_types, ./jsonrpc_utils + +export waku_payload, jsonrpc_types + +const futTimeout* = 5.seconds # Max time to wait for futures + +proc installPrivateApiHandlers*(node: WakuNode, rpcsrv: RpcServer, rng: ref BrHmacDrbgContext, topicCache: TopicCache) = + + ## Private API version 1 definitions + + ## Definitions for symmetric cryptography + + rpcsrv.rpc("get_waku_v2_private_v1_symmetric_key") do() -> SymKey: + ## Generates and returns a symmetric key for message encryption and decryption + debug "get_waku_v2_private_v1_symmetric_key" + + var key: SymKey + if randomBytes(key) != key.len: + raise newException(ValueError, "Failed generating key") + + return key + + rpcsrv.rpc("post_waku_v2_private_v1_symmetric_message") do(topic: string, message: WakuRelayMessage, symkey: string) -> bool: + ## Publishes and encrypts a message to be relayed on a PubSub topic + debug "post_waku_v2_private_v1_symmetric_message" + + let msg = message.toWakuMessage(version = 1, + rng = rng, + pubKey = none(keys.PublicKey), + symkey = some(symkey.toSymKey())) + + if (await node.publish(topic, msg).withTimeout(futTimeout)): + # Successfully published message + return true + else: + # Failed to publish message to topic + raise newException(ValueError, "Failed to publish to topic " & topic) + + rpcsrv.rpc("get_waku_v2_private_v1_symmetric_messages") do(topic: string, symkey: string) -> seq[WakuRelayMessage]: + ## Returns all WakuMessages received on a PubSub topic since the + ## last time this method was called. Decrypts the message payloads + ## before returning. + ## + ## @TODO ability to specify a return message limit + debug "get_waku_v2_private_v1_symmetric_messages", topic=topic + + if topicCache.hasKey(topic): + let msgs = topicCache[topic] + # Clear cache before next call + topicCache[topic] = @[] + return msgs.mapIt(it.toWakuRelayMessage(symkey = some(symkey.toSymKey()), + privateKey = none(keys.PrivateKey))) + else: + # Not subscribed to this topic + raise newException(ValueError, "Not subscribed to topic: " & topic) + + ## Definitions for asymmetric cryptography + + rpcsrv.rpc("get_waku_v2_private_v1_asymmetric_keypair") do() -> WakuKeyPair: + ## Generates and returns a public/private key pair for asymmetric message encryption and decryption. + debug "get_waku_v2_private_v1_asymmetric_keypair" + + let privKey = keys.PrivateKey.random(rng[]) + + return WakuKeyPair(seckey: privKey, pubkey: privKey.toPublicKey()) + + rpcsrv.rpc("post_waku_v2_private_v1_asymmetric_message") do(topic: string, message: WakuRelayMessage, publicKey: string) -> bool: + ## Publishes and encrypts a message to be relayed on a PubSub topic + debug "post_waku_v2_private_v1_asymmetric_message" + + let msg = message.toWakuMessage(version = 1, + rng = rng, + symkey = none(SymKey), + pubKey = some(publicKey.toPublicKey())) + + if (await node.publish(topic, msg).withTimeout(futTimeout)): + # Successfully published message + return true + else: + # Failed to publish message to topic + raise newException(ValueError, "Failed to publish to topic " & topic) + + rpcsrv.rpc("get_waku_v2_private_v1_asymmetric_messages") do(topic: string, privateKey: string) -> seq[WakuRelayMessage]: + ## Returns all WakuMessages received on a PubSub topic since the + ## last time this method was called. Decrypts the message payloads + ## before returning. + ## + ## @TODO ability to specify a return message limit + debug "get_waku_v2_private_v1_asymmetric_messages", topic=topic + + if topicCache.hasKey(topic): + let msgs = topicCache[topic] + # Clear cache before next call + topicCache[topic] = @[] + return msgs.mapIt(it.toWakuRelayMessage(symkey = none(SymKey), privateKey = some(privateKey.toPrivateKey()))) + else: + # Not subscribed to this topic + raise newException(ValueError, "Not subscribed to topic: " & topic) diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index 3c04ee178..8959cfb5e 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -9,16 +9,12 @@ import ../wakunode2, ./jsonrpc_types, ./jsonrpc_utils +export jsonrpc_types + const futTimeout* = 5.seconds # Max time to wait for futures const maxCache* = 100 # Max number of messages cached per topic @TODO make this configurable -type - TopicCache* = Table[string, seq[WakuMessage]] - -proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = - ## Create a per-topic message cache - var - topicCache: TopicCache +proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) = proc topicHandler(topic: string, data: seq[byte]) {.async.} = trace "Topic handler triggered" diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 15b76df4d..b579264cd 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -8,6 +8,8 @@ import ../wakunode2, ./jsonrpc_types, ./jsonrpc_utils +export jsonrpc_types + proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = const futTimeout = 5.seconds