refactor(jsonrpc): deep code and tests reorganization

This commit is contained in:
Lorenzo Delgado 2023-02-10 10:43:16 +01:00 committed by GitHub
parent a853bf52f0
commit 2f390ce884
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1533 additions and 1426 deletions

View File

@ -46,7 +46,7 @@ type
pollPeriod: chronos.Duration
seen: seq[Hash] #FIFO queue
contentTopic: string
MbMessageHandler* = proc (jsonNode: JsonNode) {.gcsafe.}
###################
@ -55,12 +55,12 @@ type
proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool =
if sequence.contains(hash):
return true
return true
if sequence.len >= DeduplQSize:
trace "Deduplication queue full. Removing oldest item."
sequence.delete 0, 0 # Remove first item in queue
sequence.add(hash)
return false
@ -88,7 +88,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
return
trace "Post Matterbridge message to chat2"
chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])
await cmb.nodev2.publish(DefaultPubsubTopic, msg)
@ -114,7 +114,7 @@ proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises:
let postRes = cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload),
username = chat2Msg[].nick)
if postRes.isErr() or (postRes[] == false):
chat2_mb_dropped.inc(labelValues = ["duplicate"])
error "Matterbridge host unreachable. Dropping message."
@ -146,10 +146,10 @@ proc new*(T: type Chat2MatterBridge,
contentTopic: string): T
{.raises: [Defect, ValueError, KeyError, TLSStreamProtocolError, IOError, LPError].} =
# Setup Matterbridge
# Setup Matterbridge
let
mbClient = MatterbridgeClient.new(mbHostUri, mbGateway)
# Let's verify the Matterbridge configuration before continuing
let clientHealth = mbClient.isHealthy()
@ -163,7 +163,7 @@ proc new*(T: type Chat2MatterBridge,
nodev2 = WakuNode.new(nodev2Key,
nodev2BindIp, nodev2BindPort,
nodev2ExtIp, nodev2ExtPort)
return Chat2MatterBridge(mbClient: mbClient,
nodev2: nodev2,
running: false,
@ -176,18 +176,18 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
cmb.running = true
debug "Start polling Matterbridge"
# Start Matterbridge polling (@TODO: use streaming interface)
proc mbHandler(jsonNode: JsonNode) {.gcsafe, raises: [Exception].} =
trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode
waitFor cmb.toChat2(jsonNode)
asyncSpawn cmb.pollMatterbridge(mbHandler)
# Start Waku v2 node
debug "Start listening on Waku v2"
await cmb.nodev2.start()
# Always mount relay for bridge
# `triggerSelf` is false on a `bridge` to avoid duplicates
await cmb.nodev2.mountRelay(triggerSelf = false)
@ -199,12 +199,12 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
if msg.isOk():
trace "Bridging message from Chat2 to Matterbridge", msg=msg[]
cmb.toMatterbridge(msg[])
cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)
proc stop*(cmb: Chat2MatterBridge) {.async.} =
info "Stopping Chat2MatterBridge"
cmb.running = false
await cmb.nodev2.stop()
@ -213,32 +213,34 @@ proc stop*(cmb: Chat2MatterBridge) {.async.} =
when isMainModule:
import
../../../waku/common/utils/nat,
../../../waku/v2/node/jsonrpc/[debug_api,
filter_api,
relay_api,
store_api]
../../waku/v2/node/message_cache,
../../waku/v2/node/jsonrpc/debug/handlers as debug_api,
../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
../../waku/v2/node/jsonrpc/relay/handlers as relay_api,
../../waku/v2/node/jsonrpc/store/handlers as store_api
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) {.raises: [Exception].} =
installDebugApiHandlers(node, rpcServer)
# Install enabled API handlers:
if conf.relay:
let topicCache = newTable[string, seq[WakuMessage]]()
let topicCache = relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(node, rpcServer, topicCache)
if conf.filter:
let messageCache = newTable[ContentTopic, seq[WakuMessage]]()
let messageCache = filter_api.MessageCache.init(capacity=30)
installFilterApiHandlers(node, rpcServer, messageCache)
if conf.store:
installStoreApiHandlers(node, rpcServer)
rpcServer.start()
let
rng = newRng()
conf = Chat2MatterbridgeConf.load()
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
@ -262,7 +264,7 @@ when isMainModule:
nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = extPort,
contentTopic = conf.contentTopic)
waitFor bridge.start()
# Now load rest of config

View File

@ -27,12 +27,13 @@ import
../../waku/v2/utils/namespacing,
../../waku/v2/utils/time,
../../waku/v2/protocol/waku_message,
../../waku/v2/node/message_cache,
../../waku/v2/node/waku_node,
../../waku/v2/node/peer_manager,
../../waku/v2/node/jsonrpc/[debug_api,
filter_api,
relay_api,
store_api],
../../waku/v2/node/jsonrpc/debug/handlers as debug_api,
../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
../../waku/v2/node/jsonrpc/relay/handlers as relay_api,
../../waku/v2/node/jsonrpc/store/handlers as store_api,
./message_compat,
./config
@ -298,11 +299,11 @@ proc setupV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuBridgeConf)
# Install enabled API handlers:
if conf.relay:
let topicCache = newTable[PubsubTopic, seq[WakuMessage]]()
let topicCache = relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(node, rpcServer, topicCache)
if conf.filternode != "":
let messageCache = newTable[ContentTopic, seq[WakuMessage]]()
let messageCache = filter_api.MessageCache.init(capacity=30)
installFilterApiHandlers(node, rpcServer, messageCache)
if conf.storenode != "":

View File

@ -10,50 +10,46 @@ import
json_rpc/rpcserver
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/message_cache,
../../waku/v2/node/waku_node,
../../waku/v2/node/jsonrpc/[admin_api,
debug_api,
filter_api,
relay_api,
store_api,
private_api,
debug_api],
../../waku/v2/node/jsonrpc/admin/handlers as admin_api,
../../waku/v2/node/jsonrpc/debug/handlers as debug_api,
../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
../../waku/v2/node/jsonrpc/relay/handlers as relay_api,
../../waku/v2/node/jsonrpc/store/handlers as store_api,
./config
logScope:
topics = "wakunode jsonrpc"
proc startRpcServer*(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf)
proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf)
{.raises: [CatchableError].} =
let
ta = initTAddress(rpcIp, rpcPort)
rpcServer = newRpcHttpServer([ta])
ta = initTAddress(address, port)
server = newRpcHttpServer([ta])
installDebugApiHandlers(node, rpcServer)
installDebugApiHandlers(node, server)
# TODO: Move to setup protocols proc
if conf.relay:
let topicCache = newTable[PubsubTopic, seq[WakuMessage]]()
installRelayApiHandlers(node, rpcServer, topicCache)
let relayMessageCache = relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(node, server, relayMessageCache)
if conf.rpcPrivate:
# Private API access allows WakuRelay functionality that
# is backwards compatible with Waku v1.
installPrivateApiHandlers(node, rpcServer, topicCache)
installRelayPrivateApiHandlers(node, server, relayMessageCache)
# TODO: Move to setup protocols proc
if conf.filternode != "":
let messageCache = newTable[ContentTopic, seq[WakuMessage]]()
installFilterApiHandlers(node, rpcServer, messageCache)
let filterMessageCache = filter_api.MessageCache.init(capacity=30)
installFilterApiHandlers(node, server, filterMessageCache)
# TODO: Move to setup protocols proc
if conf.storenode != "":
installStoreApiHandlers(node, rpcServer)
installStoreApiHandlers(node, server)
if conf.rpcAdmin:
installAdminApiHandlers(node, rpcServer)
installAdminApiHandlers(node, server)
rpcServer.start()
server.start()
info "RPC Server started", address=ta

View File

@ -42,7 +42,6 @@ import
./v2/test_peer_store_extended,
./v2/test_utils_peers,
./v2/test_message_cache,
./v2/test_jsonrpc_waku,
./v2/test_rest_serdes,
./v2/test_rest_debug_api_serdes,
./v2/test_rest_debug_api,
@ -68,6 +67,14 @@ import
./v2/test_waku_keystore_keyfile,
./v2/test_waku_keystore
## Wakunode JSON-RPC API test suite
import
./v2/wakunode_jsonrpc/test_jsonrpc_admin,
./v2/wakunode_jsonrpc/test_jsonrpc_debug,
./v2/wakunode_jsonrpc/test_jsonrpc_filter,
./v2/wakunode_jsonrpc/test_jsonrpc_relay,
./v2/wakunode_jsonrpc/test_jsonrpc_store
## Apps

View File

@ -1,699 +0,0 @@
{.used.}
import
std/[options, sets, tables, os, strutils, sequtils, times],
chronicles,
testutils/unittests, stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
eth/keys, eth/common/eth_types,
libp2p/[builders, switch, multiaddress],
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/message
import
../../waku/v1/node/rpc/hexstrings,
../../waku/v2/node/peer_manager,
../../waku/v2/node/waku_node,
../../waku/v2/node/jsonrpc/[store_api,
relay_api,
debug_api,
filter_api,
admin_api,
private_api],
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_archive,
../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_store/rpc,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_filter/rpc,
../../waku/v2/protocol/waku_filter/client,
../../waku/v2/utils/compat,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
./testlib/common,
../test_helpers
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim"
createRpcSigs(RpcHttpClient, sigPath)
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
let
digest = waku_archive.computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
store.put(pubsubTopic, message, digest, receivedTime)
procSuite "Waku v2 JSON-RPC API":
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
asyncTest "Debug API: get node info":
await node.start()
await node.mountRelay()
# RPC server setup
let
rpcPort = Port(8546)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installDebugApiHandlers(node, server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_debug_v1_info()
check:
response.listenAddresses == @[$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId]
await server.stop()
await server.closeWait()
await node.stop()
asyncTest "Relay API: publish and subscribe/unsubscribe":
await node.start()
await node.mountRelay()
# RPC server setup
let
rpcPort = Port(8547)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installRelayApiHandlers(node, server, newTable[string, seq[WakuMessage]]())
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
check:
# At this stage the node is only subscribed to the default topic
PubSub(node.wakuRelay).topics.len == 1
# Subscribe to new topics
let newTopics = @["1","2","3"]
var response = await client.post_waku_v2_relay_v1_subscriptions(newTopics)
check:
# Node is now subscribed to default + new topics
PubSub(node.wakuRelay).topics.len == 1 + newTopics.len
response == true
# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
check:
# @TODO poll topic to verify message has been published
response == true
# Unsubscribe from new topics
response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics)
check:
# Node is now unsubscribed from new topics
PubSub(node.wakuRelay).topics.len == 1
response == true
await server.stop()
await server.closeWait()
await node.stop()
asyncTest "Relay API: get latest messages":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(60300))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(60302))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(60303), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = DefaultContentTopic
payload1 = @[byte 9]
message1 = WakuMessage(payload: payload1, contentTopic: contentTopic)
payload2 = @[byte 8]
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# RPC server setup
let
rpcPort = Port(8548)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
# Let's connect to node 3 via the API
installRelayApiHandlers(node3, server, newTable[string, seq[WakuMessage]]())
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
# First see if we can retrieve messages published on the default topic (node is already subscribed)
await node2.publish(DefaultPubsubTopic, message1)
await sleepAsync(100.millis)
var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic)
check:
messages.len == 1
messages[0].contentTopic == contentTopic
messages[0].payload == payload1
# Ensure that read messages are cleared from cache
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 0
# Now try to subscribe using API
var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(100.millis)
check:
# Node is now subscribed to pubSubTopic
response == true
# Now publish a message on node1 and see if we receive it on node3
await node1.publish(pubSubTopic, message2)
await sleepAsync(100.millis)
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 1
messages[0].contentTopic == contentTopic
messages[0].payload == payload2
# Ensure that read messages are cleared from cache
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 0
await server.stop()
await server.closeWait()
await node1.stop()
await node2.stop()
await node3.stop()
asyncTest "Store API: retrieve historical messages":
await node.start()
await node.mountRelay()
# RPC server setup
let
rpcPort = Port(8549)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installStoreApiHandlers(node, server)
server.start()
# WakuStore setup
let
key = crypto.PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await node.mountStore()
node.mountStoreClient()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(node.wakuRelay)
listenSwitch.mount(node.wakuStore)
# Now prime it with some history before tests
let msgList = @[
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=0),
fakeWakuMessage(@[byte 1], ts=1),
fakeWakuMessage(@[byte 2], ts=2),
fakeWakuMessage(@[byte 3], ts=3),
fakeWakuMessage(@[byte 4], ts=4),
fakeWakuMessage(@[byte 5], ts=5),
fakeWakuMessage(@[byte 6], ts=6),
fakeWakuMessage(@[byte 7], ts=7),
fakeWakuMessage(@[byte 8], ts=8),
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=9)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilterRPC(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isNone()
await server.stop()
await server.closeWait()
await node.stop()
asyncTest "Filter API: subscribe/unsubscribe":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(60390))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(60392))
await allFutures(node1.start(), node2.start())
await node1.mountFilter()
await node2.mountFilterClient()
node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo())
# RPC server setup
let
rpcPort = Port(8550)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installFilterApiHandlers(node2, server, newTable[ContentTopic, seq[WakuMessage]]())
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
check:
# Light node has not yet subscribed to any filters
node2.wakuFilterClient.getSubscriptionsCount() == 0
let contentFilters = @[
ContentFilter(contentTopic: DefaultContentTopic),
ContentFilter(contentTopic: ContentTopic("2")),
ContentFilter(contentTopic: ContentTopic("3")),
ContentFilter(contentTopic: ContentTopic("4")),
]
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
response == true
# Light node has successfully subscribed to 4 content topics
node2.wakuFilterClient.getSubscriptionsCount() == 4
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
response == true
# Light node has successfully unsubscribed from all filters
node2.wakuFilterClient.getSubscriptionsCount() == 0
## Cleanup
await server.stop()
await server.closeWait()
await allFutures(node1.stop(), node2.stop())
asyncTest "Admin API: connect to ad-hoc peers":
# Create a couple of nodes
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60600))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60602))
peerInfo2 = node2.switch.peerInfo
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60604))
peerInfo3 = node3.switch.peerInfo
await allFutures([node1.start(), node2.start(), node3.start()])
await node1.mountRelay()
await node2.mountRelay()
await node3.mountRelay()
# RPC server setup
let
rpcPort = Port(8551)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installAdminApiHandlers(node1, server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
# Connect to nodes 2 and 3 using the Admin API
let postRes = await client.post_waku_v2_admin_v1_peers(@[constructMultiaddrStr(peerInfo2),
constructMultiaddrStr(peerInfo3)])
check:
postRes
# Verify that newly connected peers are being managed
let getRes = await client.get_waku_v2_admin_v1_peers()
check:
getRes.len == 2
# Check peer 2
getRes.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(peerInfo2))
# Check peer 3
getRes.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(peerInfo3))
# Verify that raises an exception if we can't connect to the peer
let nonExistentPeer = "/ip4/0.0.0.0/tcp/10000/p2p/16Uiu2HAm6HZZr7aToTvEBPpiys4UxajCTU97zj5v7RNR2gbniy1D"
expect(ValueError):
discard await client.post_waku_v2_admin_v1_peers(@[nonExistentPeer])
let malformedPeer = "/malformed/peer"
expect(ValueError):
discard await client.post_waku_v2_admin_v1_peers(@[malformedPeer])
await server.stop()
await server.closeWait()
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Admin API: get managed peer information":
# Create 3 nodes and start them with relay
let nodes = toSeq(0..<3).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(60220+it*2)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Dial nodes 2 and 3 from node1
await nodes[0].connectToNodes(@[constructMultiaddrStr(nodes[1].peerInfo)])
await nodes[0].connectToNodes(@[constructMultiaddrStr(nodes[2].peerInfo)])
# RPC server setup
let
rpcPort = Port(8552)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installAdminApiHandlers(nodes[0], server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_admin_v1_peers()
check:
response.len == 2
# Check peer 2
response.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(nodes[1].peerInfo))
# Check peer 3
response.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(nodes[2].peerInfo))
# Artificially remove the address from the book
nodes[0].peerManager.peerStore[AddressBook][nodes[1].peerInfo.peerId] = @[]
nodes[0].peerManager.peerStore[AddressBook][nodes[2].peerInfo.peerId] = @[]
# Verify that the returned addresses are empty
let responseEmptyAdd = await client.get_waku_v2_admin_v1_peers()
check:
responseEmptyAdd[0].multiaddr == ""
responseEmptyAdd[1].multiaddr == ""
await server.stop()
await server.closeWait()
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Admin API: get unmanaged peer information":
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60523))
await node.start()
# RPC server setup
let
rpcPort = Port(8553)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installAdminApiHandlers(node, server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
await node.mountFilter()
await node.mountFilterClient()
await node.mountSwap()
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await node.mountStore()
node.mountStoreClient()
# Create and set some peers
let
locationAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
filterPeer = PeerInfo.new(filterKey, @[locationAddr])
swapKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
swapPeer = PeerInfo.new(swapKey, @[locationAddr])
storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
storePeer = PeerInfo.new(storeKey, @[locationAddr])
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
let response = await client.get_waku_v2_admin_v1_peers()
## Then
check:
response.len == 3
# Check filter peer
(response.filterIt(it.protocol == WakuFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer)
# Check swap peer
(response.filterIt(it.protocol == WakuSwapCodec)[0]).multiaddr == constructMultiaddrStr(swapPeer)
# Check store peer
(response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer)
## Cleanup
await server.stop()
await server.closeWait()
await node.stop()
asyncTest "Private API: generate asymmetric keys and encrypt/decrypt communication":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(62001))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(62002))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(62003), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = DefaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# Setup two servers so we can see both sides of encrypted communication
let
rpcPort1 = Port(8554)
ta1 = initTAddress(bindIp, rpcPort1)
server1 = newRpcHttpServer([ta1])
rpcPort3 = Port(8555)
ta3 = initTAddress(bindIp, rpcPort3)
server3 = newRpcHttpServer([ta3])
# Let's connect to nodes 1 and 3 via the API
installPrivateApiHandlers(node1, server1, newTable[string, seq[WakuMessage]]())
installPrivateApiHandlers(node3, server3, topicCache)
installRelayApiHandlers(node3, server3, topicCache)
server1.start()
server3.start()
let client1 = newRpcHttpClient()
await client1.connect("127.0.0.1", rpcPort1, false)
let client3 = newRpcHttpClient()
await client3.connect("127.0.0.1", rpcPort3, false)
# Let's get a keypair for node3
let keypair = await client3.get_waku_v2_private_v1_asymmetric_keypair()
# Now try to subscribe on node3 using API
let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(100.millis)
check:
# node3 is now subscribed to pubSubTopic
sub
# Now publish and encrypt a message on node1 using node3's public key
let posted = await client1.post_waku_v2_private_v1_asymmetric_message(pubSubTopic, message, publicKey = (%keypair.pubkey).getStr())
check:
posted
await sleepAsync(100.millis)
# Let's see if we can receive, and decrypt, this message on node3
var messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
check:
messages.len == 1
messages[0].contentTopic.get == contentTopic
messages[0].payload == payload
# Ensure that read messages are cleared from cache
messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
check:
messages.len == 0
await server1.stop()
await server1.closeWait()
await server3.stop()
await server3.closeWait()
await node1.stop()
await node2.stop()
await node3.stop()
asyncTest "Private API: generate symmetric keys and encrypt/decrypt communication":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(62100))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(62102))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(62103), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = DefaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# Setup two servers so we can see both sides of encrypted communication
let
rpcPort1 = Port(8556)
ta1 = initTAddress(bindIp, rpcPort1)
server1 = newRpcHttpServer([ta1])
rpcPort3 = Port(8557)
ta3 = initTAddress(bindIp, rpcPort3)
server3 = newRpcHttpServer([ta3])
# Let's connect to nodes 1 and 3 via the API
installPrivateApiHandlers(node1, server1, newTable[string, seq[WakuMessage]]())
installPrivateApiHandlers(node3, server3, topicCache)
installRelayApiHandlers(node3, server3, topicCache)
server1.start()
server3.start()
let client1 = newRpcHttpClient()
await client1.connect("127.0.0.1", rpcPort1, false)
let client3 = newRpcHttpClient()
await client3.connect("127.0.0.1", rpcPort3, false)
# Let's get a symkey for node3
let symkey = await client3.get_waku_v2_private_v1_symmetric_key()
# Now try to subscribe on node3 using API
let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(100.millis)
check:
# node3 is now subscribed to pubSubTopic
sub
# Now publish and encrypt a message on node1 using node3's symkey
let posted = await client1.post_waku_v2_private_v1_symmetric_message(pubSubTopic, message, symkey = (%symkey).getStr())
check:
posted
await sleepAsync(100.millis)
# Let's see if we can receive, and decrypt, this message on node3
var messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())
check:
messages.len == 1
messages[0].contentTopic.get == contentTopic
messages[0].payload == payload
# Ensure that read messages are cleared from cache
messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())
check:
messages.len == 0
await server1.stop()
await server1.closeWait()
await server3.stop()
await server3.closeWait()
await node1.stop()
await node2.stop()
await node3.stop()

View File

@ -0,0 +1,196 @@
{.used.}
import
std/[options, sequtils],
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
eth/keys,
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/node/jsonrpc/admin/handlers as admin_api,
../../../waku/v2/node/jsonrpc/admin/client as admin_api_client,
../../../waku/v2/protocol/waku_relay,
../../../waku/v2/protocol/waku_archive,
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../../waku/v2/protocol/waku_store,
../../../waku/v2/protocol/waku_filter,
../../../waku/v2/utils/peers,
../../test_helpers
procSuite "Waku v2 JSON-RPC API - Admin":
let
rng = crypto.newRng()
bindIp = ValidIpAddress.init("0.0.0.0")
asyncTest "connect to ad-hoc peers":
# Create a couple of nodes
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60600))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60602))
peerInfo2 = node2.switch.peerInfo
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60604))
peerInfo3 = node3.switch.peerInfo
await allFutures([node1.start(), node2.start(), node3.start()])
await node1.mountRelay()
await node2.mountRelay()
await node3.mountRelay()
# RPC server setup
let
rpcPort = Port(8551)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installAdminApiHandlers(node1, server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
# Connect to nodes 2 and 3 using the Admin API
let postRes = await client.post_waku_v2_admin_v1_peers(@[constructMultiaddrStr(peerInfo2),
constructMultiaddrStr(peerInfo3)])
check:
postRes
# Verify that newly connected peers are being managed
let getRes = await client.get_waku_v2_admin_v1_peers()
check:
getRes.len == 2
# Check peer 2
getRes.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(peerInfo2))
# Check peer 3
getRes.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(peerInfo3))
# Verify that raises an exception if we can't connect to the peer
let nonExistentPeer = "/ip4/0.0.0.0/tcp/10000/p2p/16Uiu2HAm6HZZr7aToTvEBPpiys4UxajCTU97zj5v7RNR2gbniy1D"
expect(ValueError):
discard await client.post_waku_v2_admin_v1_peers(@[nonExistentPeer])
let malformedPeer = "/malformed/peer"
expect(ValueError):
discard await client.post_waku_v2_admin_v1_peers(@[malformedPeer])
await server.stop()
await server.closeWait()
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "get managed peer information":
# Create 3 nodes and start them with relay
let nodes = toSeq(0..<3).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(60220+it*2)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Dial nodes 2 and 3 from node1
await nodes[0].connectToNodes(@[constructMultiaddrStr(nodes[1].peerInfo)])
await nodes[0].connectToNodes(@[constructMultiaddrStr(nodes[2].peerInfo)])
# RPC server setup
let
rpcPort = Port(8552)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installAdminApiHandlers(nodes[0], server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_admin_v1_peers()
check:
response.len == 2
# Check peer 2
response.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(nodes[1].peerInfo))
# Check peer 3
response.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(nodes[2].peerInfo))
# Artificially remove the address from the book
nodes[0].peerManager.peerStore[AddressBook][nodes[1].peerInfo.peerId] = @[]
nodes[0].peerManager.peerStore[AddressBook][nodes[2].peerInfo.peerId] = @[]
# Verify that the returned addresses are empty
let responseEmptyAdd = await client.get_waku_v2_admin_v1_peers()
check:
responseEmptyAdd[0].multiaddr == ""
responseEmptyAdd[1].multiaddr == ""
await server.stop()
await server.closeWait()
await allFutures(nodes.mapIt(it.stop()))
asyncTest "get unmanaged peer information":
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60523))
await node.start()
# RPC server setup
let
rpcPort = Port(8553)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installAdminApiHandlers(node, server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
await node.mountFilter()
await node.mountFilterClient()
await node.mountSwap()
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await node.mountStore()
node.mountStoreClient()
# Create and set some peers
let
locationAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
filterPeer = PeerInfo.new(filterKey, @[locationAddr])
storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
storePeer = PeerInfo.new(storeKey, @[locationAddr])
node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
let response = await client.get_waku_v2_admin_v1_peers()
## Then
check:
response.len == 2
# Check filter peer
(response.filterIt(it.protocol == WakuFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer)
# Check store peer
(response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer)
## Cleanup
await server.stop()
await server.closeWait()
await node.stop()

View File

@ -0,0 +1,53 @@
{.used.}
import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
eth/keys,
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/node/jsonrpc/debug/handlers as debug_api,
../../../waku/v2/node/jsonrpc/debug/client as debug_api_client
procSuite "Waku v2 JSON-RPC API - Debug":
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
asyncTest "get node info":
await node.start()
await node.mountRelay()
# RPC server setup
let
rpcPort = Port(8546)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installDebugApiHandlers(node, server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_debug_v1_info()
check:
response.listenAddresses == @[$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId]
await server.stop()
await server.closeWait()
await node.stop()

View File

@ -0,0 +1,84 @@
{.used.}
import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
eth/keys,
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/node/message_cache,
../../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
../../../waku/v2/node/jsonrpc/filter/client as filter_api_client,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_filter/rpc,
../../../waku/v2/protocol/waku_filter/client,
../../../waku/v2/utils/peers
proc newTestMessageCache(): filter_api.MessageCache =
filter_api.MessageCache.init(capacity=30)
procSuite "Waku v2 JSON-RPC API - Filter":
let
rng = crypto.newRng()
bindIp = ValidIpAddress.init("0.0.0.0")
asyncTest "subscribe and unsubscribe":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(60390))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(60392))
await allFutures(node1.start(), node2.start())
await node1.mountFilter()
await node2.mountFilterClient()
node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo())
# RPC server setup
let
rpcPort = Port(8550)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installFilterApiHandlers(node2, server, newTestMessageCache())
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
check:
# Light node has not yet subscribed to any filters
node2.wakuFilterClient.getSubscriptionsCount() == 0
let contentFilters = @[
ContentFilter(contentTopic: DefaultContentTopic),
ContentFilter(contentTopic: ContentTopic("2")),
ContentFilter(contentTopic: ContentTopic("3")),
ContentFilter(contentTopic: ContentTopic("4")),
]
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
response == true
# Light node has successfully subscribed to 4 content topics
node2.wakuFilterClient.getSubscriptionsCount() == 4
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
response == true
# Light node has successfully unsubscribed from all filters
node2.wakuFilterClient.getSubscriptionsCount() == 0
## Cleanup
await server.stop()
await server.closeWait()
await allFutures(node1.stop(), node2.stop())

View File

@ -0,0 +1,359 @@
{.used.}
import
std/[options, sequtils, times],
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
eth/keys,
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
../../../waku/v1/node/rpc/hexstrings,
../../../waku/v2/node/peer_manager,
../../../waku/v2/node/message_cache,
../../../waku/v2/node/waku_node,
../../../waku/v2/node/jsonrpc/relay/handlers as relay_api,
../../../waku/v2/node/jsonrpc/relay/client as relay_api_client,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_relay,
../../../waku/v2/utils/compat,
../../../waku/v2/utils/peers,
../../../waku/v2/utils/time
proc newTestMessageCache(): relay_api.MessageCache =
relay_api.MessageCache.init(capacity=30)
procSuite "Waku v2 JSON-RPC API - Relay":
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
asyncTest "subscribe, unsubscribe and publish":
await node.start()
await node.mountRelay()
# RPC server setup
let
rpcPort = Port(8547)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installRelayApiHandlers(node, server, newTestMessageCache())
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
check:
# At this stage the node is only subscribed to the default topic
node.wakuRelay.subscribedTopics.toSeq().len == 1
# Subscribe to new topics
let newTopics = @["1","2","3"]
var response = await client.post_waku_v2_relay_v1_subscriptions(newTopics)
check:
# Node is now subscribed to default + new topics
node.wakuRelay.subscribedTopics.toSeq().len == 1 + newTopics.len
response == true
# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
check:
# @TODO poll topic to verify message has been published
response == true
# Unsubscribe from new topics
response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics)
check:
# Node is now unsubscribed from new topics
node.wakuRelay.subscribedTopics.toSeq().len == 1
response == true
await server.stop()
await server.closeWait()
await node.stop()
asyncTest "get latest messages":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(60300))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(60302))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(60303), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = DefaultContentTopic
payload1 = @[byte 9]
message1 = WakuMessage(payload: payload1, contentTopic: contentTopic)
payload2 = @[byte 8]
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# RPC server setup
let
rpcPort = Port(8548)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
# Let's connect to node 3 via the API
installRelayApiHandlers(node3, server, newTestMessageCache())
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
# First see if we can retrieve messages published on the default topic (node is already subscribed)
await node2.publish(DefaultPubsubTopic, message1)
await sleepAsync(100.millis)
var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic)
check:
messages.len == 1
messages[0].contentTopic == contentTopic
messages[0].payload == payload1
# Ensure that read messages are cleared from cache
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 0
# Now try to subscribe using API
var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(100.millis)
check:
# Node is now subscribed to pubSubTopic
response == true
# Now publish a message on node1 and see if we receive it on node3
await node1.publish(pubSubTopic, message2)
await sleepAsync(100.millis)
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 1
messages[0].contentTopic == contentTopic
messages[0].payload == payload2
# Ensure that read messages are cleared from cache
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
check:
messages.len == 0
await server.stop()
await server.closeWait()
await node1.stop()
await node2.stop()
await node3.stop()
asyncTest "generate asymmetric keys and encrypt/decrypt communication":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(62001))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(62002))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(62003), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = DefaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTestMessageCache()
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# Setup two servers so we can see both sides of encrypted communication
let
rpcPort1 = Port(8554)
ta1 = initTAddress(bindIp, rpcPort1)
server1 = newRpcHttpServer([ta1])
rpcPort3 = Port(8555)
ta3 = initTAddress(bindIp, rpcPort3)
server3 = newRpcHttpServer([ta3])
# Let's connect to nodes 1 and 3 via the API
installRelayPrivateApiHandlers(node1, server1, newTestMessageCache())
installRelayPrivateApiHandlers(node3, server3, topicCache)
installRelayApiHandlers(node3, server3, topicCache)
server1.start()
server3.start()
let client1 = newRpcHttpClient()
await client1.connect("127.0.0.1", rpcPort1, false)
let client3 = newRpcHttpClient()
await client3.connect("127.0.0.1", rpcPort3, false)
# Let's get a keypair for node3
let keypair = await client3.get_waku_v2_private_v1_asymmetric_keypair()
# Now try to subscribe on node3 using API
let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(100.millis)
check:
# node3 is now subscribed to pubSubTopic
sub
# Now publish and encrypt a message on node1 using node3's public key
let posted = await client1.post_waku_v2_private_v1_asymmetric_message(pubSubTopic, message, publicKey = (%keypair.pubkey).getStr())
check:
posted
await sleepAsync(100.millis)
# Let's see if we can receive, and decrypt, this message on node3
var messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
check:
messages.len == 1
messages[0].contentTopic.get == contentTopic
messages[0].payload == payload
# Ensure that read messages are cleared from cache
messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr())
check:
messages.len == 0
await server1.stop()
await server1.closeWait()
await server3.stop()
await server3.closeWait()
await node1.stop()
await node2.stop()
await node3.stop()
asyncTest "generate symmetric keys and encrypt/decrypt communication":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(62100))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(62102))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(62103), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = DefaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTestMessageCache()
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# Setup two servers so we can see both sides of encrypted communication
let
rpcPort1 = Port(8556)
ta1 = initTAddress(bindIp, rpcPort1)
server1 = newRpcHttpServer([ta1])
rpcPort3 = Port(8557)
ta3 = initTAddress(bindIp, rpcPort3)
server3 = newRpcHttpServer([ta3])
# Let's connect to nodes 1 and 3 via the API
installRelayPrivateApiHandlers(node1, server1, newTestMessageCache())
installRelayPrivateApiHandlers(node3, server3, topicCache)
installRelayApiHandlers(node3, server3, topicCache)
server1.start()
server3.start()
let client1 = newRpcHttpClient()
await client1.connect("127.0.0.1", rpcPort1, false)
let client3 = newRpcHttpClient()
await client3.connect("127.0.0.1", rpcPort3, false)
# Let's get a symkey for node3
let symkey = await client3.get_waku_v2_private_v1_symmetric_key()
# Now try to subscribe on node3 using API
let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic])
await sleepAsync(100.millis)
check:
# node3 is now subscribed to pubSubTopic
sub
# Now publish and encrypt a message on node1 using node3's symkey
let posted = await client1.post_waku_v2_private_v1_symmetric_message(pubSubTopic, message, symkey = (%symkey).getStr())
check:
posted
await sleepAsync(100.millis)
# Let's see if we can receive, and decrypt, this message on node3
var messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())
check:
messages.len == 1
messages[0].contentTopic.get == contentTopic
messages[0].payload == payload
# Ensure that read messages are cleared from cache
messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr())
check:
messages.len == 0
await server1.stop()
await server1.closeWait()
await server3.stop()
await server3.closeWait()
await node1.stop()
await node2.stop()
await node3.stop()

View File

@ -0,0 +1,102 @@
{.used.}
import
std/[options, times],
stew/shims/net as stewNet,
chronicles,
testutils/unittests,
eth/keys,
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/node/jsonrpc/store/handlers as store_api,
../../../waku/v2/node/jsonrpc/store/client as store_api_client,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_archive,
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../../waku/v2/protocol/waku_store/rpc,
../../../waku/v2/utils/peers,
../../../waku/v2/utils/time,
../../v2/testlib/common
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
let
digest = waku_archive.computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
store.put(pubsubTopic, message, digest, receivedTime)
procSuite "Waku v2 JSON-RPC API - Store":
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
asyncTest "query a node and retrieve historical messages":
await node.start()
await node.mountRelay()
# RPC server setup
let
rpcPort = Port(8549)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installStoreApiHandlers(node, server)
server.start()
# WakuStore setup
let
key = crypto.PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await node.mountStore()
node.mountStoreClient()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(node.wakuRelay)
listenSwitch.mount(node.wakuStore)
# Now prime it with some history before tests
let msgList = @[
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=0),
fakeWakuMessage(@[byte 1], ts=1),
fakeWakuMessage(@[byte 2], ts=2),
fakeWakuMessage(@[byte 3], ts=3),
fakeWakuMessage(@[byte 4], ts=4),
fakeWakuMessage(@[byte 5], ts=5),
fakeWakuMessage(@[byte 6], ts=6),
fakeWakuMessage(@[byte 7], ts=7),
fakeWakuMessage(@[byte 8], ts=8),
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=9)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilterRPC(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isNone()
await server.stop()
await server.closeWait()
await node.stop()

View File

@ -0,0 +1,4 @@
# Admin API
proc get_waku_v2_admin_v1_peers(): seq[WakuPeer]
proc post_waku_v2_admin_v1_peers(peers: seq[string]): bool

View File

@ -0,0 +1,14 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[os, strutils],
json_rpc/rpcclient
import
./types
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim")

View File

@ -0,0 +1,91 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sequtils,
chronicles,
json_rpc/rpcserver,
libp2p/[peerinfo, switch]
import
../../../../waku/v2/protocol/waku_store,
../../../../waku/v2/protocol/waku_filter,
../../../../waku/v2/protocol/waku_relay,
../../../../waku/v2/node/peer_manager,
../../../../waku/v2/node/waku_node,
./types
logScope:
topics = "waku node jsonrpc admin_api"
proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string =
# Constructs a multiaddress with both wire address and p2p identity
$wireaddr & "/p2p/" & $peerId
proc constructMultiaddrStr*(peerInfo: PeerInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if peerInfo.listenAddrs.len == 0:
return ""
constructMultiaddrStr(peerInfo.listenAddrs[0], peerInfo.peerId)
proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if remotePeerInfo.addrs.len == 0:
return ""
constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId)
proc constructMultiaddrStr*(storedInfo: StoredInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if storedInfo.addrs.len == 0:
return ""
constructMultiaddrStr(storedInfo.addrs[0], storedInfo.peerId)
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
rpcsrv.rpc("post_waku_v2_admin_v1_peers") do (peers: seq[string]) -> bool:
## Connect to a list of peers
debug "post_waku_v2_admin_v1_peers"
for i, peer in peers:
let conn = await node.peerManager.dialPeer(parseRemotePeerInfo(peer), WakuRelayCodec, source="rpc")
if conn.isNone():
raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer)
return true
rpcsrv.rpc("get_waku_v2_admin_v1_peers") do () -> seq[WakuPeer]:
## Returns a list of peers registered for this node
debug "get_waku_v2_admin_v1_peers"
var peers = newSeq[WakuPeer]()
if not node.wakuRelay.isNil():
# Map managed peers to WakuPeers and add to return list
let relayPeers = node.peerManager.peerStore.peers(WakuRelayCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuRelayCodec,
connected: it.connectedness == Connectedness.Connected))
peers.add(relayPeers)
if not node.wakuFilter.isNil():
# Map WakuFilter peers to WakuPeers and add to return list
let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuFilterCodec,
connected: it.connectedness == Connectedness.Connected))
peers.add(filterPeers)
if not node.wakuStore.isNil():
# Map WakuStore peers to WakuPeers and add to return list
let storePeers = node.peerManager.peerStore.peers(WakuStoreCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuStoreCodec,
connected: it.connectedness == Connectedness.Connected))
peers.add(storePeers)
return peers

View File

@ -0,0 +1,10 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
type WakuPeer* = object
multiaddr*: string
protocol*: string
connected*: bool

View File

@ -1,111 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sequtils,
chronicles,
json_rpc/rpcserver,
libp2p/[peerinfo, switch]
import
../../protocol/waku_store,
../../protocol/waku_filter,
../../protocol/waku_relay,
../../protocol/waku_swap/waku_swap,
../peer_manager,
../waku_node,
./jsonrpc_types
export jsonrpc_types
logScope:
topics = "waku node jsonrpc admin_api"
const futTimeout* = 30.seconds # Max time to wait for futures
proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string =
# Constructs a multiaddress with both wire address and p2p identity
$wireaddr & "/p2p/" & $peerId
proc constructMultiaddrStr*(peerInfo: PeerInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if peerInfo.listenAddrs.len == 0:
return ""
constructMultiaddrStr(peerInfo.listenAddrs[0], peerInfo.peerId)
proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if remotePeerInfo.addrs.len == 0:
return ""
constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId)
proc constructMultiaddrStr*(storedInfo: StoredInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if storedInfo.addrs.len == 0:
return ""
constructMultiaddrStr(storedInfo.addrs[0], storedInfo.peerId)
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Admin API version 1 definitions
rpcsrv.rpc("post_waku_v2_admin_v1_peers") do(peers: seq[string]) -> bool:
## Connect to a list of peers
debug "post_waku_v2_admin_v1_peers"
for i, peer in peers:
let conn = await node.peerManager.dialPeer(parseRemotePeerInfo(peer), WakuRelayCodec, source="rpc")
if conn.isNone():
raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer)
return true
rpcsrv.rpc("get_waku_v2_admin_v1_peers") do() -> seq[WakuPeer]:
## Returns a list of peers registered for this node
debug "get_waku_v2_admin_v1_peers"
# Create a single list of peers from mounted protocols.
# @TODO since the switch does not expose its connections, retrieving the connected peers requires a peer store/peer management
var wPeers: seq[WakuPeer] = @[]
## Managed peers
if not node.wakuRelay.isNil:
# Map managed peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peerStore
.peers(WakuRelayCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuRelayCodec,
connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence
if not node.wakuFilter.isNil:
# Map WakuFilter peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peerStore
.peers(WakuFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuFilterCodec,
connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence
if not node.wakuSwap.isNil:
# Map WakuSwap peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peerStore
.peers(WakuSwapCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuSwapCodec,
connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence
if not node.wakuStore.isNil:
# Map WakuStore peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peerStore
.peers(WakuStoreCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
protocol: WakuStoreCodec,
connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence
# @TODO filter output on protocol/connected-status
return wPeers

View File

@ -0,0 +1,3 @@
# Debug API
proc get_waku_v2_debug_v1_info(): WakuInfo

View File

@ -0,0 +1,14 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[os, strutils],
json_rpc/rpcclient
import
../../../../waku/v2/node/waku_node
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim")

View File

@ -5,23 +5,24 @@ else:
import
chronicles,
json_rpc/rpcserver,
../waku_node
json_rpc/rpcserver
import
../../../../waku/v2/node/waku_node
logScope:
topics = "waku node jsonrpc debug_api"
proc installDebugApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
proc installDebugApiHandlers*(node: WakuNode, server: RpcServer) =
## Debug API version 1 definitions
rpcsrv.rpc("get_waku_v2_debug_v1_info") do() -> WakuInfo:
server.rpc("get_waku_v2_debug_v1_info") do () -> WakuInfo:
## Returns information about WakuNode
debug "get_waku_v2_debug_v1_info"
return node.info()
rpcsrv.rpc("get_waku_v2_debug_v1_version") do() -> string:
server.rpc("get_waku_v2_debug_v1_version") do () -> string:
## Returns information about WakuNode
debug "get_waku_v2_debug_v1_version"

View File

@ -0,0 +1,5 @@
# Filter API
proc get_waku_v2_filter_v1_messages(contentTopic: ContentTopic): seq[WakuMessage]
proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool
proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool

View File

@ -0,0 +1,15 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[os, strutils],
json_rpc/rpcclient
import
../../../../waku/v2/protocol/waku_message,
../../../../waku/v2/protocol/waku_filter/rpc
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim")

View File

@ -0,0 +1,89 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[tables, sequtils],
chronicles,
json_rpc/rpcserver
import
../../../../waku/v2/protocol/waku_message,
../../../../waku/v2/protocol/waku_filter,
../../../../waku/v2/protocol/waku_filter/rpc,
../../../../waku/v2/protocol/waku_filter/client,
../../../../waku/v2/node/message_cache,
../../../../waku/v2/node/peer_manager,
../../../../waku/v2/node/waku_node
logScope:
topics = "waku node jsonrpc filter_api"
const DefaultPubsubTopic: PubsubTopic = "/waku/2/default-waku/proto"
const futTimeout* = 5.seconds # Max time to wait for futures
type
MessageCache* = message_cache.MessageCache[ContentTopic]
proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) =
server.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[PubsubTopic]) -> bool:
## Subscribes a node to a list of content filters
debug "post_waku_v2_filter_v1_subscription"
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote filter peers")
let
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get())
if not await subFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to subscribe to contentFilters")
# Successfully subscribed to all content filters
for cTopic in contentTopics:
cache.subscribe(cTopic)
return true
server.rpc("delete_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[PubsubTopic]) -> bool:
## Unsubscribes a node from a list of content filters
debug "delete_waku_v2_filter_v1_subscription"
let
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let unsubFut = node.unsubscribe(pubsubTopic, contentTopics)
if not await unsubFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to unsubscribe from contentFilters")
for cTopic in contentTopics:
cache.unsubscribe(cTopic)
return true
server.rpc("get_waku_v2_filter_v1_messages") do (contentTopic: ContentTopic) -> seq[WakuMessage]:
## Returns all WakuMessages received on a content topic since the
## last time this method was called
debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
if not cache.isSubscribed(contentTopic):
raise newException(ValueError, "Not subscribed to topic: " & contentTopic)
let msgRes = cache.getMessages(contentTopic, clear=true)
if msgRes.isErr():
raise newException(ValueError, "Not subscribed to topic: " & contentTopic)
return msgRes.value

View File

@ -1,104 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[tables, sequtils],
chronicles,
json_rpc/rpcserver
import
../../protocol/waku_message,
../../protocol/waku_filter,
../../protocol/waku_filter/rpc,
../../protocol/waku_filter/client,
../waku_node,
./jsonrpc_types
export jsonrpc_types
logScope:
topics = "waku node jsonrpc filter_api"
const DefaultPubsubTopic: PubsubTopic = "/waku/2/default-waku/proto"
const futTimeout* = 5.seconds # Max time to wait for futures
const maxCache* = 30 # Max number of messages cached per topic TODO: make this configurable
proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) =
## Filter API version 1 definitions
rpcsrv.rpc("get_waku_v2_filter_v1_messages") do (contentTopic: ContentTopic) -> seq[WakuMessage]:
## Returns all WakuMessages received on a content topic since the
## last time this method was called
## TODO: ability to specify a return message limit
debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
if not messageCache.hasKey(contentTopic):
raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic)
let msgs = messageCache[contentTopic]
# Clear cache before next call
messageCache[contentTopic] = @[]
return msgs
rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
## Subscribes a node to a list of content filters
debug "post_waku_v2_filter_v1_subscription"
let
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let pushHandler:FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} =
# Add message to current cache
trace "WakuMessage received", msg=msg
# Make a copy of msgs for this topic to modify
var msgs = messageCache.getOrDefault(msg.contentTopic, @[])
if msgs.len >= maxCache:
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed.
msgs.delete(0,0)
msgs.add(msg)
# Replace indexed entry with copy
# TODO: max number of content topics could be limited in node
messageCache[msg.contentTopic] = msgs
let subFut = node.subscribe(pubsubTopic, contentTopics, pushHandler)
if not await subFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to subscribe to contentFilters")
# Successfully subscribed to all content filters
for cTopic in contentTopics:
# Create message cache for each subscribed content topic
messageCache[cTopic] = @[]
return true
rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
## Unsubscribes a node from a list of content filters
debug "delete_waku_v2_filter_v1_subscription"
let
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let unsubFut = node.unsubscribe(pubsubTopic, contentTopics)
if not await unsubFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to unsubscribe from contentFilters")
# Successfully unsubscribed from all content filters
for cTopic in contentTopics:
# Remove message cache for each unsubscribed content topic
messageCache.del(cTopic)
return true

View File

@ -24,8 +24,12 @@
]#
import
stint, stew/byteutils, eth/keys, eth/common/eth_types,
../../../whisper/whisper_types
stew/byteutils,
eth/keys,
eth/common/eth_types,
stint
import
../../waku/whisper/whisper_types
type
HexDataStr* = distinct string

View File

@ -1,41 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options,tables],
eth/keys,
../../protocol/waku_message,
../../protocol/waku_store/rpc,
../../utils/time
type
StoreResponse* = object
messages*: seq[WakuMessage]
pagingOptions*: Option[StorePagingOptions]
StorePagingOptions* = object
## This type holds some options for pagination
pageSize*: uint64
cursor*: Option[PagingIndexRPC]
forward*: bool
WakuRelayMessage* = object
payload*: seq[byte]
contentTopic*: Option[ContentTopic]
# sender generated timestamp
timestamp*: Option[Timestamp]
WakuPeer* = object
multiaddr*: string
protocol*: string
connected*: bool
WakuKeyPair* = object
seckey*: keys.PrivateKey
pubkey*: keys.PublicKey
TopicCache* = TableRef[string, seq[WakuMessage]]
MessageCache* = TableRef[ContentTopic, seq[WakuMessage]]

View File

@ -1,99 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, json],
eth/keys,
../../protocol/waku_message,
../../protocol/waku_store,
../../protocol/waku_store/rpc,
../../utils/compat,
../../utils/time,
./hexstrings,
./jsonrpc_types
export hexstrings
## Json marshalling
proc `%`*(value: WakuMessage): JsonNode =
## This ensures that seq[byte] fields are marshalled to hex-format JStrings
## (as defined in `hexstrings.nim`) rather than the default JArray[JInt]
let jObj = newJObject()
for k, v in value.fieldPairs:
jObj[k] = %v
return jObj
## Conversion tools
## Since the Waku v2 JSON-RPC API has its own defined types,
## we need to convert between these and the types for the Nim API
proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC =
PagingInfoRPC(
pageSize: some(pagingOptions.pageSize),
cursor: pagingOptions.cursor,
direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
)
proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions =
StorePagingOptions(
pageSize: pagingInfo.pageSize.get(0'u64),
cursor: pagingInfo.cursor,
forward: if pagingInfo.direction.isNone(): true
else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD
)
proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
StoreResponse(
messages: response.messages,
pagingOptions: if response.cursor.isNone(): none(StorePagingOptions)
else: some(StorePagingOptions(
pageSize: uint64(response.messages.len), # This field will be deprecated soon
forward: true, # Hardcoded. This field will be deprecated soon
cursor: response.cursor.map(toRPC)
))
)
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
var t: Timestamp
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = Timestamp(0)
WakuMessage(payload: relayMessage.payload,
contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
version: version,
timestamp: t)
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
let payload = Payload(payload: relayMessage.payload,
dst: pubKey,
symkey: symkey)
var t: Timestamp
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = Timestamp(0)
WakuMessage(payload: payload.encode(version, rng[]).get(),
contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
version: version,
timestamp: t)
proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage =
let
keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get())
elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get())
else: KeyInfo(kind: KeyKind.None)
decoded = decodePayload(message, keyInfo)
WakuRelayMessage(payload: decoded.get().payload,
contentTopic: some(message.contentTopic),
timestamp: some(message.timestamp))

View File

@ -0,0 +1,22 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
json
import
../../../waku/v2/protocol/waku_message,
./hexstrings
export hexstrings
## Json marshalling
proc `%`*(value: WakuMessage): JsonNode =
## This ensures that seq[byte] fields are marshalled to hex-format JStrings
## (as defined in `hexstrings.nim`) rather than the default JArray[JInt]
let jObj = newJObject()
for k, v in value.fieldPairs:
jObj[k] = %v
return jObj

View File

@ -1,115 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[tables, sequtils],
chronicles,
eth/keys,
json_rpc/rpcserver,
nimcrypto/sysrand
import
../../utils/compat,
../waku_node,
./jsonrpc_types,
./jsonrpc_utils
export compat, jsonrpc_types
logScope:
topics = "waku node jsonrpc private_api"
const futTimeout* = 5.seconds # Max time to wait for futures
proc installPrivateApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) =
## Private API version 1 definitions
## Definitions for symmetric cryptography
rpcsrv.rpc("get_waku_v2_private_v1_symmetric_key") do() -> SymKey:
## Generates and returns a symmetric key for message encryption and decryption
debug "get_waku_v2_private_v1_symmetric_key"
var key: SymKey
if randomBytes(key) != key.len:
raise newException(ValueError, "Failed generating key")
return key
rpcsrv.rpc("post_waku_v2_private_v1_symmetric_message") do(topic: string, message: WakuRelayMessage, symkey: string) -> bool:
## Publishes and encrypts a message to be relayed on a PubSub topic
debug "post_waku_v2_private_v1_symmetric_message"
let msg = message.toWakuMessage(version = 1,
rng = node.rng,
pubKey = none(compat.PublicKey),
symkey = some(symkey.toSymKey()))
if (await node.publish(topic, msg).withTimeout(futTimeout)):
# Successfully published message
return true
else:
# Failed to publish message to topic
raise newException(ValueError, "Failed to publish to topic " & topic)
rpcsrv.rpc("get_waku_v2_private_v1_symmetric_messages") do(topic: string, symkey: string) -> seq[WakuRelayMessage]:
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called. Decrypts the message payloads
## before returning.
##
## @TODO ability to specify a return message limit
debug "get_waku_v2_private_v1_symmetric_messages", topic=topic
if topicCache.hasKey(topic):
let msgs = topicCache[topic]
# Clear cache before next call
topicCache[topic] = @[]
return msgs.mapIt(it.toWakuRelayMessage(symkey = some(symkey.toSymKey()),
privateKey = none(compat.PrivateKey)))
else:
# Not subscribed to this topic
raise newException(ValueError, "Not subscribed to topic: " & topic)
## Definitions for asymmetric cryptography
rpcsrv.rpc("get_waku_v2_private_v1_asymmetric_keypair") do() -> WakuKeyPair:
## Generates and returns a public/private key pair for asymmetric message encryption and decryption.
debug "get_waku_v2_private_v1_asymmetric_keypair"
let privKey = compat.PrivateKey.random(node.rng[])
return WakuKeyPair(seckey: privKey, pubkey: privKey.toPublicKey())
rpcsrv.rpc("post_waku_v2_private_v1_asymmetric_message") do(topic: string, message: WakuRelayMessage, publicKey: string) -> bool:
## Publishes and encrypts a message to be relayed on a PubSub topic
debug "post_waku_v2_private_v1_asymmetric_message"
let msg = message.toWakuMessage(version = 1,
rng = node.rng,
symkey = none(SymKey),
pubKey = some(publicKey.toPublicKey()))
if (await node.publish(topic, msg).withTimeout(futTimeout)):
# Successfully published message
return true
else:
# Failed to publish message to topic
raise newException(ValueError, "Failed to publish to topic " & topic)
rpcsrv.rpc("get_waku_v2_private_v1_asymmetric_messages") do(topic: string, privateKey: string) -> seq[WakuRelayMessage]:
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called. Decrypts the message payloads
## before returning.
##
## @TODO ability to specify a return message limit
debug "get_waku_v2_private_v1_asymmetric_messages", topic=topic
if topicCache.hasKey(topic):
let msgs = topicCache[topic]
# Clear cache before next call
topicCache[topic] = @[]
return msgs.mapIt(it.toWakuRelayMessage(symkey = none(SymKey), privateKey = some(privateKey.toPrivateKey())))
else:
# Not subscribed to this topic
raise newException(ValueError, "Not subscribed to topic: " & topic)

View File

@ -1,12 +1,3 @@
# Admin API
proc get_waku_v2_admin_v1_peers(): seq[WakuPeer]
proc post_waku_v2_admin_v1_peers(peers: seq[string]): bool
# Debug API
proc get_waku_v2_debug_v1_info(): WakuInfo
# Relay API
proc post_waku_v2_relay_v1_message(topic: string, message: WakuRelayMessage): bool
@ -14,17 +5,8 @@ proc get_waku_v2_relay_v1_messages(topic: string): seq[WakuMessage]
proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
# Store API
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse
# Filter API
proc get_waku_v2_filter_v1_messages(contentTopic: ContentTopic): seq[WakuMessage]
proc post_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool
proc delete_waku_v2_filter_v1_subscription(contentFilters: seq[ContentFilter], topic: Option[string]): bool
# Private API
# Relay Private API
# Symmetric
proc get_waku_v2_private_v1_symmetric_key(): SymKey
proc post_waku_v2_private_v1_symmetric_message(topic: string, message: WakuRelayMessage, symkey: string): bool
@ -32,4 +14,4 @@ proc get_waku_v2_private_v1_symmetric_messages(topic: string, symkey: string): s
# Asymmetric
proc get_waku_v2_private_v1_asymmetric_keypair(): WakuKeyPair
proc post_waku_v2_private_v1_asymmetric_message(topic: string, message: WakuRelayMessage, publicKey: string): bool
proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage]
proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage]

View File

@ -0,0 +1,20 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[os, strutils],
json_rpc/rpcclient
import
../../../../waku/v2/protocol/waku_message,
../../../../waku/v2/utils/compat,
../marshalling,
./types
export types
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim")

View File

@ -0,0 +1,225 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sequtils,
chronicles,
json_rpc/rpcserver,
eth/keys,
nimcrypto/sysrand
import
../../../../waku/v2/protocol/waku_message,
../../../../waku/v2/protocol/waku_relay,
../../../../waku/v2/node/waku_node,
../../../../waku/v2/node/message_cache,
../../../../waku/v2/utils/compat,
../../../../waku/v2/utils/time,
../hexstrings,
./types
logScope:
topics = "waku node jsonrpc relay_api"
const futTimeout* = 5.seconds # Max time to wait for futures
type
MessageCache* = message_cache.MessageCache[PubsubTopic]
## Waku Relay JSON-RPC API
proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) =
if node.wakuRelay.isNil():
debug "waku relay protocol is nil. skipping json rpc api handlers installation"
return
let topicHandler = proc(topic: PubsubTopic, message: WakuMessage) {.async.} =
cache.addMessage(topic, message)
# The node may already be subscribed to some topics when Relay API handlers
# are installed
for topic in node.wakuRelay.subscribedTopics:
node.subscribe(topic, topicHandler)
cache.subscribe(topic)
server.rpc("post_waku_v2_relay_v1_subscriptions") do (topics: seq[string]) -> bool:
## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions"
# Subscribe to all requested topics
for topic in topics:
if cache.isSubscribed(topic):
continue
cache.subscribe(topic)
node.subscribe(topic, topicHandler)
return true
server.rpc("delete_waku_v2_relay_v1_subscriptions") do (topics: seq[string]) -> bool:
## Unsubscribes a node from a list of PubSub topics
debug "delete_waku_v2_relay_v1_subscriptions"
# Unsubscribe all handlers from requested topics
for topic in topics:
node.unsubscribeAll(topic)
cache.unsubscribe(topic)
return true
server.rpc("post_waku_v2_relay_v1_message") do (topic: string, msg: WakuRelayMessage) -> bool:
## Publishes a WakuMessage to a PubSub topic
debug "post_waku_v2_relay_v1_message"
let message = block:
WakuMessage(
payload: msg.payload,
# TODO: Fail if the message doesn't have a content topic
contentTopic: msg.contentTopic.get(DefaultContentTopic),
version: 0,
timestamp: msg.timestamp.get(Timestamp(0))
)
let publishFut = node.publish(topic, message)
if not await publishFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to publish to topic " & topic)
return true
server.rpc("get_waku_v2_relay_v1_messages") do (topic: string) -> seq[WakuMessage]:
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called
debug "get_waku_v2_relay_v1_messages", topic=topic
if not cache.isSubscribed(topic):
raise newException(ValueError, "Not subscribed to topic: " & topic)
let msgRes = cache.getMessages(topic, clear=true)
if msgRes.isErr():
raise newException(ValueError, "Not subscribed to topic: " & topic)
return msgRes.value
## Waku Relay Private JSON-RPC API (Whisper/Waku v1 compatibility)
proc toWakuMessage(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
let payload = Payload(payload: relayMessage.payload,
dst: pubKey,
symkey: symkey)
var t: Timestamp
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = Timestamp(0)
WakuMessage(payload: payload.encode(version, rng[]).get(),
contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
version: version,
timestamp: t)
proc toWakuRelayMessage(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage =
let
keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get())
elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get())
else: KeyInfo(kind: KeyKind.None)
decoded = decodePayload(message, keyInfo)
WakuRelayMessage(payload: decoded.get().payload,
contentTopic: some(message.contentTopic),
timestamp: some(message.timestamp))
proc installRelayPrivateApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) =
server.rpc("get_waku_v2_private_v1_symmetric_key") do () -> SymKey:
## Generates and returns a symmetric key for message encryption and decryption
debug "get_waku_v2_private_v1_symmetric_key"
var key: SymKey
if randomBytes(key) != key.len:
raise newException(ValueError, "Failed generating key")
return key
server.rpc("post_waku_v2_private_v1_symmetric_message") do (topic: string, message: WakuRelayMessage, symkey: string) -> bool:
## Publishes and encrypts a message to be relayed on a PubSub topic
debug "post_waku_v2_private_v1_symmetric_message"
let msg = message.toWakuMessage(version = 1,
rng = node.rng,
pubKey = none(keys.PublicKey),
symkey = some(symkey.toSymKey()))
if (await node.publish(topic, msg).withTimeout(futTimeout)):
# Successfully published message
return true
else:
# Failed to publish message to topic
raise newException(ValueError, "Failed to publish to topic " & topic)
server.rpc("get_waku_v2_private_v1_symmetric_messages") do (topic: string, symkey: string) -> seq[WakuRelayMessage]:
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called. Decrypts the message payloads
## before returning.
debug "get_waku_v2_private_v1_symmetric_messages", topic=topic
if not cache.isSubscribed(topic):
raise newException(ValueError, "Not subscribed to topic: " & topic)
let msgRes = cache.getMessages(topic, clear=true)
if msgRes.isErr():
raise newException(ValueError, "Not subscribed to topic: " & topic)
let msgs = msgRes.get()
return msgs.mapIt(it.toWakuRelayMessage(symkey=some(symkey.toSymKey()),
privateKey=none(keys.PrivateKey)))
server.rpc("get_waku_v2_private_v1_asymmetric_keypair") do () -> WakuKeyPair:
## Generates and returns a public/private key pair for asymmetric message encryption and decryption.
debug "get_waku_v2_private_v1_asymmetric_keypair"
let privKey = keys.PrivateKey.random(node.rng[])
return WakuKeyPair(seckey: privKey, pubkey: privKey.toPublicKey())
server.rpc("post_waku_v2_private_v1_asymmetric_message") do (topic: string, message: WakuRelayMessage, publicKey: string) -> bool:
## Publishes and encrypts a message to be relayed on a PubSub topic
debug "post_waku_v2_private_v1_asymmetric_message"
let msg = message.toWakuMessage(version = 1,
rng = node.rng,
symkey = none(SymKey),
pubKey = some(publicKey.toPublicKey()))
let publishFut = node.publish(topic, msg)
if not await publishFut.withTimeout(futTimeout):
raise newException(ValueError, "Failed to publish to topic " & topic)
return true
server.rpc("get_waku_v2_private_v1_asymmetric_messages") do (topic: string, privateKey: string) -> seq[WakuRelayMessage]:
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called. Decrypts the message payloads
## before returning.
debug "get_waku_v2_private_v1_asymmetric_messages", topic=topic
if not cache.isSubscribed(topic):
raise newException(ValueError, "Not subscribed to topic: " & topic)
let msgRes = cache.getMessages(topic, clear=true)
if msgRes.isErr():
raise newException(ValueError, "Not subscribed to topic: " & topic)
let msgs = msgRes.get()
return msgs.mapIt(it.toWakuRelayMessage(symkey=none(SymKey),
privateKey=some(privateKey.toPrivateKey())))

