WakuNode init without WakuNodeConf (#117)

* WakuNode init without WakuNodeConf

* WakuNode start without WakuNodeConf

* setupNat adjustments for making common version

* Move setupNat to common.nim to be used for v1 and v2
This commit is contained in:
Kim De Mey 2020-09-01 04:09:54 +02:00 committed by GitHub
parent ddcfc3fdd6
commit ea0d62993d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 150 additions and 175 deletions

View File

@ -1,8 +1,9 @@
import import
confutils, chronicles, chronos, stew/byteutils, confutils, chronicles, chronos, stew/byteutils, stew/shims/net as stewNet,
eth/[keys, p2p], eth/[keys, p2p],
../../waku/protocol/v1/waku_protocol, ../../waku/protocol/v1/waku_protocol,
../../waku/node/v1/waku_helpers, ../../waku/node/v1/waku_helpers,
../../waku/node/common,
./config_example ./config_example
## This is a simple Waku v1 example to show the Waku v1 API usage. ## This is a simple Waku v1 example to show the Waku v1 API usage.
@ -15,9 +16,19 @@ let
# Seed the rng. # Seed the rng.
rng = keys.newRng() rng = keys.newRng()
# Set up the address according to NAT information. # Set up the address according to NAT information.
(ip, tcpPort, udpPort) = setupNat(config.nat, clientId, config.tcpPort, (ipExt, tcpPortExt, udpPortExt) = setupNat(config.nat, clientId,
config.udpPort, config.portsShift) Port(config.tcpPort + config.portsShift),
address = Address(ip: ip, tcpPort: tcpPort, udpPort: udpPort) Port(config.udpPort + config.portsShift))
# TODO: EthereumNode should have a better split of binding address and
# external address. Also, can't have different ports as it stands now.
address = if ipExt.isNone():
Address(ip: parseIpAddress("0.0.0.0"),
tcpPort: Port(config.tcpPort + config.portsShift),
udpPort: Port(config.udpPort + config.portsShift))
else:
Address(ip: ipExt.get(),
tcpPort: Port(config.tcpPort + config.portsShift),
udpPort: Port(config.udpPort + config.portsShift))
# Create Ethereum Node # Create Ethereum Node
var node = newEthereumNode(config.nodekey, # Node identifier var node = newEthereumNode(config.nodekey, # Node identifier

View File

@ -1,23 +1,27 @@
{.used.} {.used.}
import import
std/[unittest, os], std/unittest,
confutils, chronicles, chronos, stew/shims/net as stewNet, chronicles, chronos, stew/shims/net as stewNet, stew/byteutils,
json_rpc/[rpcclient, rpcserver],
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/crypto/secp, libp2p/crypto/secp,
eth/keys, eth/keys,
../../waku/node/v2/[config, wakunode2, waku_types], ../../waku/node/v2/[wakunode2, waku_types],
../test_helpers ../test_helpers
procSuite "WakuNode": procSuite "WakuNode":
asyncTest "Message published with content filter is retrievable": asyncTest "Message published with content filter is retrievable":
let conf = WakuNodeConf.load() let
let node = await WakuNode.init(conf) rng = keys.newRng()
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000))
let topic = "foobar" await node.start()
let message = cast[seq[byte]]("hello world") let
topic = "foobar"
message = ("hello world").toBytes
node.publish(topic, ContentFilter(contentTopic: topic), message) node.publish(topic, ContentFilter(contentTopic: topic), message)
let response = node.query(HistoryQuery(topics: @[topic])) let response = node.query(HistoryQuery(topics: @[topic]))

46
waku/node/common.nim Normal file
View File

@ -0,0 +1,46 @@
import
std/[strutils, options],
chronicles, stew/shims/net as stewNet,
eth/net/nat
proc setupNat*(natConf, clientId: string, tcpPort, udpPort: Port):
tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port],
udpPort: Option[Port]] {.gcsafe.} =
var nat: NatStrategy
case natConf.toLowerAscii:
of "any":
nat = NatAny
of "none":
nat = NatNone
of "upnp":
nat = NatUpnp
of "pmp":
nat = NatPmp
else:
if natConf.startsWith("extip:"):
try:
# any required port redirection is assumed to be done by hand
result.ip = some(ValidIpAddress.init(natConf[6..^1]))
nat = NatNone
except ValueError:
error "nor a valid IP address", address = natConf[6..^1]
quit QuitFailure
else:
error "not a valid NAT mechanism", value = natConf
quit QuitFailure
if nat != NatNone:
let extIp = getExternalIP(nat)
if extIP.isSome:
result.ip = some(ValidIpAddress.init extIp.get)
# TODO redirectPorts in considered a gcsafety violation
# because it obtains the address of a non-gcsafe proc?
let extPorts = ({.gcsafe.}:
redirectPorts(tcpPort = tcpPort,
udpPort = udpPort,
description = clientId))
if extPorts.isSome:
let (extTcpPort, extUdpPort) = extPorts.get()
result.tcpPort = some(extTcpPort)
result.udpPort = some(extUdpPort)

