From bede5a9358c086efa1f04b435abd0b5c2b1fafcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oskar=20Thor=C3=A9n?= Date: Fri, 24 Jul 2020 09:39:58 +0800 Subject: [PATCH] Node: Separate run into create Wakunode and start (#59) --- tests/v2/standard_setup.nim | 9 +- vendor/nim-eth | 2 +- vendor/nim-libp2p | 2 +- waku/node/v2/config.nim | 41 +++++-- waku/node/v2/wakunode2.nim | 160 +++++++++++++++++----------- waku/protocol/v2/waku_protocol2.nim | 4 +- 6 files changed, 139 insertions(+), 79 deletions(-) diff --git a/tests/v2/standard_setup.nim b/tests/v2/standard_setup.nim index dc1861e6b..2d9b1dbb5 100644 --- a/tests/v2/standard_setup.nim +++ b/tests/v2/standard_setup.nim @@ -39,10 +39,15 @@ proc newStandardSwitch*(privKey = none(PrivateKey), verifySignature = libp2p_pubsub_verify, sign = libp2p_pubsub_sign, transportFlags: set[ServerFlags] = {}, - rng = newRng()): Switch = + rng = newRng(), + inTimeout: Duration = 1.minutes, + outTimeout: Duration = 1.minutes): Switch = info "newStandardSwitch" proc createMplex(conn: Connection): Muxer = - result = Mplex.init(conn) + Mplex.init( + conn, + inTimeout = inTimeout, + outTimeout = outTimeout) let seckey = privKey.get(otherwise = PrivateKey.random(ECDSA, rng[]).tryGet()) diff --git a/vendor/nim-eth b/vendor/nim-eth index 765883c45..ac5155394 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit 765883c454be726799f4e724b4dc2ca8fe25bc74 +Subproject commit ac5155394f25c2049c847e328dcfc67a01547a52 diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index 3b088f898..38eb36efa 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit 3b088f898045ceb72387effa1ef81938959aa725 +Subproject commit 38eb36efaee09551e0cd6c1d4530c9abfe9cb322 diff --git a/waku/node/v2/config.nim b/waku/node/v2/config.nim index dd749162d..d8d74504a 100644 --- a/waku/node/v2/config.nim +++ b/waku/node/v2/config.nim @@ -1,5 +1,7 @@ import - confutils/defs, chronicles, chronos, + strutils, + confutils, confutils/defs, confutils/std/net, + chronicles, chronos, libp2p/crypto/crypto, libp2p/crypto/secp, nimcrypto/utils, @@ -18,15 +20,20 @@ type defaultValue: LogLevel.INFO name: "log-level" }: LogLevel + libp2pAddress* {. + defaultValue: defaultListenAddress(config) + desc: "Listening address for the LibP2P traffic." + name: "listen-address"}: ValidIpAddress + tcpPort* {. desc: "TCP listening port." defaultValue: 60000 - name: "tcp-port" }: uint16 + name: "tcp-port" }: Port udpPort* {. desc: "UDP listening port." defaultValue: 60000 - name: "udp-port" }: uint16 + name: "udp-port" }: Port portsShift* {. desc: "Add a shift to all port numbers." @@ -105,8 +112,8 @@ type rpcAddress* {. desc: "Listening address of the RPC server.", - defaultValue: parseIpAddress("127.0.0.1") - name: "rpc-address" }: IpAddress + defaultValue: ValidIpAddress.init("127.0.0.1") + name: "rpc-address" }: ValidIpAddress rpcPort* {. desc: "Listening port of the RPC server.", @@ -120,8 +127,8 @@ type metricsServerAddress* {. desc: "Listening address of the metrics server." - defaultValue: parseIpAddress("127.0.0.1") - name: "metrics-server-address" }: IpAddress + defaultValue: ValidIpAddress.init("127.0.0.1") + name: "metrics-server-address" }: ValidIpAddress metricsServerPort* {. desc: "Listening HTTP port of the metrics server." @@ -149,11 +156,25 @@ proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T = proc completeCmdArg*(T: type crypto.PrivateKey, val: TaintedString): seq[string] = return @[] -proc parseCmdArg*(T: type IpAddress, p: TaintedString): T = +proc parseCmdArg*(T: type ValidIpAddress, p: TaintedString): T = try: - result = parseIpAddress(p) + result = ValidIpAddress.init(p) except CatchableError as e: raise newException(ConfigurationError, "Invalid IP address") -proc completeCmdArg*(T: type IpAddress, val: TaintedString): seq[string] = +proc completeCmdArg*(T: type ValidIpAddress, val: TaintedString): seq[string] = return @[] + +proc parseCmdArg*(T: type Port, p: TaintedString): T = + try: + result = Port(parseInt(p)) + except CatchableError as e: + raise newException(ConfigurationError, "Invalid Port number") + +proc completeCmdArg*(T: type Port, val: TaintedString): seq[string] = + return @[] + +func defaultListenAddress*(conf: WakuNodeConf): ValidIpAddress = + # TODO: How should we select between IPv4 and IPv6 + # Maybe there should be a config option for this. + (static ValidIpAddress.init("0.0.0.0")) diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 5ace51d28..bc76b421c 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -7,6 +7,7 @@ import libp2p/crypto/crypto, libp2p/protocols/protocol, libp2p/peerinfo, + stew/shims/net as stewNet, rpc/wakurpc, ../../protocol/v2/waku_protocol2, # TODO: Pull out standard switch from tests @@ -19,9 +20,14 @@ type PublicKey* = crypto.PublicKey PrivateKey* = crypto.PrivateKey -const clientId = "Nimbus waku node" + # NOTE: based on Eth2Node in NBC eth2_network.nim + WakuNode* = ref object of RootObj + switch*: Switch + # XXX: Unclear if we need this + peerInfo*: PeerInfo + libp2pTransportLoops*: seq[Future[void]] -let globalListeningAddr = parseIpAddress("0.0.0.0") +const clientId = "Nimbus waku node" proc setBootNodes(nodes: openArray[string]): seq[ENode] = result = newSeqOfCap[ENode](nodes.len) @@ -29,6 +35,14 @@ proc setBootNodes(nodes: openArray[string]): seq[ENode] = # TODO: something more user friendly than an expect result.add(ENode.fromString(nodeId).expect("correct node")) +# NOTE Any difference here in Waku vs Eth2? +# E.g. Devp2p/Libp2p support, etc. +#func asLibp2pKey*(key: keys.PublicKey): PublicKey = +# PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key)) + +func asEthKey*(key: PrivateKey): keys.PrivateKey = + keys.PrivateKey(key.skkey) + proc initAddress(T: type MultiAddress, str: string): T = let address = MultiAddress.init(str).tryGet() if IPFS.match(address) and matchPartial(multiaddress.TCP, address): @@ -37,6 +51,9 @@ proc initAddress(T: type MultiAddress, str: string): T = raise newException(ValueError, "Invalid bootstrap node multi-address") +template tcpEndPoint(address, port): auto = + MultiAddress.init(address, tcpProtocol, port) + proc dialPeer(p: WakuProto, address: string) {.async.} = info "dialPeer", address = address # XXX: This turns ipfs into p2p, not quite sure why @@ -60,17 +77,17 @@ proc connectToNodes(p: WakuProto, nodes: openArray[string]) = # let whisperENode = ENode.fromString(nodeId).expect("correct node") # traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode)) -# NOTE: Looks almost identical to beacon_chain/eth2_network.nim -proc setupNat(conf: WakuNodeConf): tuple[ip: IpAddress, +# NOTE Identical with eth2_network, pull out into common? +# NOTE Except portsShift +proc setupNat(conf: WakuNodeConf): tuple[ip: Option[ValidIpAddress], tcpPort: Port, udpPort: Port] {.gcsafe.} = # defaults - result.ip = globalListeningAddr - result.tcpPort = Port(conf.tcpPort + conf.portsShift) - result.udpPort = Port(conf.udpPort + conf.portsShift) + result.tcpPort = Port(uint16(conf.tcpPort) + conf.portsShift) + result.udpPort = Port(uint16(conf.udpPort) + conf.portsShift) var nat: NatStrategy - case conf.nat.toLowerAscii(): + case conf.nat.toLowerAscii: of "any": nat = NatAny of "none": @@ -80,19 +97,24 @@ proc setupNat(conf: WakuNodeConf): tuple[ip: IpAddress, of "pmp": nat = NatPmp else: - if conf.nat.startsWith("extip:") and isIpAddress(conf.nat[6..^1]): - # any required port redirection is assumed to be done by hand - result.ip = parseIpAddress(conf.nat[6..^1]) - nat = NatNone + if conf.nat.startsWith("extip:"): + try: + # any required port redirection is assumed to be done by hand + result.ip = some(ValidIpAddress.init(conf.nat[6..^1])) + nat = NatNone + except ValueError: + error "nor a valid IP address", address = conf.nat[6..^1] + quit QuitFailure else: - error "not a valid NAT mechanism, nor a valid IP address", value = conf.nat - quit(QuitFailure) + error "not a valid NAT mechanism", value = conf.nat + quit QuitFailure if nat != NatNone: - let extIP = getExternalIP(nat) + let extIp = getExternalIP(nat) if extIP.isSome: - result.ip = extIP.get() - # XXX: GC safety danger zone! See NBC eth2_network.nim + result.ip = some(ValidIpAddress.init extIp.get) + # TODO redirectPorts in considered a gcsafety violation + # because it obtains the address of a non-gcsafe proc? let extPorts = ({.gcsafe.}: redirectPorts(tcpPort = result.tcpPort, udpPort = result.udpPort, @@ -111,77 +133,81 @@ proc newWakuProto(switch: Switch): WakuProto = wakuproto.handler = handle return wakuproto -proc run*(config: WakuNodeConf) = +# TODO Consider removing unused arguments +proc init*(T: type WakuNode, conf: WakuNodeConf, switch: Switch, + ip: Option[ValidIpAddress], tcpPort, udpPort: Port, + privKey: keys.PrivateKey, + peerInfo: PeerInfo): T = + new result + result.switch = switch + result.peerInfo = peerInfo + # TODO Peer pool, discovery, protocol state, etc - info "libp2p support WIP" +proc createWakuNode*(conf: WakuNodeConf): Future[WakuNode] {.async, gcsafe.} = + var + (extIp, extTcpPort, extUdpPort) = setupNat(conf) + hostAddress = tcpEndPoint(conf.libp2pAddress, Port(uint16(conf.tcpPort) + conf.portsShift)) + announcedAddresses = if extIp.isNone(): @[] + else: @[tcpEndPoint(extIp.get(), extTcpPort)] - if config.logLevel != LogLevel.NONE: - setLogLevel(config.logLevel) + info "Initializing networking", hostAddress, + announcedAddresses - # TODO Clean up host and announced IP a la eth2_network.nim let - # External TCP and UDP ports - (ip, tcpPort, udpPort) = setupNat(config) - nat_address = Address(ip: ip, tcpPort: tcpPort, udpPort: udpPort) - #port = 60000 + tcpPort - #DefaultAddr = "/ip4/127.0.0.1/tcp/55505" - address = "/ip4/127.0.0.1/tcp/" & $tcpPort - hostAddress = MultiAddress.init(address).tryGet() - - # XXX: Address and hostAddress usage needs more clarity - # Difference between announced and host address relevant for running behind NAT, however doesn't seem like nim-libp2p supports this. GHI? - # NOTE: This is a privatekey - nodekey = config.nodekey - seckey = nodekey - pubkey = seckey.getKey.get() - keys = crypto.KeyPair(seckey: seckey, pubkey: pubkey) - + nodekey = conf.nodekey + pubkey = nodekey.getKey.get() + keys = KeyPair(seckey: nodekey, pubkey: pubkey) peerInfo = PeerInfo.init(nodekey) - #INF 2020-05-28 11:15:50+08:00 Initializing networking (host address and announced same) tid=15555 address=192.168.1.101:30305:30305 - info "Initializing networking (nat address unused)", nat_address, address - peerInfo.addrs.add(Multiaddress.init(address).tryGet()) + # XXX: Add this when we create node or start it? + peerInfo.addrs.add(hostAddress) - # switch.pubsub = wakusub, plus all the peer info etc - # And it has wakuProto lets use wakuProto maybe, cause it has switch var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true) - let wakuProto = newWakuProto(switch) - switch.mount(wakuProto) - if config.rpc: - let ta = initTAddress(config.rpcAddress, - Port(config.rpcPort + config.portsShift)) + # TODO Either persist WakuNode or something here + + # TODO Look over this + # XXX Consider asEthKey and asLibp2pKey + result = WakuNode.init(conf, switch, extIp, extTcpPort, extUdpPort, keys.seckey.asEthKey, peerInfo) + +proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} = + node.libp2pTransportLoops = await node.switch.start() + + let wakuProto = newWakuProto(node.switch) + node.switch.mount(wakuProto) + wakuProto.started = true + + # TODO Move out into separate proc + if conf.rpc: + let ta = initTAddress(conf.rpcAddress, + Port(conf.rpcPort + conf.portsShift)) var rpcServer = newRpcHttpServer([ta]) setupWakuRPC(wakuProto, rpcServer) rpcServer.start() info "rpcServer started", ta=ta - # TODO: Make context async - #let fut = await switch.start() - discard switch.start() - wakuProto.started = true - + # TODO Get this from WakuNode obj + let peerInfo = node.peerInfo let id = peerInfo.peerId.pretty info "PeerInfo", id = id, addrs = peerInfo.addrs - # Try p2p instead let listenStr = $peerInfo.addrs[0] & "/p2p/" & id - #let listenStr = $peerInfo.addrs[0] & "/ipfs/" & id - # XXX: this should be /ip4..., / stripped? + ## XXX: this should be /ip4..., / stripped? info "Listening on", full = listenStr # XXX: So doing this _after_ other setup # Optionally direct connect with a set of nodes - if config.staticnodes.len > 0: connectToNodes(wakuProto, config.staticnodes) + if conf.staticnodes.len > 0: connectToNodes(wakuProto, conf.staticnodes) + # TODO Move out into separate proc when defined(insecure): - if config.metricsServer: + if conf.metricsServer: let - address = config.metricsServerAddress - port = config.metricsServerPort + config.portsShift + address = conf.metricsServerAddress + port = conf.metricsServerPort + conf.portsShift info "Starting metrics HTTP server", address, port metrics.startHttpServer($address, Port(port)) - if config.logMetrics: + if conf.logMetrics: proc logMetrics(udata: pointer) {.closure, gcsafe.} = {.gcsafe.}: let @@ -192,8 +218,16 @@ proc run*(config: WakuNodeConf) = addTimer(Moment.fromNow(2.seconds), logMetrics) addTimer(Moment.fromNow(2.seconds), logMetrics) +# TODO Get rid of this +# runForever() + +#proc run(conf: WakuNodeConf) {.async, gcsafe.} = + +proc init*() {.async.} = + let conf = WakuNodeConf.load() + let network = await createWakuNode(conf) + waitFor network.start(conf) runForever() when isMainModule: - let conf = WakuNodeConf.load() - run(conf) + discard init() diff --git a/waku/protocol/v2/waku_protocol2.nim b/waku/protocol/v2/waku_protocol2.nim index 115939148..a32291c3c 100644 --- a/waku/protocol/v2/waku_protocol2.nim +++ b/waku/protocol/v2/waku_protocol2.nim @@ -56,8 +56,8 @@ method initPubSub*(w: WakuSub) = w.text = "Foobar" debug "w.text", text = w.text - # Using GossipSub - w.gossip_enabled = true + # Not using GossipSub + w.gossip_enabled = false if w.gossip_enabled: procCall GossipSub(w).initPubSub()