Very hacky start waku on libp2p

Partial import from research repo
This commit is contained in:
Oskar Thoren 2020-04-29 12:49:27 +08:00
parent 435b785093
commit 745e0ba948
6 changed files with 479 additions and 0 deletions

View File

@ -8,3 +8,6 @@ quicksim: node/v0/quicksim.nim
wakunode: node/v0/wakunode.nim
nim c --threads:on -o:build/wakunode node/v0/wakunode.nim
wakunode2: node/v2/wakunode.nim
nim c --threads:on -o:build/wakunode2 node/v2/wakunode.nim

5
node/v2/README.md Normal file
View File

@ -0,0 +1,5 @@
# Waku node v2
Status: pre-alpha
Right now a hacky PoC for running on libp2p.

154
node/v2/config.nim Normal file
View File

@ -0,0 +1,154 @@
import
confutils/defs, chronicles, chronos,
eth/keys, eth/p2p/rlpx_protocols/waku_protocol
type
Fleet* = enum
none
prod
staging
test
WakuNodeConf* = object
logLevel* {.
desc: "Sets the log level."
defaultValue: LogLevel.INFO
name: "log-level" }: LogLevel
tcpPort* {.
desc: "TCP listening port."
defaultValue: 30303
name: "tcp-port" }: uint16
udpPort* {.
desc: "UDP listening port."
defaultValue: 30303
name: "udp-port" }: uint16
portsShift* {.
desc: "Add a shift to all port numbers."
defaultValue: 0
name: "ports-shift" }: uint16
nat* {.
desc: "Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>."
defaultValue: "any" }: string
discovery* {.
desc: "Enable/disable discovery v4."
defaultValue: true
name: "discovery" }: bool
noListen* {.
desc: "Disable listening for incoming peers."
defaultValue: false
name: "no-listen" }: bool
fleet* {.
desc: "Select the fleet to connect to."
defaultValue: Fleet.none
name: "fleet" }: Fleet
bootnodes* {.
desc: "Enode URL to bootstrap P2P discovery with. Argument may be repeated."
name: "bootnode" }: seq[string]
staticnodes* {.
desc: "Enode URL to directly connect with. Argument may be repeated."
name: "staticnode" }: seq[string]
whisper* {.
desc: "Enable the Whisper protocol."
defaultValue: false
name: "whisper" }: bool
whisperBridge* {.
desc: "Enable the Whisper protocol and bridge with Waku protocol."
defaultValue: false
name: "whisper-bridge" }: bool
lightNode* {.
desc: "Run as light node (no message relay).",
defaultValue: false
name: "light-node" }: bool
wakuTopicInterest* {.
desc: "Run as node with a topic-interest",
defaultValue: false
name: "waku-topic-interest" }: bool
wakuPow* {.
desc: "PoW requirement of Waku node.",
defaultValue: 0.002
name: "waku-pow" }: float64
nodekey* {.
desc: "P2P node private key as hex.",
defaultValue: keys.KeyPair.random().tryGet()
name: "nodekey" }: KeyPair
# TODO: Add nodekey file option
bootnodeOnly* {.
desc: "Run only as discovery bootnode."
defaultValue: false
name: "bootnode-only" }: bool
rpc* {.
desc: "Enable Waku RPC server.",
defaultValue: false
name: "rpc" }: bool
rpcAddress* {.
desc: "Listening address of the RPC server.",
defaultValue: parseIpAddress("127.0.0.1")
name: "rpc-address" }: IpAddress
rpcPort* {.
desc: "Listening port of the RPC server.",
defaultValue: 8545
name: "rpc-port" }: uint16
metricsServer* {.
desc: "Enable the metrics server."
defaultValue: false
name: "metrics-server" }: bool
metricsServerAddress* {.
desc: "Listening address of the metrics server."
defaultValue: parseIpAddress("127.0.0.1")
name: "metrics-server-address" }: IpAddress
metricsServerPort* {.
desc: "Listening HTTP port of the metrics server."
defaultValue: 8008
name: "metrics-server-port" }: uint16
logMetrics* {.
desc: "Enable metrics logging."
defaultValue: false
name: "log-metrics" }: bool
# TODO:
# - discv5 + topic register
# - mailserver functionality
proc parseCmdArg*(T: type KeyPair, p: TaintedString): T =
try:
# TODO: add isValidPrivateKey check from Nimbus?
result.seckey = PrivateKey.fromHex(string(p)).tryGet()
result.pubkey = result.seckey.toPublicKey()[]
except CatchableError as e:
raise newException(ConfigurationError, "Invalid private key")
proc completeCmdArg*(T: type KeyPair, val: TaintedString): seq[string] =
return @[]
proc parseCmdArg*(T: type IpAddress, p: TaintedString): T =
try:
result = parseIpAddress(p)
except CatchableError as e:
raise newException(ConfigurationError, "Invalid IP address")
proc completeCmdArg*(T: type IpAddress, val: TaintedString): seq[string] =
return @[]