View File

@ -1,9 +1,6 @@
import import
std/strutils,
chronos, chronos,
eth/net/nat, eth/[p2p, async_utils], eth/p2p/peer_pool eth/[p2p, async_utils], eth/p2p/peer_pool
let globalListeningAddr = parseIpAddress("0.0.0.0")
proc setBootNodes*(nodes: openArray[string]): seq[ENode] = proc setBootNodes*(nodes: openArray[string]): seq[ENode] =
result = newSeqOfCap[ENode](nodes.len) result = newSeqOfCap[ENode](nodes.len)
@ -17,39 +14,3 @@ proc connectToNodes*(node: EthereumNode, nodes: openArray[string]) =
let whisperENode = ENode.fromString(nodeId).expect("correct node") let whisperENode = ENode.fromString(nodeId).expect("correct node")
traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode)) traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode))
proc setupNat*(natConf, clientId: string, tcpPort, udpPort, portsShift: uint16):
tuple[ip: IpAddress, tcpPort: Port, udpPort: Port] =
# defaults
result.ip = globalListeningAddr
result.tcpPort = Port(tcpPort + portsShift)
result.udpPort = Port(udpPort + portsShift)
var nat: NatStrategy
case natConf.toLowerAscii():
of "any":
nat = NatAny
of "none":
nat = NatNone
of "upnp":
nat = NatUpnp
of "pmp":
nat = NatPmp
else:
if natConf.startsWith("extip:") and isIpAddress(natConf[6..^1]):
# any required port redirection is assumed to be done by hand
result.ip = parseIpAddress(natConf[6..^1])
nat = NatNone
else:
error "not a valid NAT mechanism, nor a valid IP address", value = natConf
quit(QuitFailure)
if nat != NatNone:
let extIP = getExternalIP(nat)
if extIP.isSome:
result.ip = extIP.get()
let extPorts = redirectPorts(tcpPort = result.tcpPort,
udpPort = result.udpPort,
description = clientId)
if extPorts.isSome:
(result.tcpPort, result.udpPort) = extPorts.get()

View File

