{.push raises: [Defect].} import std/[tables, hashes, sequtils], chronos, confutils, chronicles, chronicles/topics_registry, metrics, metrics/chronos_httpserver, stew/byteutils, stew/shims/net as stewNet, json_rpc/rpcserver, # Waku v1 imports eth/[keys, p2p], eth/common/utils, eth/p2p/enode, ../v1/protocol/waku_protocol, # Waku v2 imports libp2p/crypto/crypto, ../v2/utils/namespacing, ../v2/node/wakunode2, # Common cli config ./config_bridge declarePublicCounter waku_bridge_transfers, "Number of messages transferred between Waku v1 and v2 networks", ["type"] declarePublicCounter waku_bridge_dropped, "Number of messages dropped", ["type"] logScope: topics = "wakubridge" ################## # Default values # ################## const ClientIdV1 = "nim-waku v1 node" DefaultTTL = 5'u32 DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue ######### # Types # ######### type WakuBridge* = ref object of RootObj nodev1*: EthereumNode nodev2*: WakuNode nodev2PubsubTopic: wakunode2.Topic # Pubsub topic to bridge to/from seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication. ################### # Helper funtions # ################### # Deduplication proc containsOrAdd(sequence: var seq[hashes.Hash], hash: hashes.Hash): bool = if sequence.contains(hash): return true if sequence.len >= DeduplQSize: trace "Deduplication queue full. Removing oldest item." sequence.delete 0, 0 # Remove first item in queue sequence.add(hash) return false # Topic conversion proc toV2ContentTopic*(v1Topic: waku_protocol.Topic): ContentTopic = ## Convert a 4-byte array v1 topic to a namespaced content topic ## with format `/waku/1//proto` ## ## should be prefixed with `0x` var namespacedTopic = NamespacedTopic() namespacedTopic.application = "waku" namespacedTopic.version = "1" namespacedTopic.topicName = "0x" & v1Topic.toHex() namespacedTopic.encoding = "rlp" return ContentTopic($namespacedTopic) proc toV1Topic*(contentTopic: ContentTopic): waku_protocol.Topic {.raises: [Defect, LPError, ValueError]} = ## Extracts the 4-byte array v1 topic from a content topic ## with format `/waku/1//proto` hexToByteArray(hexStr = NamespacedTopic.fromString(contentTopic).tryGet().topicName, N = 4) # Byte array length # Message conversion func toWakuMessage(env: Envelope): WakuMessage = # Translate a Waku v1 envelope to a Waku v2 message WakuMessage(payload: env.data, contentTopic: toV2ContentTopic(env.topic), version: 1) proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = let msg = env.toWakuMessage() if bridge.seen.containsOrAdd(msg.encode().buffer.hash()): # This is a duplicate message. Return trace "Already seen. Dropping.", msg=msg waku_bridge_dropped.inc(labelValues = ["duplicate"]) return trace "Sending message to V2", msg=msg waku_bridge_transfers.inc(labelValues = ["v1_to_v2"]) await bridge.nodev2.publish(bridge.nodev2PubsubTopic, msg) proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe, raises: [Defect, LPError, ValueError].} = if bridge.seen.containsOrAdd(msg.encode().buffer.hash()): # This is a duplicate message. Return trace "Already seen. Dropping.", msg=msg waku_bridge_dropped.inc(labelValues = ["duplicate"]) return trace "Sending message to V1", msg=msg waku_bridge_transfers.inc(labelValues = ["v2_to_v1"]) # @TODO: use namespacing to map v2 contentTopics to v1 topics let v1TopicSeq = msg.contentTopic.toBytes()[0..3] discard bridge.nodev1.postMessage(ttl = DefaultTTL, topic = toV1Topic(msg.contentTopic), payload = msg.payload) ############## # Public API # ############## proc new*(T: type WakuBridge, # NodeV1 initialisation nodev1Key: keys.KeyPair, nodev1Address: Address, powRequirement = 0.002, rng: ref BrHmacDrbgContext, topicInterest = none(seq[waku_protocol.Topic]), bloom = some(fullBloom()), # NodeV2 initialisation nodev2Key: crypto.PrivateKey, nodev2BindIp: ValidIpAddress, nodev2BindPort: Port, nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), # Bridge configuration nodev2PubsubTopic: wakunode2.Topic): T {.raises: [Defect, LPError].} = # Setup Waku v1 node var nodev1 = newEthereumNode(keys = nodev1Key, address = nodev1Address, networkId = NetworkId(1), chain = nil, clientId = ClientIdV1, addAllCapabilities = false, rng = rng) nodev1.addCapability Waku # Always enable Waku protocol # Setup the Waku configuration. # This node is being set up as a bridge. By default it gets configured as a node with # a full bloom filter so that it will receive and forward all messages. # It is, however, possible to configure a topic interest to bridge only # selected messages. # TODO: What is the PoW setting now? let wakuConfig = WakuConfig(powRequirement: powRequirement, bloom: bloom, isLightNode: false, maxMsgSize: waku_protocol.defaultMaxMsgSize, topics: topicInterest) nodev1.configureWaku(wakuConfig) # Setup Waku v2 node let nodev2 = WakuNode.new(nodev2Key, nodev2BindIp, nodev2BindPort, nodev2ExtIp, nodev2ExtPort) return WakuBridge(nodev1: nodev1, nodev2: nodev2, nodev2PubsubTopic: nodev2PubsubTopic) proc start*(bridge: WakuBridge) {.async.} = info "Starting WakuBridge" debug "Start listening on Waku v1" # Start listening on Waku v1 node let connectedFut = bridge.nodev1.connectToNetwork(@[], true, # Always enable listening false # Disable discovery (only discovery v4 is currently supported) ) connectedFut.callback = proc(data: pointer) {.gcsafe.} = {.gcsafe.}: if connectedFut.failed: fatal "connectToNetwork failed", msg = connectedFut.readError.msg quit(1) # Start Waku v2 node debug "Start listening on Waku v2" await bridge.nodev2.start() # Always mount relay for bridge. # `triggerSelf` is false on a `bridge` to avoid duplicates bridge.nodev2.mountRelay(triggerSelf = false) # Bridging # Handle messages on Waku v1 and bridge to Waku v2 proc handleEnvReceived(envelope: Envelope) {.gcsafe, raises: [Defect].} = trace "Bridging envelope from V1 to V2", envelope=envelope asyncSpawn bridge.toWakuV2(envelope) bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived) # Handle messages on Waku v2 and bridge to Waku v1 proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) if msg.isOk(): try: trace "Bridging message from V2 to V1", msg=msg.tryGet() bridge.toWakuV1(msg.tryGet()) except ValueError: trace "Failed to convert message to Waku v1. Check content-topic format.", msg=msg waku_bridge_dropped.inc(labelValues = ["value_error"]) bridge.nodev2.subscribe(bridge.nodev2PubsubTopic, relayHandler) proc stop*(bridge: WakuBridge) {.async.} = await bridge.nodev2.stop() {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError when isMainModule: import eth/p2p/whispernodes, ./utils/nat, ../v1/node/rpc/wakusim, ../v1/node/rpc/waku, ../v1/node/rpc/key_storage, ../v1/node/waku_helpers, ../v2/node/jsonrpc/[debug_api, filter_api, relay_api, store_api] proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) = installDebugApiHandlers(node, rpcServer) # Install enabled API handlers: if conf.relay: let topicCache = newTable[string, seq[WakuMessage]]() installRelayApiHandlers(node, rpcServer, topicCache) if conf.filter: let messageCache = newTable[ContentTopic, seq[WakuMessage]]() installFilterApiHandlers(node, rpcServer, messageCache) if conf.store: installStoreApiHandlers(node, rpcServer) rpcServer.start() let rng = keys.newRng() conf = WakuNodeConf.load() if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) # Load address configuration let (nodev1ExtIp, _, _) = setupNat(conf.nat, ClientIdV1, Port(conf.devp2pTcpPort + conf.portsShift), Port(conf.udpPort + conf.portsShift)) # TODO: EthereumNode should have a better split of binding address and # external address. Also, can't have different ports as it stands now. nodev1Address = if nodev1ExtIp.isNone(): Address(ip: parseIpAddress("0.0.0.0"), tcpPort: Port(conf.devp2pTcpPort + conf.portsShift), udpPort: Port(conf.udpPort + conf.portsShift)) else: Address(ip: nodev1ExtIp.get(), tcpPort: Port(conf.devp2pTcpPort + conf.portsShift), udpPort: Port(conf.udpPort + conf.portsShift)) (nodev2ExtIp, nodev2ExtPort, _) = setupNat(conf.nat, clientId, Port(uint16(conf.libp2pTcpPort) + conf.portsShift), Port(uint16(conf.udpPort) + conf.portsShift)) # Topic interest and bloom var topicInterest: Option[seq[waku_protocol.Topic]] var bloom: Option[Bloom] if conf.wakuV1TopicInterest: var topics: seq[waku_protocol.Topic] topicInterest = some(topics) else: bloom = some(fullBloom()) let bridge = WakuBridge.new(nodev1Key = conf.nodekeyV1, nodev1Address = nodev1Address, powRequirement = conf.wakuV1Pow, rng = rng, topicInterest = topicInterest, bloom = bloom, nodev2Key = conf.nodekeyV2, nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort, nodev2PubsubTopic = conf.bridgePubsubTopic) waitFor bridge.start() # Now load rest of config # Optionally direct connect nodev1 with a set of nodes if conf.staticnodesV1.len > 0: connectToNodes(bridge.nodev1, conf.staticnodesV1) elif conf.fleetV1 == prod: connectToNodes(bridge.nodev1, WhisperNodes) elif conf.fleetV1 == staging: connectToNodes(bridge.nodev1, WhisperNodesStaging) elif conf.fleetV1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest) # Mount configured Waku v2 protocols mountLibp2pPing(bridge.nodev2) if conf.store: mountStore(bridge.nodev2, persistMessages = false) # Bridge does not persist messages if conf.filter: mountFilter(bridge.nodev2) if conf.staticnodesV2.len > 0: waitFor connectToNodes(bridge.nodev2, conf.staticnodesV2) if conf.storenode != "": setStorePeer(bridge.nodev2, conf.storenode) if conf.filternode != "": setFilterPeer(bridge.nodev2, conf.filternode) if conf.rpc: let ta = initTAddress(conf.rpcAddress, Port(conf.rpcPort + conf.portsShift)) var rpcServer = newRpcHttpServer([ta]) # Waku v1 RPC let keys = newKeyStorage() setupWakuRPC(bridge.nodev1, keys, rpcServer, rng) setupWakuSimRPC(bridge.nodev1, rpcServer) # Waku v2 rpc startV2Rpc(bridge.nodev2, rpcServer, conf) rpcServer.start() if conf.metricsServer: let address = conf.metricsServerAddress port = conf.metricsServerPort + conf.portsShift info "Starting metrics HTTP server", address, port startMetricsHttpServer($address, Port(port)) runForever()