mirror of https://github.com/waku-org/nwaku.git
fix(jsonrpc): encode waku messages payload in base64 (#1139)
This commit is contained in:
parent
416bc80092
commit
9258914057
|
@ -17,7 +17,7 @@ import
|
|||
./testlib/waku2
|
||||
|
||||
|
||||
procSuite "WakuNode - Lightpush":
|
||||
suite "WakuNode - Lightpush":
|
||||
asyncTest "Lightpush message return success":
|
||||
## Setup
|
||||
let
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, sequtils, times],
|
||||
std/[options, sequtils],
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
libp2p/crypto/crypto,
|
||||
json_rpc/[rpcserver, rpcclient]
|
||||
import
|
||||
../../../waku/v1/node/rpc/hexstrings,
|
||||
../../../waku/common/base64,
|
||||
../../../waku/v2/node/peer_manager,
|
||||
../../../waku/v2/node/message_cache,
|
||||
../../../waku/v2/node/waku_node,
|
||||
|
@ -18,7 +18,7 @@ import
|
|||
../../../waku/v2/protocol/waku_relay,
|
||||
../../../waku/v2/utils/compat,
|
||||
../../../waku/v2/utils/peers,
|
||||
../../../waku/v2/utils/time,
|
||||
../testlib/common,
|
||||
../testlib/waku2
|
||||
|
||||
|
||||
|
@ -26,333 +26,370 @@ proc newTestMessageCache(): relay_api.MessageCache =
|
|||
relay_api.MessageCache.init(capacity=30)
|
||||
|
||||
|
||||
procSuite "Waku v2 JSON-RPC API - Relay":
|
||||
let
|
||||
privkey = generateSecp256k1Key()
|
||||
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||
extIp = ValidIpAddress.init("127.0.0.1")
|
||||
port = Port(0)
|
||||
node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
|
||||
suite "Waku v2 JSON-RPC API - Relay":
|
||||
|
||||
asyncTest "subscribe and unsubscribe from topics":
|
||||
## Setup
|
||||
let node = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
asyncTest "subscribe, unsubscribe and publish":
|
||||
await node.start()
|
||||
|
||||
await node.mountRelay(topics = @[DefaultPubsubTopic])
|
||||
|
||||
# RPC server setup
|
||||
# JSON-RPC server
|
||||
let
|
||||
rpcPort = Port(8547)
|
||||
ta = initTAddress(bindIp, rpcPort)
|
||||
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
|
||||
server = newRpcHttpServer([ta])
|
||||
|
||||
installRelayApiHandlers(node, server, newTestMessageCache())
|
||||
server.start()
|
||||
|
||||
# JSON-RPC client
|
||||
let client = newRpcHttpClient()
|
||||
await client.connect("127.0.0.1", rpcPort, false)
|
||||
|
||||
check:
|
||||
# At this stage the node is only subscribed to the default topic
|
||||
node.wakuRelay.subscribedTopics.toSeq().len == 1
|
||||
## Given
|
||||
let newTopics = @["test-topic1","test-topic2","test-topic3"]
|
||||
|
||||
## When
|
||||
# Subscribe to new topics
|
||||
let newTopics = @["1","2","3"]
|
||||
var response = await client.post_waku_v2_relay_v1_subscriptions(newTopics)
|
||||
let subResp = await client.post_waku_v2_relay_v1_subscriptions(newTopics)
|
||||
|
||||
check:
|
||||
# Node is now subscribed to default + new topics
|
||||
node.wakuRelay.subscribedTopics.toSeq().len == 1 + newTopics.len
|
||||
response == true
|
||||
|
||||
# Publish a message on the default topic
|
||||
response = await client.post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
|
||||
check:
|
||||
# @TODO poll topic to verify message has been published
|
||||
response == true
|
||||
let subTopics = node.wakuRelay.subscribedTopics.toSeq()
|
||||
|
||||
# Unsubscribe from new topics
|
||||
response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics)
|
||||
let unsubResp = await client.delete_waku_v2_relay_v1_subscriptions(newTopics)
|
||||
|
||||
let unsubTopics = node.wakuRelay.subscribedTopics.toSeq()
|
||||
|
||||
## Then
|
||||
check:
|
||||
subResp == true
|
||||
check:
|
||||
# Node is now subscribed to default + new topics
|
||||
subTopics.len == 1 + newTopics.len
|
||||
DefaultPubsubTopic in subTopics
|
||||
newTopics.allIt(it in subTopics)
|
||||
|
||||
check:
|
||||
unsubResp == true
|
||||
check:
|
||||
# Node is now unsubscribed from new topics
|
||||
node.wakuRelay.subscribedTopics.toSeq().len == 1
|
||||
response == true
|
||||
unsubTopics.len == 1
|
||||
DefaultPubsubTopic in unsubTopics
|
||||
newTopics.allIt(it notin unsubTopics)
|
||||
|
||||
await server.stop()
|
||||
await server.closeWait()
|
||||
|
||||
await node.stop()
|
||||
|
||||
asyncTest "get latest messages":
|
||||
asyncTest "publish message to topic":
|
||||
## Setup
|
||||
let
|
||||
nodeKey1 = generateSecp256k1Key()
|
||||
node1 = WakuNode.new(nodeKey1, bindIp, Port(0))
|
||||
nodeKey2 = generateSecp256k1Key()
|
||||
node2 = WakuNode.new(nodeKey2, bindIp, Port(0))
|
||||
nodeKey3 = generateSecp256k1Key()
|
||||
node3 = WakuNode.new(nodeKey3, bindIp, Port(0), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = DefaultContentTopic
|
||||
payload1 = @[byte 9]
|
||||
message1 = WakuMessage(payload: payload1, contentTopic: contentTopic)
|
||||
payload2 = @[byte 8]
|
||||
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic)
|
||||
pubSubTopic = "test-jsonrpc-pubsub-topic"
|
||||
contentTopic = "test-jsonrpc-content-topic"
|
||||
|
||||
await node1.start()
|
||||
await node1.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
# Relay nodes setup
|
||||
let
|
||||
srcNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
dstNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
await node2.start()
|
||||
await node2.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
await allFutures(srcNode.start(), dstNode.start())
|
||||
|
||||
await node3.start()
|
||||
await node3.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
await srcNode.mountRelay(@[pubSubTopic])
|
||||
await dstNode.mountRelay(@[pubSubTopic])
|
||||
|
||||
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
# RPC server setup
|
||||
|
||||
# RPC server (source node)
|
||||
let
|
||||
rpcPort = Port(8548)
|
||||
ta = initTAddress(bindIp, rpcPort)
|
||||
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
|
||||
server = newRpcHttpServer([ta])
|
||||
|
||||
# Let's connect to node 3 via the API
|
||||
installRelayApiHandlers(node3, server, newTestMessageCache())
|
||||
installRelayApiHandlers(srcNode, server, newTestMessageCache())
|
||||
server.start()
|
||||
|
||||
# JSON-RPC client
|
||||
let client = newRpcHttpClient()
|
||||
await client.connect("127.0.0.1", rpcPort, false)
|
||||
|
||||
# First see if we can retrieve messages published on the default topic (node is already subscribed)
|
||||
await node2.publish(DefaultPubsubTopic, message1)
|
||||
## Given
|
||||
let message = fakeWakuMessage( payload= @[byte 72], contentTopic=contentTopic)
|
||||
|
||||
await sleepAsync(100.millis)
|
||||
let dstHandlerFut = newFuture[(PubsubTopic, WakuMessage)]()
|
||||
proc dstHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
dstHandlerFut.complete((topic, msg))
|
||||
|
||||
var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic)
|
||||
dstNode.subscribe(pubSubTopic, dstHandler)
|
||||
|
||||
check:
|
||||
messages.len == 1
|
||||
messages[0].contentTopic == contentTopic
|
||||
messages[0].payload == payload1
|
||||
## When
|
||||
let rpcMessage = WakuMessageRPC(
|
||||
payload: Base64String.encode(message.payload),
|
||||
contentTopic: some(message.contentTopic),
|
||||
timestamp: some(message.timestamp),
|
||||
version: some(message.version)
|
||||
)
|
||||
let response = await client.post_waku_v2_relay_v1_message(pubSubTopic, rpcMessage)
|
||||
|
||||
# Ensure that read messages are cleared from cache
|
||||
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
|
||||
check:
|
||||
messages.len == 0
|
||||
|
||||
# Now try to subscribe using API
|
||||
|
||||
var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
|
||||
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
check:
|
||||
# Node is now subscribed to pubSubTopic
|
||||
## Then
|
||||
require:
|
||||
response == true
|
||||
await dstHandlerFut.withTimeout(chronos.seconds(5))
|
||||
|
||||
# Now publish a message on node1 and see if we receive it on node3
|
||||
await node1.publish(pubSubTopic, message2)
|
||||
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
|
||||
|
||||
let (topic, msg) = dstHandlerFut.read()
|
||||
check:
|
||||
messages.len == 1
|
||||
messages[0].contentTopic == contentTopic
|
||||
messages[0].payload == payload2
|
||||
topic == pubSubTopic
|
||||
msg == message
|
||||
|
||||
# Ensure that read messages are cleared from cache
|
||||
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
|
||||
check:
|
||||
messages.len == 0
|
||||
|
||||
## Cleanup
|
||||
await server.stop()
|
||||
await server.closeWait()
|
||||
await allFutures(srcNode.stop(), dstNode.stop())
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
|
||||
asyncTest "generate asymmetric keys and encrypt/decrypt communication":
|
||||
asyncTest "get latest messages received from topics cache":
|
||||
## Setup
|
||||
let
|
||||
nodeKey1 = generateSecp256k1Key()
|
||||
node1 = WakuNode.new(nodeKey1, bindIp, Port(0))
|
||||
nodeKey2 = generateSecp256k1Key()
|
||||
node2 = WakuNode.new(nodeKey2, bindIp, Port(0))
|
||||
nodeKey3 = generateSecp256k1Key()
|
||||
node3 = WakuNode.new(nodeKey3, bindIp, Port(0), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = DefaultContentTopic
|
||||
payload = @[byte 9]
|
||||
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
|
||||
topicCache = newTestMessageCache()
|
||||
pubSubTopic = "test-jsonrpc-pubsub-topic"
|
||||
contentTopic = "test-jsonrpc-content-topic"
|
||||
|
||||
await node1.start()
|
||||
await node1.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
|
||||
await node2.start()
|
||||
await node2.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
|
||||
await node3.start()
|
||||
await node3.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
|
||||
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
# Setup two servers so we can see both sides of encrypted communication
|
||||
# Relay nodes setup
|
||||
let
|
||||
rpcPort1 = Port(8554)
|
||||
ta1 = initTAddress(bindIp, rpcPort1)
|
||||
server1 = newRpcHttpServer([ta1])
|
||||
rpcPort3 = Port(8555)
|
||||
ta3 = initTAddress(bindIp, rpcPort3)
|
||||
server3 = newRpcHttpServer([ta3])
|
||||
srcNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
dstNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
# Let's connect to nodes 1 and 3 via the API
|
||||
installRelayPrivateApiHandlers(node1, server1, newTestMessageCache())
|
||||
installRelayPrivateApiHandlers(node3, server3, topicCache)
|
||||
installRelayApiHandlers(node3, server3, topicCache)
|
||||
server1.start()
|
||||
server3.start()
|
||||
await allFutures(srcNode.start(), dstNode.start())
|
||||
|
||||
let client1 = newRpcHttpClient()
|
||||
await client1.connect("127.0.0.1", rpcPort1, false)
|
||||
await srcNode.mountRelay(@[pubSubTopic])
|
||||
await dstNode.mountRelay(@[pubSubTopic])
|
||||
|
||||
let client3 = newRpcHttpClient()
|
||||
await client3.connect("127.0.0.1", rpcPort3, false)
|
||||
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
# Let's get a keypair for node3
|
||||
|
||||
let keypair = await client3.get_waku_v2_private_v1_asymmetric_keypair()
|
||||
# RPC server (destination node)
|
||||
let
|
||||
rpcPort = Port(8548)
|
||||
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
|
||||
server = newRpcHttpServer([ta])
|
||||
|
||||
# Now try to subscribe on node3 using API
|
||||
installRelayApiHandlers(dstNode, server, newTestMessageCache())
|
||||
server.start()
|
||||
|
||||
let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
|
||||
# JSON-RPC client
|
||||
let client = newRpcHttpClient()
|
||||
await client.connect("127.0.0.1", rpcPort, false)
|
||||
|
||||
await sleepAsync(100.millis)
|
||||
## Given
|
||||
let messages = @[
|
||||
fakeWakuMessage(payload= @[byte 70], contentTopic=contentTopic),
|
||||
fakeWakuMessage(payload= @[byte 71], contentTopic=contentTopic),
|
||||
fakeWakuMessage(payload= @[byte 72], contentTopic=contentTopic),
|
||||
fakeWakuMessage(payload= @[byte 73], contentTopic=contentTopic)
|
||||
]
|
||||
|
||||
## When
|
||||
for msg in messages:
|
||||
await srcNode.publish(pubSubTopic, msg)
|
||||
|
||||
await sleepAsync(200.millis)
|
||||
|
||||
let dstMessages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
|
||||
|
||||
## Then
|
||||
check:
|
||||
# node3 is now subscribed to pubSubTopic
|
||||
sub
|
||||
dstMessages.len == 4
|
||||
dstMessages[2].payload == Base64String.encode(messages[2].payload)
|
||||
dstMessages[2].contentTopic.get() == messages[2].contentTopic
|
||||
dstMessages[2].timestamp.get() == messages[2].timestamp
|
||||
dstMessages[2].version.get() == messages[2].version
|
||||
|
||||
# Now publish and encrypt a message on node1 using node3's public key
|
||||
let posted = await client1.post_waku_v2_private_v1_asymmetric_message(pubSubTopic, message, publicKey = (%keypair.pubkey).getStr())
|
||||
check:
|
||||
posted
|
||||
## Cleanup
|
||||
await server.stop()
|
||||
await server.closeWait()
|
||||
await allFutures(srcNode.stop(), dstNode.stop())
|
||||
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
# Let's see if we can receive, and decrypt, this message on node3
|
||||
var messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
|
||||
|
||||
check:
|
||||
messages.len == 1
|
||||
messages[0].contentTopic.get == contentTopic
|
||||
messages[0].payload == payload
|
||||
|
||||
# Ensure that read messages are cleared from cache
|
||||
messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
|
||||
check:
|
||||
messages.len == 0
|
||||
|
||||
await server1.stop()
|
||||
await server1.closeWait()
|
||||
await server3.stop()
|
||||
await server3.closeWait()
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
suite "Waku v2 JSON-RPC API - Relay (Private)":
|
||||
|
||||
asyncTest "generate symmetric keys and encrypt/decrypt communication":
|
||||
let
|
||||
nodeKey1 = generateSecp256k1Key()
|
||||
node1 = WakuNode.new(nodeKey1, bindIp, Port(0))
|
||||
nodeKey2 = generateSecp256k1Key()
|
||||
node2 = WakuNode.new(nodeKey2, bindIp, Port(0))
|
||||
nodeKey3 = generateSecp256k1Key()
|
||||
node3 = WakuNode.new(nodeKey3, bindIp, Port(0), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = DefaultContentTopic
|
||||
payload = @[byte 9]
|
||||
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
|
||||
topicCache = newTestMessageCache()
|
||||
pubSubTopic = "test-relay-pubsub-topic"
|
||||
contentTopic = "test-relay-content-topic"
|
||||
|
||||
await node1.start()
|
||||
await node1.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
let
|
||||
srcNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(0))
|
||||
relNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(0))
|
||||
dstNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(0))
|
||||
|
||||
await node2.start()
|
||||
await node2.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
await allFutures(srcNode.start(), relNode.start(), dstNode.start())
|
||||
|
||||
await node3.start()
|
||||
await node3.mountRelay(@[DefaultPubsubTopic, pubSubTopic])
|
||||
await srcNode.mountRelay(@[pubSubTopic])
|
||||
await relNode.mountRelay(@[pubSubTopic])
|
||||
await dstNode.mountRelay(@[pubSubTopic])
|
||||
|
||||
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
await srcNode.connectToNodes(@[relNode.peerInfo.toRemotePeerInfo()])
|
||||
await relNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
# Setup two servers so we can see both sides of encrypted communication
|
||||
let
|
||||
rpcPort1 = Port(8556)
|
||||
ta1 = initTAddress(bindIp, rpcPort1)
|
||||
server1 = newRpcHttpServer([ta1])
|
||||
rpcPort3 = Port(8557)
|
||||
ta3 = initTAddress(bindIp, rpcPort3)
|
||||
server3 = newRpcHttpServer([ta3])
|
||||
srcRpcPort = Port(8554)
|
||||
srcTa = initTAddress(ValidIpAddress.init("127.0.0.1"), srcRpcPort)
|
||||
srcServer = newRpcHttpServer([srcTa])
|
||||
|
||||
# Let's connect to nodes 1 and 3 via the API
|
||||
installRelayPrivateApiHandlers(node1, server1, newTestMessageCache())
|
||||
installRelayPrivateApiHandlers(node3, server3, topicCache)
|
||||
installRelayApiHandlers(node3, server3, topicCache)
|
||||
server1.start()
|
||||
server3.start()
|
||||
let srcMessageCache = newTestMessageCache()
|
||||
installRelayApiHandlers(srcNode, srcServer, srcMessageCache)
|
||||
installRelayPrivateApiHandlers(srcNode, srcServer, srcMessageCache)
|
||||
srcServer.start()
|
||||
|
||||
let client1 = newRpcHttpClient()
|
||||
await client1.connect("127.0.0.1", rpcPort1, false)
|
||||
let
|
||||
dstRpcPort = Port(8555)
|
||||
dstTa = initTAddress(ValidIpAddress.init("127.0.0.1"), dstRpcPort)
|
||||
dstServer = newRpcHttpServer([dstTa])
|
||||
|
||||
let client3 = newRpcHttpClient()
|
||||
await client3.connect("127.0.0.1", rpcPort3, false)
|
||||
let dstMessageCache = newTestMessageCache()
|
||||
installRelayApiHandlers(dstNode, dstServer, dstMessageCache)
|
||||
installRelayPrivateApiHandlers(dstNode, dstServer, dstMessageCache)
|
||||
dstServer.start()
|
||||
|
||||
# Let's get a symkey for node3
|
||||
|
||||
let symkey = await client3.get_waku_v2_private_v1_symmetric_key()
|
||||
let srcClient = newRpcHttpClient()
|
||||
await srcClient.connect("127.0.0.1", srcRpcPort, false)
|
||||
|
||||
# Now try to subscribe on node3 using API
|
||||
let dstClient = newRpcHttpClient()
|
||||
await dstClient.connect("127.0.0.1", dstRpcPort, false)
|
||||
|
||||
let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
|
||||
## Given
|
||||
let
|
||||
payload = @[byte 38]
|
||||
payloadBase64 = Base64String.encode(payload)
|
||||
|
||||
await sleepAsync(100.millis)
|
||||
let message = WakuMessageRPC(
|
||||
payload: payloadBase64,
|
||||
contentTopic: some(contentTopic),
|
||||
timestamp: some(now()),
|
||||
)
|
||||
|
||||
check:
|
||||
# node3 is now subscribed to pubSubTopic
|
||||
sub
|
||||
## When
|
||||
let symkey = await dstClient.get_waku_v2_private_v1_symmetric_key()
|
||||
|
||||
# Now publish and encrypt a message on node1 using node3's symkey
|
||||
let posted = await client1.post_waku_v2_private_v1_symmetric_message(pubSubTopic, message, symkey = (%symkey).getStr())
|
||||
check:
|
||||
let posted = await srcCLient.post_waku_v2_private_v1_symmetric_message(pubSubTopic, message, symkey = (%symkey).getStr())
|
||||
require:
|
||||
posted
|
||||
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
# Let's see if we can receive, and decrypt, this message on node3
|
||||
var messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())
|
||||
|
||||
# Let's see if we can receive, and decrypt, this message on dstNode
|
||||
var messages = await dstClient.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())
|
||||
check:
|
||||
messages.len == 1
|
||||
messages[0].contentTopic.get == contentTopic
|
||||
messages[0].payload == payload
|
||||
messages[0].payload == payloadBase64
|
||||
messages[0].contentTopic == message.contentTopic
|
||||
messages[0].timestamp == message.timestamp
|
||||
messages[0].version.get() == 1'u32
|
||||
|
||||
# Ensure that read messages are cleared from cache
|
||||
messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())
|
||||
messages = await dstClient.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())
|
||||
check:
|
||||
messages.len == 0
|
||||
|
||||
await server1.stop()
|
||||
await server1.closeWait()
|
||||
await server3.stop()
|
||||
await server3.closeWait()
|
||||
## Cleanup
|
||||
await srcServer.stop()
|
||||
await srcServer.closeWait()
|
||||
await dstServer.stop()
|
||||
await dstServer.closeWait()
|
||||
|
||||
await allFutures(srcNode.stop(), relNode.stop(), dstNode.stop())
|
||||
|
||||
asyncTest "generate asymmetric keys and encrypt/decrypt communication":
|
||||
let
|
||||
pubSubTopic = "test-relay-pubsub-topic"
|
||||
contentTopic = "test-relay-content-topic"
|
||||
|
||||
let
|
||||
srcNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(0))
|
||||
relNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(0))
|
||||
dstNode = WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(0))
|
||||
|
||||
await allFutures(srcNode.start(), relNode.start(), dstNode.start())
|
||||
|
||||
await srcNode.mountRelay(@[pubSubTopic])
|
||||
await relNode.mountRelay(@[pubSubTopic])
|
||||
await dstNode.mountRelay(@[pubSubTopic])
|
||||
|
||||
await srcNode.connectToNodes(@[relNode.peerInfo.toRemotePeerInfo()])
|
||||
await relNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
# Setup two servers so we can see both sides of encrypted communication
|
||||
let
|
||||
srcRpcPort = Port(8554)
|
||||
srcTa = initTAddress(ValidIpAddress.init("127.0.0.1"), srcRpcPort)
|
||||
srcServer = newRpcHttpServer([srcTa])
|
||||
|
||||
let srcMessageCache = newTestMessageCache()
|
||||
installRelayApiHandlers(srcNode, srcServer, srcMessageCache)
|
||||
installRelayPrivateApiHandlers(srcNode, srcServer, srcMessageCache)
|
||||
srcServer.start()
|
||||
|
||||
let
|
||||
dstRpcPort = Port(8555)
|
||||
dstTa = initTAddress(ValidIpAddress.init("127.0.0.1"), dstRpcPort)
|
||||
dstServer = newRpcHttpServer([dstTa])
|
||||
|
||||
let dstMessageCache = newTestMessageCache()
|
||||
installRelayApiHandlers(dstNode, dstServer, dstMessageCache)
|
||||
installRelayPrivateApiHandlers(dstNode, dstServer, dstMessageCache)
|
||||
dstServer.start()
|
||||
|
||||
|
||||
let srcClient = newRpcHttpClient()
|
||||
await srcClient.connect("127.0.0.1", srcRpcPort, false)
|
||||
|
||||
let dstClient = newRpcHttpClient()
|
||||
await dstClient.connect("127.0.0.1", dstRpcPort, false)
|
||||
|
||||
## Given
|
||||
let
|
||||
payload = @[byte 38]
|
||||
payloadBase64 = Base64String.encode(payload)
|
||||
|
||||
let message = WakuMessageRPC(
|
||||
payload: payloadBase64,
|
||||
contentTopic: some(contentTopic),
|
||||
timestamp: some(now()),
|
||||
)
|
||||
|
||||
## When
|
||||
let keypair = await dstClient.get_waku_v2_private_v1_asymmetric_keypair()
|
||||
|
||||
# Now publish and encrypt a message on srcNode using dstNode's public key
|
||||
let posted = await srcClient.post_waku_v2_private_v1_asymmetric_message(pubSubTopic, message, publicKey = (%keypair.pubkey).getStr())
|
||||
require:
|
||||
posted
|
||||
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
# Let's see if we can receive, and decrypt, this message on dstNode
|
||||
var messages = await dstClient.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
|
||||
check:
|
||||
messages.len == 1
|
||||
messages[0].payload == payloadBase64
|
||||
messages[0].contentTopic == message.contentTopic
|
||||
messages[0].timestamp == message.timestamp
|
||||
messages[0].version.get() == 1'u32
|
||||
|
||||
# Ensure that read messages are cleared from cache
|
||||
messages = await dstClient.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
|
||||
check:
|
||||
messages.len == 0
|
||||
|
||||
## Cleanup
|
||||
await srcServer.stop()
|
||||
await srcServer.closeWait()
|
||||
await dstServer.stop()
|
||||
await dstServer.closeWait()
|
||||
|
||||
await allFutures(srcNode.stop(), relNode.stop(), dstNode.stop())
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import stew/[results, byteutils, base64]
|
||||
|
||||
|
||||
type Base64String* = distinct string
|
||||
|
||||
|
||||
proc encode*(t: type Base64String, value: string|seq[byte]): Base64String =
|
||||
let val = block:
|
||||
when value is string:
|
||||
toBytes(value)
|
||||
else:
|
||||
value
|
||||
Base64String(base64.encode(Base64, val))
|
||||
|
||||
proc decode*(t: Base64String): Result[seq[byte], string] =
|
||||
try:
|
||||
ok(base64.decode(Base64, string(t)))
|
||||
except:
|
||||
err("decoding failed: " & getCurrentExceptionMsg())
|
||||
|
||||
|
||||
proc `$`*(t: Base64String): string {.inline.}=
|
||||
string(t)
|
||||
|
||||
proc `==`*(lhs: Base64String|string, rhs: Base64String|string): bool {.inline.}=
|
||||
string(lhs) == string(rhs)
|
|
@ -0,0 +1,69 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
|
||||
type
|
||||
HexDataStr* = distinct string
|
||||
Identifier* = distinct string # 32 bytes, no 0x prefix!
|
||||
HexStrings* = HexDataStr | Identifier
|
||||
|
||||
|
||||
# Validation
|
||||
|
||||
template hasHexHeader(value: string): bool =
|
||||
if value.len >= 2 and value[0] == '0' and value[1] in {'x', 'X'}: true
|
||||
else: false
|
||||
|
||||
template isHexChar(c: char): bool =
|
||||
if c notin {'0'..'9'} and
|
||||
c notin {'a'..'f'} and
|
||||
c notin {'A'..'F'}: false
|
||||
else: true
|
||||
|
||||
func isValidHexQuantity*(value: string): bool =
|
||||
if not hasHexHeader(value):
|
||||
return false
|
||||
|
||||
# No leading zeros (but allow 0x0)
|
||||
if value.len < 3 or (value.len > 3 and value[2] == '0'):
|
||||
return false
|
||||
|
||||
for i in 2..<value.len:
|
||||
let c = value[i]
|
||||
if not isHexChar(c):
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
func isValidHexData*(value: string, header = true): bool =
|
||||
if header and not hasHexHeader(value):
|
||||
return false
|
||||
|
||||
# Must be even number of digits
|
||||
if value.len mod 2 != 0:
|
||||
return false
|
||||
|
||||
# Leading zeros are allowed
|
||||
for i in 2..<value.len:
|
||||
let c = value[i]
|
||||
if not isHexChar(c):
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
|
||||
template isValidHexData*(value: string, hexLen: int, header = true): bool =
|
||||
value.len == hexLen and value.isValidHexData(header)
|
||||
|
||||
proc validateHexData*(value: string) {.inline, raises: [ValueError].} =
|
||||
if unlikely(not isValidHexData(value)):
|
||||
raise newException(ValueError, "Invalid hex data format: " & value)
|
||||
|
||||
|
||||
# Initialisation
|
||||
|
||||
proc hexDataStr*(value: string): HexDataStr {.inline, raises: [ValueError].} =
|
||||
validateHexData(value)
|
||||
HexDataStr(value)
|
|
@ -1,208 +1,6 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2018 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
{.deprecated: "import 'waku/common/hexstrings' instead".}
|
||||
|
||||
## This module implements the Ethereum hexadecimal string formats for JSON
|
||||
## See: https://github.com/ethereum/wiki/wiki/JSON-RPC#hex-value-encoding
|
||||
import ../../../common/hexstrings
|
||||
|
||||
#[
|
||||
Note:
|
||||
The following types are converted to hex strings when marshalled to JSON:
|
||||
* Hash256
|
||||
* UInt256
|
||||
* seq[byte]
|
||||
* openArray[seq]
|
||||
* PublicKey
|
||||
* PrivateKey
|
||||
* SymKey
|
||||
* Bytes
|
||||
]#
|
||||
export hexstrings
|
||||
|
||||
import
|
||||
stew/byteutils,
|
||||
eth/keys,
|
||||
eth/common/eth_types,
|
||||
stint
|
||||
import
|
||||
../../waku/whisper/whisper_types
|
||||
|
||||
type
|
||||
HexDataStr* = distinct string
|
||||
Identifier* = distinct string # 32 bytes, no 0x prefix!
|
||||
HexStrings = HexDataStr | Identifier
|
||||
|
||||
# Hex validation
|
||||
|
||||
template hasHexHeader(value: string): bool =
|
||||
if value.len >= 2 and value[0] == '0' and value[1] in {'x', 'X'}: true
|
||||
else: false
|
||||
|
||||
template isHexChar(c: char): bool =
|
||||
if c notin {'0'..'9'} and
|
||||
c notin {'a'..'f'} and
|
||||
c notin {'A'..'F'}: false
|
||||
else: true
|
||||
|
||||
func isValidHexQuantity*(value: string): bool =
|
||||
if not value.hasHexHeader:
|
||||
return false
|
||||
# No leading zeros (but allow 0x0)
|
||||
if value.len < 3 or (value.len > 3 and value[2] == '0'): return false
|
||||
for i in 2 ..< value.len:
|
||||
let c = value[i]
|
||||
if not c.isHexChar:
|
||||
return false
|
||||
return true
|
||||
|
||||
func isValidHexData*(value: string, header = true): bool =
|
||||
if header and not value.hasHexHeader:
|
||||
return false
|
||||
# Must be even number of digits
|
||||
if value.len mod 2 != 0: return false
|
||||
# Leading zeros are allowed
|
||||
for i in 2 ..< value.len:
|
||||
let c = value[i]
|
||||
if not c.isHexChar:
|
||||
return false
|
||||
return true
|
||||
|
||||
template isValidHexData(value: string, hexLen: int, header = true): bool =
|
||||
value.len == hexLen and value.isValidHexData(header)
|
||||
|
||||
func isValidIdentifier*(value: string): bool =
|
||||
# 32 bytes for Whisper ID, no 0x prefix
|
||||
result = value.isValidHexData(64, false)
|
||||
|
||||
func isValidPublicKey*(value: string): bool =
|
||||
# 65 bytes for Public Key plus 1 byte for 0x prefix
|
||||
result = value.isValidHexData(132)
|
||||
|
||||
func isValidPrivateKey*(value: string): bool =
|
||||
# 32 bytes for Private Key plus 1 byte for 0x prefix
|
||||
result = value.isValidHexData(66)
|
||||
|
||||
func isValidSymKey*(value: string): bool =
|
||||
# 32 bytes for Private Key plus 1 byte for 0x prefix
|
||||
result = value.isValidHexData(66)
|
||||
|
||||
func isValidHash256*(value: string): bool =
|
||||
# 32 bytes for Hash256 plus 1 byte for 0x prefix
|
||||
result = value.isValidHexData(66)
|
||||
|
||||
const
|
||||
SInvalidData = "Invalid hex data format for Ethereum"
|
||||
|
||||
proc validateHexData*(value: string) {.inline.} =
|
||||
if unlikely(not value.isValidHexData):
|
||||
raise newException(ValueError, SInvalidData & ": " & value)
|
||||
|
||||
# Initialisation
|
||||
|
||||
proc hexDataStr*(value: string): HexDataStr {.inline.} =
|
||||
value.validateHexData
|
||||
result = value.HexDataStr
|
||||
|
||||
# Converters for use in RPC
|
||||
|
||||
import json
|
||||
from json_rpc/rpcserver import expect
|
||||
|
||||
proc `%`*(value: HexStrings): JsonNode =
|
||||
result = %(value.string)
|
||||
|
||||
# Overloads to support expected representation of hex data
|
||||
|
||||
proc `%`*(value: Hash256): JsonNode =
|
||||
#result = %("0x" & $value) # More clean but no lowercase :(
|
||||
result = %("0x" & value.data.toHex)
|
||||
|
||||
proc `%`*(value: UInt256): JsonNode =
|
||||
result = %("0x" & value.toString(16))
|
||||
|
||||
proc `%`*(value: PublicKey): JsonNode =
|
||||
result = %("0x04" & $value)
|
||||
|
||||
proc `%`*(value: PrivateKey): JsonNode =
|
||||
result = %("0x" & $value)
|
||||
|
||||
proc `%`*(value: SymKey): JsonNode =
|
||||
result = %("0x" & value.toHex)
|
||||
|
||||
proc `%`*(value: seq[byte]): JsonNode =
|
||||
if value.len > 0:
|
||||
result = %("0x" & value.toHex)
|
||||
else:
|
||||
result = newJArray()
|
||||
|
||||
# Helpers for the fromJson procs
|
||||
|
||||
proc toPublicKey*(key: string): PublicKey {.inline.} =
|
||||
result = PublicKey.fromHex(key[4 .. ^1]).tryGet()
|
||||
|
||||
proc toPrivateKey*(key: string): PrivateKey {.inline.} =
|
||||
result = PrivateKey.fromHex(key[2 .. ^1]).tryGet()
|
||||
|
||||
proc toSymKey*(key: string): SymKey {.inline.} =
|
||||
hexToByteArray(key[2 .. ^1], result)
|
||||
|
||||
# Marshalling from JSON to Nim types that includes format checking
|
||||
|
||||
func invalidMsg(name: string): string = "When marshalling from JSON, parameter \"" & name & "\" is not valid"
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, result: var HexDataStr) =
|
||||
n.kind.expect(JString, argName)
|
||||
let hexStr = n.getStr()
|
||||
if not hexStr.isValidHexData:
|
||||
raise newException(ValueError, invalidMsg(argName) & " as Ethereum data \"" & hexStr & "\"")
|
||||
result = hexStr.hexDataStr
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, result: var Identifier) =
|
||||
n.kind.expect(JString, argName)
|
||||
let hexStr = n.getStr()
|
||||
if not hexStr.isValidIdentifier:
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a identifier \"" & hexStr & "\"")
|
||||
result = hexStr.Identifier
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, result: var UInt256) =
|
||||
n.kind.expect(JString, argName)
|
||||
let hexStr = n.getStr()
|
||||
if not (hexStr.len <= 66 and hexStr.isValidHexQuantity):
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a UInt256 \"" & hexStr & "\"")
|
||||
result = readUintBE[256](hexToPaddedByteArray[32](hexStr))
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, result: var PublicKey) =
|
||||
n.kind.expect(JString, argName)
|
||||
let hexStr = n.getStr()
|
||||
if not hexStr.isValidPublicKey:
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a public key \"" & hexStr & "\"")
|
||||
result = hexStr.toPublicKey
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, result: var PrivateKey) =
|
||||
n.kind.expect(JString, argName)
|
||||
let hexStr = n.getStr()
|
||||
if not hexStr.isValidPrivateKey:
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a private key \"" & hexStr & "\"")
|
||||
result = hexStr.toPrivateKey
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, result: var SymKey) =
|
||||
n.kind.expect(JString, argName)
|
||||
let hexStr = n.getStr()
|
||||
if not hexStr.isValidSymKey:
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a symmetric key \"" & hexStr & "\"")
|
||||
result = toSymKey(hexStr)
|
||||
|
||||
# Following procs currently required only for testing, the `createRpcSigs` macro
|
||||
# requires it as it will convert the JSON results back to the original Nim
|
||||
# types, but it needs the `fromJson` calls for those specific Nim types to do so
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, result: var Hash256) =
|
||||
n.kind.expect(JString, argName)
|
||||
let hexStr = n.getStr()
|
||||
if not hexStr.isValidHash256:
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a Hash256 \"" & hexStr & "\"")
|
||||
hexToByteArray(hexStr, result.data)
|
||||
|
|
|
@ -1,22 +1,19 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
json
|
||||
import
|
||||
../../../waku/v2/protocol/waku_message,
|
||||
./hexstrings
|
||||
stew/byteutils,
|
||||
json,
|
||||
json_rpc/rpcserver
|
||||
|
||||
export hexstrings
|
||||
|
||||
## Json marshalling
|
||||
func invalidMsg*(name: string): string =
|
||||
"When marshalling from JSON, parameter \"" & name & "\" is not valid"
|
||||
|
||||
proc `%`*(value: WakuMessage): JsonNode =
|
||||
## This ensures that seq[byte] fields are marshalled to hex-format JStrings
|
||||
## (as defined in `hexstrings.nim`) rather than the default JArray[JInt]
|
||||
let jObj = newJObject()
|
||||
for k, v in value.fieldPairs:
|
||||
jObj[k] = %v
|
||||
return jObj
|
||||
|
||||
## JSON marshalling
|
||||
|
||||
# seq[byte]
|
||||
|
||||
proc `%`*(value: seq[byte]): JsonNode =
|
||||
if value.len > 0:
|
||||
%("0x" & value.toHex())
|
||||
else:
|
||||
newJArray()
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
import
|
||||
std/options,
|
||||
json,
|
||||
json_rpc/rpcserver
|
||||
import
|
||||
../../../waku/common/base64,
|
||||
../../../waku/v2/protocol/waku_message,
|
||||
../../../waku/v2/utils/time
|
||||
|
||||
|
||||
type
|
||||
WakuMessageRPC* = object
|
||||
payload*: Base64String
|
||||
contentTopic*: Option[ContentTopic]
|
||||
version*: Option[uint32]
|
||||
timestamp*: Option[Timestamp]
|
||||
ephemeral*: Option[bool]
|
||||
|
||||
|
||||
## Type mappings
|
||||
|
||||
func toWakuMessageRPC*(msg: WakuMessage): WakuMessageRPC =
|
||||
WakuMessageRPC(
|
||||
payload: Base64String.encode(msg.payload),
|
||||
contentTopic: some(msg.contentTopic),
|
||||
version: some(msg.version),
|
||||
timestamp: some(msg.timestamp),
|
||||
ephemeral: some(msg.ephemeral)
|
||||
)
|
||||
|
||||
|
||||
## JSON-RPC type marshalling
|
||||
|
||||
# Base64String
|
||||
|
||||
proc `%`*(value: Base64String): JsonNode =
|
||||
%(value.string)
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, value: var Base64String) =
|
||||
n.kind.expect(JString, argName)
|
||||
|
||||
value = Base64String(n.getStr())
|
||||
|
||||
# WakuMessageRpc (WakuMessage)
|
||||
|
||||
proc `%`*(value: WakuMessageRpc): JsonNode =
|
||||
let jObj = newJObject()
|
||||
for k, v in value.fieldPairs:
|
||||
jObj[k] = %v
|
||||
return jObj
|
|
@ -1,17 +1,16 @@
|
|||
# Relay API
|
||||
|
||||
proc post_waku_v2_relay_v1_message(topic: string, message: WakuRelayMessage): bool
|
||||
proc get_waku_v2_relay_v1_messages(topic: string): seq[WakuMessage]
|
||||
proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||
proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
|
||||
proc post_waku_v2_relay_v1_subscriptions(topics: seq[PubsubTopic]): bool
|
||||
proc delete_waku_v2_relay_v1_subscriptions(topics: seq[PubsubTopic]): bool
|
||||
proc post_waku_v2_relay_v1_message(topic: PubsubTopic, message: WakuMessageRPC): bool
|
||||
proc get_waku_v2_relay_v1_messages(topic: PubsubTopic): seq[WakuMessageRPC]
|
||||
|
||||
|
||||
# Relay Private API
|
||||
# Symmetric
|
||||
proc get_waku_v2_private_v1_symmetric_key(): SymKey
|
||||
proc post_waku_v2_private_v1_symmetric_message(topic: string, message: WakuRelayMessage, symkey: string): bool
|
||||
proc get_waku_v2_private_v1_symmetric_messages(topic: string, symkey: string): seq[WakuRelayMessage]
|
||||
proc post_waku_v2_private_v1_symmetric_message(topic: string, message: WakuMessageRPC, symkey: string): bool
|
||||
proc get_waku_v2_private_v1_symmetric_messages(topic: string, symkey: string): seq[WakuMessageRPC]
|
||||
# Asymmetric
|
||||
proc get_waku_v2_private_v1_asymmetric_keypair(): WakuKeyPair
|
||||
proc post_waku_v2_private_v1_asymmetric_message(topic: string, message: WakuRelayMessage, publicKey: string): bool
|
||||
proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage]
|
||||
proc post_waku_v2_private_v1_asymmetric_message(topic: string, message: WakuMessageRPC, publicKey: string): bool
|
||||
proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuMessageRPC]
|
||||
|
|
|
@ -9,7 +9,6 @@ import
|
|||
import
|
||||
../../../../waku/v2/protocol/waku_message,
|
||||
../../../../waku/v2/utils/compat,
|
||||
../marshalling,
|
||||
./types
|
||||
|
||||
export types
|
||||
|
|
|
@ -10,13 +10,13 @@ import
|
|||
eth/keys,
|
||||
nimcrypto/sysrand
|
||||
import
|
||||
../../../../waku/common/base64,
|
||||
../../../../waku/v2/protocol/waku_message,
|
||||
../../../../waku/v2/protocol/waku_relay,
|
||||
../../../../waku/v2/node/waku_node,
|
||||
../../../../waku/v2/node/message_cache,
|
||||
../../../../waku/v2/utils/compat,
|
||||
../../../../waku/v2/utils/time,
|
||||
../hexstrings,
|
||||
./types
|
||||
|
||||
|
||||
|
@ -47,7 +47,7 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
|
|||
cache.subscribe(topic)
|
||||
|
||||
|
||||
server.rpc("post_waku_v2_relay_v1_subscriptions") do (topics: seq[string]) -> bool:
|
||||
server.rpc("post_waku_v2_relay_v1_subscriptions") do (topics: seq[PubsubTopic]) -> bool:
|
||||
## Subscribes a node to a list of PubSub topics
|
||||
debug "post_waku_v2_relay_v1_subscriptions"
|
||||
|
||||
|
@ -61,7 +61,7 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
|
|||
|
||||
return true
|
||||
|
||||
server.rpc("delete_waku_v2_relay_v1_subscriptions") do (topics: seq[string]) -> bool:
|
||||
server.rpc("delete_waku_v2_relay_v1_subscriptions") do (topics: seq[PubsubTopic]) -> bool:
|
||||
## Unsubscribes a node from a list of PubSub topics
|
||||
debug "delete_waku_v2_relay_v1_subscriptions"
|
||||
|
||||
|
@ -72,18 +72,22 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
|
|||
|
||||
return true
|
||||
|
||||
server.rpc("post_waku_v2_relay_v1_message") do (topic: string, msg: WakuRelayMessage) -> bool:
|
||||
server.rpc("post_waku_v2_relay_v1_message") do (topic: PubsubTopic, msg: WakuMessageRPC) -> bool:
|
||||
## Publishes a WakuMessage to a PubSub topic
|
||||
debug "post_waku_v2_relay_v1_message"
|
||||
|
||||
let message = block:
|
||||
WakuMessage(
|
||||
payload: msg.payload,
|
||||
# TODO: Fail if the message doesn't have a content topic
|
||||
contentTopic: msg.contentTopic.get(DefaultContentTopic),
|
||||
version: 0,
|
||||
timestamp: msg.timestamp.get(Timestamp(0))
|
||||
)
|
||||
let payloadRes = base64.decode(msg.payload)
|
||||
if payloadRes.isErr():
|
||||
raise newException(ValueError, "invalid payload format: " & payloadRes.error)
|
||||
|
||||
let message = WakuMessage(
|
||||
payload: payloadRes.value,
|
||||
# TODO: Fail if the message doesn't have a content topic
|
||||
contentTopic: msg.contentTopic.get(DefaultContentTopic),
|
||||
version: msg.version.get(0'u32),
|
||||
timestamp: msg.timestamp.get(Timestamp(0)),
|
||||
ephemeral: msg.ephemeral.get(false)
|
||||
)
|
||||
|
||||
let publishFut = node.publish(topic, message)
|
||||
|
||||
|
@ -92,7 +96,7 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
|
|||
|
||||
return true
|
||||
|
||||
server.rpc("get_waku_v2_relay_v1_messages") do (topic: string) -> seq[WakuMessage]:
|
||||
server.rpc("get_waku_v2_relay_v1_messages") do (topic: PubsubTopic) -> seq[WakuMessageRPC]:
|
||||
## Returns all WakuMessages received on a PubSub topic since the
|
||||
## last time this method was called
|
||||
debug "get_waku_v2_relay_v1_messages", topic=topic
|
||||
|
@ -104,37 +108,29 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
|
|||
if msgRes.isErr():
|
||||
raise newException(ValueError, "Not subscribed to topic: " & topic)
|
||||
|
||||
return msgRes.value
|
||||
return msgRes.value.map(toWakuMessageRPC)
|
||||
|
||||
|
||||
## Waku Relay Private JSON-RPC API (Whisper/Waku v1 compatibility)
|
||||
|
||||
proc toWakuMessage(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
|
||||
let payload = Payload(payload: relayMessage.payload,
|
||||
dst: pubKey,
|
||||
symkey: symkey)
|
||||
|
||||
var t: Timestamp
|
||||
if relayMessage.timestamp.isSome:
|
||||
t = relayMessage.timestamp.get
|
||||
func keyInfo(symkey: Option[SymKey], privateKey: Option[PrivateKey]): KeyInfo =
|
||||
if symkey.isSome():
|
||||
KeyInfo(kind: Symmetric, symKey: symkey.get())
|
||||
elif privateKey.isSome():
|
||||
KeyInfo(kind: Asymmetric, privKey: privateKey.get())
|
||||
else:
|
||||
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
|
||||
t = Timestamp(0)
|
||||
KeyInfo(kind: KeyKind.None)
|
||||
|
||||
WakuMessage(payload: payload.encode(version, rng[]).get(),
|
||||
contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
|
||||
version: version,
|
||||
timestamp: t)
|
||||
|
||||
proc toWakuRelayMessage(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage =
|
||||
proc toWakuMessageRPC(message: WakuMessage,
|
||||
symkey = none(SymKey),
|
||||
privateKey = none(PrivateKey)): WakuMessageRPC =
|
||||
let
|
||||
keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get())
|
||||
elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get())
|
||||
else: KeyInfo(kind: KeyKind.None)
|
||||
keyInfo = keyInfo(symkey, privateKey)
|
||||
decoded = decodePayload(message, keyInfo)
|
||||
|
||||
WakuRelayMessage(payload: decoded.get().payload,
|
||||
WakuMessageRPC(payload: Base64String.encode(decoded.get().payload),
|
||||
contentTopic: some(message.contentTopic),
|
||||
version: some(message.version),
|
||||
timestamp: some(message.timestamp))
|
||||
|
||||
|
||||
|
@ -150,39 +146,57 @@ proc installRelayPrivateApiHandlers*(node: WakuNode, server: RpcServer, cache: M
|
|||
|
||||
return key
|
||||
|
||||
server.rpc("post_waku_v2_private_v1_symmetric_message") do (topic: string, message: WakuRelayMessage, symkey: string) -> bool:
|
||||
server.rpc("post_waku_v2_private_v1_symmetric_message") do (topic: string, msg: WakuMessageRPC, symkey: string) -> bool:
|
||||
## Publishes and encrypts a message to be relayed on a PubSub topic
|
||||
debug "post_waku_v2_private_v1_symmetric_message"
|
||||
|
||||
let msg = message.toWakuMessage(version = 1,
|
||||
rng = node.rng,
|
||||
pubKey = none(keys.PublicKey),
|
||||
symkey = some(symkey.toSymKey()))
|
||||
let payloadRes = base64.decode(msg.payload)
|
||||
if payloadRes.isErr():
|
||||
raise newException(ValueError, "invalid payload format: " & payloadRes.error)
|
||||
|
||||
if (await node.publish(topic, msg).withTimeout(futTimeout)):
|
||||
# Successfully published message
|
||||
return true
|
||||
else:
|
||||
# Failed to publish message to topic
|
||||
raise newException(ValueError, "Failed to publish to topic " & topic)
|
||||
let payloadV1 = Payload(
|
||||
payload: payloadRes.value,
|
||||
dst: none(keys.PublicKey),
|
||||
symkey: some(symkey.toSymKey())
|
||||
)
|
||||
|
||||
server.rpc("get_waku_v2_private_v1_symmetric_messages") do (topic: string, symkey: string) -> seq[WakuRelayMessage]:
|
||||
let encryptedPayloadRes = payloadV1.encode(1, node.rng[])
|
||||
if encryptedPayloadRes.isErr():
|
||||
raise newException(ValueError, "payload encryption failed: " & $encryptedPayloadRes.error)
|
||||
|
||||
let message = WakuMessage(
|
||||
payload: encryptedPayloadRes.value,
|
||||
# TODO: Fail if the message doesn't have a content topic
|
||||
contentTopic: msg.contentTopic.get(DefaultContentTopic),
|
||||
version: 1,
|
||||
timestamp: msg.timestamp.get(Timestamp(0)),
|
||||
ephemeral: msg.ephemeral.get(false)
|
||||
)
|
||||
|
||||
let publishFut = node.publish(topic, message)
|
||||
if not await publishFut.withTimeout(futTimeout):
|
||||
raise newException(ValueError, "publish to topic timed out")
|
||||
|
||||
# Successfully published message
|
||||
return true
|
||||
|
||||
server.rpc("get_waku_v2_private_v1_symmetric_messages") do (topic: string, symkey: string) -> seq[WakuMessageRPC]:
|
||||
## Returns all WakuMessages received on a PubSub topic since the
|
||||
## last time this method was called. Decrypts the message payloads
|
||||
## before returning.
|
||||
debug "get_waku_v2_private_v1_symmetric_messages", topic=topic
|
||||
|
||||
if not cache.isSubscribed(topic):
|
||||
raise newException(ValueError, "Not subscribed to topic: " & topic)
|
||||
raise newException(ValueError, "not subscribed to topic: " & topic)
|
||||
|
||||
let msgRes = cache.getMessages(topic, clear=true)
|
||||
if msgRes.isErr():
|
||||
raise newException(ValueError, "Not subscribed to topic: " & topic)
|
||||
raise newException(ValueError, "not subscribed to topic: " & topic)
|
||||
|
||||
let msgs = msgRes.get()
|
||||
|
||||
return msgs.mapIt(it.toWakuRelayMessage(symkey=some(symkey.toSymKey()),
|
||||
privateKey=none(keys.PrivateKey)))
|
||||
let key = some(symkey.toSymKey())
|
||||
return msgs.mapIt(it.toWakuMessageRPC(symkey=key))
|
||||
|
||||
server.rpc("get_waku_v2_private_v1_asymmetric_keypair") do () -> WakuKeyPair:
|
||||
## Generates and returns a public/private key pair for asymmetric message encryption and decryption.
|
||||
|
@ -192,34 +206,54 @@ proc installRelayPrivateApiHandlers*(node: WakuNode, server: RpcServer, cache: M
|
|||
|
||||
return WakuKeyPair(seckey: privKey, pubkey: privKey.toPublicKey())
|
||||
|
||||
server.rpc("post_waku_v2_private_v1_asymmetric_message") do (topic: string, message: WakuRelayMessage, publicKey: string) -> bool:
|
||||
server.rpc("post_waku_v2_private_v1_asymmetric_message") do (topic: string, msg: WakuMessageRPC, publicKey: string) -> bool:
|
||||
## Publishes and encrypts a message to be relayed on a PubSub topic
|
||||
debug "post_waku_v2_private_v1_asymmetric_message"
|
||||
|
||||
let msg = message.toWakuMessage(version = 1,
|
||||
rng = node.rng,
|
||||
symkey = none(SymKey),
|
||||
pubKey = some(publicKey.toPublicKey()))
|
||||
let payloadRes = base64.decode(msg.payload)
|
||||
if payloadRes.isErr():
|
||||
raise newException(ValueError, "invalid payload format: " & payloadRes.error)
|
||||
|
||||
let publishFut = node.publish(topic, msg)
|
||||
let payloadV1 = Payload(
|
||||
payload: payloadRes.value,
|
||||
dst: some(publicKey.toPublicKey()),
|
||||
symkey: none(SymKey)
|
||||
)
|
||||
|
||||
let encryptedPayloadRes = payloadV1.encode(1, node.rng[])
|
||||
if encryptedPayloadRes.isErr():
|
||||
raise newException(ValueError, "payload encryption failed: " & $encryptedPayloadRes.error)
|
||||
|
||||
let message = WakuMessage(
|
||||
payload: encryptedPayloadRes.value,
|
||||
# TODO: Fail if the message doesn't have a content topic
|
||||
contentTopic: msg.contentTopic.get(DefaultContentTopic),
|
||||
version: 1,
|
||||
timestamp: msg.timestamp.get(Timestamp(0)),
|
||||
ephemeral: msg.ephemeral.get(false)
|
||||
)
|
||||
|
||||
let publishFut = node.publish(topic, message)
|
||||
if not await publishFut.withTimeout(futTimeout):
|
||||
raise newException(ValueError, "Failed to publish to topic " & topic)
|
||||
raise newException(ValueError, "publish to topic timed out")
|
||||
|
||||
# Successfully published message
|
||||
return true
|
||||
|
||||
server.rpc("get_waku_v2_private_v1_asymmetric_messages") do (topic: string, privateKey: string) -> seq[WakuRelayMessage]:
|
||||
server.rpc("get_waku_v2_private_v1_asymmetric_messages") do (topic: string, privateKey: string) -> seq[WakuMessageRPC]:
|
||||
## Returns all WakuMessages received on a PubSub topic since the
|
||||
## last time this method was called. Decrypts the message payloads
|
||||
## before returning.
|
||||
debug "get_waku_v2_private_v1_asymmetric_messages", topic=topic
|
||||
|
||||
if not cache.isSubscribed(topic):
|
||||
raise newException(ValueError, "Not subscribed to topic: " & topic)
|
||||
raise newException(ValueError, "not subscribed to topic: " & topic)
|
||||
|
||||
let msgRes = cache.getMessages(topic, clear=true)
|
||||
if msgRes.isErr():
|
||||
raise newException(ValueError, "Not subscribed to topic: " & topic)
|
||||
raise newException(ValueError, "not subscribed to topic: " & topic)
|
||||
|
||||
let msgs = msgRes.get()
|
||||
return msgs.mapIt(it.toWakuRelayMessage(symkey=none(SymKey),
|
||||
privateKey=some(privateKey.toPrivateKey())))
|
||||
|
||||
let key = some(privateKey.toPrivateKey())
|
||||
return msgs.mapIt(it.toWakuMessageRPC(privateKey=key))
|
||||
|
|
|
@ -1,22 +1,84 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
import
|
||||
stew/[results, byteutils],
|
||||
eth/keys,
|
||||
json,
|
||||
json_rpc/rpcserver
|
||||
import
|
||||
../../waku/whisper/whisper_types,
|
||||
../../waku/common/hexstrings,
|
||||
../marshalling,
|
||||
../message
|
||||
|
||||
export message
|
||||
|
||||
import
|
||||
std/options,
|
||||
eth/keys
|
||||
import
|
||||
../../../../waku/v2/protocol/waku_message,
|
||||
../../../../waku/v2/utils/time
|
||||
|
||||
type
|
||||
WakuRelayMessage* = object
|
||||
payload*: seq[byte]
|
||||
contentTopic*: Option[ContentTopic]
|
||||
timestamp*: Option[Timestamp]
|
||||
|
||||
WakuKeyPair* = object
|
||||
seckey*: keys.PrivateKey
|
||||
pubkey*: keys.PublicKey
|
||||
|
||||
|
||||
## JSON-RPC type marshalling
|
||||
|
||||
# SymKey
|
||||
|
||||
proc `%`*(value: SymKey): JsonNode =
|
||||
%("0x" & value.toHex())
|
||||
|
||||
func isValidSymKey*(value: string): bool =
|
||||
# 32 bytes for Private Key plus 1 byte for 0x prefix
|
||||
value.isValidHexData(66)
|
||||
|
||||
proc toSymKey*(key: string): SymKey {.inline.} =
|
||||
hexToByteArray(key[2 .. ^1], result)
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, value: var SymKey) =
|
||||
n.kind.expect(JString, argName)
|
||||
|
||||
let hexStr = n.getStr()
|
||||
if not isValidSymKey(hexStr):
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a symmetric key \"" & hexStr & "\"")
|
||||
|
||||
value = hexStr.toSymKey()
|
||||
|
||||
# PublicKey
|
||||
|
||||
proc `%`*(value: PublicKey): JsonNode =
|
||||
%("0x04" & $value)
|
||||
|
||||
func isValidPublicKey*(value: string): bool =
|
||||
# 65 bytes for Public Key plus 1 byte for 0x prefix
|
||||
value.isValidHexData(132)
|
||||
|
||||
proc toPublicKey*(key: string): PublicKey {.inline.} =
|
||||
PublicKey.fromHex(key[4 .. ^1]).tryGet()
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, value: var PublicKey) =
|
||||
n.kind.expect(JString, argName)
|
||||
|
||||
let hexStr = n.getStr()
|
||||
if not isValidPublicKey(hexStr):
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a public key \"" & hexStr & "\"")
|
||||
|
||||
value = hexStr.toPublicKey()
|
||||
|
||||
# PrivateKey
|
||||
|
||||
proc `%`*(value: PrivateKey): JsonNode =
|
||||
%("0x" & $value)
|
||||
|
||||
func isValidPrivateKey*(value: string): bool =
|
||||
# 32 bytes for Private Key plus 1 byte for 0x prefix
|
||||
value.isValidHexData(66)
|
||||
|
||||
proc toPrivateKey*(key: string): PrivateKey {.inline.} =
|
||||
PrivateKey.fromHex(key[2 .. ^1]).tryGet()
|
||||
|
||||
proc fromJson*(n: JsonNode, argName: string, value: var PrivateKey) =
|
||||
n.kind.expect(JString, argName)
|
||||
|
||||
let hexStr = n.getStr()
|
||||
if not isValidPrivateKey(hexStr):
|
||||
raise newException(ValueError, invalidMsg(argName) & " as a private key \"" & hexStr & "\"")
|
||||
|
||||
value = hexStr.toPrivateKey()
|
||||
|
|
|
@ -41,7 +41,7 @@ proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions =
|
|||
|
||||
proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
|
||||
StoreResponse(
|
||||
messages: response.messages,
|
||||
messages: response.messages.map(toWakuMessageRPC),
|
||||
pagingOptions: if response.cursor.isNone(): none(StorePagingOptions)
|
||||
else: some(StorePagingOptions(
|
||||
pageSize: uint64(response.messages.len), # This field will be deprecated soon
|
||||
|
|
|
@ -6,13 +6,15 @@ else:
|
|||
import
|
||||
std/options
|
||||
import
|
||||
../../../../waku/v2/protocol/waku_message,
|
||||
../../../../waku/v2/protocol/waku_store/rpc
|
||||
../../../../waku/v2/protocol/waku_store/rpc,
|
||||
../message
|
||||
|
||||
export message
|
||||
|
||||
|
||||
type
|
||||
StoreResponse* = object
|
||||
messages*: seq[WakuMessage]
|
||||
messages*: seq[WakuMessageRPC]
|
||||
pagingOptions*: Option[StorePagingOptions]
|
||||
|
||||
StorePagingOptions* = object
|
||||
|
|
|
@ -1,30 +1,5 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
{.deprecated: "import 'waku/common/base64' instead".}
|
||||
|
||||
import stew/[results, byteutils, base64]
|
||||
import ../../../common/base64
|
||||
|
||||
|
||||
type Base64String* = distinct string
|
||||
|
||||
|
||||
proc encode*(t: type Base64String, value: string|seq[byte]): Base64String =
|
||||
let val = block:
|
||||
when value is string:
|
||||
toBytes(value)
|
||||
else:
|
||||
value
|
||||
Base64String(base64.encode(Base64, val))
|
||||
|
||||
proc decode*(t: Base64String): Result[seq[byte], cstring] =
|
||||
try:
|
||||
ok(base64.decode(Base64, string(t)))
|
||||
except:
|
||||
err("failed to decode base64 string")
|
||||
|
||||
proc `$`*(t: Base64String): string {.inline.}=
|
||||
string(t)
|
||||
|
||||
proc `==`*(lhs: Base64String|string, rhs: Base64String|string): bool {.inline.}=
|
||||
string(lhs) == string(rhs)
|
||||
export base64
|
||||
|
|
|
@ -43,7 +43,7 @@ proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
|
|||
timestamp: some(msg.timestamp)
|
||||
)
|
||||
|
||||
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] =
|
||||
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, string] =
|
||||
let
|
||||
payload = ?msg.payload.decode()
|
||||
contentTopic = msg.contentTopic.get(DefaultContentTopic)
|
||||
|
|
Loading…
Reference in New Issue