diff --git a/Makefile b/Makefile index b5907282b..c63f79a10 100644 --- a/Makefile +++ b/Makefile @@ -228,6 +228,10 @@ chat2: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims +chat2mix: | build deps librln + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim chat2mix $(NIM_PARAMS) waku.nims + rln-db-inspector: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim rln_db_inspector $(NIM_PARAMS) waku.nims diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim new file mode 100644 index 000000000..15b2eb40a --- /dev/null +++ b/apps/chat2mix/chat2mix.nim @@ -0,0 +1,701 @@ +## chat2 is an example of usage of Waku v2. For suggested usage options, please +## see dingpu tutorial in docs folder. + +when not (compileOption("threads")): + {.fatal: "Please, compile this program with the --threads:on option!".} + +{.push raises: [].} + +import std/[strformat, strutils, times, options, random, sequtils] +import + confutils, + chronicles, + chronos, + eth/keys, + bearssl, + stew/[byteutils, results], + metrics, + metrics/chronos_httpserver +import + libp2p/[ + switch, # manage transports, a single entry point for dialing and listening + crypto/crypto, # cryptographic functions + stream/connection, # create and close stream read / write connections + multiaddress, + # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP + peerinfo, + # manage the information of a peer, such as peer ID and public / private key + peerid, # Implement how peers interact + protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs + nameresolving/dnsresolver, + ] # define DNS resolution +import + waku/[ + waku_core, + waku_lightpush_legacy/common, + waku_lightpush_legacy/rpc, + waku_enr, + discovery/waku_dnsdisc, + waku_store_legacy, + waku_node, + node/waku_metrics, + node/peer_manager, + factory/builder, + common/utils/nat, + waku_relay, + waku_store/common, + waku_filter_v2/client, + ], + ./config_chat2mix + +import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub +import ../../waku/waku_rln_relay + +const Help = + """ + Commands: /[?|help|connect|nick|exit] + help: Prints this help + connect: dials a remote peer + nick: change nickname for current chat session + exit: exits chat session +""" + +# XXX Connected is a bit annoying, because incoming connections don't trigger state change +# Could poll connection pool or something here, I suppose +# TODO Ensure connected turns true on incoming connections, or get rid of it +type Chat = ref object + node: WakuNode # waku node for publishing, subscribing, etc + transp: StreamTransport # transport streams between read & write file descriptor + subscribed: bool # indicates if a node is subscribed or not to a topic + connected: bool # if the node is connected to another peer + started: bool # if the node has started + nick: string # nickname for this chat session + prompt: bool # chat prompt is showing + contentTopic: string # default content topic for chat messages + conf: Chat2Conf # configuration for chat2 + +type + PrivateKey* = crypto.PrivateKey + Topic* = waku_core.PubsubTopic + +##################### +## chat2 protobufs ## +##################### + +type + SelectResult*[T] = Result[T, string] + + Chat2Message* = object + timestamp*: int64 + nick*: string + payload*: seq[byte] + +proc getPubsubTopic*( + conf: Chat2Conf, node: WakuNode, contentTopic: string +): PubsubTopic = + let shard = node.wakuSharding.getShard(contentTopic).valueOr: + echo "Could not parse content topic: " & error + return "" #TODO: fix this. + return $RelayShard(clusterId: conf.clusterId, shardId: shard.shardId) + +proc init*(T: type Chat2Message, buffer: seq[byte]): ProtoResult[T] = + var msg = Chat2Message() + let pb = initProtoBuffer(buffer) + + var timestamp: uint64 + discard ?pb.getField(1, timestamp) + msg.timestamp = int64(timestamp) + + discard ?pb.getField(2, msg.nick) + discard ?pb.getField(3, msg.payload) + + ok(msg) + +proc encode*(message: Chat2Message): ProtoBuffer = + var serialised = initProtoBuffer() + + serialised.write(1, uint64(message.timestamp)) + serialised.write(2, message.nick) + serialised.write(3, message.payload) + + return serialised + +proc toString*(message: Chat2Message): string = + # Get message date and timestamp in local time + let time = message.timestamp.fromUnix().local().format("'<'MMM' 'dd,' 'HH:mm'>'") + + return time & " " & message.nick & ": " & string.fromBytes(message.payload) + +##################### + +proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} = + echo "Connecting to nodes" + await c.node.connectToNodes(nodes) + c.connected = true + +proc showChatPrompt(c: Chat) = + if not c.prompt: + try: + stdout.write(">> ") + stdout.flushFile() + c.prompt = true + except IOError: + discard + +proc getChatLine(c: Chat, msg: WakuMessage): Result[string, string] = + # No payload encoding/encryption from Waku + let + pb = Chat2Message.init(msg.payload) + chatLine = + if pb.isOk: + pb[].toString() + else: + string.fromBytes(msg.payload) + return ok(chatline) + +proc printReceivedMessage(c: Chat, msg: WakuMessage) = + let + pb = Chat2Message.init(msg.payload) + chatLine = + if pb.isOk: + pb[].toString() + else: + string.fromBytes(msg.payload) + try: + echo &"{chatLine}" + except ValueError: + # Formatting fail. Print chat line in any case. + echo chatLine + + c.prompt = false + showChatPrompt(c) + trace "Printing message", chatLine, contentTopic = msg.contentTopic + +proc readNick(transp: StreamTransport): Future[string] {.async.} = + # Chat prompt + stdout.write("Choose a nickname >> ") + stdout.flushFile() + return await transp.readLine() + +proc startMetricsServer( + serverIp: IpAddress, serverPort: Port +): Result[MetricsHttpServerRef, string] = + info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort + + let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort) + if metricsServerRes.isErr(): + return err("metrics HTTP server start failed: " & $metricsServerRes.error) + + let server = metricsServerRes.value + try: + waitFor server.start() + except CatchableError: + return err("metrics HTTP server start failed: " & getCurrentExceptionMsg()) + + info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort + ok(metricsServerRes.value) + +proc publish(c: Chat, line: string) = + # First create a Chat2Message protobuf with this line of text + let time = getTime().toUnix() + let chat2pb = + Chat2Message(timestamp: time, nick: c.nick, payload: line.toBytes()).encode() + + ## @TODO: error handling on failure + proc handler(response: PushResponse) {.gcsafe, closure.} = + trace "lightpush response received", response = response + + var message = WakuMessage( + payload: chat2pb.buffer, + contentTopic: c.contentTopic, + version: 0, + timestamp: getNanosecondTime(time), + ) + + try: + if not c.node.wakuLightpushClient.isNil(): + # Attempt lightpush with mix + echo "Attempting to publish message as lightpush" + #( + discard c.node.lightpushPublish( + some(c.conf.getPubsubTopic(c.node, c.contentTopic)), + message, + none(RemotePeerInfo), + false, + ) #TODO: Not waiting for response, have to change once SURB is implmented + #).isOkOr: + # error "failed to publish lightpush message", error = error + else: + error "failed to publish message as lightpush client is not initialized" + except CatchableError: + error "caught error publishing message: ", error = getCurrentExceptionMsg() + +# TODO This should read or be subscribe handler subscribe +proc readAndPrint(c: Chat) {.async.} = + while true: + # while p.connected: + # # TODO: echo &"{p.id} -> " + # + # echo cast[string](await p.conn.readLp(1024)) + #echo "readAndPrint subscribe NYI" + await sleepAsync(100) + +# TODO Implement +proc writeAndPrint(c: Chat) {.async.} = + while true: + # Connect state not updated on incoming WakuRelay connections + # if not c.connected: + # echo "type an address or wait for a connection:" + # echo "type /[help|?] for help" + + # Chat prompt + showChatPrompt(c) + + let line = await c.transp.readLine() + if line.startsWith("/help") or line.startsWith("/?") or not c.started: + echo Help + continue + + # if line.startsWith("/disconnect"): + # echo "Ending current session" + # if p.connected and p.conn.closed.not: + # await p.conn.close() + # p.connected = false + elif line.startsWith("/connect"): + # TODO Should be able to connect to multiple peers for Waku chat + if c.connected: + echo "already connected to at least one peer" + continue + + echo "enter address of remote peer" + let address = await c.transp.readLine() + if address.len > 0: + await c.connectToNodes(@[address]) + elif line.startsWith("/nick"): + # Set a new nickname + c.nick = await readNick(c.transp) + echo "You are now known as " & c.nick + elif line.startsWith("/exit"): + echo "quitting..." + + try: + await c.node.stop() + except: + echo "exception happened when stopping: " & getCurrentExceptionMsg() + + quit(QuitSuccess) + else: + # XXX connected state problematic + if c.started: + echo "publishing message: " & line + c.publish(line) + # TODO Connect to peer logic? + else: + try: + if line.startsWith("/") and "p2p" in line: + await c.connectToNodes(@[line]) + except: + echo &"unable to dial remote peer {line}" + echo getCurrentExceptionMsg() + +proc readWriteLoop(c: Chat) {.async.} = + asyncSpawn c.writeAndPrint() # execute the async function but does not block + asyncSpawn c.readAndPrint() + +proc readInput(wfd: AsyncFD) {.thread, raises: [Defect, CatchableError].} = + ## This procedure performs reading from `stdin` and sends data over + ## pipe to main thread. + let transp = fromPipe(wfd) + + while true: + let line = stdin.readLine() + discard waitFor transp.write(line & "\r\n") + +var alreadyUsedServicePeers {.threadvar.}: seq[RemotePeerInfo] + +proc selectRandomServicePeer*( + pm: PeerManager, actualPeer: Option[RemotePeerInfo], codec: string +): Result[RemotePeerInfo, void] = + if actualPeer.isSome(): + alreadyUsedServicePeers.add(actualPeer.get()) + + let supportivePeers = pm.switch.peerStore.getPeersByProtocol(codec).filterIt( + it notin alreadyUsedServicePeers + ) + if supportivePeers.len == 0: + return err() + + let rndPeerIndex = rand(0 .. supportivePeers.len - 1) + return ok(supportivePeers[rndPeerIndex]) + +proc maintainSubscription( + wakuNode: WakuNode, + filterPubsubTopic: PubsubTopic, + filterContentTopic: ContentTopic, + filterPeer: RemotePeerInfo, + preventPeerSwitch: bool, +) {.async.} = + var actualFilterPeer = filterPeer + const maxFailedSubscribes = 3 + const maxFailedServiceNodeSwitches = 10 + var noFailedSubscribes = 0 + var noFailedServiceNodeSwitches = 0 + while true: + info "maintaining subscription at", peer = constructMultiaddrStr(actualFilterPeer) + # First use filter-ping to check if we have an active subscription + let pingRes = await wakuNode.wakuFilterClient.ping(actualFilterPeer) + if pingRes.isErr(): + # No subscription found. Let's subscribe. + error "ping failed.", err = pingRes.error + trace "no subscription found. Sending subscribe request" + + let subscribeRes = await wakuNode.filterSubscribe( + some(filterPubsubTopic), filterContentTopic, actualFilterPeer + ) + + if subscribeRes.isErr(): + noFailedSubscribes += 1 + error "Subscribe request failed.", + err = subscribeRes.error, + peer = actualFilterPeer, + failCount = noFailedSubscribes + + # TODO: disconnet from failed actualFilterPeer + # asyncSpawn(wakuNode.peerManager.switch.disconnect(p)) + # wakunode.peerManager.peerStore.delete(actualFilterPeer) + + if noFailedSubscribes < maxFailedSubscribes: + await sleepAsync(2000) # Wait a bit before retrying + continue + elif not preventPeerSwitch: + let peerOpt = selectRandomServicePeer( + wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec + ) + if peerOpt.isOk(): + actualFilterPeer = peerOpt.get() + + info "Found new peer for codec", + codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer) + + noFailedSubscribes = 0 + continue # try again with new peer without delay + else: + error "Failed to find new service peer. Exiting." + noFailedServiceNodeSwitches += 1 + break + else: + if noFailedSubscribes > 0: + noFailedSubscribes -= 1 + + notice "subscribe request successful." + else: + info "subscription is live." + + await sleepAsync(30000) # Subscription maintenance interval + +{.pop.} + # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError +proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = + let + transp = fromPipe(rfd) + conf = Chat2Conf.load() + nodekey = + if conf.nodekey.isSome(): + conf.nodekey.get() + else: + PrivateKey.random(Secp256k1, rng[]).tryGet() + + # set log level + if conf.logLevel != LogLevel.NONE: + setLogLevel(conf.logLevel) + + let natRes = setupNat( + conf.nat, + clientId, + Port(uint16(conf.tcpPort) + conf.portsShift), + Port(uint16(conf.udpPort) + conf.portsShift), + ) + + if natRes.isErr(): + raise newException(ValueError, "setupNat error " & natRes.error) + + let (extIp, extTcpPort, extUdpPort) = natRes.get() + + var enrBuilder = EnrBuilder.init(nodeKey) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + let node = block: + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + + builder + .withNetworkConfigurationDetails( + conf.listenAddress, + Port(uint16(conf.tcpPort) + conf.portsShift), + extIp, + extTcpPort, + wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), + wsEnabled = conf.websocketSupport, + wssEnabled = conf.websocketSecureSupport, + ) + .tryGet() + builder.build().tryGet() + + await node.start() + #TODO: fix hard-coding. + node.mountSharding(conf.clusterId, 1).isOkOr: + error "failed to mount waku sharding: ", error = error + quit(QuitFailure) + #[ if conf.rlnRelayCredPath == "": + raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed") + + if conf.relay: + let shards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + (await node.mountRelay()).isOkOr: + echo "failed to mount relay: " & error + return + ]# + await node.mountLibp2pPing() + let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic) + echo "pubsub topic is: " & pubsubTopic + let nick = await readNick(transp) + echo "Welcome, " & nick & "!" + + var chat = Chat( + node: node, + transp: transp, + subscribed: true, + connected: false, + started: true, + nick: nick, + prompt: false, + contentTopic: conf.contentTopic, + conf: conf, + ) + + if conf.staticnodes.len > 0: + echo "Connecting to static peers..." + await connectToNodes(chat, conf.staticnodes) + + var dnsDiscoveryUrl = none(string) + + if conf.fleet != Fleet.none: + # Use DNS discovery to connect to selected fleet + echo "Connecting to " & $conf.fleet & " fleet using DNS discovery..." + + if conf.fleet == Fleet.test: + dnsDiscoveryUrl = some( + "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im" + ) + else: + # Connect to sandbox by default + dnsDiscoveryUrl = some( + "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" + ) + elif conf.dnsDiscoveryUrl != "": + # No pre-selected fleet. Discover nodes via DNS using user config + debug "Discovering nodes using Waku DNS discovery", url = conf.dnsDiscoveryUrl + dnsDiscoveryUrl = some(conf.dnsDiscoveryUrl) + + var discoveredNodes: seq[RemotePeerInfo] + + if dnsDiscoveryUrl.isSome: + var nameServers: seq[TransportAddress] + for ip in conf.dnsDiscoveryNameServers: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + let dnsResolver = DnsResolver.new(nameServers) + + proc resolver(domain: string): Future[string] {.async, gcsafe.} = + trace "resolving", domain = domain + let resolved = await dnsResolver.resolveTxt(domain) + return resolved[0] # Use only first answer + + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) + if wakuDnsDiscovery.isOk: + let discoveredPeers = await wakuDnsDiscovery.get().findPeers() + if discoveredPeers.isOk: + info "Connecting to discovered peers" + discoveredNodes = discoveredPeers.get() + echo "Discovered and connecting to " & $discoveredNodes + waitFor chat.node.connectToNodes(discoveredNodes) + else: + warn "Failed to init Waku DNS discovery" + + let peerInfo = node.switch.peerInfo + let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId + echo &"Listening on\n {listenStr}" + + if (conf.storenode != "") or (conf.store == true): + await node.mountStore() + + var storenode: Option[RemotePeerInfo] + + if conf.storenode != "": + let peerInfo = parsePeerInfo(conf.storenode) + if peerInfo.isOk(): + storenode = some(peerInfo.value) + else: + error "Incorrect conf.storenode", error = peerInfo.error + elif discoveredNodes.len > 0: + echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers" + storenode = some(discoveredNodes[rand(0 .. len(discoveredNodes) - 1)]) + + if storenode.isSome(): + # We have a viable storenode. Let's query it for historical messages. + echo "Connecting to storenode: " & $(storenode.get()) + + node.mountStoreClient() + node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec) + + proc storeHandler(response: StoreQueryResponse) {.gcsafe.} = + for msg in response.messages: + let payload = + if msg.message.isSome(): + msg.message.get().payload + else: + newSeq[byte](0) + + let + pb = Chat2Message.init(payload) + chatLine = + if pb.isOk: + pb[].toString() + else: + string.fromBytes(payload) + echo &"{chatLine}" + info "Hit store handler" + + let queryRes = await node.query( + StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get() + ) + if queryRes.isOk(): + storeHandler(queryRes.value) + + if conf.edgemode: #Mount light protocol clients + node.mountLightPushClient() + await node.mountFilterClient() + let filterHandler = proc( + pubsubTopic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, closure.} = + trace "Hit filter handler", contentTopic = msg.contentTopic + chat.printReceivedMessage(msg) + + node.wakuFilterClient.registerPushHandler(filterHandler) + + if conf.serviceNode != "": #TODO: use one of discovered nodes if not present. + let peerInfo = parsePeerInfo(conf.serviceNode) + if peerInfo.isOk(): + #await mountLegacyLightPush(node) + node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec) + # Start maintaining subscription + asyncSpawn maintainSubscription( + node, pubsubTopic, conf.contentTopic, peerInfo.value, false + ) + else: + error "LightPushClient not mounted. Couldn't parse conf.serviceNode", + error = peerInfo.error + + # Subscribe to a topic, if relay is mounted + if conf.relay: + proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + trace "Hit subscribe handler", topic + + if msg.contentTopic == chat.contentTopic: + chat.printReceivedMessage(msg) + + node.subscribe((kind: PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler)).isOkOr: + error "failed to subscribe to pubsub topic", topic = pubsubTopic, error = error + + #[ if conf.rlnRelay: + info "WakuRLNRelay is enabled" + + proc spamHandler(wakuMessage: WakuMessage) {.gcsafe, closure.} = + debug "spam handler is called" + let chatLineResult = chat.getChatLine(wakuMessage) + if chatLineResult.isOk(): + echo "A spam message is found and discarded : ", chatLineResult.value + else: + echo "A spam message is found and discarded" + chat.prompt = false + showChatPrompt(chat) + + echo "rln-relay preparation is in progress..." + + let rlnConf = WakuRlnConfig( + dynamic: conf.rlnRelayDynamic, + credIndex: conf.rlnRelayCredIndex, + chainId: UInt256.fromBytesBE(conf.rlnRelayChainId.toBytesBE()), + ethClientUrls: conf.ethClientUrls.mapIt(string(it)), + creds: some( + RlnRelayCreds( + path: conf.rlnRelayCredPath, password: conf.rlnRelayCredPassword + ) + ), + userMessageLimit: conf.rlnRelayUserMessageLimit, + epochSizeSec: conf.rlnEpochSizeSec, + treePath: conf.rlnRelayTreePath, + ) + + waitFor node.mountRlnRelay(rlnConf, spamHandler = some(spamHandler)) + + let membershipIndex = node.wakuRlnRelay.groupManager.membershipIndex.get() + let identityCredential = node.wakuRlnRelay.groupManager.idCredentials.get() + echo "your membership index is: ", membershipIndex + echo "your rln identity commitment key is: ", + identityCredential.idCommitment.inHex() + else: + info "WakuRLNRelay is disabled" + echo "WakuRLNRelay is disabled, please enable it by passing in the --rln-relay flag" ]# + if conf.metricsLogging: + startMetricsLog() + + if conf.metricsServer: + let metricsServer = startMetricsServer( + conf.metricsServerAddress, Port(conf.metricsServerPort + conf.portsShift) + ) + + await chat.readWriteLoop() + + runForever() + +proc main(rng: ref HmacDrbgContext) {.async.} = + let (rfd, wfd) = createAsyncPipe() + if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: + raise newException(ValueError, "Could not initialize pipe!") + + var thread: Thread[AsyncFD] + thread.createThread(readInput, wfd) + try: + await processInput(rfd, rng) + # Handle only ConfigurationError for now + # TODO: Throw other errors from the mounting procedure + except ConfigurationError as e: + raise e + +when isMainModule: # isMainModule = true when the module is compiled as the main file + let rng = crypto.newRng() + try: + waitFor(main(rng)) + except CatchableError as e: + raise e + +## Dump of things that can be improved: +## +## - Incoming dialed peer does not change connected state (not relying on it for now) +## - Unclear if staticnode argument works (can enter manually) +## - Don't trigger self / double publish own messages +## - Test/default to cluster node connection (diff protocol version) +## - Redirect logs to separate file +## - Expose basic publish/subscribe etc commands with /syntax +## - Show part of peerid to know who sent message +## - Deal with protobuf messages (e.g. other chat protocol, or encrypted) diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim new file mode 100644 index 000000000..424a59bec --- /dev/null +++ b/apps/chat2mix/config_chat2mix.nim @@ -0,0 +1,353 @@ +import + chronicles, + chronos, + confutils, + confutils/defs, + confutils/std/net, + eth/keys, + libp2p/crypto/crypto, + libp2p/crypto/secp, + nimcrypto/utils, + std/strutils, + regex +import waku/waku_core + +type + Fleet* = enum + none + prod + test + + EthRpcUrl* = distinct string + + Chat2Conf* = object ## General node config + edgemode* {. + defaultValue: true, desc: "Run the app in edge mode", name: "edge-mode" + .}: bool + + logLevel* {. + desc: "Sets the log level.", defaultValue: LogLevel.INFO, name: "log-level" + .}: LogLevel + + nodekey* {.desc: "P2P node private key as 64 char hex string.", name: "nodekey".}: + Option[crypto.PrivateKey] + + listenAddress* {. + defaultValue: defaultListenAddress(config), + desc: "Listening address for the LibP2P traffic.", + name: "listen-address" + .}: IpAddress + + tcpPort* {.desc: "TCP listening port.", defaultValue: 60000, name: "tcp-port".}: + Port + + udpPort* {.desc: "UDP listening port.", defaultValue: 60000, name: "udp-port".}: + Port + + portsShift* {. + desc: "Add a shift to all port numbers.", defaultValue: 0, name: "ports-shift" + .}: uint16 + + nat* {. + desc: + "Specify method to use for determining public address. " & + "Must be one of: any, none, upnp, pmp, extip:.", + defaultValue: "any" + .}: string + + ## Persistence config + dbPath* {. + desc: "The database path for peristent storage", defaultValue: "", name: "db-path" + .}: string + + persistPeers* {. + desc: "Enable peer persistence: true|false", + defaultValue: false, + name: "persist-peers" + .}: bool + + persistMessages* {. + desc: "Enable message persistence: true|false", + defaultValue: false, + name: "persist-messages" + .}: bool + + ## Relay config + relay* {. + desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay" + .}: bool + + staticnodes* {. + desc: "Peer multiaddr to directly connect with. Argument may be repeated.", + name: "staticnode" + .}: seq[string] + + keepAlive* {. + desc: "Enable keep-alive for idle connections: true|false", + defaultValue: false, + name: "keep-alive" + .}: bool + + clusterId* {. + desc: + "Cluster id that the node is running in. Node in a different cluster id is disconnected.", + defaultValue: 42, + name: "cluster-id" + .}: uint16 + + shards* {. + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", + defaultValue: @[uint16(0)], + name: "shard" + .}: seq[uint16] + + ## Store config + store* {. + desc: "Enable store protocol: true|false", defaultValue: true, name: "store" + .}: bool + + storenode* {. + desc: "Peer multiaddr to query for storage.", defaultValue: "", name: "storenode" + .}: string + + ## Filter config + filter* {. + desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter" + .}: bool + + ## Lightpush config + lightpush* {. + desc: "Enable lightpush protocol: true|false", + defaultValue: false, + name: "lightpush" + .}: bool + + servicenode* {. + desc: "Peer multiaddr to request lightpush and filter services", + defaultValue: "", + name: "servicenode" + .}: string + + ## Metrics config + metricsServer* {. + desc: "Enable the metrics server: true|false", + defaultValue: false, + name: "metrics-server" + .}: bool + + metricsServerAddress* {. + desc: "Listening address of the metrics server.", + defaultValue: parseIpAddress("127.0.0.1"), + name: "metrics-server-address" + .}: IpAddress + + metricsServerPort* {. + desc: "Listening HTTP port of the metrics server.", + defaultValue: 8008, + name: "metrics-server-port" + .}: uint16 + + metricsLogging* {. + desc: "Enable metrics logging: true|false", + defaultValue: true, + name: "metrics-logging" + .}: bool + + ## DNS discovery config + dnsDiscovery* {. + desc: + "Deprecated, please set dns-discovery-url instead. Enable discovering nodes via DNS", + defaultValue: false, + name: "dns-discovery" + .}: bool + + dnsDiscoveryUrl* {. + desc: "URL for DNS node list in format 'enrtree://@'", + defaultValue: "", + name: "dns-discovery-url" + .}: string + + dnsDiscoveryNameServers* {. + desc: "DNS name server IPs to query. Argument may be repeated.", + defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], + name: "dns-discovery-name-server" + .}: seq[IpAddress] + + ## Chat2 configuration + fleet* {. + desc: + "Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.", + defaultValue: Fleet.none, + name: "fleet" + .}: Fleet + + contentTopic* {. + desc: "Content topic for chat messages.", + defaultValue: "/toy-chat/2/huilong/proto", + name: "content-topic" + .}: string + + ## Websocket Configuration + websocketSupport* {. + desc: "Enable websocket: true|false", + defaultValue: false, + name: "websocket-support" + .}: bool + + websocketPort* {. + desc: "WebSocket listening port.", defaultValue: 8000, name: "websocket-port" + .}: Port + + websocketSecureSupport* {. + desc: "WebSocket Secure Support.", + defaultValue: false, + name: "websocket-secure-support" + .}: bool ## rln-relay configuration + +#[ rlnRelay* {. + desc: "Enable spam protection through rln-relay: true|false", + defaultValue: false, + name: "rln-relay" + .}: bool + + rlnRelayChainId* {. + desc: + "Chain ID of the provided contract (optional, will fetch from RPC provider if not used)", + defaultValue: 0, + name: "rln-relay-chain-id" + .}: uint + + rlnRelayCredPath* {. + desc: "The path for peristing rln-relay credential", + defaultValue: "", + name: "rln-relay-cred-path" + .}: string + + rlnRelayCredIndex* {. + desc: "the index of the onchain commitment to use", name: "rln-relay-cred-index" + .}: Option[uint] + + rlnRelayDynamic* {. + desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false", + defaultValue: false, + name: "rln-relay-dynamic" + .}: bool + + rlnRelayIdKey* {. + desc: "Rln relay identity secret key as a Hex string", + defaultValue: "", + name: "rln-relay-id-key" + .}: string + + rlnRelayIdCommitmentKey* {. + desc: "Rln relay identity commitment key as a Hex string", + defaultValue: "", + name: "rln-relay-id-commitment-key" + .}: string + + ethClientUrls* {. + desc: + "HTTP address of an Ethereum testnet client e.g., http://localhost:8540/. Argument may be repeated.", + defaultValue: newSeq[EthRpcUrl](0), + name: "rln-relay-eth-client-address" + .}: seq[EthRpcUrl] + + rlnRelayEthContractAddress* {. + desc: "Address of membership contract on an Ethereum testnet", + defaultValue: "", + name: "rln-relay-eth-contract-address" + .}: string + + rlnRelayCredPassword* {. + desc: "Password for encrypting RLN credentials", + defaultValue: "", + name: "rln-relay-cred-password" + .}: string + + rlnRelayUserMessageLimit* {. + desc: + "Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.", + defaultValue: 1, + name: "rln-relay-user-message-limit" + .}: uint64 + + rlnEpochSizeSec* {. + desc: + "Epoch size in seconds used to rate limit RLN memberships. Default is 1 second.", + defaultValue: 1, + name: "rln-relay-epoch-sec" + .}: uint64 + + rlnRelayTreePath* {. + desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)", + defaultValue: "", + name: "rln-relay-tree-path" + .}: string ]# + +# NOTE: Keys are different in nim-libp2p +proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T = + try: + let key = SkPrivateKey.init(utils.fromHex(p)).tryGet() + # XXX: Here at the moment + result = crypto.PrivateKey(scheme: Secp256k1, skkey: key) + except CatchableError as e: + raise newException(ValueError, "Invalid private key") + +proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type IpAddress, p: string): T = + try: + result = parseIpAddress(p) + except CatchableError as e: + raise newException(ValueError, "Invalid IP address") + +proc completeCmdArg*(T: type IpAddress, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type Port, p: string): T = + try: + result = Port(parseInt(p)) + except CatchableError as e: + raise newException(ValueError, "Invalid Port number") + +proc completeCmdArg*(T: type Port, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type Option[uint], p: string): T = + try: + some(parseUint(p)) + except CatchableError: + raise newException(ValueError, "Invalid unsigned integer") + +proc completeCmdArg*(T: type EthRpcUrl, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type EthRpcUrl, s: string): T = + ## allowed patterns: + ## http://url:port + ## https://url:port + ## http://url:port/path + ## https://url:port/path + ## http://url/with/path + ## http://url:port/path?query + ## https://url:port/path?query + ## disallowed patterns: + ## any valid/invalid ws or wss url + var httpPattern = + re2"^(https?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*" + var wsPattern = + re2"^(wss?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*" + if regex.match(s, wsPattern): + raise newException( + ValueError, "Websocket RPC URL is not supported, Please use an HTTP URL" + ) + if not regex.match(s, httpPattern): + raise newException(ValueError, "Invalid HTTP RPC URL") + return EthRpcUrl(s) + +func defaultListenAddress*(conf: Chat2Conf): IpAddress = + # TODO: How should we select between IPv4 and IPv6 + # Maybe there should be a config option for this. + (static parseIpAddress("0.0.0.0")) diff --git a/apps/chat2mix/nim.cfg b/apps/chat2mix/nim.cfg new file mode 100644 index 000000000..2231f2ebe --- /dev/null +++ b/apps/chat2mix/nim.cfg @@ -0,0 +1,4 @@ +-d:chronicles_line_numbers +-d:chronicles_runtime_filtering:on +-d:discv5_protocol_id:d5waku +path = "../.." diff --git a/waku.nimble b/waku.nimble index 3cb1a5e16..b68626a4e 100644 --- a/waku.nimble +++ b/waku.nimble @@ -149,6 +149,14 @@ task chat2, "Build example Waku chat usage": let name = "chat2" buildBinary name, "apps/chat2/", "-d:chronicles_sinks=textlines[file] -d:ssl" +task chat2mix, "Build example Waku chat mix usage": + # NOTE For debugging, set debug level. For chat usage we want minimal log + # output to STDOUT. Can be fixed by redirecting logs to file (e.g.) + #buildBinary name, "examples/", "-d:chronicles_log_level=WARN" + + let name = "chat2mix" + buildBinary name, "apps/chat2mix/", "-d:chronicles_sinks=textlines[file] -d:ssl" + task chat2bridge, "Build chat2bridge": let name = "chat2bridge" buildBinary name, "apps/chat2bridge/"