From e1c3fca7ae96f4d8efbedb7801c52ef1e47d985b Mon Sep 17 00:00:00 2001 From: jm-clius Date: Thu, 6 May 2021 14:09:35 +0000 Subject: [PATCH] deploy: c5e75801495ea444524f8de92c0726c9c9c00c77 --- Makefile | 6 +- examples/v2/chat2.nim | 5 +- examples/v2/matterbridge/chat2bridge.nim | 230 ++++++++++++++++++ .../v2/matterbridge/config_chat2bridge.nim | 149 ++++++++++++ .../vendor/libbacktrace-upstream/libtool | 2 +- waku.nimble | 5 + waku/common/README.md | 1 + waku/common/utils/matterbridge_client.nim | 55 +++++ waku/v2/node/wakunode2.nim | 2 +- 9 files changed, 449 insertions(+), 6 deletions(-) create mode 100644 examples/v2/matterbridge/chat2bridge.nim create mode 100644 examples/v2/matterbridge/config_chat2bridge.nim create mode 100644 waku/common/utils/matterbridge_client.nim diff --git a/Makefile b/Makefile index 0779c68f4..ff9b3e6f4 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ GIT_SUBMODULE_UPDATE := git submodule update --init --recursive else # "variables.mk" was included. Business as usual until the end of this file. # default target, because it's the first one that doesn't start with '.' -all: | wakunode1 sim1 example1 wakunode2 sim2 example2 chat2 bridge +all: | wakunode1 sim1 example1 wakunode2 sim2 example2 chat2 bridge chat2bridge # must be included after the default target -include $(BUILD_SYSTEM_DIR)/makefiles/targets.mk @@ -141,6 +141,10 @@ bridge: | build deps echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim bridge $(NIM_PARAMS) waku.nims +chat2bridge: | build deps + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim chat2bridge $(NIM_PARAMS) waku.nims + # Builds and run the test suite (Waku v1 + v2) test: | test1 test2 diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index ca9b022c6..51fe6ecdd 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -35,9 +35,8 @@ const Help = """ const PayloadV1* {.booldefine.} = false - DefaultTopic = "/waku/2/default-waku/proto" - - DefaultContentTopic = ContentTopic("dingpu") + DefaultTopic* = "/waku/2/default-waku/proto" + DefaultContentTopic* = ContentTopic("dingpu") # XXX Connected is a bit annoying, because incoming connections don't trigger state change # Could poll connection pool or something here, I suppose diff --git a/examples/v2/matterbridge/chat2bridge.nim b/examples/v2/matterbridge/chat2bridge.nim new file mode 100644 index 000000000..343174f2d --- /dev/null +++ b/examples/v2/matterbridge/chat2bridge.nim @@ -0,0 +1,230 @@ +import + std/[tables, times, strutils], + chronos, confutils, chronicles, chronicles/topics_registry, metrics, + stew/[byteutils, endians2], + stew/shims/net as stewNet, json_rpc/rpcserver, + # Matterbridge client imports + ../../../waku/common/utils/matterbridge_client, + # Waku v2 imports + libp2p/crypto/crypto, + ../../../waku/v2/protocol/waku_filter/waku_filter_types, + ../../../waku/v2/node/wakunode2, + # Chat 2 imports + ../chat2, + # Common cli config + ./config_chat2bridge + +declarePublicCounter chat2_mb_transfers, "Number of messages transferred between chat2 and Matterbridge", ["type"] + +logScope: + topics = "chat2bridge" + +################## +# Default values # +################## + +const + DefaultTopic* = chat2.DefaultTopic + DefaultContentTopic* = chat2.DefaultContentTopic + +######### +# Types # +######### + +type + Chat2MatterBridge* = ref object of RootObj + mbClient*: MatterbridgeClient + nodev2*: WakuNode + running: bool + pollPeriod: chronos.Duration + + MbMessageHandler* = proc (jsonNode: JsonNode) {.gcsafe.} + +################### +# Helper funtions # +###################S + +proc toWakuMessage(jsonNode: JsonNode): WakuMessage = + # Translates a Matterbridge API JSON response to a Waku v2 message + let msgFields = jsonNode.getFields() + + # @TODO error handling here - verify expected fields + + let chat2pb = Chat2Message(timestamp: getTime().toUnix(), # @TODO use provided timestamp + nick: msgFields["username"].getStr(), + payload: msgFields["text"].getStr().toBytes()).encode() + + WakuMessage(payload: chat2pb.buffer, + contentTopic: DefaultContentTopic, + version: 0) + +proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} = + chat2_mb_transfers.inc(labelValues = ["v1_to_v2"]) + + trace "Post Matterbridge message to chat2" + + await cmb.nodev2.publish(DefaultTopic, jsonNode.toWakuMessage()) + +proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe.} = + chat2_mb_transfers.inc(labelValues = ["v2_to_v1"]) + + trace "Post chat2 message to Matterbridge" + + let chat2Msg = Chat2Message.init(msg.payload) + + assert chat2Msg.isOk + + cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload), + username = chat2Msg[].nick) + +proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} = + while cmb.running: + for jsonNode in cmb.mbClient.getMessages(): + handler(jsonNode) + + await sleepAsync(cmb.pollPeriod) + +############## +# Public API # +############## +proc new*(T: type Chat2MatterBridge, + # Matterbridge initialisation + mbHostUri: string, + mbGateway: string, + # NodeV2 initialisation + nodev2Key: crypto.PrivateKey, + nodev2BindIp: ValidIpAddress, nodev2BindPort: Port, + nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port]()): T = + + # Setup Matterbridge + let + mbClient = MatterbridgeClient.new(mbHostUri, mbGateway) + + # Setup Waku v2 node + let + nodev2 = WakuNode.init(nodev2Key, + nodev2BindIp, nodev2BindPort, + nodev2ExtIp, nodev2ExtPort) + + return Chat2MatterBridge(mbClient: mbClient, nodev2: nodev2, running: false, pollPeriod: chronos.seconds(1)) + +proc start*(cmb: Chat2MatterBridge) {.async.} = + info "Starting Chat2MatterBridge" + + cmb.running = true + + debug "Start polling Matterbridge" + + # Start Matterbridge polling (@TODO: use streaming interface) + proc mbHandler(jsonNode: JsonNode) {.gcsafe.} = + trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode + waitFor cmb.toChat2(jsonNode) + + asyncSpawn cmb.pollMatterbridge(mbHandler) + + # Start Waku v2 node + debug "Start listening on Waku v2" + await cmb.nodev2.start() + + cmb.nodev2.mountRelay() # Always mount relay for bridge + + # Bridging + # Handle messages on Waku v2 and bridge to Matterbridge + proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + trace "Bridging message from Chat2 to Matterbridge", msg=msg[] + cmb.toMatterbridge(msg[]) + + cmb.nodev2.subscribe(DefaultTopic, relayHandler) + +proc stop*(cmb: Chat2MatterBridge) {.async.} = + info "Stopping Chat2MatterBridge" + + cmb.running = false + + await cmb.nodev2.stop() + +when isMainModule: + import + ../../../waku/common/utils/nat, + ../../../waku/v2/node/jsonrpc/[debug_api, + filter_api, + relay_api, + store_api] + + proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) = + installDebugApiHandlers(node, rpcServer) + + # Install enabled API handlers: + if conf.relay: + let topicCache = newTable[string, seq[WakuMessage]]() + installRelayApiHandlers(node, rpcServer, topicCache) + + if conf.filter: + let messageCache = newTable[ContentTopic, seq[WakuMessage]]() + installFilterApiHandlers(node, rpcServer, messageCache) + + if conf.store: + installStoreApiHandlers(node, rpcServer) + + rpcServer.start() + + let + rng = newRng() + conf = Chat2MatterbridgeConf.load() + + if conf.logLevel != LogLevel.NONE: + setLogLevel(conf.logLevel) + + # Load address configuration + let + (nodev2ExtIp, nodev2ExtPort, _) = setupNat(conf.nat, clientId, + Port(uint16(conf.libp2pTcpPort) + conf.portsShift), + Port(uint16(conf.udpPort) + conf.portsShift)) + + let + bridge = Chat2Matterbridge.new( + mbHostUri = conf.mbHostUri, + mbGateway = conf.mbGateway, + nodev2Key = conf.nodeKeyv2, + nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), + nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort) + + waitFor bridge.start() + + # Now load rest of config + # Mount configured Waku v2 protocols + if conf.store: + mountStore(bridge.nodev2) + + if conf.filter: + mountFilter(bridge.nodev2) + + if conf.staticnodesv2.len > 0: + waitFor connectToNodes(bridge.nodev2, conf.staticnodesv2) + + if conf.storenode != "": + setStorePeer(bridge.nodev2, conf.storenode) + + if conf.filternode != "": + setFilterPeer(bridge.nodev2, conf.filternode) + + if conf.rpc: + let ta = initTAddress(conf.rpcAddress, + Port(conf.rpcPort + conf.portsShift)) + var rpcServer = newRpcHttpServer([ta]) + # Waku v2 rpc + startV2Rpc(bridge.nodev2, rpcServer, conf) + + rpcServer.start() + + when defined(insecure): + if conf.metricsServer: + let + address = conf.metricsServerAddress + port = conf.metricsServerPort + conf.portsShift + info "Starting metrics HTTP server", address, port + metrics.startHttpServer($address, Port(port)) + + runForever() diff --git a/examples/v2/matterbridge/config_chat2bridge.nim b/examples/v2/matterbridge/config_chat2bridge.nim new file mode 100644 index 000000000..56e950e97 --- /dev/null +++ b/examples/v2/matterbridge/config_chat2bridge.nim @@ -0,0 +1,149 @@ +import + confutils, confutils/defs, confutils/std/net, chronicles, chronos, + libp2p/crypto/[crypto, secp], + eth/keys + +type + Chat2MatterbridgeConf* = object + logLevel* {. + desc: "Sets the log level" + defaultValue: LogLevel.INFO + name: "log-level" .}: LogLevel + + listenAddress* {. + defaultValue: defaultListenAddress(config) + desc: "Listening address for the LibP2P traffic" + name: "listen-address"}: ValidIpAddress + + libp2pTcpPort* {. + desc: "Libp2p TCP listening port (for Waku v2)" + defaultValue: 9000 + name: "libp2p-tcp-port" .}: uint16 + + udpPort* {. + desc: "UDP listening port" + defaultValue: 9000 + name: "udp-port" .}: uint16 + + portsShift* {. + desc: "Add a shift to all default port numbers" + defaultValue: 0 + name: "ports-shift" .}: uint16 + + nat* {. + desc: "Specify method to use for determining public address. " & + "Must be one of: any, none, upnp, pmp, extip:" + defaultValue: "any" .}: string + + rpc* {. + desc: "Enable Waku RPC server" + defaultValue: false + name: "rpc" .}: bool + + rpcAddress* {. + desc: "Listening address of the RPC server", + defaultValue: ValidIpAddress.init("127.0.0.1") + name: "rpc-address" }: ValidIpAddress + + rpcPort* {. + desc: "Listening port of the RPC server" + defaultValue: 8545 + name: "rpc-port" .}: uint16 + + metricsServer* {. + desc: "Enable the metrics server" + defaultValue: false + name: "metrics-server" .}: bool + + metricsServerAddress* {. + desc: "Listening address of the metrics server" + defaultValue: ValidIpAddress.init("127.0.0.1") + name: "metrics-server-address" }: ValidIpAddress + + metricsServerPort* {. + desc: "Listening HTTP port of the metrics server" + defaultValue: 8008 + name: "metrics-server-port" .}: uint16 + + ### Waku v2 options + staticnodesv2* {. + desc: "Multiaddr of peer to directly connect with. Argument may be repeated" + name: "staticnodev2" }: seq[string] + + nodekeyv2* {. + desc: "P2P node private key as hex" + defaultValue: crypto.PrivateKey.random(Secp256k1, newRng()[]).tryGet() + name: "nodekeyv2" }: crypto.PrivateKey + + topics* {. + desc: "Default topics to subscribe to (space separated list)" + defaultValue: "/waku/2/default-waku/proto" + name: "topics" .}: string + + store* {. + desc: "Flag whether to start store protocol", + defaultValue: true + name: "store" }: bool + + filter* {. + desc: "Flag whether to start filter protocol", + defaultValue: false + name: "filter" }: bool + + relay* {. + desc: "Flag whether to start relay protocol", + defaultValue: true + name: "relay" }: bool + + storenode* {. + desc: "Multiaddr of peer to connect with for waku store protocol" + defaultValue: "" + name: "storenode" }: string + + filternode* {. + desc: "Multiaddr of peer to connect with for waku filter protocol" + defaultValue: "" + name: "filternode" }: string + + # Matterbridge options + mbHostUri* {. + desc: "Matterbridge host API address" + defaultValue: "http://127.0.0.1:4242" + name: "mb-host-uri" }: string + + mbGateway* {. + desc: "Matterbridge gateway" + defaultValue: "gateway1" + name: "mb-gateway" }: string + +proc parseCmdArg*(T: type keys.KeyPair, p: TaintedString): T = + try: + let privkey = keys.PrivateKey.fromHex(string(p)).tryGet() + result = privkey.toKeyPair() + except CatchableError: + raise newException(ConfigurationError, "Invalid private key") + +proc completeCmdArg*(T: type keys.KeyPair, val: TaintedString): seq[string] = + return @[] + +proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T = + let key = SkPrivateKey.init(p) + if key.isOk(): + crypto.PrivateKey(scheme: Secp256k1, skkey: key.get()) + else: + raise newException(ConfigurationError, "Invalid private key") + +proc completeCmdArg*(T: type crypto.PrivateKey, val: TaintedString): seq[string] = + return @[] + +proc parseCmdArg*(T: type ValidIpAddress, p: TaintedString): T = + try: + result = ValidIpAddress.init(p) + except CatchableError: + raise newException(ConfigurationError, "Invalid IP address") + +proc completeCmdArg*(T: type ValidIpAddress, val: TaintedString): seq[string] = + return @[] + +func defaultListenAddress*(conf: Chat2MatterbridgeConf): ValidIpAddress = + (static ValidIpAddress.init("0.0.0.0")) diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 75bb1a103..c915d376d 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az275-461: +# Libtool was configured on host fv-az182-381: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku.nimble b/waku.nimble index c29a52f8d..78a9318a1 100644 --- a/waku.nimble +++ b/waku.nimble @@ -87,3 +87,8 @@ task chat2, "Build example Waku v2 chat usage": task bridge, "Build Waku v1 - v2 bridge": buildBinary "wakubridge", "waku/common/", "-d:chronicles_log_level=DEBUG" + +task chat2bridge, "Build chat2-matterbridge": + let name = "chat2bridge" + + buildBinary name, "examples/v2/matterbridge/", "-d:chronicles_log_level=DEBUG" \ No newline at end of file diff --git a/waku/common/README.md b/waku/common/README.md index c2f8d09a2..973bdfea6 100644 --- a/waku/common/README.md +++ b/waku/common/README.md @@ -5,3 +5,4 @@ This folder contains (a) modules that use both Waku v1 and Waku v2. and (b) util Examples include: - Bridge between v1 and v2 - NAT traversal +- interworking with protocols external to Waku (such as Matterbridge) diff --git a/waku/common/utils/matterbridge_client.nim b/waku/common/utils/matterbridge_client.nim new file mode 100644 index 000000000..631402b5c --- /dev/null +++ b/waku/common/utils/matterbridge_client.nim @@ -0,0 +1,55 @@ +import + std/[httpclient, json, uri, options] + +const + # Resource locators + stream* = "/api/stream" + messages* = "/api/messages" + message* = "/api/message" + health* = "/api/health" + +type + MatterbridgeClient* = ref object of RootObj + hostClient*: HttpClient + host*: Uri + gateway*: string + +proc new*(T: type MatterbridgeClient, + hostUri: string, + gateway = "gateway1"): MatterbridgeClient = + let mbClient = MatterbridgeClient() + + mbClient.hostClient = newHttpClient() + mbClient.hostClient.headers = newHttpHeaders({ "Content-Type": "application/json" }) + + mbClient.host = parseUri(hostUri) + mbClient.gateway = gateway + + return mbClient + +proc getMessages*(mb: MatterbridgeClient): seq[JsonNode] = + let response = mb.hostClient.get($(mb.host / messages)) + assert response.status == "200 OK" + + return parseJson(response.body()).getElems() + +proc postMessage*(mb: MatterbridgeClient, msg: JsonNode) = + let response = mb.hostClient.request($(mb.host / message), + httpMethod = HttpPost, + body = $msg) + + assert response.status == "200 OK" + + # @TODO: better error-handling here + +proc postMessage*(mb: MatterbridgeClient, text: string, username: string) = + let jsonNode = %* {"text": text, + "username": username, + "gateway": mb.gateway} + + mb.postMessage(jsonNode) + +proc isHealthy*(mb: MatterbridgeClient): bool = + let response = mb.hostClient.get($(mb.host / health)) + + return response.status == "200 OK" and response.body == "OK" diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index d1cbcc555..befec780b 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -376,7 +376,7 @@ proc mountSwap*(node: WakuNode) = # NYI - Do we need this? #node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription()) -proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool) = +proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false) = info "mounting store" if node.wakuSwap.isNil: