From 62b824c387b970748fd00c41c06a232aa0e7c49c Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Mon, 22 Mar 2021 11:44:45 +0200 Subject: [PATCH] Add WakuBridge and test (#426) * Add WakuBridge and test * Fix test --- tests/all_tests_v2.nim | 3 +- tests/v2/test_waku_bridge.nim | 89 ++++++++++++ tests/v2/test_waku_rln_relay.nim | 2 + waku/common/wakubridge.nim | 234 +++++++++++++++++++------------ 4 files changed, 235 insertions(+), 93 deletions(-) create mode 100644 tests/v2/test_waku_bridge.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index a4cdaa379..3363ace93 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -12,7 +12,8 @@ import ./v2/test_jsonrpc_waku, ./v2/test_peer_manager, ./v2/test_web3, # TODO remove it when rln-relay tests get finalized - ./v2/test_waku_rln_relay + ./v2/test_waku_rln_relay, + ./v2/test_waku_bridge # TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc # For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim new file mode 100644 index 000000000..e79c616e2 --- /dev/null +++ b/tests/v2/test_waku_bridge.nim @@ -0,0 +1,89 @@ +{.used.} + +import + std/unittest, + chronicles, chronos, stew/shims/net as stewNet, stew/byteutils, + libp2p/crypto/crypto, + libp2p/crypto/secp, + libp2p/peerid, + libp2p/multiaddress, + libp2p/switch, + libp2p/protocols/pubsub/rpc/messages, + libp2p/protocols/pubsub/pubsub, + eth/p2p, + eth/keys, + ../../waku/common/wakubridge, + ../../waku/v1/protocol/waku_protocol, + ../../waku/v2/protocol/[waku_message, message_notifier], + ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_filter/waku_filter, + ../../waku/v2/node/wakunode2, + ../test_helpers + +procSuite "WakuBridge": + let rng = keys.newRng() + + asyncTest "Messages are bridged between Waku v1 and Waku v2": + let + # Bridge + nodev1Key = keys.KeyPair.random(rng[]) + nodev2Key = crypto.PrivateKey.random(Secp256k1, rng[])[] + bridge = WakuBridge.new( + nodev1Key= nodev1Key, + nodev1Address = localAddress(30303), + powRequirement = 0.002, + rng = rng, + nodev2Key = nodev2Key, + nodev2BindIp = ValidIpAddress.init("0.0.0.0"), nodev2BindPort= Port(60000)) + + # Waku v1 node + v1Node = setupTestNode(rng, Waku) + + # Waku v2 node + v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + + topic = [byte 0x00, 0, 0, byte 0x01] + contentTopic = ContentTopic(1) + payloadV1 = "hello from V1".toBytes() + payloadV2 = "hello from V2".toBytes() + message = WakuMessage(payload: payloadV2, contentTopic: contentTopic) + + await bridge.start() + + await v2Node.start() + v2Node.mountRelay(@[defaultBridgeTopic]) + + discard await v1Node.rlpxConnect(newNode(bridge.nodev1.toENode())) + await v2Node.connectToNodes(@[bridge.nodev2.peerInfo]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk() and msg.value().version == 1: + completionFut.complete(true) + + v2Node.subscribe(defaultBridgeTopic, relayHandler) + + await sleepAsync(2000.millis) + + # Test bridging from V2 to V1 + await v2Node.publish(defaultBridgeTopic, message) + + await sleepAsync(2000.millis) + + check: + # v1Node received message published by v2Node + v1Node.protocolState(Waku).queue.items.len == 1 + + # Test bridging from V1 to V2 + check: + v1Node.postMessage(ttl = 5, + topic = topic, + payload = payloadV1) == true + + # v2Node received payload published by v1Node + await completionFut.withTimeout(5.seconds) + + await bridge.stop() + \ No newline at end of file diff --git a/tests/v2/test_waku_rln_relay.nim b/tests/v2/test_waku_rln_relay.nim index 992d2cfb9..6dfadb43f 100644 --- a/tests/v2/test_waku_rln_relay.nim +++ b/tests/v2/test_waku_rln_relay.nim @@ -215,6 +215,8 @@ procSuite "Waku rln relay": # start rln-relay await node.mountRlnRelay(ethClientAddress = some(EthClient), ethAccountAddress = some(ethAccountAddress), membershipContractAddress = some(membershipContractAddress)) + await node.stop() + suite "Waku rln relay": test "Keygen Nim Wrappers": var diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index 037239e7e..1807ef70d 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -23,63 +23,91 @@ import # Common cli config ./config_bridge +logScope: + topics = "wakubridge" + +################## +# Default values # +################## + const + defaultBridgeTopic* = "/waku/2/default-bridge/proto" clientIdV1 = "nim-waku v1 node" - defaultBridgeTopic = "/waku/2/default-bridge/proto" defaultTTL = 5'u32 -proc toWakuMessage(env: Envelope): WakuMessage = +######### +# Types # +######### + +type + WakuBridge* = ref object of RootObj + nodev1*: EthereumNode + nodev2*: WakuNode + +################### +# Helper funtions # +################### + +func toWakuMessage(env: Envelope): WakuMessage = # Translate a Waku v1 envelope to a Waku v2 message WakuMessage(payload: env.data, contentTopic: ContentTopic(uint32.fromBytes(env.topic)), version: 1) -proc toWakuV2(env: Envelope, nodev2: WakuNode) {.async.} = - await nodev2.publish(defaultBridgeTopic, env.toWakuMessage()) +proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = + await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage()) -proc toWakuV1(msg: WakuMessage, nodev1: EthereumNode) {.gcsafe.} = - discard nodev1.postMessage(ttl = defaultTTL, - topic = msg.contentTopic.toBytes(), - payload = msg.payload) +proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} = + discard bridge.nodev1.postMessage(ttl = defaultTTL, + topic = msg.contentTopic.toBytes(), + payload = msg.payload) -proc startWakuV1(config: WakuNodeConf, rng: ref BrHmacDrbgContext): - EthereumNode = - let - (ipExt, _, _) = setupNat(config.nat, clientIdV1, - Port(config.devp2pTcpPort + config.portsShift), - Port(config.udpPort + config.portsShift)) - # TODO: EthereumNode should have a better split of binding address and - # external address. Also, can't have different ports as it stands now. - address = if ipExt.isNone(): - Address(ip: parseIpAddress("0.0.0.0"), - tcpPort: Port(config.devp2pTcpPort + config.portsShift), - udpPort: Port(config.udpPort + config.portsShift)) - else: - Address(ip: ipExt.get(), - tcpPort: Port(config.devp2pTcpPort + config.portsShift), - udpPort: Port(config.udpPort + config.portsShift)) +############## +# Public API # +############## +proc new*(T: type WakuBridge, + # NodeV1 initialisation + nodev1Key: keys.KeyPair, + nodev1Address: Address, + powRequirement = 0.002, + rng: ref BrHmacDrbgContext, + # NodeV2 initialisation + nodev2Key: crypto.PrivateKey, + nodev2BindIp: ValidIpAddress, nodev2BindPort: Port, + nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port]()): T = - # Set-up node - var node = newEthereumNode(config.nodekeyv1, address, NetworkId(1), nil, clientIdV1, - addAllCapabilities = false, rng = rng) - node.addCapability Waku # Always enable Waku protocol - # Set up the Waku configuration. + # Setup Waku v1 node + var + nodev1 = newEthereumNode(keys = nodev1Key, address = nodev1Address, + networkId = NetworkId(1), chain = nil, clientId = clientIdV1, + addAllCapabilities = false, rng = rng) + + nodev1.addCapability Waku # Always enable Waku protocol + + # Setup the Waku configuration. # This node is being set up as a bridge so it gets configured as a node with # a full bloom filter so that it will receive and forward all messages. # TODO: What is the PoW setting now? - let wakuConfig = WakuConfig(powRequirement: config.wakuPow, - bloom: some(fullBloom()), isLightNode: false, - maxMsgSize: waku_protocol.defaultMaxMsgSize, - topics: none(seq[waku_protocol.Topic])) - node.configureWaku(wakuConfig) + let wakuConfig = WakuConfig(powRequirement: powRequirement, + bloom: some(fullBloom()), isLightNode: false, + maxMsgSize: waku_protocol.defaultMaxMsgSize, + topics: none(seq[waku_protocol.Topic])) + nodev1.configureWaku(wakuConfig) - # Optionally direct connect with a set of nodes - if config.staticnodesv1.len > 0: connectToNodes(node, config.staticnodesv1) - elif config.fleetv1 == prod: connectToNodes(node, WhisperNodes) - elif config.fleetv1 == staging: connectToNodes(node, WhisperNodesStaging) - elif config.fleetv1 == test: connectToNodes(node, WhisperNodesTest) + # Setup Waku v2 node + let + nodev2 = WakuNode.init(nodev2Key, + nodev2BindIp, nodev2BindPort, + nodev2ExtIp, nodev2ExtPort) + + return WakuBridge(nodev1: nodev1, nodev2: nodev2) - let connectedFut = node.connectToNetwork(@[], +proc start*(bridge: WakuBridge) {.async.} = + info "Starting WakuBridge" + + debug "Start listening on Waku v1" + # Start listening on Waku v1 node + let connectedFut = bridge.nodev1.connectToNetwork(@[], true, # Always enable listening false # Disable discovery (only discovery v4 is currently supported) ) @@ -88,38 +116,32 @@ proc startWakuV1(config: WakuNodeConf, rng: ref BrHmacDrbgContext): if connectedFut.failed: fatal "connectToNetwork failed", msg = connectedFut.readError.msg quit(1) + + # Start Waku v2 node + debug "Start listening on Waku v2" + await bridge.nodev2.start() + + bridge.nodev2.mountRelay() # Always mount relay for bridge - return node + # Bridging + # Handle messages on Waku v1 and bridge to Waku v2 + proc handleEnvReceived(envelope: Envelope) {.gcsafe.} = + trace "Bridging envelope from V1 to V2", envelope=envelope + waitFor bridge.toWakuV2(envelope) -proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} = - let - (extIp, extTcpPort, _) = setupNat(config.nat, clientId, - Port(uint16(config.libp2pTcpPort) + config.portsShift), - Port(uint16(config.udpPort) + config.portsShift)) - node = WakuNode.init(config.nodeKeyv2, config.listenAddress, - Port(uint16(config.libp2pTcpPort) + config.portsShift), extIp, extTcpPort) + bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived) - await node.start() + # Handle messages on Waku v2 and bridge to Waku v1 + proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + trace "Bridging message from V2 to V1", msg=msg[] + bridge.toWakuV1(msg[]) + + bridge.nodev2.subscribe(defaultBridgeTopic, relayHandler) - if config.store: - mountStore(node) - - if config.filter: - mountFilter(node) - - if config.relay: - mountRelay(node, config.topics.split(" ")) - - if config.staticnodesv2.len > 0: - waitFor connectToNodes(node, config.staticnodesv2) - - if config.storenode != "": - setStorePeer(node, config.storenode) - - if config.filternode != "": - setFilterPeer(node, config.filternode) - - return node +proc stop*(bridge: WakuBridge) {.async.} = + await bridge.nodev2.stop() when isMainModule: proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) = @@ -141,35 +163,63 @@ when isMainModule: let rng = keys.newRng() - let conf = WakuNodeConf.load() - + conf = WakuNodeConf.load() + if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) + + # Load address configuration + let + (nodev1ExtIp, _, _) = setupNat(conf.nat, clientIdV1, + Port(conf.devp2pTcpPort + conf.portsShift), + Port(conf.udpPort + conf.portsShift)) + # TODO: EthereumNode should have a better split of binding address and + # external address. Also, can't have different ports as it stands now. + nodev1Address = if nodev1ExtIp.isNone(): + Address(ip: parseIpAddress("0.0.0.0"), + tcpPort: Port(conf.devp2pTcpPort + conf.portsShift), + udpPort: Port(conf.udpPort + conf.portsShift)) + else: + Address(ip: nodev1ExtIp.get(), + tcpPort: Port(conf.devp2pTcpPort + conf.portsShift), + udpPort: Port(conf.udpPort + conf.portsShift)) + (nodev2ExtIp, nodev2ExtPort, _) = setupNat(conf.nat, clientId, + Port(uint16(conf.libp2pTcpPort) + conf.portsShift), + Port(uint16(conf.udpPort) + conf.portsShift)) + + let + bridge = WakuBridge.new(nodev1Key = conf.nodekeyv1, + nodev1Address = nodev1Address, + powRequirement = conf.wakuPow, + rng = rng, + nodev2Key = conf.nodeKeyv2, + nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), + nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort) - var - nodev1 {.threadvar.}: EthereumNode - nodev2 {.threadvar.}: WakuNode + waitFor bridge.start() - nodev1 = startWakuV1(conf, rng) - nodev2 = waitFor startWakuV2(conf) + # Now load rest of config + # Optionally direct connect nodev1 with a set of nodes + if conf.staticnodesv1.len > 0: connectToNodes(bridge.nodev1, conf.staticnodesv1) + elif conf.fleetv1 == prod: connectToNodes(bridge.nodev1, WhisperNodes) + elif conf.fleetv1 == staging: connectToNodes(bridge.nodev1, WhisperNodesStaging) + elif conf.fleetv1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest) - - # Handle messages on Waku v1 and bridge to Waku v2 - proc handleEnvReceived(envelope: Envelope) {.gcsafe.} = - debug "Bridging envelope from V1 to V2", envelope=envelope - waitFor envelope.toWakuV2(nodev2) + # Mount configured Waku v2 protocols + if conf.store: + mountStore(bridge.nodev2) - nodev1.registerEnvReceivedHandler(handleEnvReceived) + if conf.filter: + mountFilter(bridge.nodev2) - # Handle messages on Waku v2 and bridge to Waku v1 - proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - debug "Bridging message from V2 to V1", msg=msg[] - msg[].toWakuV1(nodev1) - - nodev2.subscribe(defaultBridgeTopic, relayHandler) + if conf.staticnodesv2.len > 0: + waitFor connectToNodes(bridge.nodev2, conf.staticnodesv2) + if conf.storenode != "": + setStorePeer(bridge.nodev2, conf.storenode) + + if conf.filternode != "": + setFilterPeer(bridge.nodev2, conf.filternode) if conf.rpc: let ta = initTAddress(conf.rpcAddress, @@ -177,10 +227,10 @@ when isMainModule: var rpcServer = newRpcHttpServer([ta]) # Waku v1 RPC let keys = newKeyStorage() - setupWakuRPC(nodev1, keys, rpcServer, rng) - setupWakuSimRPC(nodev1, rpcServer) + setupWakuRPC(bridge.nodev1, keys, rpcServer, rng) + setupWakuSimRPC(bridge.nodev1, rpcServer) # Waku v2 rpc - startV2Rpc(nodev2, rpcServer, conf) + startV2Rpc(bridge.nodev2, rpcServer, conf) rpcServer.start()