View File

@ -0,0 +1,3 @@
const wakuVersionStr = "2.0.0-alpha1"
# TODO: Move me to protocol ns

279
node/v2/wakunode.nim Normal file
View File

@ -0,0 +1,279 @@
import
confutils, config, strutils, chronos, json_rpc/rpcserver, metrics,
chronicles/topics_registry, # TODO: What? Need this for setLoglevel, weird.
eth/[keys, p2p, async_utils], eth/common/utils, eth/net/nat,
eth/p2p/[discovery, enode, peer_pool, bootnodes, whispernodes],
eth/p2p/rlpx_protocols/[whisper_protocol, waku_protocol, waku_bridge],
# TODO remove me
../../vendor/nimbus/nimbus/rpc/[wakusim, key_storage],
../../vendor/nim-libp2p/libp2p/standard_setup,
../../vendor/nim-libp2p/libp2p/crypto/crypto,
../../vendor/nim-libp2p/libp2p/protocols/protocol,
../../vendor/nim-libp2p/libp2p/peerinfo,
# TODO: RPC folder
wakurpc2
# TODO: Better aliasing of vendor dirs
# TODO: Something wrong with module imports, "invalid module name" with this
# ../vendor/nim-libp2p/libp2p/[switch, standard_setup, peerinfo, peer, connection,
# multiaddress, multicodec, crypto/crypto, protocols/identify, protocols/protocol]
# key and crypto modules different
type
KeyPair* = crypto.KeyPair
PublicKey* = crypto.PublicKey
PrivateKey* = crypto.PrivateKey
# handler defined in parent object
type WakuProto = ref object of LPProtocol
switch: Switch
conn: Connection
connected: bool
started: bool
const clientId = "Nimbus waku node"
# TODO: We want this to be on top of GossipSub, how does that work?
const WakuCodec = "/vac/waku/2.0.0-alpha0"
let globalListeningAddr = parseIpAddress("0.0.0.0")
proc setBootNodes(nodes: openArray[string]): seq[ENode] =
result = newSeqOfCap[ENode](nodes.len)
for nodeId in nodes:
# TODO: something more user friendly than an expect
result.add(ENode.fromString(nodeId).expect("correct node"))
proc connectToNodes(node: EthereumNode, nodes: openArray[string]) =
for nodeId in nodes:
# TODO: something more user friendly than an assert
let whisperENode = ENode.fromString(nodeId).expect("correct node")
traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode))
# NOTE: Looks almost identical to beacon_chain/eth2_network.nim
proc setupNat(conf: WakuNodeConf): tuple[ip: IpAddress,
tcpPort: Port,
udpPort: Port] =
# defaults
result.ip = globalListeningAddr
result.tcpPort = Port(conf.tcpPort + conf.portsShift)
result.udpPort = Port(conf.udpPort + conf.portsShift)
var nat: NatStrategy
case conf.nat.toLowerAscii():
of "any":
nat = NatAny
of "none":
nat = NatNone
of "upnp":
nat = NatUpnp
of "pmp":
nat = NatPmp
else:
if conf.nat.startsWith("extip:") and isIpAddress(conf.nat[6..^1]):
# any required port redirection is assumed to be done by hand
result.ip = parseIpAddress(conf.nat[6..^1])
nat = NatNone
else:
error "not a valid NAT mechanism, nor a valid IP address", value = conf.nat
quit(QuitFailure)
if nat != NatNone:
let extIP = getExternalIP(nat)
if extIP.isSome:
result.ip = extIP.get()
let extPorts = redirectPorts(tcpPort = result.tcpPort,
udpPort = result.udpPort,
description = clientId)
if extPorts.isSome:
(result.tcpPort, result.udpPort) = extPorts.get()
proc newWakuProto(switch: Switch): WakuProto =
var wakuproto = WakuProto(switch: switch, codec: WakuCodec)
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
let msg = cast[string](await conn.readLp())
await conn.writeLp("Hello!")
await conn.close()
wakuproto.handler = handle
return wakuproto
proc run(config: WakuNodeConf) =
info "libp2p support NYI"
if config.logLevel != LogLevel.NONE:
setLogLevel(config.logLevel)
let
# External TCP and UDP ports
(ip, tcpPort, udpPort) = setupNat(config)
address = Address(ip: ip, tcpPort: tcpPort, udpPort: udpPort)
port = tcpPort
# Using this for now
DefaultAddr = "/ip4/127.0.0.1/tcp/55505"
hostAddress = MultiAddress.init(DefaultAddr)
# Difference between announced and host address relevant for running behind NAT, however doesn't seem like nim-libp2p supports this. GHI?
#
# TODO: Convert config.nodekey eth.key to libp2p.crypto key. Should work with Secp256k1, just need to ensure representation etc is the same. Using random now for spike.
#nodekey = config.nodekey
#keys = crypto.KeyPair(nodekey)
privKey = PrivateKey.random(Secp256k1)
keys = KeyPair(seckey: privKey, pubkey: privKey.getKey())
peerInfo = PeerInfo.init(privKey)
info "Initializing networking (host address and announced same)", address
peerInfo.addrs.add(Multiaddress.init(DefaultAddr))
# XXX: It isn't clear that it is a good idea use dial/connect as RPC
# But let's try it
# More of a hello world thing, we'd want to mount it on to of gossipsub
if config.rpc:
# What is ta? transport address...right.
let ta = initTAddress(config.rpcAddress,
Port(config.rpcPort + config.portsShift))
var rpcServer = newRpcHttpServer([ta])
let keys = newKeyStorage()
# Ok cool no node here
# TODO: Fix me with node etc
#setupWakuRPC(node, keys, rpcServer)
#setupWakuSimRPC(node, rpcServer)
rpcServer.start()
# TODO: Here setup a libp2p node
# Essentially something like this in nbc/eth2_network:
# proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node]
# TODO: Also see beacon_chain/beaconnode, RPC server etc
# Also probably start with floodsub for simplicity
# Slice it up only minimal parts here
# HERE ATM
# config.nodekey = KeyPair.random().tryGet()
# address = set above; ip, tcp and udp port (incl NAT)
# clientId = "Nimbus waku node"
#let network = await createLibP2PNode(conf) # doing in-line
# let rpcServer ...
# Is it a "Standard" Switch? Assume it is for now
var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true, gossip = true)
let wakuProto = newWakuProto(switch)
switch.mount(wakuProto)
# TODO: Make context async
#let fut = await switch.start()
discard switch.start()
wakuProto.started = true
let id = peerInfo.peerId.pretty
info "PeerInfo", id = id, addrs = peerInfo.addrs
let listenStr = $peerInfo.addrs[0] & "/ipfs/" & id
info "Listening on", full = listenStr
# Here directchat uses rwLoop for protocol
# What if we dial here? How dial?
# Feels very ghetto, this hookup
# Once we have a switch and started it, we need to connect the them
# Then we can dial a peer on a protocol
# If we start node 2 this should do:
# let conn = switch.dial(peerInfo, WakuCodec)
# (testswitch.nim) Conn write hello
# Aight so we calling here, but where
# This is why nice to have RPC - how does this look in libp2p?
# Is this doable with RPC in DevP2P? Probably. How? For quicksim
# (quicksim.nim) Basically RPC HTTP Client and connect
# Ok, now what? Quick break. Probably RPC etc? Try to dial, 1:1 style
# Also not actually using GossipSub so far
# We want to dial with another node
# Set-up node
# var node = newEthereumNode(config.nodekey, address, 1, nil, clientId,
# addAllCapabilities = false)
# if not config.bootnodeOnly:
# node.addCapability Waku # Always enable Waku protocol
# var topicInterest: Option[seq[waku_protocol.Topic]]
# var bloom: Option[Bloom]
# if config.wakuTopicInterest:
# var topics: seq[waku_protocol.Topic]
# topicInterest = some(topics)
# else:
# bloom = some(fullBloom())
# let wakuConfig = WakuConfig(powRequirement: config.wakuPow,
# bloom: bloom,
# isLightNode: config.lightNode,
# maxMsgSize: waku_protocol.defaultMaxMsgSize,
# topics: topicInterest)
# node.configureWaku(wakuConfig)
# if config.whisper or config.whisperBridge:
# node.addCapability Whisper
# node.protocolState(Whisper).config.powRequirement = 0.002
# if config.whisperBridge:
# node.shareMessageQueue()
#
# # TODO: Status fleet bootnodes are discv5? That will not work.
# let bootnodes = if config.bootnodes.len > 0: setBootNodes(config.bootnodes)
# elif config.fleet == prod: setBootNodes(StatusBootNodes)
# elif config.fleet == staging: setBootNodes(StatusBootNodesStaging)
# elif config.fleet == test : setBootNodes(StatusBootNodesTest)
# else: @[]
#
# traceAsyncErrors node.connectToNetwork(bootnodes, not config.noListen,
# config.discovery)
#
# if not config.bootnodeOnly:
# # Optionally direct connect with a set of nodes
# if config.staticnodes.len > 0: connectToNodes(node, config.staticnodes)
# elif config.fleet == prod: connectToNodes(node, WhisperNodes)
# elif config.fleet == staging: connectToNodes(node, WhisperNodesStaging)
# elif config.fleet == test: connectToNodes(node, WhisperNodesTest)
#
# if config.rpc:
# let ta = initTAddress(config.rpcAddress,
# Port(config.rpcPort + config.portsShift))
# var rpcServer = newRpcHttpServer([ta])
# let keys = newKeyStorage()
# setupWakuRPC(node, keys, rpcServer)
# setupWakuSimRPC(node, rpcServer)
# rpcServer.start()
#
# when defined(insecure):
# if config.metricsServer:
# let
# address = config.metricsServerAddress
# port = config.metricsServerPort + config.portsShift
# info "Starting metrics HTTP server", address, port
# metrics.startHttpServer($address, Port(port))
#
# if config.logMetrics:
# proc logMetrics(udata: pointer) {.closure, gcsafe.} =
# {.gcsafe.}:
# let
# connectedPeers = connected_peers.value
# validEnvelopes = waku_protocol.valid_envelopes.value
# invalidEnvelopes = waku_protocol.dropped_expired_envelopes.value +
# waku_protocol.dropped_from_future_envelopes.value +
# waku_protocol.dropped_low_pow_envelopes.value +
# waku_protocol.dropped_too_large_envelopes.value +
# waku_protocol.dropped_bloom_filter_mismatch_envelopes.value +
# waku_protocol.dropped_topic_mismatch_envelopes.value +
# waku_protocol.dropped_benign_duplicate_envelopes.value +
# waku_protocol.dropped_duplicate_envelopes.value
#
# info "Node metrics", connectedPeers, validEnvelopes, invalidEnvelopes
# addTimer(Moment.fromNow(2.seconds), logMetrics)
# addTimer(Moment.fromNow(2.seconds), logMetrics)
#
runForever()
when isMainModule:
let conf = WakuNodeConf.load()
run(conf)

