From ea0d62993d435603d6eedf7a459137f6696f47b8 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Tue, 1 Sep 2020 04:09:54 +0200 Subject: [PATCH] WakuNode init without WakuNodeConf (#117) * WakuNode init without WakuNodeConf * WakuNode start without WakuNodeConf * setupNat adjustments for making common version * Move setupNat to common.nim to be used for v1 and v2 --- examples/v1/example.nim | 19 +++- tests/v2/test_wakunode.nim | 20 ++-- waku/node/common.nim | 46 +++++++++ waku/node/v1/waku_helpers.nim | 41 +------- waku/node/v1/wakunode.nim | 20 +++- waku/node/v2/wakunode2.nim | 179 ++++++++++++---------------------- 6 files changed, 150 insertions(+), 175 deletions(-) create mode 100644 waku/node/common.nim diff --git a/examples/v1/example.nim b/examples/v1/example.nim index b3f78e918..dfe45d2d5 100644 --- a/examples/v1/example.nim +++ b/examples/v1/example.nim @@ -1,8 +1,9 @@ import - confutils, chronicles, chronos, stew/byteutils, + confutils, chronicles, chronos, stew/byteutils, stew/shims/net as stewNet, eth/[keys, p2p], ../../waku/protocol/v1/waku_protocol, ../../waku/node/v1/waku_helpers, + ../../waku/node/common, ./config_example ## This is a simple Waku v1 example to show the Waku v1 API usage. @@ -15,9 +16,19 @@ let # Seed the rng. rng = keys.newRng() # Set up the address according to NAT information. - (ip, tcpPort, udpPort) = setupNat(config.nat, clientId, config.tcpPort, - config.udpPort, config.portsShift) - address = Address(ip: ip, tcpPort: tcpPort, udpPort: udpPort) + (ipExt, tcpPortExt, udpPortExt) = setupNat(config.nat, clientId, + Port(config.tcpPort + config.portsShift), + Port(config.udpPort + config.portsShift)) + # TODO: EthereumNode should have a better split of binding address and + # external address. Also, can't have different ports as it stands now. + address = if ipExt.isNone(): + Address(ip: parseIpAddress("0.0.0.0"), + tcpPort: Port(config.tcpPort + config.portsShift), + udpPort: Port(config.udpPort + config.portsShift)) + else: + Address(ip: ipExt.get(), + tcpPort: Port(config.tcpPort + config.portsShift), + udpPort: Port(config.udpPort + config.portsShift)) # Create Ethereum Node var node = newEthereumNode(config.nodekey, # Node identifier diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index fb206ba48..fdf45e307 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -1,23 +1,27 @@ {.used.} import - std/[unittest, os], - confutils, chronicles, chronos, stew/shims/net as stewNet, - json_rpc/[rpcclient, rpcserver], + std/unittest, + chronicles, chronos, stew/shims/net as stewNet, stew/byteutils, libp2p/crypto/crypto, libp2p/crypto/secp, eth/keys, - ../../waku/node/v2/[config, wakunode2, waku_types], + ../../waku/node/v2/[wakunode2, waku_types], ../test_helpers procSuite "WakuNode": asyncTest "Message published with content filter is retrievable": - let conf = WakuNodeConf.load() - let node = await WakuNode.init(conf) + let + rng = keys.newRng() + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"), + Port(60000)) - let topic = "foobar" + await node.start() - let message = cast[seq[byte]]("hello world") + let + topic = "foobar" + message = ("hello world").toBytes node.publish(topic, ContentFilter(contentTopic: topic), message) let response = node.query(HistoryQuery(topics: @[topic])) diff --git a/waku/node/common.nim b/waku/node/common.nim new file mode 100644 index 000000000..289d741d2 --- /dev/null +++ b/waku/node/common.nim @@ -0,0 +1,46 @@ +import + std/[strutils, options], + chronicles, stew/shims/net as stewNet, + eth/net/nat + +proc setupNat*(natConf, clientId: string, tcpPort, udpPort: Port): + tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], + udpPort: Option[Port]] {.gcsafe.} = + + var nat: NatStrategy + case natConf.toLowerAscii: + of "any": + nat = NatAny + of "none": + nat = NatNone + of "upnp": + nat = NatUpnp + of "pmp": + nat = NatPmp + else: + if natConf.startsWith("extip:"): + try: + # any required port redirection is assumed to be done by hand + result.ip = some(ValidIpAddress.init(natConf[6..^1])) + nat = NatNone + except ValueError: + error "nor a valid IP address", address = natConf[6..^1] + quit QuitFailure + else: + error "not a valid NAT mechanism", value = natConf + quit QuitFailure + + if nat != NatNone: + let extIp = getExternalIP(nat) + if extIP.isSome: + result.ip = some(ValidIpAddress.init extIp.get) + # TODO redirectPorts in considered a gcsafety violation + # because it obtains the address of a non-gcsafe proc? + let extPorts = ({.gcsafe.}: + redirectPorts(tcpPort = tcpPort, + udpPort = udpPort, + description = clientId)) + if extPorts.isSome: + let (extTcpPort, extUdpPort) = extPorts.get() + result.tcpPort = some(extTcpPort) + result.udpPort = some(extUdpPort) diff --git a/waku/node/v1/waku_helpers.nim b/waku/node/v1/waku_helpers.nim index aa23c3d81..b3e111975 100644 --- a/waku/node/v1/waku_helpers.nim +++ b/waku/node/v1/waku_helpers.nim @@ -1,9 +1,6 @@ import - std/strutils, chronos, - eth/net/nat, eth/[p2p, async_utils], eth/p2p/peer_pool - -let globalListeningAddr = parseIpAddress("0.0.0.0") + eth/[p2p, async_utils], eth/p2p/peer_pool proc setBootNodes*(nodes: openArray[string]): seq[ENode] = result = newSeqOfCap[ENode](nodes.len) @@ -17,39 +14,3 @@ proc connectToNodes*(node: EthereumNode, nodes: openArray[string]) = let whisperENode = ENode.fromString(nodeId).expect("correct node") traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode)) - -proc setupNat*(natConf, clientId: string, tcpPort, udpPort, portsShift: uint16): - tuple[ip: IpAddress, tcpPort: Port, udpPort: Port] = - # defaults - result.ip = globalListeningAddr - result.tcpPort = Port(tcpPort + portsShift) - result.udpPort = Port(udpPort + portsShift) - - var nat: NatStrategy - case natConf.toLowerAscii(): - of "any": - nat = NatAny - of "none": - nat = NatNone - of "upnp": - nat = NatUpnp - of "pmp": - nat = NatPmp - else: - if natConf.startsWith("extip:") and isIpAddress(natConf[6..^1]): - # any required port redirection is assumed to be done by hand - result.ip = parseIpAddress(natConf[6..^1]) - nat = NatNone - else: - error "not a valid NAT mechanism, nor a valid IP address", value = natConf - quit(QuitFailure) - - if nat != NatNone: - let extIP = getExternalIP(nat) - if extIP.isSome: - result.ip = extIP.get() - let extPorts = redirectPorts(tcpPort = result.tcpPort, - udpPort = result.udpPort, - description = clientId) - if extPorts.isSome: - (result.tcpPort, result.udpPort) = extPorts.get() diff --git a/waku/node/v1/wakunode.nim b/waku/node/v1/wakunode.nim index b584ac9b1..a16dc5559 100644 --- a/waku/node/v1/wakunode.nim +++ b/waku/node/v1/wakunode.nim @@ -1,19 +1,29 @@ import - std/strutils, confutils, chronos, json_rpc/rpcserver, metrics, metrics/chronicles_support, + stew/shims/net as stewNet, eth/[keys, p2p], eth/common/utils, eth/p2p/[discovery, enode, peer_pool, bootnodes, whispernodes], eth/p2p/rlpx_protocols/whisper_protocol, - ../../protocol/v1/[waku_protocol, waku_bridge], + ../../protocol/v1/[waku_protocol, waku_bridge], ../common, ./rpc/[waku, wakusim, key_storage], ./waku_helpers, ./config const clientId = "Nimbus waku node" proc run(config: WakuNodeConf, rng: ref BrHmacDrbgContext) = let - (ip, tcpPort, udpPort) = setupNat(config.nat, clientId, config.tcpPort, - config.udpPort, config.portsShift) - address = Address(ip: ip, tcpPort: tcpPort, udpPort: udpPort) + (ipExt, tcpPortExt, udpPortExt) = setupNat(config.nat, clientId, + Port(config.tcpPort + config.portsShift), + Port(config.udpPort + config.portsShift)) + # TODO: EthereumNode should have a better split of binding address and + # external address. Also, can't have different ports as it stands now. + address = if ipExt.isNone(): + Address(ip: parseIpAddress("0.0.0.0"), + tcpPort: Port(config.tcpPort + config.portsShift), + udpPort: Port(config.udpPort + config.portsShift)) + else: + Address(ip: ipExt.get(), + tcpPort: Port(config.tcpPort + config.portsShift), + udpPort: Port(config.udpPort + config.portsShift)) # Set-up node var node = newEthereumNode(config.nodekey, address, 1, nil, clientId, diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 141b19a14..31d73ec2d 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -2,7 +2,7 @@ import std/[strutils, options], chronos, confutils, json_rpc/rpcserver, metrics, stew/shims/net as stewNet, # TODO: Why do we need eth keys? - eth/keys, eth/net/nat, + eth/keys, # eth/[keys, p2p], eth/net/nat, eth/p2p/[discovery, enode], libp2p/multiaddress, libp2p/crypto/crypto, @@ -10,7 +10,7 @@ import # NOTE For TopicHandler, solve with exports? libp2p/protocols/pubsub/pubsub, libp2p/peerinfo, - ../../protocol/v2/waku_relay, + ../../protocol/v2/waku_relay, ../common, ./waku_types, ./config, ./standard_setup, ./rpc/wakurpc # key and crypto modules different @@ -30,7 +30,7 @@ type HistoryResponse* = object messages*: seq[Message] -const clientId = "Nimbus waku node" +const clientId = "Nimbus Waku v2 node" # NOTE Any difference here in Waku vs Eth2? # E.g. Devp2p/Libp2p support, etc. @@ -76,103 +76,58 @@ proc connectToNodes(n: WakuNode, nodes: openArray[string]) = # let whisperENode = ENode.fromString(nodeId).expect("correct node") # traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode)) -# NOTE Identical with eth2_network, pull out into common? -# NOTE Except portsShift -proc setupNat(conf: WakuNodeConf): tuple[ip: Option[ValidIpAddress], - tcpPort: Port, - udpPort: Port] {.gcsafe.} = - # defaults - result.tcpPort = Port(uint16(conf.tcpPort) + conf.portsShift) - result.udpPort = Port(uint16(conf.udpPort) + conf.portsShift) +proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port) = + let + ta = initTAddress(rpcIp, rpcPort) + rpcServer = newRpcHttpServer([ta]) + setupWakuRPC(node, rpcServer) + rpcServer.start() + info "RPC Server started", ta - var nat: NatStrategy - case conf.nat.toLowerAscii: - of "any": - nat = NatAny - of "none": - nat = NatNone - of "upnp": - nat = NatUpnp - of "pmp": - nat = NatPmp - else: - if conf.nat.startsWith("extip:"): - try: - # any required port redirection is assumed to be done by hand - result.ip = some(ValidIpAddress.init(conf.nat[6..^1])) - nat = NatNone - except ValueError: - error "nor a valid IP address", address = conf.nat[6..^1] - quit QuitFailure - else: - error "not a valid NAT mechanism", value = conf.nat - quit QuitFailure +proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) = + info "Starting metrics HTTP server", serverIp, serverPort + metrics.startHttpServer($serverIp, serverPort) - if nat != NatNone: - let extIp = getExternalIP(nat) - if extIP.isSome: - result.ip = some(ValidIpAddress.init extIp.get) - # TODO redirectPorts in considered a gcsafety violation - # because it obtains the address of a non-gcsafe proc? - let extPorts = ({.gcsafe.}: - redirectPorts(tcpPort = result.tcpPort, - udpPort = result.udpPort, - description = clientId)) - if extPorts.isSome: - (result.tcpPort, result.udpPort) = extPorts.get() +proc startMetricsLog() = + proc logMetrics(udata: pointer) {.closure, gcsafe.} = + {.gcsafe.}: + # TODO: libp2p_pubsub_peers is not public, so we need to make this either + # public in libp2p or do our own peer counting after all. + let + totalMessages = total_messages.value -# TODO Consider removing unused arguments -proc init*(T: type WakuNode, conf: WakuNodeConf, switch: Switch, - ip: Option[ValidIpAddress], tcpPort, udpPort: Port, - privKey: keys.PrivateKey, - peerInfo: PeerInfo): T = - new result - result.switch = switch - result.peerInfo = peerInfo - # TODO Peer pool, discovery, protocol state, etc + info "Node metrics", totalMessages + discard setTimer(Moment.fromNow(2.seconds), logMetrics) + discard setTimer(Moment.fromNow(2.seconds), logMetrics) -proc createWakuNode*(conf: WakuNodeConf): Future[WakuNode] {.async, gcsafe.} = - var - (extIp, extTcpPort, extUdpPort) = setupNat(conf) - hostAddress = tcpEndPoint(conf.libp2pAddress, Port(uint16(conf.tcpPort) + conf.portsShift)) - announcedAddresses = if extIp.isNone(): @[] - else: @[tcpEndPoint(extIp.get(), extTcpPort)] +## Public API +## +proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, + bindIp: ValidIpAddress, bindPort: Port, + extIp = none[ValidIpAddress](), extPort = none[Port]()): T = + ## Creates and starts a Waku node. + let + hostAddress = tcpEndPoint(bindIp, bindPort) + announcedAddresses = if extIp.isNone() or extPort.isNone(): @[] + else: @[tcpEndPoint(extIp.get(), extPort.get())] + peerInfo = PeerInfo.init(nodekey) info "Initializing networking", hostAddress, announcedAddresses - - let - nodekey = conf.nodekey - pubkey = nodekey.getKey.get() - keys = KeyPair(seckey: nodekey, pubkey: pubkey) - peerInfo = PeerInfo.init(nodekey) - # XXX: Add this when we create node or start it? peerInfo.addrs.add(hostAddress) - var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true) + var switch = newStandardSwitch(some(nodekey), hostAddress, triggerSelf = true) - # TODO Either persist WakuNode or something here + return WakuNode(switch: switch, peerInfo: peerInfo) - # TODO Look over this - # XXX Consider asEthKey and asLibp2pKey - result = WakuNode.init(conf, switch, extIp, extTcpPort, extUdpPort, keys.seckey.asEthKey, peerInfo) - -proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} = +proc start*(node: WakuNode) {.async.} = node.libp2pTransportLoops = await node.switch.start() # NOTE WakuRelay is being instantiated as part of creating switch with PubSub field set # # TODO Mount Waku Store and Waku Filter here - # TODO Move out into separate proc - if conf.rpc: - let ta = initTAddress(conf.rpcAddress, Port(conf.rpcPort + conf.portsShift)) - var rpcServer = newRpcHttpServer([ta]) - setupWakuRPC(node, rpcServer) - rpcServer.start() - info "rpcServer started", ta=ta - # TODO Get this from WakuNode obj let peerInfo = node.peerInfo let id = peerInfo.peerId.pretty @@ -181,40 +136,6 @@ proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} = ## XXX: this should be /ip4..., / stripped? info "Listening on", full = listenStr - # XXX: So doing this _after_ other setup - # Optionally direct connect with a set of nodes - if conf.staticnodes.len > 0: connectToNodes(node, conf.staticnodes) - - # TODO Move out into separate proc - when defined(insecure): - if conf.metricsServer: - let - address = conf.metricsServerAddress - port = conf.metricsServerPort + conf.portsShift - info "Starting metrics HTTP server", address, port - metrics.startHttpServer($address, Port(port)) - - if conf.logMetrics: - proc logMetrics(udata: pointer) {.closure, gcsafe.} = - {.gcsafe.}: - let - connectedPeers = connected_peers.value - totalMessages = total_messages.value - - info "Node metrics", connectedPeers, totalMessages - addTimer(Moment.fromNow(2.seconds), logMetrics) - addTimer(Moment.fromNow(2.seconds), logMetrics) - -## Public API -## - -proc init*(T: type WakuNode, conf: WakuNodeConf): Future[T] {.async.} = - ## Creates and starts a Waku node. - ## - let node = await createWakuNode(conf) - await node.start(conf) - return node - # NOTE TopicHandler is defined in pubsub.nim, roughly: #type TopicHandler* = proc(topic: string, data: seq[byte]) @@ -294,6 +215,28 @@ proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse = result.messages.insert(msg[1]) when isMainModule: - let conf = WakuNodeConf.load() - discard WakuNode.init(conf) + let + conf = WakuNodeConf.load() + (extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId, + Port(uint16(conf.tcpPort) + conf.portsShift), + Port(uint16(conf.udpPort) + conf.portsShift)) + node = WakuNode.init(conf.nodeKey, conf.libp2pAddress, + Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort) + + waitFor node.start() + + if conf.staticnodes.len > 0: + connectToNodes(node, conf.staticnodes) + + if conf.rpc: + startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift)) + + if conf.logMetrics: + startMetricsLog() + + when defined(insecure): + if conf.metricsServer: + startMetricsServer(conf.metricsServerAddress, + Port(conf.metricsServerPort + conf.portsShift)) + runForever()