@ -1,19 +1,29 @@
import import
std/strutils,
confutils, chronos, json_rpc/rpcserver, metrics, metrics/chronicles_support, confutils, chronos, json_rpc/rpcserver, metrics, metrics/chronicles_support,
stew/shims/net as stewNet,
eth/[keys, p2p], eth/common/utils, eth/[keys, p2p], eth/common/utils,
eth/p2p/[discovery, enode, peer_pool, bootnodes, whispernodes], eth/p2p/[discovery, enode, peer_pool, bootnodes, whispernodes],
eth/p2p/rlpx_protocols/whisper_protocol, eth/p2p/rlpx_protocols/whisper_protocol,
../../protocol/v1/[waku_protocol, waku_bridge], ../../protocol/v1/[waku_protocol, waku_bridge], ../common,
./rpc/[waku, wakusim, key_storage], ./waku_helpers, ./config ./rpc/[waku, wakusim, key_storage], ./waku_helpers, ./config
const clientId = "Nimbus waku node" const clientId = "Nimbus waku node"
proc run(config: WakuNodeConf, rng: ref BrHmacDrbgContext) = proc run(config: WakuNodeConf, rng: ref BrHmacDrbgContext) =
let let
(ip, tcpPort, udpPort) = setupNat(config.nat, clientId, config.tcpPort, (ipExt, tcpPortExt, udpPortExt) = setupNat(config.nat, clientId,
config.udpPort, config.portsShift) Port(config.tcpPort + config.portsShift),
address = Address(ip: ip, tcpPort: tcpPort, udpPort: udpPort) Port(config.udpPort + config.portsShift))
# TODO: EthereumNode should have a better split of binding address and
# external address. Also, can't have different ports as it stands now.
address = if ipExt.isNone():
Address(ip: parseIpAddress("0.0.0.0"),
tcpPort: Port(config.tcpPort + config.portsShift),
udpPort: Port(config.udpPort + config.portsShift))
else:
Address(ip: ipExt.get(),
tcpPort: Port(config.tcpPort + config.portsShift),
udpPort: Port(config.udpPort + config.portsShift))
# Set-up node # Set-up node
var node = newEthereumNode(config.nodekey, address, 1, nil, clientId, var node = newEthereumNode(config.nodekey, address, 1, nil, clientId,

View File

@ -2,7 +2,7 @@ import
std/[strutils, options], std/[strutils, options],
chronos, confutils, json_rpc/rpcserver, metrics, stew/shims/net as stewNet, chronos, confutils, json_rpc/rpcserver, metrics, stew/shims/net as stewNet,
# TODO: Why do we need eth keys? # TODO: Why do we need eth keys?
eth/keys, eth/net/nat, eth/keys,
# eth/[keys, p2p], eth/net/nat, eth/p2p/[discovery, enode], # eth/[keys, p2p], eth/net/nat, eth/p2p/[discovery, enode],
libp2p/multiaddress, libp2p/multiaddress,
libp2p/crypto/crypto, libp2p/crypto/crypto,
@ -10,7 +10,7 @@ import
# NOTE For TopicHandler, solve with exports? # NOTE For TopicHandler, solve with exports?
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsub,
libp2p/peerinfo, libp2p/peerinfo,
../../protocol/v2/waku_relay, ../../protocol/v2/waku_relay, ../common,
./waku_types, ./config, ./standard_setup, ./rpc/wakurpc ./waku_types, ./config, ./standard_setup, ./rpc/wakurpc
# key and crypto modules different # key and crypto modules different
@ -30,7 +30,7 @@ type
HistoryResponse* = object HistoryResponse* = object
messages*: seq[Message] messages*: seq[Message]
const clientId = "Nimbus waku node" const clientId = "Nimbus Waku v2 node"
# NOTE Any difference here in Waku vs Eth2? # NOTE Any difference here in Waku vs Eth2?
# E.g. Devp2p/Libp2p support, etc. # E.g. Devp2p/Libp2p support, etc.
@ -76,103 +76,58 @@ proc connectToNodes(n: WakuNode, nodes: openArray[string]) =
# let whisperENode = ENode.fromString(nodeId).expect("correct node") # let whisperENode = ENode.fromString(nodeId).expect("correct node")
# traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode)) # traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode))
# NOTE Identical with eth2_network, pull out into common? proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port) =
# NOTE Except portsShift let
proc setupNat(conf: WakuNodeConf): tuple[ip: Option[ValidIpAddress], ta = initTAddress(rpcIp, rpcPort)
tcpPort: Port, rpcServer = newRpcHttpServer([ta])
udpPort: Port] {.gcsafe.} = setupWakuRPC(node, rpcServer)
# defaults rpcServer.start()
result.tcpPort = Port(uint16(conf.tcpPort) + conf.portsShift) info "RPC Server started", ta
result.udpPort = Port(uint16(conf.udpPort) + conf.portsShift)
var nat: NatStrategy proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) =
case conf.nat.toLowerAscii: info "Starting metrics HTTP server", serverIp, serverPort
of "any": metrics.startHttpServer($serverIp, serverPort)
nat = NatAny
of "none":
nat = NatNone
of "upnp":
nat = NatUpnp
of "pmp":
nat = NatPmp
else:
if conf.nat.startsWith("extip:"):
try:
# any required port redirection is assumed to be done by hand
result.ip = some(ValidIpAddress.init(conf.nat[6..^1]))
nat = NatNone
except ValueError:
error "nor a valid IP address", address = conf.nat[6..^1]
quit QuitFailure
else:
error "not a valid NAT mechanism", value = conf.nat
quit QuitFailure
if nat != NatNone: proc startMetricsLog() =
let extIp = getExternalIP(nat) proc logMetrics(udata: pointer) {.closure, gcsafe.} =
if extIP.isSome: {.gcsafe.}:
result.ip = some(ValidIpAddress.init extIp.get) # TODO: libp2p_pubsub_peers is not public, so we need to make this either
# TODO redirectPorts in considered a gcsafety violation # public in libp2p or do our own peer counting after all.
# because it obtains the address of a non-gcsafe proc? let
let extPorts = ({.gcsafe.}: totalMessages = total_messages.value
redirectPorts(tcpPort = result.tcpPort,
udpPort = result.udpPort,
description = clientId))
if extPorts.isSome:
(result.tcpPort, result.udpPort) = extPorts.get()
# TODO Consider removing unused arguments info "Node metrics", totalMessages
proc init*(T: type WakuNode, conf: WakuNodeConf, switch: Switch, discard setTimer(Moment.fromNow(2.seconds), logMetrics)
ip: Option[ValidIpAddress], tcpPort, udpPort: Port, discard setTimer(Moment.fromNow(2.seconds), logMetrics)
privKey: keys.PrivateKey,
peerInfo: PeerInfo): T =
new result
result.switch = switch
result.peerInfo = peerInfo
# TODO Peer pool, discovery, protocol state, etc
proc createWakuNode*(conf: WakuNodeConf): Future[WakuNode] {.async, gcsafe.} = ## Public API
var ##
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
hostAddress = tcpEndPoint(conf.libp2pAddress, Port(uint16(conf.tcpPort) + conf.portsShift))
announcedAddresses = if extIp.isNone(): @[]
else: @[tcpEndPoint(extIp.get(), extTcpPort)]
proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
bindIp: ValidIpAddress, bindPort: Port,
extIp = none[ValidIpAddress](), extPort = none[Port]()): T =
## Creates and starts a Waku node.
let
hostAddress = tcpEndPoint(bindIp, bindPort)
announcedAddresses = if extIp.isNone() or extPort.isNone(): @[]
else: @[tcpEndPoint(extIp.get(), extPort.get())]
peerInfo = PeerInfo.init(nodekey)
info "Initializing networking", hostAddress, info "Initializing networking", hostAddress,
announcedAddresses announcedAddresses
let
nodekey = conf.nodekey
pubkey = nodekey.getKey.get()
keys = KeyPair(seckey: nodekey, pubkey: pubkey)
peerInfo = PeerInfo.init(nodekey)
# XXX: Add this when we create node or start it? # XXX: Add this when we create node or start it?
peerInfo.addrs.add(hostAddress) peerInfo.addrs.add(hostAddress)
var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true) var switch = newStandardSwitch(some(nodekey), hostAddress, triggerSelf = true)
# TODO Either persist WakuNode or something here return WakuNode(switch: switch, peerInfo: peerInfo)
# TODO Look over this proc start*(node: WakuNode) {.async.} =
# XXX Consider asEthKey and asLibp2pKey
result = WakuNode.init(conf, switch, extIp, extTcpPort, extUdpPort, keys.seckey.asEthKey, peerInfo)
proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
node.libp2pTransportLoops = await node.switch.start() node.libp2pTransportLoops = await node.switch.start()
# NOTE WakuRelay is being instantiated as part of creating switch with PubSub field set # NOTE WakuRelay is being instantiated as part of creating switch with PubSub field set
# #
# TODO Mount Waku Store and Waku Filter here # TODO Mount Waku Store and Waku Filter here
# TODO Move out into separate proc
if conf.rpc:
let ta = initTAddress(conf.rpcAddress, Port(conf.rpcPort + conf.portsShift))
var rpcServer = newRpcHttpServer([ta])
setupWakuRPC(node, rpcServer)
rpcServer.start()
info "rpcServer started", ta=ta
# TODO Get this from WakuNode obj # TODO Get this from WakuNode obj
let peerInfo = node.peerInfo let peerInfo = node.peerInfo
let id = peerInfo.peerId.pretty let id = peerInfo.peerId.pretty
@ -181,40 +136,6 @@ proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
## XXX: this should be /ip4..., / stripped? ## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr info "Listening on", full = listenStr
# XXX: So doing this _after_ other setup
# Optionally direct connect with a set of nodes
if conf.staticnodes.len > 0: connectToNodes(node, conf.staticnodes)
# TODO Move out into separate proc
when defined(insecure):
if conf.metricsServer:
let
address = conf.metricsServerAddress
port = conf.metricsServerPort + conf.portsShift
info "Starting metrics HTTP server", address, port
metrics.startHttpServer($address, Port(port))
if conf.logMetrics:
proc logMetrics(udata: pointer) {.closure, gcsafe.} =
{.gcsafe.}:
let
connectedPeers = connected_peers.value
totalMessages = total_messages.value
info "Node metrics", connectedPeers, totalMessages
addTimer(Moment.fromNow(2.seconds), logMetrics)
addTimer(Moment.fromNow(2.seconds), logMetrics)
## Public API
##
proc init*(T: type WakuNode, conf: WakuNodeConf): Future[T] {.async.} =
## Creates and starts a Waku node.
##
let node = await createWakuNode(conf)
await node.start(conf)
return node
# NOTE TopicHandler is defined in pubsub.nim, roughly: # NOTE TopicHandler is defined in pubsub.nim, roughly:
#type TopicHandler* = proc(topic: string, data: seq[byte]) #type TopicHandler* = proc(topic: string, data: seq[byte])
@ -294,6 +215,28 @@ proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
result.messages.insert(msg[1]) result.messages.insert(msg[1])
when isMainModule: when isMainModule:
let conf = WakuNodeConf.load() let
discard WakuNode.init(conf) conf = WakuNodeConf.load()
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
Port(uint16(conf.tcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
node = WakuNode.init(conf.nodeKey, conf.libp2pAddress,
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
waitFor node.start()
if conf.staticnodes.len > 0:
connectToNodes(node, conf.staticnodes)
if conf.rpc:
startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift))
if conf.logMetrics:
startMetricsLog()
when defined(insecure):
if conf.metricsServer:
startMetricsServer(conf.metricsServerAddress,
Port(conf.metricsServerPort + conf.portsShift))
runForever() runForever()