35
node/v2/wakurpc2.nim Normal file
View File

@ -0,0 +1,35 @@
import
json_rpc/rpcserver, tables, options,
eth/[common, rlp, keys, p2p],
#DevP2P impl
#eth/p2p/rlpx_protocols/waku_protocol,
waku2_protocol,
nimcrypto/[sysrand, hmac, sha2, pbkdf2],
../../vendor/nimbus/nimbus/rpc/rpc_types,
../../vendor/nimbus/nimbus/rpc/hexstrings,
../../vendor/nimbus/nimbus/rpc/key_storage
from stew/byteutils import hexToSeqByte, hexToByteArray
# Instead of using rlpx waku_protocol here, lets do mock waku2_protocol
# This should wrap GossipSub, not use EthereumNode here
# Blatant copy of Whisper RPC but for the Waku protocol
# XXX: Wrong, also what is wakuVersionStr?
# We also have rlpx protocol here waku_protocol
proc setupWakuRPC*(node: EthereumNode, keys: KeyStorage, rpcsrv: RpcServer) =
# Seems easy enough, lets try to get this first
rpcsrv.rpc("waku_version") do() -> string:
## Returns string of the current whisper protocol version.
# TODO: Should read from waku2_protocol I think
#result = wakuVersionStr
result = "2.0.0-alpha0x"
# TODO: Dial/Connect
# XXX: Though wrong layer for that - wait how does this work in devp2p sim?
# We connect to nodes there, should be very similar here
# We can also do this from nim-waku repo