Node: Separate run into create Wakunode and start (#59)

This commit is contained in:
Oskar Thorén 2020-07-24 09:39:58 +08:00 committed by GitHub
parent fb45502ba7
commit bede5a9358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 139 additions and 79 deletions

View File

@ -39,10 +39,15 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
verifySignature = libp2p_pubsub_verify, verifySignature = libp2p_pubsub_verify,
sign = libp2p_pubsub_sign, sign = libp2p_pubsub_sign,
transportFlags: set[ServerFlags] = {}, transportFlags: set[ServerFlags] = {},
rng = newRng()): Switch = rng = newRng(),
inTimeout: Duration = 1.minutes,
outTimeout: Duration = 1.minutes): Switch =
info "newStandardSwitch" info "newStandardSwitch"
proc createMplex(conn: Connection): Muxer = proc createMplex(conn: Connection): Muxer =
result = Mplex.init(conn) Mplex.init(
conn,
inTimeout = inTimeout,
outTimeout = outTimeout)
let let
seckey = privKey.get(otherwise = PrivateKey.random(ECDSA, rng[]).tryGet()) seckey = privKey.get(otherwise = PrivateKey.random(ECDSA, rng[]).tryGet())

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit 765883c454be726799f4e724b4dc2ca8fe25bc74 Subproject commit ac5155394f25c2049c847e328dcfc67a01547a52

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 3b088f898045ceb72387effa1ef81938959aa725 Subproject commit 38eb36efaee09551e0cd6c1d4530c9abfe9cb322

View File

