import std/tables, chronos, confutils, chronicles, chronicles/topics_registry, metrics, stew/[byteutils, objects], stew/shims/net as stewNet, json_rpc/rpcserver, # Waku v1 imports eth/[keys, p2p], eth/common/utils, eth/p2p/enode, ../v1/protocol/waku_protocol, # Waku v2 imports libp2p/crypto/crypto, ../v2/protocol/waku_filter/waku_filter_types, ../v2/node/wakunode2, # Common cli config ./config_bridge declarePublicCounter waku_bridge_transfers, "Number of messages transferred between Waku v1 and v2 networks", ["type"] logScope: topics = "wakubridge" ################## # Default values # ################## const defaultBridgeTopic* = "/waku/2/default-bridge/proto" clientIdV1 = "nim-waku v1 node" defaultTTL = 5'u32 ######### # Types # ######### type WakuBridge* = ref object of RootObj nodev1*: EthereumNode nodev2*: WakuNode ################### # Helper funtions # ################### func toWakuMessage(env: Envelope): WakuMessage = # Translate a Waku v1 envelope to a Waku v2 message WakuMessage(payload: env.data, contentTopic: ContentTopic(string.fromBytes(env.topic)), version: 1) proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = waku_bridge_transfers.inc(labelValues = ["v1_to_v2"]) await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage()) proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} = waku_bridge_transfers.inc(labelValues = ["v2_to_v1"]) # @TODO: use namespacing to map v2 contentTopics to v1 topics let v1TopicSeq = msg.contentTopic.toBytes()[0..3] discard bridge.nodev1.postMessage(ttl = defaultTTL, topic = toArray(4, v1TopicSeq), payload = msg.payload) ############## # Public API # ############## proc new*(T: type WakuBridge, # NodeV1 initialisation nodev1Key: keys.KeyPair, nodev1Address: Address, powRequirement = 0.002, rng: ref BrHmacDrbgContext, # NodeV2 initialisation nodev2Key: crypto.PrivateKey, nodev2BindIp: ValidIpAddress, nodev2BindPort: Port, nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port]()): T = # Setup Waku v1 node var nodev1 = newEthereumNode(keys = nodev1Key, address = nodev1Address, networkId = NetworkId(1), chain = nil, clientId = clientIdV1, addAllCapabilities = false, rng = rng) nodev1.addCapability Waku # Always enable Waku protocol # Setup the Waku configuration. # This node is being set up as a bridge so it gets configured as a node with # a full bloom filter so that it will receive and forward all messages. # TODO: What is the PoW setting now? let wakuConfig = WakuConfig(powRequirement: powRequirement, bloom: some(fullBloom()), isLightNode: false, maxMsgSize: waku_protocol.defaultMaxMsgSize, topics: none(seq[waku_protocol.Topic])) nodev1.configureWaku(wakuConfig) # Setup Waku v2 node let nodev2 = WakuNode.init(nodev2Key, nodev2BindIp, nodev2BindPort, nodev2ExtIp, nodev2ExtPort) return WakuBridge(nodev1: nodev1, nodev2: nodev2) proc start*(bridge: WakuBridge) {.async.} = info "Starting WakuBridge" debug "Start listening on Waku v1" # Start listening on Waku v1 node let connectedFut = bridge.nodev1.connectToNetwork(@[], true, # Always enable listening false # Disable discovery (only discovery v4 is currently supported) ) connectedFut.callback = proc(data: pointer) {.gcsafe.} = {.gcsafe.}: if connectedFut.failed: fatal "connectToNetwork failed", msg = connectedFut.readError.msg quit(1) # Start Waku v2 node debug "Start listening on Waku v2" await bridge.nodev2.start() bridge.nodev2.mountRelay() # Always mount relay for bridge # Bridging # Handle messages on Waku v1 and bridge to Waku v2 proc handleEnvReceived(envelope: Envelope) {.gcsafe.} = trace "Bridging envelope from V1 to V2", envelope=envelope waitFor bridge.toWakuV2(envelope) bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived) # Handle messages on Waku v2 and bridge to Waku v1 proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) if msg.isOk(): trace "Bridging message from V2 to V1", msg=msg[] bridge.toWakuV1(msg[]) bridge.nodev2.subscribe(defaultBridgeTopic, relayHandler) proc stop*(bridge: WakuBridge) {.async.} = await bridge.nodev2.stop() when isMainModule: import eth/p2p/whispernodes, ./utils/nat, ../v1/node/rpc/wakusim, ../v1/node/rpc/waku, ../v1/node/rpc/key_storage, ../v1/node/waku_helpers, ../v2/node/jsonrpc/[debug_api, filter_api, relay_api, store_api] proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) = installDebugApiHandlers(node, rpcServer) # Install enabled API handlers: if conf.relay: let topicCache = newTable[string, seq[WakuMessage]]() installRelayApiHandlers(node, rpcServer, topicCache) if conf.filter: let messageCache = newTable[ContentTopic, seq[WakuMessage]]() installFilterApiHandlers(node, rpcServer, messageCache) if conf.store: installStoreApiHandlers(node, rpcServer) rpcServer.start() let rng = keys.newRng() conf = WakuNodeConf.load() if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) # Load address configuration let (nodev1ExtIp, _, _) = setupNat(conf.nat, clientIdV1, Port(conf.devp2pTcpPort + conf.portsShift), Port(conf.udpPort + conf.portsShift)) # TODO: EthereumNode should have a better split of binding address and # external address. Also, can't have different ports as it stands now. nodev1Address = if nodev1ExtIp.isNone(): Address(ip: parseIpAddress("0.0.0.0"), tcpPort: Port(conf.devp2pTcpPort + conf.portsShift), udpPort: Port(conf.udpPort + conf.portsShift)) else: Address(ip: nodev1ExtIp.get(), tcpPort: Port(conf.devp2pTcpPort + conf.portsShift), udpPort: Port(conf.udpPort + conf.portsShift)) (nodev2ExtIp, nodev2ExtPort, _) = setupNat(conf.nat, clientId, Port(uint16(conf.libp2pTcpPort) + conf.portsShift), Port(uint16(conf.udpPort) + conf.portsShift)) let bridge = WakuBridge.new(nodev1Key = conf.nodekeyv1, nodev1Address = nodev1Address, powRequirement = conf.wakuPow, rng = rng, nodev2Key = conf.nodeKeyv2, nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort) waitFor bridge.start() # Now load rest of config # Optionally direct connect nodev1 with a set of nodes if conf.staticnodesv1.len > 0: connectToNodes(bridge.nodev1, conf.staticnodesv1) elif conf.fleetv1 == prod: connectToNodes(bridge.nodev1, WhisperNodes) elif conf.fleetv1 == staging: connectToNodes(bridge.nodev1, WhisperNodesStaging) elif conf.fleetv1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest) # Mount configured Waku v2 protocols if conf.store: mountStore(bridge.nodev2, persistMessages = conf.persistmessages) if conf.filter: mountFilter(bridge.nodev2) if conf.staticnodesv2.len > 0: waitFor connectToNodes(bridge.nodev2, conf.staticnodesv2) if conf.storenode != "": setStorePeer(bridge.nodev2, conf.storenode) if conf.filternode != "": setFilterPeer(bridge.nodev2, conf.filternode) if conf.rpc: let ta = initTAddress(conf.rpcAddress, Port(conf.rpcPort + conf.portsShift)) var rpcServer = newRpcHttpServer([ta]) # Waku v1 RPC let keys = newKeyStorage() setupWakuRPC(bridge.nodev1, keys, rpcServer, rng) setupWakuSimRPC(bridge.nodev1, rpcServer) # Waku v2 rpc startV2Rpc(bridge.nodev2, rpcServer, conf) rpcServer.start() when defined(insecure): if conf.metricsServer: let address = conf.metricsServerAddress port = conf.metricsServerPort + conf.portsShift info "Starting metrics HTTP server", address, port metrics.startHttpServer($address, Port(port)) runForever()