mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
Wakuv1v2 process (#238)
* Add start of wakubridge with only waku v1 node for now * Add waku v2 node to wakubridge * Add bridge target to makefile * Keep waku v1 PoW configurable * Fix for latest WakuNode API * Fix Makefile target all * Rename to config_bridge and at brief docs on bridge Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
parent
5ddd8701c9
commit
dcbefbf4d6
7
Makefile
7
Makefile
@ -26,6 +26,7 @@ DOCKER_IMAGE_NIM_PARAMS ?= -d:chronicles_colors:none -d:insecure
|
||||
wakunode2 \
|
||||
example1 \
|
||||
example2 \
|
||||
bridge \
|
||||
test \
|
||||
clean \
|
||||
libbacktrace
|
||||
@ -46,7 +47,7 @@ GIT_SUBMODULE_UPDATE := git submodule update --init --recursive
|
||||
else # "variables.mk" was included. Business as usual until the end of this file.
|
||||
|
||||
# default target, because it's the first one that doesn't start with '.'
|
||||
all: | wakunode1 sim1 example1 wakunode2 sim2 example2 chat2
|
||||
all: | wakunode1 sim1 example1 wakunode2 sim2 example2 chat2 bridge
|
||||
|
||||
# must be included after the default target
|
||||
-include $(BUILD_SYSTEM_DIR)/makefiles/targets.mk
|
||||
@ -112,6 +113,10 @@ chat2: | build deps
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims
|
||||
|
||||
bridge: | build deps
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim bridge $(NIM_PARAMS) waku.nims
|
||||
|
||||
# Builds and run the test suite (Waku v1 + v2)
|
||||
test: | test1 test2
|
||||
|
||||
|
||||
@ -15,11 +15,28 @@ By specifying `staticnode` it connects to that node subscribes to the `waku` top
|
||||
|
||||
Then type messages to publish.
|
||||
|
||||
## Interactively add a node
|
||||
|
||||
There is also an interactive mode. Type `/connect` then paste address of other node. However, this currently has some timing issues with mesh not being updated, so it is adviced not to use this until this has been addressed. See https://github.com/status-im/nim-waku/issues/231
|
||||
|
||||
## Dingpu cluster node
|
||||
|
||||
```
|
||||
/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS
|
||||
```
|
||||
## Interactively add a node
|
||||
|
||||
There is also an interactive mode. Type `/connect` then paste address of other node. However, this currently has some timing issues with mesh not being updated, so it is adviced not to use this until this has been addressed. See https://github.com/status-im/nim-waku/issues/231
|
||||
## Run a node
|
||||
|
||||
To just run a node and not interact on the chat it is enough to run `wakunode2`:
|
||||
```
|
||||
./build/wakunode2 --staticnode:<multiaddr>
|
||||
```
|
||||
|
||||
You can also run the `wakubridge` process, which currently runs both a Waku v1
|
||||
and Waku v2 node. Currently, it has the same effect as running a `wakunode` and
|
||||
`wakunode2` process separately, but bridging functionality will be added later
|
||||
to this application.
|
||||
|
||||
```
|
||||
./build/wakubridge --staticnodev2:<multiaddr> --fleetv1:test
|
||||
```
|
||||
|
||||
@ -82,3 +82,6 @@ task chat2, "Build example Waku v2 chat usage":
|
||||
# output to STDOUT. Can be fixed by redirecting logs to file (e.g.)
|
||||
#buildBinary name, "examples/v2/", "-d:chronicles_log_level=WARN"
|
||||
buildBinary name, "examples/v2/", "-d:chronicles_log_level=DEBUG"
|
||||
|
||||
task bridge, "Build Waku v1 - v2 bridge":
|
||||
buildBinary "wakubridge", "waku/node/", "-d:chronicles_log_level=DEBUG"
|
||||
|
||||
170
waku/node/config_bridge.nim
Normal file
170
waku/node/config_bridge.nim
Normal file
@ -0,0 +1,170 @@
|
||||
import
|
||||
confutils, confutils/defs, confutils/std/net, chronicles, chronos,
|
||||
libp2p/crypto/[crypto, secp],
|
||||
eth/keys
|
||||
|
||||
type
|
||||
FleetV1* = enum
|
||||
none
|
||||
prod
|
||||
staging
|
||||
test
|
||||
|
||||
WakuNodeConf* = object
|
||||
logLevel* {.
|
||||
desc: "Sets the log level"
|
||||
defaultValue: LogLevel.INFO
|
||||
name: "log-level" .}: LogLevel
|
||||
|
||||
listenAddress* {.
|
||||
defaultValue: defaultListenAddress(config)
|
||||
desc: "Listening address for the LibP2P traffic"
|
||||
name: "listen-address"}: ValidIpAddress
|
||||
|
||||
libp2pTcpPort* {.
|
||||
desc: "Libp2p TCP listening port (for Waku v2)"
|
||||
defaultValue: 9000
|
||||
name: "libp2p-tcp-port" .}: uint16
|
||||
|
||||
devp2pTcpPort* {.
|
||||
desc: "Devp2p TCP listening port (for Waku v1)"
|
||||
defaultValue: 30303
|
||||
name: "devp2p-tcp-port" .}: uint16
|
||||
|
||||
udpPort* {.
|
||||
desc: "UDP listening port"
|
||||
defaultValue: 9000
|
||||
name: "udp-port" .}: uint16
|
||||
|
||||
portsShift* {.
|
||||
desc: "Add a shift to all default port numbers"
|
||||
defaultValue: 0
|
||||
name: "ports-shift" .}: uint16
|
||||
|
||||
nat* {.
|
||||
desc: "Specify method to use for determining public address. " &
|
||||
"Must be one of: any, none, upnp, pmp, extip:<IP>"
|
||||
defaultValue: "any" .}: string
|
||||
|
||||
rpc* {.
|
||||
desc: "Enable Waku RPC server"
|
||||
defaultValue: false
|
||||
name: "rpc" .}: bool
|
||||
|
||||
rpcAddress* {.
|
||||
desc: "Listening address of the RPC server",
|
||||
defaultValue: ValidIpAddress.init("127.0.0.1")
|
||||
name: "rpc-address" }: ValidIpAddress
|
||||
|
||||
rpcPort* {.
|
||||
desc: "Listening port of the RPC server"
|
||||
defaultValue: 8545
|
||||
name: "rpc-port" .}: uint16
|
||||
|
||||
metricsServer* {.
|
||||
desc: "Enable the metrics server"
|
||||
defaultValue: false
|
||||
name: "metrics-server" .}: bool
|
||||
|
||||
metricsServerAddress* {.
|
||||
desc: "Listening address of the metrics server"
|
||||
defaultValue: ValidIpAddress.init("127.0.0.1")
|
||||
name: "metrics-server-address" }: ValidIpAddress
|
||||
|
||||
metricsServerPort* {.
|
||||
desc: "Listening HTTP port of the metrics server"
|
||||
defaultValue: 8008
|
||||
name: "metrics-server-port" .}: uint16
|
||||
|
||||
### Waku v1 options
|
||||
fleetv1* {.
|
||||
desc: "Select the Waku v1 fleet to connect to"
|
||||
defaultValue: FleetV1.none
|
||||
name: "fleetv1" .}: FleetV1
|
||||
|
||||
staticnodesv1* {.
|
||||
desc: "Enode URL to directly connect with. Argument may be repeated"
|
||||
name: "staticnodev1" .}: seq[string]
|
||||
|
||||
nodekeyv1* {.
|
||||
desc: "DevP2P node private key as hex",
|
||||
# TODO: can the rng be passed in somehow via Load?
|
||||
defaultValue: keys.KeyPair.random(keys.newRng()[])
|
||||
name: "nodekeyv1" .}: keys.KeyPair
|
||||
|
||||
wakuPow* {.
|
||||
desc: "PoW requirement of Waku node.",
|
||||
defaultValue: 0.002
|
||||
name: "waku-pow" .}: float64
|
||||
|
||||
### Waku v2 options
|
||||
staticnodesv2* {.
|
||||
desc: "Multiaddr of peer to directly connect with. Argument may be repeated"
|
||||
name: "staticnodev2" }: seq[string]
|
||||
|
||||
nodekeyv2* {.
|
||||
desc: "P2P node private key as hex"
|
||||
defaultValue: crypto.PrivateKey.random(Secp256k1, keys.newRng()[]).tryGet()
|
||||
name: "nodekeyv2" }: crypto.PrivateKey
|
||||
|
||||
topics* {.
|
||||
desc: "Default topics to subscribe to (space seperated list)"
|
||||
defaultValue: "waku"
|
||||
name: "topics" .}: string
|
||||
|
||||
store* {.
|
||||
desc: "Flag whether to start store protocol",
|
||||
defaultValue: false
|
||||
name: "store" }: bool
|
||||
|
||||
filter* {.
|
||||
desc: "Flag whether to start filter protocol",
|
||||
defaultValue: false
|
||||
name: "filter" }: bool
|
||||
|
||||
relay* {.
|
||||
desc: "Flag whether to start relay protocol",
|
||||
defaultValue: true
|
||||
name: "relay" }: bool
|
||||
|
||||
storenode* {.
|
||||
desc: "Multiaddr of peer to connect with for waku store protocol"
|
||||
defaultValue: ""
|
||||
name: "storenode" }: string
|
||||
|
||||
filternode* {.
|
||||
desc: "Multiaddr of peer to connect with for waku filter protocol"
|
||||
defaultValue: ""
|
||||
name: "filternode" }: string
|
||||
|
||||
proc parseCmdArg*(T: type keys.KeyPair, p: TaintedString): T =
|
||||
try:
|
||||
let privkey = keys.PrivateKey.fromHex(string(p)).tryGet()
|
||||
result = privkey.toKeyPair()
|
||||
except CatchableError:
|
||||
raise newException(ConfigurationError, "Invalid private key")
|
||||
|
||||
proc completeCmdArg*(T: type keys.KeyPair, val: TaintedString): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T =
|
||||
let key = SkPrivateKey.init(p)
|
||||
if key.isOk():
|
||||
crypto.PrivateKey(scheme: Secp256k1, skkey: key.get())
|
||||
else:
|
||||
raise newException(ConfigurationError, "Invalid private key")
|
||||
|
||||
proc completeCmdArg*(T: type crypto.PrivateKey, val: TaintedString): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type ValidIpAddress, p: TaintedString): T =
|
||||
try:
|
||||
result = ValidIpAddress.init(p)
|
||||
except CatchableError:
|
||||
raise newException(ConfigurationError, "Invalid IP address")
|
||||
|
||||
proc completeCmdArg*(T: type ValidIpAddress, val: TaintedString): seq[string] =
|
||||
return @[]
|
||||
|
||||
func defaultListenAddress*(conf: WakuNodeConf): ValidIpAddress =
|
||||
(static ValidIpAddress.init("0.0.0.0"))
|
||||
@ -1,5 +1,5 @@
|
||||
import
|
||||
std/[options, tables],
|
||||
std/[options, tables, strutils],
|
||||
chronos, chronicles, stew/shims/net as stewNet,
|
||||
# TODO: Why do we need eth keys?
|
||||
eth/keys,
|
||||
@ -13,6 +13,8 @@ import
|
||||
../../protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier],
|
||||
./waku_types
|
||||
|
||||
export waku_types
|
||||
|
||||
logScope:
|
||||
topics = "wakunode"
|
||||
|
||||
@ -230,54 +232,50 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async
|
||||
# Can also move this to the start proc, possibly wiser?
|
||||
discard node.subscribe(topic, handler)
|
||||
|
||||
## Helpers
|
||||
proc parsePeerInfo(address: string): PeerInfo =
|
||||
let multiAddr = MultiAddress.initAddress(address)
|
||||
let parts = address.split("/")
|
||||
return PeerInfo.init(parts[^1], [multiAddr])
|
||||
|
||||
proc dialPeer*(n: WakuNode, address: string) {.async.} =
|
||||
info "dialPeer", address = address
|
||||
# XXX: This turns ipfs into p2p, not quite sure why
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
||||
info "Dialing peer", ma = remotePeer.addrs[0]
|
||||
# NOTE This is dialing on WakuRelay protocol specifically
|
||||
# TODO Keep track of conn and connected state somewhere (WakuRelay?)
|
||||
#p.conn = await p.switch.dial(remotePeer, WakuRelayCodec)
|
||||
#p.connected = true
|
||||
discard n.switch.dial(remotePeer, WakuRelayCodec)
|
||||
info "Post switch dial"
|
||||
|
||||
proc setStorePeer*(n: WakuNode, address: string) =
|
||||
info "dialPeer", address = address
|
||||
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
||||
n.wakuStore.setPeer(remotePeer)
|
||||
|
||||
proc setFilterPeer*(n: WakuNode, address: string) =
|
||||
info "dialPeer", address = address
|
||||
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
||||
n.wakuFilter.setPeer(remotePeer)
|
||||
|
||||
proc connectToNodes*(n: WakuNode, nodes: openArray[string]) =
|
||||
for nodeId in nodes:
|
||||
info "connectToNodes", node = nodeId
|
||||
# XXX: This seems...brittle
|
||||
discard dialPeer(n, nodeId)
|
||||
|
||||
when isMainModule:
|
||||
import
|
||||
std/strutils,
|
||||
confutils, json_rpc/rpcserver, metrics,
|
||||
./config, ./rpc/wakurpc, ../common
|
||||
|
||||
proc parsePeerInfo(address: string): PeerInfo =
|
||||
let multiAddr = MultiAddress.initAddress(address)
|
||||
let parts = address.split("/")
|
||||
return PeerInfo.init(parts[^1], [multiAddr])
|
||||
|
||||
proc dialPeer(n: WakuNode, address: string) {.async.} =
|
||||
info "dialPeer", address = address
|
||||
# XXX: This turns ipfs into p2p, not quite sure why
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
||||
info "Dialing peer", ma = remotePeer.addrs[0]
|
||||
# NOTE This is dialing on WakuRelay protocol specifically
|
||||
# TODO Keep track of conn and connected state somewhere (WakuRelay?)
|
||||
#p.conn = await p.switch.dial(remotePeer, WakuRelayCodec)
|
||||
#p.connected = true
|
||||
discard n.switch.dial(remotePeer, WakuRelayCodec)
|
||||
info "Post switch dial"
|
||||
|
||||
proc setStorePeer(n: WakuNode, address: string) =
|
||||
info "dialPeer", address = address
|
||||
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
||||
n.wakuStore.setPeer(remotePeer)
|
||||
|
||||
proc setFilterPeer(n: WakuNode, address: string) =
|
||||
info "dialPeer", address = address
|
||||
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
||||
n.wakuFilter.setPeer(remotePeer)
|
||||
|
||||
proc connectToNodes(n: WakuNode, nodes: openArray[string]) =
|
||||
for nodeId in nodes:
|
||||
info "connectToNodes", node = nodeId
|
||||
# XXX: This seems...brittle
|
||||
discard dialPeer(n, nodeId)
|
||||
# Waku 1
|
||||
# let whisperENode = ENode.fromString(nodeId).expect("correct node")
|
||||
# traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode))
|
||||
|
||||
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port) =
|
||||
let
|
||||
ta = initTAddress(rpcIp, rpcPort)
|
||||
|
||||
133
waku/node/wakubridge.nim
Normal file
133
waku/node/wakubridge.nim
Normal file
@ -0,0 +1,133 @@
|
||||
import
|
||||
std/strutils,
|
||||
chronos, confutils, chronicles, chronicles/topics_registry, metrics,
|
||||
stew/shims/net as stewNet, json_rpc/rpcserver,
|
||||
# Waku v1 imports
|
||||
eth/[keys, p2p], eth/common/utils,
|
||||
eth/p2p/[enode, whispernodes],
|
||||
../protocol/v1/waku_protocol, ./common,
|
||||
./v1/rpc/[waku, wakusim, key_storage], ./v1/waku_helpers,
|
||||
# Waku v2 imports
|
||||
libp2p/crypto/crypto,
|
||||
./v2/wakunode2,
|
||||
./v2/rpc/wakurpc,
|
||||
# Common cli config
|
||||
./config_bridge
|
||||
|
||||
const clientIdV1 = "nim-waku v1 node"
|
||||
|
||||
proc startWakuV1(config: WakuNodeConf, rng: ref BrHmacDrbgContext):
|
||||
EthereumNode =
|
||||
let
|
||||
(ipExt, _, _) = setupNat(config.nat, clientIdV1,
|
||||
Port(config.devp2pTcpPort + config.portsShift),
|
||||
Port(config.udpPort + config.portsShift))
|
||||
# TODO: EthereumNode should have a better split of binding address and
|
||||
# external address. Also, can't have different ports as it stands now.
|
||||
address = if ipExt.isNone():
|
||||
Address(ip: parseIpAddress("0.0.0.0"),
|
||||
tcpPort: Port(config.devp2pTcpPort + config.portsShift),
|
||||
udpPort: Port(config.udpPort + config.portsShift))
|
||||
else:
|
||||
Address(ip: ipExt.get(),
|
||||
tcpPort: Port(config.devp2pTcpPort + config.portsShift),
|
||||
udpPort: Port(config.udpPort + config.portsShift))
|
||||
|
||||
# Set-up node
|
||||
var node = newEthereumNode(config.nodekeyv1, address, 1, nil, clientIdV1,
|
||||
addAllCapabilities = false, rng = rng)
|
||||
node.addCapability Waku # Always enable Waku protocol
|
||||
# Set up the Waku configuration.
|
||||
# This node is being set up as a bridge so it gets configured as a node with
|
||||
# a full bloom filter so that it will receive and forward all messages.
|
||||
# TODO: What is the PoW setting now?
|
||||
let wakuConfig = WakuConfig(powRequirement: config.wakuPow,
|
||||
bloom: some(fullBloom()), isLightNode: false,
|
||||
maxMsgSize: waku_protocol.defaultMaxMsgSize,
|
||||
topics: none(seq[waku_protocol.Topic]))
|
||||
node.configureWaku(wakuConfig)
|
||||
|
||||
# Optionally direct connect with a set of nodes
|
||||
if config.staticnodesv1.len > 0: connectToNodes(node, config.staticnodesv1)
|
||||
elif config.fleetv1 == prod: connectToNodes(node, WhisperNodes)
|
||||
elif config.fleetv1 == staging: connectToNodes(node, WhisperNodesStaging)
|
||||
elif config.fleetv1 == test: connectToNodes(node, WhisperNodesTest)
|
||||
|
||||
let connectedFut = node.connectToNetwork(@[],
|
||||
true, # Always enable listening
|
||||
false # Disable discovery (only discovery v4 is currently supported)
|
||||
)
|
||||
connectedFut.callback = proc(data: pointer) {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
if connectedFut.failed:
|
||||
fatal "connectToNetwork failed", msg = connectedFut.readError.msg
|
||||
quit(1)
|
||||
|
||||
return node
|
||||
|
||||
proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} =
|
||||
let
|
||||
(extIp, extTcpPort, _) = setupNat(config.nat, clientId,
|
||||
Port(uint16(config.libp2pTcpPort) + config.portsShift),
|
||||
Port(uint16(config.udpPort) + config.portsShift))
|
||||
node = WakuNode.init(config.nodeKeyv2, config.listenAddress,
|
||||
Port(uint16(config.libp2pTcpPort) + config.portsShift), extIp, extTcpPort)
|
||||
|
||||
await node.start()
|
||||
|
||||
if config.store:
|
||||
mountStore(node)
|
||||
|
||||
if config.filter:
|
||||
mountFilter(node)
|
||||
|
||||
if config.relay:
|
||||
waitFor mountRelay(node, config.topics.split(" "))
|
||||
|
||||
if config.staticnodesv2.len > 0:
|
||||
connectToNodes(node, config.staticnodesv2)
|
||||
|
||||
if config.storenode != "":
|
||||
setStorePeer(node, config.storenode)
|
||||
|
||||
if config.filternode != "":
|
||||
setFilterPeer(node, config.filternode)
|
||||
|
||||
return node
|
||||
|
||||
when isMainModule:
|
||||
let
|
||||
rng = keys.newRng()
|
||||
let conf = WakuNodeConf.load()
|
||||
|
||||
if conf.logLevel != LogLevel.NONE:
|
||||
setLogLevel(conf.logLevel)
|
||||
|
||||
let
|
||||
nodev1 = startWakuV1(conf, rng)
|
||||
nodev2 = waitFor startWakuV2(conf)
|
||||
|
||||
if conf.rpc:
|
||||
let ta = initTAddress(conf.rpcAddress,
|
||||
Port(conf.rpcPort + conf.portsShift))
|
||||
var rpcServer = newRpcHttpServer([ta])
|
||||
# Waku v1 RPC
|
||||
# TODO: Commented out the Waku v1 RPC calls as there is a conflict because
|
||||
# of exact same named rpc calls between v1 and v2
|
||||
# let keys = newKeyStorage()
|
||||
# setupWakuRPC(nodev1, keys, rpcServer, rng)
|
||||
setupWakuSimRPC(nodev1, rpcServer)
|
||||
# Waku v2 rpc
|
||||
setupWakuRPC(nodev2, rpcServer)
|
||||
|
||||
rpcServer.start()
|
||||
|
||||
when defined(insecure):
|
||||
if conf.metricsServer:
|
||||
let
|
||||
address = conf.metricsServerAddress
|
||||
port = conf.metricsServerPort + conf.portsShift
|
||||
info "Starting metrics HTTP server", address, port
|
||||
metrics.startHttpServer($address, Port(port))
|
||||
|
||||
runForever()
|
||||
Loading…
x
Reference in New Issue
Block a user