@ -1,5 +1,7 @@
import import
confutils/defs, chronicles, chronos, strutils,
confutils, confutils/defs, confutils/std/net,
chronicles, chronos,
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/crypto/secp, libp2p/crypto/secp,
nimcrypto/utils, nimcrypto/utils,
@ -18,15 +20,20 @@ type
defaultValue: LogLevel.INFO defaultValue: LogLevel.INFO
name: "log-level" }: LogLevel name: "log-level" }: LogLevel
libp2pAddress* {.
defaultValue: defaultListenAddress(config)
desc: "Listening address for the LibP2P traffic."
name: "listen-address"}: ValidIpAddress
tcpPort* {. tcpPort* {.
desc: "TCP listening port." desc: "TCP listening port."
defaultValue: 60000 defaultValue: 60000
name: "tcp-port" }: uint16 name: "tcp-port" }: Port
udpPort* {. udpPort* {.
desc: "UDP listening port." desc: "UDP listening port."
defaultValue: 60000 defaultValue: 60000
name: "udp-port" }: uint16 name: "udp-port" }: Port
portsShift* {. portsShift* {.
desc: "Add a shift to all port numbers." desc: "Add a shift to all port numbers."
@ -105,8 +112,8 @@ type
rpcAddress* {. rpcAddress* {.
desc: "Listening address of the RPC server.", desc: "Listening address of the RPC server.",
defaultValue: parseIpAddress("127.0.0.1") defaultValue: ValidIpAddress.init("127.0.0.1")
name: "rpc-address" }: IpAddress name: "rpc-address" }: ValidIpAddress
rpcPort* {. rpcPort* {.
desc: "Listening port of the RPC server.", desc: "Listening port of the RPC server.",
@ -120,8 +127,8 @@ type
metricsServerAddress* {. metricsServerAddress* {.
desc: "Listening address of the metrics server." desc: "Listening address of the metrics server."
defaultValue: parseIpAddress("127.0.0.1") defaultValue: ValidIpAddress.init("127.0.0.1")
name: "metrics-server-address" }: IpAddress name: "metrics-server-address" }: ValidIpAddress
metricsServerPort* {. metricsServerPort* {.
desc: "Listening HTTP port of the metrics server." desc: "Listening HTTP port of the metrics server."
@ -149,11 +156,25 @@ proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T =
proc completeCmdArg*(T: type crypto.PrivateKey, val: TaintedString): seq[string] = proc completeCmdArg*(T: type crypto.PrivateKey, val: TaintedString): seq[string] =
return @[] return @[]
proc parseCmdArg*(T: type IpAddress, p: TaintedString): T = proc parseCmdArg*(T: type ValidIpAddress, p: TaintedString): T =
try: try:
result = parseIpAddress(p) result = ValidIpAddress.init(p)
except CatchableError as e: except CatchableError as e:
raise newException(ConfigurationError, "Invalid IP address") raise newException(ConfigurationError, "Invalid IP address")
proc completeCmdArg*(T: type IpAddress, val: TaintedString): seq[string] = proc completeCmdArg*(T: type ValidIpAddress, val: TaintedString): seq[string] =
return @[] return @[]
proc parseCmdArg*(T: type Port, p: TaintedString): T =
try:
result = Port(parseInt(p))
except CatchableError as e:
raise newException(ConfigurationError, "Invalid Port number")
proc completeCmdArg*(T: type Port, val: TaintedString): seq[string] =
return @[]
func defaultListenAddress*(conf: WakuNodeConf): ValidIpAddress =
# TODO: How should we select between IPv4 and IPv6
# Maybe there should be a config option for this.
(static ValidIpAddress.init("0.0.0.0"))

View File

@ -7,6 +7,7 @@ import
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/protocol, libp2p/protocols/protocol,
libp2p/peerinfo, libp2p/peerinfo,
stew/shims/net as stewNet,
rpc/wakurpc, rpc/wakurpc,
../../protocol/v2/waku_protocol2, ../../protocol/v2/waku_protocol2,
# TODO: Pull out standard switch from tests # TODO: Pull out standard switch from tests
@ -19,9 +20,14 @@ type
PublicKey* = crypto.PublicKey PublicKey* = crypto.PublicKey
PrivateKey* = crypto.PrivateKey PrivateKey* = crypto.PrivateKey
const clientId = "Nimbus waku node" # NOTE: based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj
switch*: Switch
# XXX: Unclear if we need this
peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]]
let globalListeningAddr = parseIpAddress("0.0.0.0") const clientId = "Nimbus waku node"
proc setBootNodes(nodes: openArray[string]): seq[ENode] = proc setBootNodes(nodes: openArray[string]): seq[ENode] =
result = newSeqOfCap[ENode](nodes.len) result = newSeqOfCap[ENode](nodes.len)
@ -29,6 +35,14 @@ proc setBootNodes(nodes: openArray[string]): seq[ENode] =
# TODO: something more user friendly than an expect # TODO: something more user friendly than an expect
result.add(ENode.fromString(nodeId).expect("correct node")) result.add(ENode.fromString(nodeId).expect("correct node"))
# NOTE Any difference here in Waku vs Eth2?
# E.g. Devp2p/Libp2p support, etc.
#func asLibp2pKey*(key: keys.PublicKey): PublicKey =
# PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))
func asEthKey*(key: PrivateKey): keys.PrivateKey =
keys.PrivateKey(key.skkey)
proc initAddress(T: type MultiAddress, str: string): T = proc initAddress(T: type MultiAddress, str: string): T =
let address = MultiAddress.init(str).tryGet() let address = MultiAddress.init(str).tryGet()
if IPFS.match(address) and matchPartial(multiaddress.TCP, address): if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
@ -37,6 +51,9 @@ proc initAddress(T: type MultiAddress, str: string): T =
raise newException(ValueError, raise newException(ValueError,
"Invalid bootstrap node multi-address") "Invalid bootstrap node multi-address")
template tcpEndPoint(address, port): auto =
MultiAddress.init(address, tcpProtocol, port)
proc dialPeer(p: WakuProto, address: string) {.async.} = proc dialPeer(p: WakuProto, address: string) {.async.} =
info "dialPeer", address = address info "dialPeer", address = address
# XXX: This turns ipfs into p2p, not quite sure why # XXX: This turns ipfs into p2p, not quite sure why
@ -60,17 +77,17 @@ proc connectToNodes(p: WakuProto, nodes: openArray[string]) =
# let whisperENode = ENode.fromString(nodeId).expect("correct node") # let whisperENode = ENode.fromString(nodeId).expect("correct node")
# traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode)) # traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode))
# NOTE: Looks almost identical to beacon_chain/eth2_network.nim # NOTE Identical with eth2_network, pull out into common?
proc setupNat(conf: WakuNodeConf): tuple[ip: IpAddress, # NOTE Except portsShift
proc setupNat(conf: WakuNodeConf): tuple[ip: Option[ValidIpAddress],
tcpPort: Port, tcpPort: Port,
udpPort: Port] {.gcsafe.} = udpPort: Port] {.gcsafe.} =
# defaults # defaults
result.ip = globalListeningAddr result.tcpPort = Port(uint16(conf.tcpPort) + conf.portsShift)
result.tcpPort = Port(conf.tcpPort + conf.portsShift) result.udpPort = Port(uint16(conf.udpPort) + conf.portsShift)
result.udpPort = Port(conf.udpPort + conf.portsShift)
var nat: NatStrategy var nat: NatStrategy
case conf.nat.toLowerAscii(): case conf.nat.toLowerAscii:
of "any": of "any":
nat = NatAny nat = NatAny
of "none": of "none":
@ -80,19 +97,24 @@ proc setupNat(conf: WakuNodeConf): tuple[ip: IpAddress,
of "pmp": of "pmp":
nat = NatPmp nat = NatPmp
else: else:
if conf.nat.startsWith("extip:") and isIpAddress(conf.nat[6..^1]): if conf.nat.startsWith("extip:"):
# any required port redirection is assumed to be done by hand try:
result.ip = parseIpAddress(conf.nat[6..^1]) # any required port redirection is assumed to be done by hand
nat = NatNone result.ip = some(ValidIpAddress.init(conf.nat[6..^1]))
nat = NatNone
except ValueError:
error "nor a valid IP address", address = conf.nat[6..^1]
quit QuitFailure
else: else:
error "not a valid NAT mechanism, nor a valid IP address", value = conf.nat error "not a valid NAT mechanism", value = conf.nat
quit(QuitFailure) quit QuitFailure
if nat != NatNone: if nat != NatNone:
let extIP = getExternalIP(nat) let extIp = getExternalIP(nat)
if extIP.isSome: if extIP.isSome:
result.ip = extIP.get() result.ip = some(ValidIpAddress.init extIp.get)
# XXX: GC safety danger zone! See NBC eth2_network.nim # TODO redirectPorts in considered a gcsafety violation
# because it obtains the address of a non-gcsafe proc?
let extPorts = ({.gcsafe.}: let extPorts = ({.gcsafe.}:
redirectPorts(tcpPort = result.tcpPort, redirectPorts(tcpPort = result.tcpPort,
udpPort = result.udpPort, udpPort = result.udpPort,
@ -111,77 +133,81 @@ proc newWakuProto(switch: Switch): WakuProto =
wakuproto.handler = handle wakuproto.handler = handle
return wakuproto return wakuproto
proc run*(config: WakuNodeConf) = # TODO Consider removing unused arguments
proc init*(T: type WakuNode, conf: WakuNodeConf, switch: Switch,
ip: Option[ValidIpAddress], tcpPort, udpPort: Port,
privKey: keys.PrivateKey,
peerInfo: PeerInfo): T =
new result
result.switch = switch
result.peerInfo = peerInfo
# TODO Peer pool, discovery, protocol state, etc
info "libp2p support WIP" proc createWakuNode*(conf: WakuNodeConf): Future[WakuNode] {.async, gcsafe.} =
var
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
hostAddress = tcpEndPoint(conf.libp2pAddress, Port(uint16(conf.tcpPort) + conf.portsShift))
announcedAddresses = if extIp.isNone(): @[]
else: @[tcpEndPoint(extIp.get(), extTcpPort)]
if config.logLevel != LogLevel.NONE: info "Initializing networking", hostAddress,
setLogLevel(config.logLevel) announcedAddresses
# TODO Clean up host and announced IP a la eth2_network.nim
let let
# External TCP and UDP ports nodekey = conf.nodekey
(ip, tcpPort, udpPort) = setupNat(config) pubkey = nodekey.getKey.get()
nat_address = Address(ip: ip, tcpPort: tcpPort, udpPort: udpPort) keys = KeyPair(seckey: nodekey, pubkey: pubkey)
#port = 60000 + tcpPort
#DefaultAddr = "/ip4/127.0.0.1/tcp/55505"
address = "/ip4/127.0.0.1/tcp/" & $tcpPort
hostAddress = MultiAddress.init(address).tryGet()
# XXX: Address and hostAddress usage needs more clarity
# Difference between announced and host address relevant for running behind NAT, however doesn't seem like nim-libp2p supports this. GHI?
# NOTE: This is a privatekey
nodekey = config.nodekey
seckey = nodekey
pubkey = seckey.getKey.get()
keys = crypto.KeyPair(seckey: seckey, pubkey: pubkey)
peerInfo = PeerInfo.init(nodekey) peerInfo = PeerInfo.init(nodekey)
#INF 2020-05-28 11:15:50+08:00 Initializing networking (host address and announced same) tid=15555 address=192.168.1.101:30305:30305 # XXX: Add this when we create node or start it?
info "Initializing networking (nat address unused)", nat_address, address peerInfo.addrs.add(hostAddress)
peerInfo.addrs.add(Multiaddress.init(address).tryGet())
# switch.pubsub = wakusub, plus all the peer info etc
# And it has wakuProto lets use wakuProto maybe, cause it has switch
var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true) var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true)
let wakuProto = newWakuProto(switch)
switch.mount(wakuProto)
if config.rpc: # TODO Either persist WakuNode or something here
let ta = initTAddress(config.rpcAddress,
Port(config.rpcPort + config.portsShift)) # TODO Look over this
# XXX Consider asEthKey and asLibp2pKey
result = WakuNode.init(conf, switch, extIp, extTcpPort, extUdpPort, keys.seckey.asEthKey, peerInfo)
proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
node.libp2pTransportLoops = await node.switch.start()
let wakuProto = newWakuProto(node.switch)
node.switch.mount(wakuProto)
wakuProto.started = true
# TODO Move out into separate proc
if conf.rpc:
let ta = initTAddress(conf.rpcAddress,
Port(conf.rpcPort + conf.portsShift))
var rpcServer = newRpcHttpServer([ta]) var rpcServer = newRpcHttpServer([ta])
setupWakuRPC(wakuProto, rpcServer) setupWakuRPC(wakuProto, rpcServer)
rpcServer.start() rpcServer.start()
info "rpcServer started", ta=ta info "rpcServer started", ta=ta
# TODO: Make context async # TODO Get this from WakuNode obj
#let fut = await switch.start() let peerInfo = node.peerInfo
discard switch.start()
wakuProto.started = true
let id = peerInfo.peerId.pretty let id = peerInfo.peerId.pretty
info "PeerInfo", id = id, addrs = peerInfo.addrs info "PeerInfo", id = id, addrs = peerInfo.addrs
# Try p2p instead
let listenStr = $peerInfo.addrs[0] & "/p2p/" & id let listenStr = $peerInfo.addrs[0] & "/p2p/" & id
#let listenStr = $peerInfo.addrs[0] & "/ipfs/" & id ## XXX: this should be /ip4..., / stripped?
# XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr info "Listening on", full = listenStr
# XXX: So doing this _after_ other setup # XXX: So doing this _after_ other setup
# Optionally direct connect with a set of nodes # Optionally direct connect with a set of nodes
if config.staticnodes.len > 0: connectToNodes(wakuProto, config.staticnodes) if conf.staticnodes.len > 0: connectToNodes(wakuProto, conf.staticnodes)
# TODO Move out into separate proc
when defined(insecure): when defined(insecure):
if config.metricsServer: if conf.metricsServer:
let let
address = config.metricsServerAddress address = conf.metricsServerAddress
port = config.metricsServerPort + config.portsShift port = conf.metricsServerPort + conf.portsShift
info "Starting metrics HTTP server", address, port info "Starting metrics HTTP server", address, port
metrics.startHttpServer($address, Port(port)) metrics.startHttpServer($address, Port(port))
if config.logMetrics: if conf.logMetrics:
proc logMetrics(udata: pointer) {.closure, gcsafe.} = proc logMetrics(udata: pointer) {.closure, gcsafe.} =
{.gcsafe.}: {.gcsafe.}:
let let
@ -192,8 +218,16 @@ proc run*(config: WakuNodeConf) =
addTimer(Moment.fromNow(2.seconds), logMetrics) addTimer(Moment.fromNow(2.seconds), logMetrics)
addTimer(Moment.fromNow(2.seconds), logMetrics) addTimer(Moment.fromNow(2.seconds), logMetrics)
# TODO Get rid of this
# runForever()
#proc run(conf: WakuNodeConf) {.async, gcsafe.} =
proc init*() {.async.} =
let conf = WakuNodeConf.load()
let network = await createWakuNode(conf)
waitFor network.start(conf)
runForever() runForever()
when isMainModule: when isMainModule:
let conf = WakuNodeConf.load() discard init()
run(conf)

View File

@ -56,8 +56,8 @@ method initPubSub*(w: WakuSub) =
w.text = "Foobar" w.text = "Foobar"
debug "w.text", text = w.text debug "w.text", text = w.text
# Using GossipSub # Not using GossipSub
w.gossip_enabled = true w.gossip_enabled = false
if w.gossip_enabled: if w.gossip_enabled:
procCall GossipSub(w).initPubSub() procCall GossipSub(w).initPubSub()