From 732230cfe297dc1a48af4c26323de74648013fdf Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Wed, 21 Oct 2020 11:54:29 +0200 Subject: [PATCH] Wakuv1v2 process (#238) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add start of wakubridge with only waku v1 node for now * Add waku v2 node to wakubridge * Add bridge target to makefile * Keep waku v1 PoW configurable * Fix for latest WakuNode API * Fix Makefile target all * Rename to config_bridge and at brief docs on bridge Co-authored-by: Oskar Thorén --- Makefile | 7 +- docs/tutorial/dingpu.md | 21 ++++- waku.nimble | 3 + waku/node/config_bridge.nim | 170 ++++++++++++++++++++++++++++++++++++ waku/node/v2/wakunode2.nim | 84 +++++++++--------- waku/node/wakubridge.nim | 133 ++++++++++++++++++++++++++++ 6 files changed, 372 insertions(+), 46 deletions(-) create mode 100644 waku/node/config_bridge.nim create mode 100644 waku/node/wakubridge.nim diff --git a/Makefile b/Makefile index 7d1089b3a..1efd03a18 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ DOCKER_IMAGE_NIM_PARAMS ?= -d:chronicles_colors:none -d:insecure wakunode2 \ example1 \ example2 \ + bridge \ test \ clean \ libbacktrace @@ -46,7 +47,7 @@ GIT_SUBMODULE_UPDATE := git submodule update --init --recursive else # "variables.mk" was included. Business as usual until the end of this file. # default target, because it's the first one that doesn't start with '.' -all: | wakunode1 sim1 example1 wakunode2 sim2 example2 chat2 +all: | wakunode1 sim1 example1 wakunode2 sim2 example2 chat2 bridge # must be included after the default target -include $(BUILD_SYSTEM_DIR)/makefiles/targets.mk @@ -112,6 +113,10 @@ chat2: | build deps echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims +bridge: | build deps + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim bridge $(NIM_PARAMS) waku.nims + # Builds and run the test suite (Waku v1 + v2) test: | test1 test2 diff --git a/docs/tutorial/dingpu.md b/docs/tutorial/dingpu.md index 04c546c40..3731b8dbf 100644 --- a/docs/tutorial/dingpu.md +++ b/docs/tutorial/dingpu.md @@ -15,11 +15,28 @@ By specifying `staticnode` it connects to that node subscribes to the `waku` top Then type messages to publish. +## Interactively add a node + +There is also an interactive mode. Type `/connect` then paste address of other node. However, this currently has some timing issues with mesh not being updated, so it is adviced not to use this until this has been addressed. See https://github.com/status-im/nim-waku/issues/231 + ## Dingpu cluster node ``` /ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS ``` -## Interactively add a node -There is also an interactive mode. Type `/connect` then paste address of other node. However, this currently has some timing issues with mesh not being updated, so it is adviced not to use this until this has been addressed. See https://github.com/status-im/nim-waku/issues/231 +## Run a node + +To just run a node and not interact on the chat it is enough to run `wakunode2`: +``` +./build/wakunode2 --staticnode: +``` + +You can also run the `wakubridge` process, which currently runs both a Waku v1 +and Waku v2 node. Currently, it has the same effect as running a `wakunode` and +`wakunode2` process separately, but bridging functionality will be added later +to this application. + +``` +./build/wakubridge --staticnodev2: --fleetv1:test +``` diff --git a/waku.nimble b/waku.nimble index 951b83a72..c20c4e012 100644 --- a/waku.nimble +++ b/waku.nimble @@ -82,3 +82,6 @@ task chat2, "Build example Waku v2 chat usage": # output to STDOUT. Can be fixed by redirecting logs to file (e.g.) #buildBinary name, "examples/v2/", "-d:chronicles_log_level=WARN" buildBinary name, "examples/v2/", "-d:chronicles_log_level=DEBUG" + +task bridge, "Build Waku v1 - v2 bridge": + buildBinary "wakubridge", "waku/node/", "-d:chronicles_log_level=DEBUG" diff --git a/waku/node/config_bridge.nim b/waku/node/config_bridge.nim new file mode 100644 index 000000000..ad5d042fb --- /dev/null +++ b/waku/node/config_bridge.nim @@ -0,0 +1,170 @@ +import + confutils, confutils/defs, confutils/std/net, chronicles, chronos, + libp2p/crypto/[crypto, secp], + eth/keys + +type + FleetV1* = enum + none + prod + staging + test + + WakuNodeConf* = object + logLevel* {. + desc: "Sets the log level" + defaultValue: LogLevel.INFO + name: "log-level" .}: LogLevel + + listenAddress* {. + defaultValue: defaultListenAddress(config) + desc: "Listening address for the LibP2P traffic" + name: "listen-address"}: ValidIpAddress + + libp2pTcpPort* {. + desc: "Libp2p TCP listening port (for Waku v2)" + defaultValue: 9000 + name: "libp2p-tcp-port" .}: uint16 + + devp2pTcpPort* {. + desc: "Devp2p TCP listening port (for Waku v1)" + defaultValue: 30303 + name: "devp2p-tcp-port" .}: uint16 + + udpPort* {. + desc: "UDP listening port" + defaultValue: 9000 + name: "udp-port" .}: uint16 + + portsShift* {. + desc: "Add a shift to all default port numbers" + defaultValue: 0 + name: "ports-shift" .}: uint16 + + nat* {. + desc: "Specify method to use for determining public address. " & + "Must be one of: any, none, upnp, pmp, extip:" + defaultValue: "any" .}: string + + rpc* {. + desc: "Enable Waku RPC server" + defaultValue: false + name: "rpc" .}: bool + + rpcAddress* {. + desc: "Listening address of the RPC server", + defaultValue: ValidIpAddress.init("127.0.0.1") + name: "rpc-address" }: ValidIpAddress + + rpcPort* {. + desc: "Listening port of the RPC server" + defaultValue: 8545 + name: "rpc-port" .}: uint16 + + metricsServer* {. + desc: "Enable the metrics server" + defaultValue: false + name: "metrics-server" .}: bool + + metricsServerAddress* {. + desc: "Listening address of the metrics server" + defaultValue: ValidIpAddress.init("127.0.0.1") + name: "metrics-server-address" }: ValidIpAddress + + metricsServerPort* {. + desc: "Listening HTTP port of the metrics server" + defaultValue: 8008 + name: "metrics-server-port" .}: uint16 + + ### Waku v1 options + fleetv1* {. + desc: "Select the Waku v1 fleet to connect to" + defaultValue: FleetV1.none + name: "fleetv1" .}: FleetV1 + + staticnodesv1* {. + desc: "Enode URL to directly connect with. Argument may be repeated" + name: "staticnodev1" .}: seq[string] + + nodekeyv1* {. + desc: "DevP2P node private key as hex", + # TODO: can the rng be passed in somehow via Load? + defaultValue: keys.KeyPair.random(keys.newRng()[]) + name: "nodekeyv1" .}: keys.KeyPair + + wakuPow* {. + desc: "PoW requirement of Waku node.", + defaultValue: 0.002 + name: "waku-pow" .}: float64 + + ### Waku v2 options + staticnodesv2* {. + desc: "Multiaddr of peer to directly connect with. Argument may be repeated" + name: "staticnodev2" }: seq[string] + + nodekeyv2* {. + desc: "P2P node private key as hex" + defaultValue: crypto.PrivateKey.random(Secp256k1, keys.newRng()[]).tryGet() + name: "nodekeyv2" }: crypto.PrivateKey + + topics* {. + desc: "Default topics to subscribe to (space seperated list)" + defaultValue: "waku" + name: "topics" .}: string + + store* {. + desc: "Flag whether to start store protocol", + defaultValue: false + name: "store" }: bool + + filter* {. + desc: "Flag whether to start filter protocol", + defaultValue: false + name: "filter" }: bool + + relay* {. + desc: "Flag whether to start relay protocol", + defaultValue: true + name: "relay" }: bool + + storenode* {. + desc: "Multiaddr of peer to connect with for waku store protocol" + defaultValue: "" + name: "storenode" }: string + + filternode* {. + desc: "Multiaddr of peer to connect with for waku filter protocol" + defaultValue: "" + name: "filternode" }: string + +proc parseCmdArg*(T: type keys.KeyPair, p: TaintedString): T = + try: + let privkey = keys.PrivateKey.fromHex(string(p)).tryGet() + result = privkey.toKeyPair() + except CatchableError: + raise newException(ConfigurationError, "Invalid private key") + +proc completeCmdArg*(T: type keys.KeyPair, val: TaintedString): seq[string] = + return @[] + +proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T = + let key = SkPrivateKey.init(p) + if key.isOk(): + crypto.PrivateKey(scheme: Secp256k1, skkey: key.get()) + else: + raise newException(ConfigurationError, "Invalid private key") + +proc completeCmdArg*(T: type crypto.PrivateKey, val: TaintedString): seq[string] = + return @[] + +proc parseCmdArg*(T: type ValidIpAddress, p: TaintedString): T = + try: + result = ValidIpAddress.init(p) + except CatchableError: + raise newException(ConfigurationError, "Invalid IP address") + +proc completeCmdArg*(T: type ValidIpAddress, val: TaintedString): seq[string] = + return @[] + +func defaultListenAddress*(conf: WakuNodeConf): ValidIpAddress = + (static ValidIpAddress.init("0.0.0.0")) diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 431de9832..a0532a243 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -1,5 +1,5 @@ import - std/[options, tables], + std/[options, tables, strutils], chronos, chronicles, stew/shims/net as stewNet, # TODO: Why do we need eth keys? eth/keys, @@ -13,6 +13,8 @@ import ../../protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier], ./waku_types +export waku_types + logScope: topics = "wakunode" @@ -230,54 +232,50 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async # Can also move this to the start proc, possibly wiser? discard node.subscribe(topic, handler) +## Helpers +proc parsePeerInfo(address: string): PeerInfo = + let multiAddr = MultiAddress.initAddress(address) + let parts = address.split("/") + return PeerInfo.init(parts[^1], [multiAddr]) + +proc dialPeer*(n: WakuNode, address: string) {.async.} = + info "dialPeer", address = address + # XXX: This turns ipfs into p2p, not quite sure why + let remotePeer = parsePeerInfo(address) + + info "Dialing peer", ma = remotePeer.addrs[0] + # NOTE This is dialing on WakuRelay protocol specifically + # TODO Keep track of conn and connected state somewhere (WakuRelay?) + #p.conn = await p.switch.dial(remotePeer, WakuRelayCodec) + #p.connected = true + discard n.switch.dial(remotePeer, WakuRelayCodec) + info "Post switch dial" + +proc setStorePeer*(n: WakuNode, address: string) = + info "dialPeer", address = address + + let remotePeer = parsePeerInfo(address) + + n.wakuStore.setPeer(remotePeer) + +proc setFilterPeer*(n: WakuNode, address: string) = + info "dialPeer", address = address + + let remotePeer = parsePeerInfo(address) + + n.wakuFilter.setPeer(remotePeer) + +proc connectToNodes*(n: WakuNode, nodes: openArray[string]) = + for nodeId in nodes: + info "connectToNodes", node = nodeId + # XXX: This seems...brittle + discard dialPeer(n, nodeId) when isMainModule: import - std/strutils, confutils, json_rpc/rpcserver, metrics, ./config, ./rpc/wakurpc, ../common - proc parsePeerInfo(address: string): PeerInfo = - let multiAddr = MultiAddress.initAddress(address) - let parts = address.split("/") - return PeerInfo.init(parts[^1], [multiAddr]) - - proc dialPeer(n: WakuNode, address: string) {.async.} = - info "dialPeer", address = address - # XXX: This turns ipfs into p2p, not quite sure why - let remotePeer = parsePeerInfo(address) - - info "Dialing peer", ma = remotePeer.addrs[0] - # NOTE This is dialing on WakuRelay protocol specifically - # TODO Keep track of conn and connected state somewhere (WakuRelay?) - #p.conn = await p.switch.dial(remotePeer, WakuRelayCodec) - #p.connected = true - discard n.switch.dial(remotePeer, WakuRelayCodec) - info "Post switch dial" - - proc setStorePeer(n: WakuNode, address: string) = - info "dialPeer", address = address - - let remotePeer = parsePeerInfo(address) - - n.wakuStore.setPeer(remotePeer) - - proc setFilterPeer(n: WakuNode, address: string) = - info "dialPeer", address = address - - let remotePeer = parsePeerInfo(address) - - n.wakuFilter.setPeer(remotePeer) - - proc connectToNodes(n: WakuNode, nodes: openArray[string]) = - for nodeId in nodes: - info "connectToNodes", node = nodeId - # XXX: This seems...brittle - discard dialPeer(n, nodeId) - # Waku 1 - # let whisperENode = ENode.fromString(nodeId).expect("correct node") - # traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode)) - proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port) = let ta = initTAddress(rpcIp, rpcPort) diff --git a/waku/node/wakubridge.nim b/waku/node/wakubridge.nim new file mode 100644 index 000000000..ec7db0865 --- /dev/null +++ b/waku/node/wakubridge.nim @@ -0,0 +1,133 @@ +import + std/strutils, + chronos, confutils, chronicles, chronicles/topics_registry, metrics, + stew/shims/net as stewNet, json_rpc/rpcserver, + # Waku v1 imports + eth/[keys, p2p], eth/common/utils, + eth/p2p/[enode, whispernodes], + ../protocol/v1/waku_protocol, ./common, + ./v1/rpc/[waku, wakusim, key_storage], ./v1/waku_helpers, + # Waku v2 imports + libp2p/crypto/crypto, + ./v2/wakunode2, + ./v2/rpc/wakurpc, + # Common cli config + ./config_bridge + +const clientIdV1 = "nim-waku v1 node" + +proc startWakuV1(config: WakuNodeConf, rng: ref BrHmacDrbgContext): + EthereumNode = + let + (ipExt, _, _) = setupNat(config.nat, clientIdV1, + Port(config.devp2pTcpPort + config.portsShift), + Port(config.udpPort + config.portsShift)) + # TODO: EthereumNode should have a better split of binding address and + # external address. Also, can't have different ports as it stands now. + address = if ipExt.isNone(): + Address(ip: parseIpAddress("0.0.0.0"), + tcpPort: Port(config.devp2pTcpPort + config.portsShift), + udpPort: Port(config.udpPort + config.portsShift)) + else: + Address(ip: ipExt.get(), + tcpPort: Port(config.devp2pTcpPort + config.portsShift), + udpPort: Port(config.udpPort + config.portsShift)) + + # Set-up node + var node = newEthereumNode(config.nodekeyv1, address, 1, nil, clientIdV1, + addAllCapabilities = false, rng = rng) + node.addCapability Waku # Always enable Waku protocol + # Set up the Waku configuration. + # This node is being set up as a bridge so it gets configured as a node with + # a full bloom filter so that it will receive and forward all messages. + # TODO: What is the PoW setting now? + let wakuConfig = WakuConfig(powRequirement: config.wakuPow, + bloom: some(fullBloom()), isLightNode: false, + maxMsgSize: waku_protocol.defaultMaxMsgSize, + topics: none(seq[waku_protocol.Topic])) + node.configureWaku(wakuConfig) + + # Optionally direct connect with a set of nodes + if config.staticnodesv1.len > 0: connectToNodes(node, config.staticnodesv1) + elif config.fleetv1 == prod: connectToNodes(node, WhisperNodes) + elif config.fleetv1 == staging: connectToNodes(node, WhisperNodesStaging) + elif config.fleetv1 == test: connectToNodes(node, WhisperNodesTest) + + let connectedFut = node.connectToNetwork(@[], + true, # Always enable listening + false # Disable discovery (only discovery v4 is currently supported) + ) + connectedFut.callback = proc(data: pointer) {.gcsafe.} = + {.gcsafe.}: + if connectedFut.failed: + fatal "connectToNetwork failed", msg = connectedFut.readError.msg + quit(1) + + return node + +proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} = + let + (extIp, extTcpPort, _) = setupNat(config.nat, clientId, + Port(uint16(config.libp2pTcpPort) + config.portsShift), + Port(uint16(config.udpPort) + config.portsShift)) + node = WakuNode.init(config.nodeKeyv2, config.listenAddress, + Port(uint16(config.libp2pTcpPort) + config.portsShift), extIp, extTcpPort) + + await node.start() + + if config.store: + mountStore(node) + + if config.filter: + mountFilter(node) + + if config.relay: + waitFor mountRelay(node, config.topics.split(" ")) + + if config.staticnodesv2.len > 0: + connectToNodes(node, config.staticnodesv2) + + if config.storenode != "": + setStorePeer(node, config.storenode) + + if config.filternode != "": + setFilterPeer(node, config.filternode) + + return node + +when isMainModule: + let + rng = keys.newRng() + let conf = WakuNodeConf.load() + + if conf.logLevel != LogLevel.NONE: + setLogLevel(conf.logLevel) + + let + nodev1 = startWakuV1(conf, rng) + nodev2 = waitFor startWakuV2(conf) + + if conf.rpc: + let ta = initTAddress(conf.rpcAddress, + Port(conf.rpcPort + conf.portsShift)) + var rpcServer = newRpcHttpServer([ta]) + # Waku v1 RPC + # TODO: Commented out the Waku v1 RPC calls as there is a conflict because + # of exact same named rpc calls between v1 and v2 + # let keys = newKeyStorage() + # setupWakuRPC(nodev1, keys, rpcServer, rng) + setupWakuSimRPC(nodev1, rpcServer) + # Waku v2 rpc + setupWakuRPC(nodev2, rpcServer) + + rpcServer.start() + + when defined(insecure): + if conf.metricsServer: + let + address = conf.metricsServerAddress + port = conf.metricsServerPort + conf.portsShift + info "Starting metrics HTTP server", address, port + metrics.startHttpServer($address, Port(port)) + + runForever()