nwaku/waku/common/wakubridge.nim

196 lines
6.4 KiB
Nim

import
std/[strutils, tables],
chronos, confutils, chronicles, chronicles/topics_registry, metrics,
stew/endians2,
stew/shims/net as stewNet, json_rpc/rpcserver,
# Waku v1 imports
eth/[keys, p2p], eth/common/utils,
eth/p2p/[enode, whispernodes],
../v1/protocol/waku_protocol,
./utils/nat,
../v1/node/rpc/wakusim,
../v1/node/rpc/waku,
../v1/node/rpc/key_storage,
../v1/node/waku_helpers,
# Waku v2 imports
libp2p/crypto/crypto,
../v2/protocol/waku_filter/waku_filter_types,
../v2/node/wakunode2,
../v2/node/jsonrpc/[debug_api,
filter_api,
relay_api,
store_api],
# Common cli config
./config_bridge
const
clientIdV1 = "nim-waku v1 node"
defaultBridgeTopic = "/waku/2/default-bridge/proto"
defaultTTL = 5'u32
proc toWakuMessage(env: Envelope): WakuMessage =
# Translate a Waku v1 envelope to a Waku v2 message
WakuMessage(payload: env.data,
contentTopic: ContentTopic(uint32.fromBytes(env.topic)),
version: 1)
proc toWakuV2(env: Envelope, nodev2: WakuNode) {.async.} =
await nodev2.publish(defaultBridgeTopic, env.toWakuMessage())
proc toWakuV1(msg: WakuMessage, nodev1: EthereumNode) {.gcsafe.} =
discard nodev1.postMessage(ttl = defaultTTL,
topic = msg.contentTopic.toBytes(),
payload = msg.payload)
proc startWakuV1(config: WakuNodeConf, rng: ref BrHmacDrbgContext):
EthereumNode =
let
(ipExt, _, _) = setupNat(config.nat, clientIdV1,
Port(config.devp2pTcpPort + config.portsShift),
Port(config.udpPort + config.portsShift))
# TODO: EthereumNode should have a better split of binding address and
# external address. Also, can't have different ports as it stands now.
address = if ipExt.isNone():
Address(ip: parseIpAddress("0.0.0.0"),
tcpPort: Port(config.devp2pTcpPort + config.portsShift),
udpPort: Port(config.udpPort + config.portsShift))
else:
Address(ip: ipExt.get(),
tcpPort: Port(config.devp2pTcpPort + config.portsShift),
udpPort: Port(config.udpPort + config.portsShift))
# Set-up node
var node = newEthereumNode(config.nodekeyv1, address, NetworkId(1), nil, clientIdV1,
addAllCapabilities = false, rng = rng)
node.addCapability Waku # Always enable Waku protocol
# Set up the Waku configuration.
# This node is being set up as a bridge so it gets configured as a node with
# a full bloom filter so that it will receive and forward all messages.
# TODO: What is the PoW setting now?
let wakuConfig = WakuConfig(powRequirement: config.wakuPow,
bloom: some(fullBloom()), isLightNode: false,
maxMsgSize: waku_protocol.defaultMaxMsgSize,
topics: none(seq[waku_protocol.Topic]))
node.configureWaku(wakuConfig)
# Optionally direct connect with a set of nodes
if config.staticnodesv1.len > 0: connectToNodes(node, config.staticnodesv1)
elif config.fleetv1 == prod: connectToNodes(node, WhisperNodes)
elif config.fleetv1 == staging: connectToNodes(node, WhisperNodesStaging)
elif config.fleetv1 == test: connectToNodes(node, WhisperNodesTest)
let connectedFut = node.connectToNetwork(@[],
true, # Always enable listening
false # Disable discovery (only discovery v4 is currently supported)
)
connectedFut.callback = proc(data: pointer) {.gcsafe.} =
{.gcsafe.}:
if connectedFut.failed:
fatal "connectToNetwork failed", msg = connectedFut.readError.msg
quit(1)
return node
proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} =
let
(extIp, extTcpPort, _) = setupNat(config.nat, clientId,
Port(uint16(config.libp2pTcpPort) + config.portsShift),
Port(uint16(config.udpPort) + config.portsShift))
node = WakuNode.init(config.nodeKeyv2, config.listenAddress,
Port(uint16(config.libp2pTcpPort) + config.portsShift), extIp, extTcpPort)
await node.start()
if config.store:
mountStore(node)
if config.filter:
mountFilter(node)
if config.relay:
mountRelay(node, config.topics.split(" "))
if config.staticnodesv2.len > 0:
waitFor connectToNodes(node, config.staticnodesv2)
if config.storenode != "":
setStorePeer(node, config.storenode)
if config.filternode != "":
setFilterPeer(node, config.filternode)
return node
when isMainModule:
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) =
installDebugApiHandlers(node, rpcServer)
# Install enabled API handlers:
if conf.relay:
let topicCache = newTable[string, seq[WakuMessage]]()
installRelayApiHandlers(node, rpcServer, topicCache)
if conf.filter:
let messageCache = newTable[ContentTopic, seq[WakuMessage]]()
installFilterApiHandlers(node, rpcServer, messageCache)
if conf.store:
installStoreApiHandlers(node, rpcServer)
rpcServer.start()
let
rng = keys.newRng()
let conf = WakuNodeConf.load()
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
var
nodev1 {.threadvar.}: EthereumNode
nodev2 {.threadvar.}: WakuNode
nodev1 = startWakuV1(conf, rng)
nodev2 = waitFor startWakuV2(conf)
# Handle messages on Waku v1 and bridge to Waku v2
proc handleEnvReceived(envelope: Envelope) {.gcsafe.} =
debug "Bridging envelope from V1 to V2", envelope=envelope
waitFor envelope.toWakuV2(nodev2)
nodev1.registerEnvReceivedHandler(handleEnvReceived)
# Handle messages on Waku v2 and bridge to Waku v1
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
debug "Bridging message from V2 to V1", msg=msg[]
msg[].toWakuV1(nodev1)
nodev2.subscribe(defaultBridgeTopic, relayHandler)
if conf.rpc:
let ta = initTAddress(conf.rpcAddress,
Port(conf.rpcPort + conf.portsShift))
var rpcServer = newRpcHttpServer([ta])
# Waku v1 RPC
let keys = newKeyStorage()
setupWakuRPC(nodev1, keys, rpcServer, rng)
setupWakuSimRPC(nodev1, rpcServer)
# Waku v2 rpc
startV2Rpc(nodev2, rpcServer, conf)
rpcServer.start()
when defined(insecure):
if conf.metricsServer:
let
address = conf.metricsServerAddress
port = conf.metricsServerPort + conf.portsShift
info "Starting metrics HTTP server", address, port
metrics.startHttpServer($address, Port(port))
runForever()