View File

@ -0,0 +1,22 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options,
eth/keys
import
../../../../waku/v2/protocol/waku_message,
../../../../waku/v2/utils/time
type
WakuRelayMessage* = object
payload*: seq[byte]
contentTopic*: Option[ContentTopic]
timestamp*: Option[Timestamp]
WakuKeyPair* = object
seckey*: keys.PrivateKey
pubkey*: keys.PublicKey

View File

@ -1,114 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[tables,sequtils],
chronicles,
json_rpc/rpcserver,
libp2p/protocols/pubsub/pubsub,
../../protocol/waku_message,
../waku_node,
./jsonrpc_types,
./jsonrpc_utils
export jsonrpc_types
logScope:
topics = "waku node jsonrpc relay_api"
const futTimeout* = 5.seconds # Max time to wait for futures
const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) =
proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} =
trace "Topic handler triggered", topic=topic
let msg = WakuMessage.decode(data)
if msg.isOk():
# Add message to current cache
trace "WakuMessage received", msg=msg, topic=topic
# Make a copy of msgs for this topic to modify
var msgs = topicCache.getOrDefault(topic, @[])
if msgs.len >= maxCache:
# Message cache on this topic exceeds maximum. Delete oldest.
# @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed.
msgs.delete(0,0)
msgs.add(msg[])
# Replace indexed entry with copy
# @TODO max number of topics could be limited in node
topicCache[topic] = msgs
else:
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
# @TODO handle message decode failure
## Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for topic in PubSub(node.wakuRelay).topics.keys:
debug "Adding API topic handler for existing subscription", topic=topic
node.subscribe(topic, topicHandler)
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
## Relay API version 1 definitions
rpcsrv.rpc("post_waku_v2_relay_v1_message") do(topic: string, message: WakuRelayMessage) -> bool:
## Publishes a WakuMessage to a PubSub topic
debug "post_waku_v2_relay_v1_message"
if (await node.publish(topic, message.toWakuMessage(version = 0)).withTimeout(futTimeout)):
# Successfully published message
return true
else:
# Failed to publish message to topic
raise newException(ValueError, "Failed to publish to topic " & topic)
rpcsrv.rpc("get_waku_v2_relay_v1_messages") do(topic: string) -> seq[WakuMessage]:
## Returns all WakuMessages received on a PubSub topic since the
## last time this method was called
## @TODO ability to specify a return message limit
debug "get_waku_v2_relay_v1_messages", topic=topic
if topicCache.hasKey(topic):
let msgs = topicCache[topic]
# Clear cache before next call
topicCache[topic] = @[]
return msgs
else:
# Not subscribed to this topic
raise newException(ValueError, "Not subscribed to topic: " & topic)
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions"
# Subscribe to all requested topics
for topic in topics:
# Only subscribe to topics for which we have no subscribed topic handlers yet
if not topicCache.hasKey(topic):
node.subscribe(topic, topicHandler)
# Create message cache for this topic
trace "MessageCache for topic", topic=topic
topicCache[topic] = @[]
# Successfully subscribed to all requested topics
return true
rpcsrv.rpc("delete_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Unsubscribes a node from a list of PubSub topics
debug "delete_waku_v2_relay_v1_subscriptions"
# Unsubscribe all handlers from requested topics
for topic in topics:
node.unsubscribeAll(topic)
# Remove message cache for topic
topicCache.del(topic)
# Successfully unsubscribed from all requested topics
return true

View File

@ -0,0 +1,4 @@
# Store API
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse

View File

@ -0,0 +1,19 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[os, strutils],
json_rpc/rpcclient
import
../../../../waku/v2/protocol/waku_store/rpc,
../../../../waku/v2/utils/time,
./types
export types
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
createRpcSigs(RpcHttpClient, sourceDir / "callsigs.nim")

View File

@ -0,0 +1,85 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, sequtils],
chronicles,
json_rpc/rpcserver
import
../../../../../waku/v2/protocol/waku_message,
../../../../../waku/v2/protocol/waku_store,
../../../../../waku/v2/protocol/waku_store/rpc,
../../../../../waku/v2/utils/time,
../../../../waku/v2/node/waku_node,
../../../../waku/v2/node/peer_manager,
./types
logScope:
topics = "waku node jsonrpc store_api"
const futTimeout = 5.seconds
proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC =
PagingInfoRPC(
pageSize: some(pagingOptions.pageSize),
cursor: pagingOptions.cursor,
direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
)
proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions =
StorePagingOptions(
pageSize: pagingInfo.pageSize.get(0'u64),
cursor: pagingInfo.cursor,
forward: if pagingInfo.direction.isNone(): true
else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD
)
proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
StoreResponse(
messages: response.messages,
pagingOptions: if response.cursor.isNone(): none(StorePagingOptions)
else: some(StorePagingOptions(
pageSize: uint64(response.messages.len), # This field will be deprecated soon
forward: true, # Hardcoded. This field will be deprecated soon
cursor: response.cursor.map(toRPC)
))
)
proc installStoreApiHandlers*(node: WakuNode, server: RpcServer) =
server.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers")
let req = HistoryQuery(
pubsubTopic: pubsubTopicOption,
contentTopics: contentFiltersOption.get(@[]).mapIt(it.contentTopic),
startTime: startTime,
endTime: endTime,
ascending: if pagingOptions.isNone(): true
else: pagingOptions.get().forward,
pageSize: if pagingOptions.isNone(): DefaultPageSize
else: min(pagingOptions.get().pageSize, MaxPageSize),
cursor: if pagingOptions.isNone(): none(HistoryCursor)
else: pagingOptions.get().cursor.map(toAPI)
)
let queryFut = node.query(req, peerOpt.get())
if not await queryFut.withTimeout(futTimeout):
raise newException(ValueError, "No history response received (timeout)")
let res = queryFut.read()
if res.isErr():
raise newException(ValueError, $res.error)
return res.value.toJsonRPCStoreResponse()

View File

@ -0,0 +1,22 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options
import
../../../../waku/v2/protocol/waku_message,
../../../../waku/v2/protocol/waku_store/rpc
type
StoreResponse* = object
messages*: seq[WakuMessage]
pagingOptions*: Option[StorePagingOptions]
StorePagingOptions* = object
## This type holds some options for pagination
pageSize*: uint64
cursor*: Option[PagingIndexRPC]
forward*: bool

View File

@ -1,61 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, sequtils],
chronicles,
json_rpc/rpcserver
import
../../protocol/waku_store,
../../protocol/waku_store/rpc,
../../utils/time,
../waku_node,
../peer_manager,
./jsonrpc_types,
./jsonrpc_utils
export jsonrpc_types
logScope:
topics = "waku node jsonrpc store_api"
proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
const futTimeout = 5.seconds
## Store API version 1 definitions
rpcsrv.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers")
let req = HistoryQuery(
pubsubTopic: pubsubTopicOption,
contentTopics: if contentFiltersOption.isNone(): @[]
else: contentFiltersOption.get().mapIt(it.contentTopic),
startTime: startTime,
endTime: endTime,
ascending: if pagingOptions.isNone(): true
else: pagingOptions.get().forward,
pageSize: if pagingOptions.isNone(): DefaultPageSize
else: min(pagingOptions.get().pageSize, MaxPageSize),
cursor: if pagingOptions.isNone(): none(HistoryCursor)
else: pagingOptions.get().cursor.map(toAPI)
)
let queryFut = node.query(req, peerOpt.get())
if not await queryFut.withTimeout(futTimeout):
raise newException(ValueError, "No history response received (timeout)")
let res = queryFut.read()
if res.isErr():
raise newException(ValueError, $res.error)
debug "get_waku_v2_store_v1_messages response"
return res.value.toJsonRPCStoreResponse()