{.push raises: [Defect].} import std/[tables, times, strutils, hashes, sequtils], chronos, confutils, chronicles, chronicles/topics_registry, chronos/streams/tlsstream, metrics, metrics/chronos_httpserver, stew/[byteutils, endians2], stew/shims/net as stewNet, json_rpc/rpcserver, # Matterbridge client imports ../../../waku/common/utils/matterbridge_client, # Waku v2 imports libp2p/crypto/crypto, libp2p/errors, ../../../waku/v2/protocol/waku_message, ../../../waku/v2/node/wakunode2, # Chat 2 imports ../chat2, # Common cli config ./config_chat2bridge declarePublicCounter chat2_mb_transfers, "Number of messages transferred between chat2 and Matterbridge", ["type"] declarePublicCounter chat2_mb_dropped, "Number of messages dropped", ["reason"] logScope: topics = "chat2bridge" ################## # Default values # ################## const DefaultTopic* = chat2.DefaultTopic DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue ######### # Types # ######### type Chat2MatterBridge* = ref object of RootObj mbClient*: MatterbridgeClient nodev2*: WakuNode running: bool pollPeriod: chronos.Duration seen: seq[Hash] #FIFO queue contentTopic: string MbMessageHandler* = proc (jsonNode: JsonNode) {.gcsafe.} ################### # Helper funtions # ###################S proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool = if sequence.contains(hash): return true if sequence.len >= DeduplQSize: trace "Deduplication queue full. Removing oldest item." sequence.delete 0, 0 # Remove first item in queue sequence.add(hash) return false proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage {.raises: [Defect, KeyError]} = # Translates a Matterbridge API JSON response to a Waku v2 message let msgFields = jsonNode.getFields() # @TODO error handling here - verify expected fields let chat2pb = Chat2Message(timestamp: getTime().toUnix(), # @TODO use provided timestamp nick: msgFields["username"].getStr(), payload: msgFields["text"].getStr().toBytes()).encode() WakuMessage(payload: chat2pb.buffer, contentTopic: cmb.contentTopic, version: 0) proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} = let msg = cmb.toWakuMessage(jsonNode) if cmb.seen.containsOrAdd(msg.payload.hash()): # This is a duplicate message. Return. chat2_mb_dropped.inc(labelValues = ["duplicate"]) return trace "Post Matterbridge message to chat2" chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"]) await cmb.nodev2.publish(DefaultTopic, msg) proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} = if cmb.seen.containsOrAdd(msg.payload.hash()): # This is a duplicate message. Return. chat2_mb_dropped.inc(labelValues = ["duplicate"]) return if msg.contentTopic != cmb.contentTopic: # Only bridge messages on the configured content topic chat2_mb_dropped.inc(labelValues = ["filtered"]) return trace "Post chat2 message to Matterbridge" chat2_mb_transfers.inc(labelValues = ["chat2_to_mb"]) let chat2Msg = Chat2Message.init(msg.payload) assert chat2Msg.isOk let postRes = cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload), username = chat2Msg[].nick) if postRes.isErr() or (postRes[] == false): chat2_mb_dropped.inc(labelValues = ["duplicate"]) error "Matterbridge host unreachable. Dropping message." proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} = while cmb.running: let getRes = cmb.mbClient.getMessages() if getRes.isOk(): for jsonNode in getRes[]: handler(jsonNode) else: error "Matterbridge host unreachable. Sleeping before retrying." await sleepAsync(chronos.seconds(10)) await sleepAsync(cmb.pollPeriod) ############## # Public API # ############## proc new*(T: type Chat2MatterBridge, # Matterbridge initialisation mbHostUri: string, mbGateway: string, # NodeV2 initialisation nodev2Key: crypto.PrivateKey, nodev2BindIp: ValidIpAddress, nodev2BindPort: Port, nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), contentTopic: string): T {.raises: [Defect, ValueError, KeyError, TLSStreamProtocolError, IOError, LPError].} = # Setup Matterbridge let mbClient = MatterbridgeClient.new(mbHostUri, mbGateway) # Let's verify the Matterbridge configuration before continuing let clientHealth = mbClient.isHealthy() if clientHealth.isOk() and clientHealth[]: info "Reached Matterbridge host", host=mbClient.host else: raise newException(ValueError, "Matterbridge client not reachable/healthy") # Setup Waku v2 node let nodev2 = WakuNode.new(nodev2Key, nodev2BindIp, nodev2BindPort, nodev2ExtIp, nodev2ExtPort) return Chat2MatterBridge(mbClient: mbClient, nodev2: nodev2, running: false, pollPeriod: chronos.seconds(1), contentTopic: contentTopic) proc start*(cmb: Chat2MatterBridge) {.async.} = info "Starting Chat2MatterBridge" cmb.running = true debug "Start polling Matterbridge" # Start Matterbridge polling (@TODO: use streaming interface) proc mbHandler(jsonNode: JsonNode) {.gcsafe, raises: [Exception].} = trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode waitFor cmb.toChat2(jsonNode) asyncSpawn cmb.pollMatterbridge(mbHandler) # Start Waku v2 node debug "Start listening on Waku v2" await cmb.nodev2.start() # Always mount relay for bridge # `triggerSelf` is false on a `bridge` to avoid duplicates await cmb.nodev2.mountRelay(triggerSelf = false) # Bridging # Handle messages on Waku v2 and bridge to Matterbridge proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} = let msg = WakuMessage.init(data) if msg.isOk(): trace "Bridging message from Chat2 to Matterbridge", msg=msg[] cmb.toMatterbridge(msg[]) cmb.nodev2.subscribe(DefaultTopic, relayHandler) proc stop*(cmb: Chat2MatterBridge) {.async.} = info "Stopping Chat2MatterBridge" cmb.running = false await cmb.nodev2.stop() {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError when isMainModule: import ../../../waku/common/utils/nat, ../../../waku/v2/node/jsonrpc/[debug_api, filter_api, relay_api, store_api] proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) {.raises: [Exception].} = installDebugApiHandlers(node, rpcServer) # Install enabled API handlers: if conf.relay: let topicCache = newTable[string, seq[WakuMessage]]() installRelayApiHandlers(node, rpcServer, topicCache) if conf.filter: let messageCache = newTable[ContentTopic, seq[WakuMessage]]() installFilterApiHandlers(node, rpcServer, messageCache) if conf.store: installStoreApiHandlers(node, rpcServer) rpcServer.start() let rng = newRng() conf = Chat2MatterbridgeConf.load() if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) # Load address configuration let (nodev2ExtIp, nodev2ExtPort, _) = setupNat(conf.nat, clientId, Port(uint16(conf.libp2pTcpPort) + conf.portsShift), Port(uint16(conf.udpPort) + conf.portsShift)) ## The following heuristic assumes that, in absence of manual ## config, the external port is the same as the bind port. extPort = if nodev2ExtIp.isSome() and nodev2ExtPort.isNone(): some(Port(uint16(conf.libp2pTcpPort) + conf.portsShift)) else: nodev2ExtPort let bridge = Chat2Matterbridge.new( mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)), mbGateway = conf.mbGateway, nodev2Key = conf.nodekey, nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = extPort, contentTopic = conf.contentTopic) waitFor bridge.start() # Now load rest of config # Mount configured Waku v2 protocols waitFor mountLibp2pPing(bridge.nodev2) if conf.store: waitFor mountStore(bridge.nodev2) if conf.filter: waitFor mountFilter(bridge.nodev2) if conf.staticnodes.len > 0: waitFor connectToNodes(bridge.nodev2, conf.staticnodes) if conf.storenode != "": setStorePeer(bridge.nodev2, conf.storenode) if conf.filternode != "": setFilterPeer(bridge.nodev2, conf.filternode) if conf.rpc: let ta = initTAddress(conf.rpcAddress, Port(conf.rpcPort + conf.portsShift)) var rpcServer = newRpcHttpServer([ta]) # Waku v2 rpc startV2Rpc(bridge.nodev2, rpcServer, conf) rpcServer.start() if conf.metricsServer: let address = conf.metricsServerAddress port = conf.metricsServerPort + conf.portsShift info "Starting metrics HTTP server", address, port startMetricsHttpServer($address, Port(port)) runForever()