From a6892bfebe4150873c18a2390de4abe3266ad3f6 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 29 Aug 2025 11:52:38 +0530 Subject: [PATCH] chore: chat2 with mix (#3506) * add mixify option to lightpush publish * initialize mix protocol and peerExchange to populate node pool in chat2 * scripts for mixnet sim and chat2mix --- Makefile | 4 + apps/chat2mix/chat2mix.nim | 704 ++++++++++++++++++++++ apps/chat2mix/config_chat2mix.nim | 282 +++++++++ apps/chat2mix/nim.cfg | 4 + simulations/README.md | 4 + simulations/mixnet/README.md | 70 +++ simulations/mixnet/config.toml | 25 + simulations/mixnet/config1.toml | 27 + simulations/mixnet/config2.toml | 27 + simulations/mixnet/config3.toml | 27 + simulations/mixnet/config4.toml | 27 + simulations/mixnet/run_chat_mix.sh | 1 + simulations/mixnet/run_chat_mix1.sh | 1 + simulations/mixnet/run_lp_service_node.sh | 1 + simulations/mixnet/run_mix_node1.sh | 1 + simulations/mixnet/run_mix_node2.sh | 1 + simulations/mixnet/run_mix_node3.sh | 1 + simulations/mixnet/run_mix_node4.sh | 1 + vendor/nim-libp2p | 2 +- waku.nimble | 8 + waku/node/waku_node.nim | 35 +- 21 files changed, 1247 insertions(+), 6 deletions(-) create mode 100644 apps/chat2mix/chat2mix.nim create mode 100644 apps/chat2mix/config_chat2mix.nim create mode 100644 apps/chat2mix/nim.cfg create mode 100644 simulations/README.md create mode 100644 simulations/mixnet/README.md create mode 100644 simulations/mixnet/config.toml create mode 100644 simulations/mixnet/config1.toml create mode 100644 simulations/mixnet/config2.toml create mode 100644 simulations/mixnet/config3.toml create mode 100644 simulations/mixnet/config4.toml create mode 100755 simulations/mixnet/run_chat_mix.sh create mode 100755 simulations/mixnet/run_chat_mix1.sh create mode 100755 simulations/mixnet/run_lp_service_node.sh create mode 100755 simulations/mixnet/run_mix_node1.sh create mode 100755 simulations/mixnet/run_mix_node2.sh create mode 100755 simulations/mixnet/run_mix_node3.sh create mode 100755 simulations/mixnet/run_mix_node4.sh diff --git a/Makefile b/Makefile index b5907282b..c63f79a10 100644 --- a/Makefile +++ b/Makefile @@ -228,6 +228,10 @@ chat2: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims +chat2mix: | build deps librln + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim chat2mix $(NIM_PARAMS) waku.nims + rln-db-inspector: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim rln_db_inspector $(NIM_PARAMS) waku.nims diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim new file mode 100644 index 000000000..03ad18708 --- /dev/null +++ b/apps/chat2mix/chat2mix.nim @@ -0,0 +1,704 @@ +## chat2 is an example of usage of Waku v2. For suggested usage options, please +## see dingpu tutorial in docs folder. + +when not (compileOption("threads")): + {.fatal: "Please, compile this program with the --threads:on option!".} + +{.push raises: [].} + +import std/[strformat, strutils, times, options, random, sequtils] +import + confutils, + chronicles, + chronos, + eth/keys, + bearssl, + stew/[byteutils, results], + metrics, + metrics/chronos_httpserver +import + libp2p/[ + switch, # manage transports, a single entry point for dialing and listening + crypto/crypto, # cryptographic functions + stream/connection, # create and close stream read / write connections + multiaddress, + # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP + peerinfo, + # manage the information of a peer, such as peer ID and public / private key + peerid, # Implement how peers interact + protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs + nameresolving/dnsresolver, + ] # define DNS resolution +import mix/curve25519 +import + waku/[ + waku_core, + waku_lightpush_legacy/common, + waku_lightpush_legacy/rpc, + waku_enr, + discovery/waku_dnsdisc, + waku_store_legacy, + waku_node, + node/waku_metrics, + node/peer_manager, + factory/builder, + common/utils/nat, + waku_relay, + waku_store/common, + waku_filter_v2/client, + ], + ./config_chat2mix + +import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub +import ../../waku/waku_rln_relay + +logScope: + topics = "chat2 mix" + +const Help = + """ + Commands: /[?|help|connect|nick|exit] + help: Prints this help + connect: dials a remote peer + nick: change nickname for current chat session + exit: exits chat session +""" + +# XXX Connected is a bit annoying, because incoming connections don't trigger state change +# Could poll connection pool or something here, I suppose +# TODO Ensure connected turns true on incoming connections, or get rid of it +type Chat = ref object + node: WakuNode # waku node for publishing, subscribing, etc + transp: StreamTransport # transport streams between read & write file descriptor + subscribed: bool # indicates if a node is subscribed or not to a topic + connected: bool # if the node is connected to another peer + started: bool # if the node has started + nick: string # nickname for this chat session + prompt: bool # chat prompt is showing + contentTopic: string # default content topic for chat messages + conf: Chat2Conf # configuration for chat2 + +type + PrivateKey* = crypto.PrivateKey + Topic* = waku_core.PubsubTopic + +##################### +## chat2 protobufs ## +##################### + +type + SelectResult*[T] = Result[T, string] + + Chat2Message* = object + timestamp*: int64 + nick*: string + payload*: seq[byte] + +proc getPubsubTopic*( + conf: Chat2Conf, node: WakuNode, contentTopic: string +): PubsubTopic = + let shard = node.wakuSharding.getShard(contentTopic).valueOr: + echo "Could not parse content topic: " & error + return "" #TODO: fix this. + return $RelayShard(clusterId: conf.clusterId, shardId: shard.shardId) + +proc init*(T: type Chat2Message, buffer: seq[byte]): ProtoResult[T] = + var msg = Chat2Message() + let pb = initProtoBuffer(buffer) + + var timestamp: uint64 + discard ?pb.getField(1, timestamp) + msg.timestamp = int64(timestamp) + + discard ?pb.getField(2, msg.nick) + discard ?pb.getField(3, msg.payload) + + ok(msg) + +proc encode*(message: Chat2Message): ProtoBuffer = + var serialised = initProtoBuffer() + + serialised.write(1, uint64(message.timestamp)) + serialised.write(2, message.nick) + serialised.write(3, message.payload) + + return serialised + +proc toString*(message: Chat2Message): string = + # Get message date and timestamp in local time + let time = message.timestamp.fromUnix().local().format("'<'MMM' 'dd,' 'HH:mm'>'") + + return time & " " & message.nick & ": " & string.fromBytes(message.payload) + +##################### + +proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} = + echo "Connecting to nodes" + await c.node.connectToNodes(nodes) + c.connected = true + +proc showChatPrompt(c: Chat) = + if not c.prompt: + try: + stdout.write(">> ") + stdout.flushFile() + c.prompt = true + except IOError: + discard + +proc getChatLine(c: Chat, msg: WakuMessage): Result[string, string] = + # No payload encoding/encryption from Waku + let + pb = Chat2Message.init(msg.payload) + chatLine = + if pb.isOk: + pb[].toString() + else: + string.fromBytes(msg.payload) + return ok(chatline) + +proc printReceivedMessage(c: Chat, msg: WakuMessage) = + let + pb = Chat2Message.init(msg.payload) + chatLine = + if pb.isOk: + pb[].toString() + else: + string.fromBytes(msg.payload) + try: + echo &"{chatLine}" + except ValueError: + # Formatting fail. Print chat line in any case. + echo chatLine + + c.prompt = false + showChatPrompt(c) + trace "Printing message", chatLine, contentTopic = msg.contentTopic + +proc readNick(transp: StreamTransport): Future[string] {.async.} = + # Chat prompt + stdout.write("Choose a nickname >> ") + stdout.flushFile() + return await transp.readLine() + +proc startMetricsServer( + serverIp: IpAddress, serverPort: Port +): Result[MetricsHttpServerRef, string] = + info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort + + let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort) + if metricsServerRes.isErr(): + return err("metrics HTTP server start failed: " & $metricsServerRes.error) + + let server = metricsServerRes.value + try: + waitFor server.start() + except CatchableError: + return err("metrics HTTP server start failed: " & getCurrentExceptionMsg()) + + info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort + ok(metricsServerRes.value) + +proc publish(c: Chat, line: string) = + # First create a Chat2Message protobuf with this line of text + let time = getTime().toUnix() + let chat2pb = + Chat2Message(timestamp: time, nick: c.nick, payload: line.toBytes()).encode() + + ## @TODO: error handling on failure + proc handler(response: PushResponse) {.gcsafe, closure.} = + trace "lightpush response received", response = response + + var message = WakuMessage( + payload: chat2pb.buffer, + contentTopic: c.contentTopic, + version: 0, + timestamp: getNanosecondTime(time), + ) + + try: + if not c.node.wakuLightpushClient.isNil(): + # Attempt lightpush with mix + #( + discard c.node.lightpushPublish( + some(c.conf.getPubsubTopic(c.node, c.contentTopic)), + message, + none(RemotePeerInfo), + true, + ) #TODO: Not waiting for response, have to change once SURB is implmented + #).isOkOr: + # error "failed to publish lightpush message", error = error + else: + error "failed to publish message as lightpush client is not initialized" + except CatchableError: + error "caught error publishing message: ", error = getCurrentExceptionMsg() + +# TODO This should read or be subscribe handler subscribe +proc readAndPrint(c: Chat) {.async.} = + while true: + # while p.connected: + # # TODO: echo &"{p.id} -> " + # + # echo cast[string](await p.conn.readLp(1024)) + #echo "readAndPrint subscribe NYI" + await sleepAsync(100) + +# TODO Implement +proc writeAndPrint(c: Chat) {.async.} = + while true: + # Connect state not updated on incoming WakuRelay connections + # if not c.connected: + # echo "type an address or wait for a connection:" + # echo "type /[help|?] for help" + + # Chat prompt + showChatPrompt(c) + + let line = await c.transp.readLine() + if line.startsWith("/help") or line.startsWith("/?") or not c.started: + echo Help + continue + + # if line.startsWith("/disconnect"): + # echo "Ending current session" + # if p.connected and p.conn.closed.not: + # await p.conn.close() + # p.connected = false + elif line.startsWith("/connect"): + # TODO Should be able to connect to multiple peers for Waku chat + if c.connected: + echo "already connected to at least one peer" + continue + + echo "enter address of remote peer" + let address = await c.transp.readLine() + if address.len > 0: + await c.connectToNodes(@[address]) + elif line.startsWith("/nick"): + # Set a new nickname + c.nick = await readNick(c.transp) + echo "You are now known as " & c.nick + elif line.startsWith("/exit"): + echo "quitting..." + + try: + await c.node.stop() + except: + echo "exception happened when stopping: " & getCurrentExceptionMsg() + + quit(QuitSuccess) + else: + # XXX connected state problematic + if c.started: + echo "publishing message: " & line + c.publish(line) + # TODO Connect to peer logic? + else: + try: + if line.startsWith("/") and "p2p" in line: + await c.connectToNodes(@[line]) + except: + echo &"unable to dial remote peer {line}" + echo getCurrentExceptionMsg() + +proc readWriteLoop(c: Chat) {.async.} = + asyncSpawn c.writeAndPrint() # execute the async function but does not block + asyncSpawn c.readAndPrint() + +proc readInput(wfd: AsyncFD) {.thread, raises: [Defect, CatchableError].} = + ## This procedure performs reading from `stdin` and sends data over + ## pipe to main thread. + let transp = fromPipe(wfd) + + while true: + let line = stdin.readLine() + discard waitFor transp.write(line & "\r\n") + +var alreadyUsedServicePeers {.threadvar.}: seq[RemotePeerInfo] + +proc selectRandomServicePeer*( + pm: PeerManager, actualPeer: Option[RemotePeerInfo], codec: string +): Result[RemotePeerInfo, void] = + if actualPeer.isSome(): + alreadyUsedServicePeers.add(actualPeer.get()) + + let supportivePeers = pm.switch.peerStore.getPeersByProtocol(codec).filterIt( + it notin alreadyUsedServicePeers + ) + if supportivePeers.len == 0: + return err() + + let rndPeerIndex = rand(0 .. supportivePeers.len - 1) + return ok(supportivePeers[rndPeerIndex]) + +proc maintainSubscription( + wakuNode: WakuNode, + filterPubsubTopic: PubsubTopic, + filterContentTopic: ContentTopic, + filterPeer: RemotePeerInfo, + preventPeerSwitch: bool, +) {.async.} = + var actualFilterPeer = filterPeer + const maxFailedSubscribes = 3 + const maxFailedServiceNodeSwitches = 10 + var noFailedSubscribes = 0 + var noFailedServiceNodeSwitches = 0 + while true: + info "maintaining subscription at", peer = constructMultiaddrStr(actualFilterPeer) + # First use filter-ping to check if we have an active subscription + let pingRes = await wakuNode.wakuFilterClient.ping(actualFilterPeer) + if pingRes.isErr(): + # No subscription found. Let's subscribe. + error "ping failed.", err = pingRes.error + trace "no subscription found. Sending subscribe request" + + let subscribeRes = await wakuNode.filterSubscribe( + some(filterPubsubTopic), filterContentTopic, actualFilterPeer + ) + + if subscribeRes.isErr(): + noFailedSubscribes += 1 + error "Subscribe request failed.", + err = subscribeRes.error, + peer = actualFilterPeer, + failCount = noFailedSubscribes + + # TODO: disconnet from failed actualFilterPeer + # asyncSpawn(wakuNode.peerManager.switch.disconnect(p)) + # wakunode.peerManager.peerStore.delete(actualFilterPeer) + + if noFailedSubscribes < maxFailedSubscribes: + await sleepAsync(2000) # Wait a bit before retrying + continue + elif not preventPeerSwitch: + let peerOpt = selectRandomServicePeer( + wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec + ) + if peerOpt.isOk(): + actualFilterPeer = peerOpt.get() + + info "Found new peer for codec", + codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer) + + noFailedSubscribes = 0 + continue # try again with new peer without delay + else: + error "Failed to find new service peer. Exiting." + noFailedServiceNodeSwitches += 1 + break + else: + if noFailedSubscribes > 0: + noFailedSubscribes -= 1 + + notice "subscribe request successful." + else: + info "subscription is live." + + await sleepAsync(30000) # Subscription maintenance interval + +proc processMixNodes(localnode: WakuNode, nodes: seq[string]) {.async.} = + if nodes.len == 0: + return + + info "Processing mix nodes: ", nodes = $nodes + for node in nodes: + var enrRec: enr.Record + if enrRec.fromURI(node): + let peerInfo = enrRec.toRemotePeerInfo().valueOr: + error "Failed to parse mix node", error = error + continue + localnode.peermanager.addPeer(peerInfo, Discv5) + info "Added mix node", peer = peerInfo + else: + error "Failed to parse mix node ENR", node = node + +{.pop.} + # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError +proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = + let + transp = fromPipe(rfd) + conf = Chat2Conf.load() + nodekey = + if conf.nodekey.isSome(): + conf.nodekey.get() + else: + PrivateKey.random(Secp256k1, rng[]).tryGet() + + # set log level + if conf.logLevel != LogLevel.NONE: + setLogLevel(conf.logLevel) + + let natRes = setupNat( + conf.nat, + clientId, + Port(uint16(conf.tcpPort) + conf.portsShift), + Port(uint16(conf.udpPort) + conf.portsShift), + ) + + if natRes.isErr(): + raise newException(ValueError, "setupNat error " & natRes.error) + + let (extIp, extTcpPort, extUdpPort) = natRes.get() + + var enrBuilder = EnrBuilder.init(nodeKey) + + enrBuilder.withWakuRelaySharding( + RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) + ).isOkOr: + error "failed to add sharded topics to ENR", error = error + quit(QuitFailure) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + let node = block: + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + + builder + .withNetworkConfigurationDetails( + conf.listenAddress, + Port(uint16(conf.tcpPort) + conf.portsShift), + extIp, + extTcpPort, + wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), + wsEnabled = conf.websocketSupport, + wssEnabled = conf.websocketSecureSupport, + ) + .tryGet() + builder.build().tryGet() + + node.mountSharding(conf.clusterId, conf.numShardsInNetwork).isOkOr: + error "failed to mount waku sharding: ", error = error + quit(QuitFailure) + node.mountMetadata(conf.clusterId).isOkOr: + error "failed to mount waku metadata protocol: ", err = error + quit(QuitFailure) + + try: + await node.mountPeerExchange() + except CatchableError: + error "failed to mount waku peer-exchange protocol", + error = getCurrentExceptionMsg() + quit(QuitFailure) + + let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr: + error "failed to generate mix key pair", error = error + return + + (await node.mountMix(conf.clusterId, mixPrivKey)).isOkOr: + error "failed to mount waku mix protocol: ", error = $error + quit(QuitFailure) + if conf.mixnodes.len > 0: + await processMixNodes(node, conf.mixnodes) + await node.start() + + node.peerManager.start() + + #[ if conf.rlnRelayCredPath == "": + raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed") + + if conf.relay: + let shards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + (await node.mountRelay()).isOkOr: + echo "failed to mount relay: " & error + return + ]# + await node.mountLibp2pPing() + let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic) + echo "pubsub topic is: " & pubsubTopic + let nick = await readNick(transp) + echo "Welcome, " & nick & "!" + + var chat = Chat( + node: node, + transp: transp, + subscribed: true, + connected: false, + started: true, + nick: nick, + prompt: false, + contentTopic: conf.contentTopic, + conf: conf, + ) + + var dnsDiscoveryUrl = none(string) + + if conf.fleet != Fleet.none: + # Use DNS discovery to connect to selected fleet + echo "Connecting to " & $conf.fleet & " fleet using DNS discovery..." + + if conf.fleet == Fleet.test: + dnsDiscoveryUrl = some( + "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im" + ) + else: + # Connect to sandbox by default + dnsDiscoveryUrl = some( + "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" + ) + elif conf.dnsDiscoveryUrl != "": + # No pre-selected fleet. Discover nodes via DNS using user config + debug "Discovering nodes using Waku DNS discovery", url = conf.dnsDiscoveryUrl + dnsDiscoveryUrl = some(conf.dnsDiscoveryUrl) + + var discoveredNodes: seq[RemotePeerInfo] + + if dnsDiscoveryUrl.isSome: + var nameServers: seq[TransportAddress] + for ip in conf.dnsDiscoveryNameServers: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + let dnsResolver = DnsResolver.new(nameServers) + + proc resolver(domain: string): Future[string] {.async, gcsafe.} = + trace "resolving", domain = domain + let resolved = await dnsResolver.resolveTxt(domain) + return resolved[0] # Use only first answer + + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) + if wakuDnsDiscovery.isOk: + let discoveredPeers = await wakuDnsDiscovery.get().findPeers() + if discoveredPeers.isOk: + info "Connecting to discovered peers" + discoveredNodes = discoveredPeers.get() + echo "Discovered and connecting to " & $discoveredNodes + waitFor chat.node.connectToNodes(discoveredNodes) + else: + warn "Failed to init Waku DNS discovery" + + let peerInfo = node.switch.peerInfo + let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId + echo &"Listening on\n {listenStr}" + + if (conf.storenode != "") or (conf.store == true): + await node.mountStore() + + var storenode: Option[RemotePeerInfo] + + if conf.storenode != "": + let peerInfo = parsePeerInfo(conf.storenode) + if peerInfo.isOk(): + storenode = some(peerInfo.value) + else: + error "Incorrect conf.storenode", error = peerInfo.error + elif discoveredNodes.len > 0: + echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers" + storenode = some(discoveredNodes[rand(0 .. len(discoveredNodes) - 1)]) + + if storenode.isSome(): + # We have a viable storenode. Let's query it for historical messages. + echo "Connecting to storenode: " & $(storenode.get()) + + node.mountStoreClient() + node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec) + + proc storeHandler(response: StoreQueryResponse) {.gcsafe.} = + for msg in response.messages: + let payload = + if msg.message.isSome(): + msg.message.get().payload + else: + newSeq[byte](0) + + let + pb = Chat2Message.init(payload) + chatLine = + if pb.isOk: + pb[].toString() + else: + string.fromBytes(payload) + echo &"{chatLine}" + info "Hit store handler" + + let queryRes = await node.query( + StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get() + ) + if queryRes.isOk(): + storeHandler(queryRes.value) + + if conf.edgemode: #Mount light protocol clients + node.mountLightPushClient() + await node.mountFilterClient() + let filterHandler = proc( + pubsubTopic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, closure.} = + trace "Hit filter handler", contentTopic = msg.contentTopic + chat.printReceivedMessage(msg) + + node.wakuFilterClient.registerPushHandler(filterHandler) + + if conf.serviceNode != "": #TODO: use one of discovered nodes if not present. + let peerInfo = parsePeerInfo(conf.serviceNode) + if peerInfo.isOk(): + #await mountLegacyLightPush(node) + node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec) + node.peerManager.addServicePeer(peerInfo.value, WakuPeerExchangeCodec) + # Start maintaining subscription + asyncSpawn maintainSubscription( + node, pubsubTopic, conf.contentTopic, peerInfo.value, false + ) + else: + error "LightPushClient not mounted. Couldn't parse conf.serviceNode", + error = peerInfo.error + # TODO: Loop faster + node.startPeerExchangeLoop() + + while node.getMixNodePoolSize() < 3: + info "waiting for mix nodes to be discovered", + currentpoolSize = node.getMixNodePoolSize() + await sleepAsync(1000) + notice "ready to publish with mix node pool size ", + currentpoolSize = node.getMixNodePoolSize() + echo "ready to publish messages now" + + if conf.metricsLogging: + startMetricsLog() + + if conf.metricsServer: + let metricsServer = startMetricsServer( + conf.metricsServerAddress, Port(conf.metricsServerPort + conf.portsShift) + ) + + await chat.readWriteLoop() + + runForever() + +proc main(rng: ref HmacDrbgContext) {.async.} = + let (rfd, wfd) = createAsyncPipe() + if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: + raise newException(ValueError, "Could not initialize pipe!") + + var thread: Thread[AsyncFD] + thread.createThread(readInput, wfd) + try: + await processInput(rfd, rng) + # Handle only ConfigurationError for now + # TODO: Throw other errors from the mounting procedure + except ConfigurationError as e: + raise e + +when isMainModule: # isMainModule = true when the module is compiled as the main file + let rng = crypto.newRng() + try: + waitFor(main(rng)) + except CatchableError as e: + raise e + +## Dump of things that can be improved: +## +## - Incoming dialed peer does not change connected state (not relying on it for now) +## - Unclear if staticnode argument works (can enter manually) +## - Don't trigger self / double publish own messages +## - Test/default to cluster node connection (diff protocol version) +## - Redirect logs to separate file +## - Expose basic publish/subscribe etc commands with /syntax +## - Show part of peerid to know who sent message +## - Deal with protobuf messages (e.g. other chat protocol, or encrypted) diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim new file mode 100644 index 000000000..3c0efe291 --- /dev/null +++ b/apps/chat2mix/config_chat2mix.nim @@ -0,0 +1,282 @@ +import chronicles, chronos, std/strutils, regex + +import + eth/keys, + libp2p/crypto/crypto, + libp2p/crypto/secp, + nimcrypto/utils, + confutils, + confutils/defs, + confutils/std/net + +import waku/waku_core + +type + Fleet* = enum + none + prod + test + + EthRpcUrl* = distinct string + + Chat2Conf* = object ## General node config + edgemode* {. + defaultValue: true, desc: "Run the app in edge mode", name: "edge-mode" + .}: bool + + logLevel* {. + desc: "Sets the log level.", defaultValue: LogLevel.INFO, name: "log-level" + .}: LogLevel + + nodekey* {.desc: "P2P node private key as 64 char hex string.", name: "nodekey".}: + Option[crypto.PrivateKey] + + listenAddress* {. + defaultValue: defaultListenAddress(config), + desc: "Listening address for the LibP2P traffic.", + name: "listen-address" + .}: IpAddress + + tcpPort* {.desc: "TCP listening port.", defaultValue: 60000, name: "tcp-port".}: + Port + + udpPort* {.desc: "UDP listening port.", defaultValue: 60000, name: "udp-port".}: + Port + + portsShift* {. + desc: "Add a shift to all port numbers.", defaultValue: 0, name: "ports-shift" + .}: uint16 + + nat* {. + desc: + "Specify method to use for determining public address. " & + "Must be one of: any, none, upnp, pmp, extip:.", + defaultValue: "any" + .}: string + + ## Persistence config + dbPath* {. + desc: "The database path for peristent storage", defaultValue: "", name: "db-path" + .}: string + + persistPeers* {. + desc: "Enable peer persistence: true|false", + defaultValue: false, + name: "persist-peers" + .}: bool + + persistMessages* {. + desc: "Enable message persistence: true|false", + defaultValue: false, + name: "persist-messages" + .}: bool + + ## Relay config + relay* {. + desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay" + .}: bool + + staticnodes* {. + desc: "Peer multiaddr to directly connect with. Argument may be repeated.", + name: "staticnode" + .}: seq[string] + + mixnodes* {. + desc: "Peer ENR to add as a mixnode. Argument may be repeated.", name: "mixnode" + .}: seq[string] + + keepAlive* {. + desc: "Enable keep-alive for idle connections: true|false", + defaultValue: false, + name: "keep-alive" + .}: bool + + clusterId* {. + desc: + "Cluster id that the node is running in. Node in a different cluster id is disconnected.", + defaultValue: 2, + name: "cluster-id" + .}: uint16 + + numShardsInNetwork* {. + desc: "Number of shards in the network", + defaultValue: 1, + name: "num-shards-in-network" + .}: uint32 + + shards* {. + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", + defaultValue: @[uint16(0)], + name: "shard" + .}: seq[uint16] + + ## Store config + store* {. + desc: "Enable store protocol: true|false", defaultValue: false, name: "store" + .}: bool + + storenode* {. + desc: "Peer multiaddr to query for storage.", defaultValue: "", name: "storenode" + .}: string + + ## Filter config + filter* {. + desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter" + .}: bool + + ## Lightpush config + lightpush* {. + desc: "Enable lightpush protocol: true|false", + defaultValue: false, + name: "lightpush" + .}: bool + + servicenode* {. + desc: "Peer multiaddr to request lightpush and filter services", + defaultValue: "", + name: "servicenode" + .}: string + + ## Metrics config + metricsServer* {. + desc: "Enable the metrics server: true|false", + defaultValue: false, + name: "metrics-server" + .}: bool + + metricsServerAddress* {. + desc: "Listening address of the metrics server.", + defaultValue: parseIpAddress("127.0.0.1"), + name: "metrics-server-address" + .}: IpAddress + + metricsServerPort* {. + desc: "Listening HTTP port of the metrics server.", + defaultValue: 8008, + name: "metrics-server-port" + .}: uint16 + + metricsLogging* {. + desc: "Enable metrics logging: true|false", + defaultValue: true, + name: "metrics-logging" + .}: bool + + ## DNS discovery config + dnsDiscovery* {. + desc: + "Deprecated, please set dns-discovery-url instead. Enable discovering nodes via DNS", + defaultValue: false, + name: "dns-discovery" + .}: bool + + dnsDiscoveryUrl* {. + desc: "URL for DNS node list in format 'enrtree://@'", + defaultValue: "", + name: "dns-discovery-url" + .}: string + + dnsDiscoveryNameServers* {. + desc: "DNS name server IPs to query. Argument may be repeated.", + defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], + name: "dns-discovery-name-server" + .}: seq[IpAddress] + + ## Chat2 configuration + fleet* {. + desc: + "Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.", + defaultValue: Fleet.none, + name: "fleet" + .}: Fleet + + contentTopic* {. + desc: "Content topic for chat messages.", + defaultValue: "/toy-chat-mix/2/huilong/proto", + name: "content-topic" + .}: string + + ## Websocket Configuration + websocketSupport* {. + desc: "Enable websocket: true|false", + defaultValue: false, + name: "websocket-support" + .}: bool + + websocketPort* {. + desc: "WebSocket listening port.", defaultValue: 8000, name: "websocket-port" + .}: Port + + websocketSecureSupport* {. + desc: "WebSocket Secure Support.", + defaultValue: false, + name: "websocket-secure-support" + .}: bool ## rln-relay configuration + +# NOTE: Keys are different in nim-libp2p +proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T = + try: + let key = SkPrivateKey.init(utils.fromHex(p)).tryGet() + # XXX: Here at the moment + result = crypto.PrivateKey(scheme: Secp256k1, skkey: key) + except CatchableError as e: + raise newException(ValueError, "Invalid private key") + +proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type IpAddress, p: string): T = + try: + result = parseIpAddress(p) + except CatchableError as e: + raise newException(ValueError, "Invalid IP address") + +proc completeCmdArg*(T: type IpAddress, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type Port, p: string): T = + try: + result = Port(parseInt(p)) + except CatchableError as e: + raise newException(ValueError, "Invalid Port number") + +proc completeCmdArg*(T: type Port, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type Option[uint], p: string): T = + try: + some(parseUint(p)) + except CatchableError: + raise newException(ValueError, "Invalid unsigned integer") + +proc completeCmdArg*(T: type EthRpcUrl, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type EthRpcUrl, s: string): T = + ## allowed patterns: + ## http://url:port + ## https://url:port + ## http://url:port/path + ## https://url:port/path + ## http://url/with/path + ## http://url:port/path?query + ## https://url:port/path?query + ## disallowed patterns: + ## any valid/invalid ws or wss url + var httpPattern = + re2"^(https?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*" + var wsPattern = + re2"^(wss?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*" + if regex.match(s, wsPattern): + raise newException( + ValueError, "Websocket RPC URL is not supported, Please use an HTTP URL" + ) + if not regex.match(s, httpPattern): + raise newException(ValueError, "Invalid HTTP RPC URL") + return EthRpcUrl(s) + +func defaultListenAddress*(conf: Chat2Conf): IpAddress = + # TODO: How should we select between IPv4 and IPv6 + # Maybe there should be a config option for this. + (static parseIpAddress("0.0.0.0")) diff --git a/apps/chat2mix/nim.cfg b/apps/chat2mix/nim.cfg new file mode 100644 index 000000000..2231f2ebe --- /dev/null +++ b/apps/chat2mix/nim.cfg @@ -0,0 +1,4 @@ +-d:chronicles_line_numbers +-d:chronicles_runtime_filtering:on +-d:discv5_protocol_id:d5waku +path = "../.." diff --git a/simulations/README.md b/simulations/README.md new file mode 100644 index 000000000..c035fc96d --- /dev/null +++ b/simulations/README.md @@ -0,0 +1,4 @@ +# Purpose + +This is a place where any simulation related scripts and utilities can be stored. +Checkout mixnet folder to get an idea. diff --git a/simulations/mixnet/README.md b/simulations/mixnet/README.md new file mode 100644 index 000000000..fcc67b6e1 --- /dev/null +++ b/simulations/mixnet/README.md @@ -0,0 +1,70 @@ +# Mixnet simulation + +## Aim + +Simulate a local mixnet along with a chat app to publish using mix. +This is helpful to test any changes while development. +It includes scripts that run a `4 node` mixnet along with a lightpush service node(without mix) that can be used to test quickly. + +## Simulation Details + +Note that before running the simulation both `wakunode2` and `chat2mix` have to be built. + +```bash +cd +make wakunode2 +make chat2mix +``` + +Simulation includes scripts for: + +1. a 4 waku-node mixnet where `node1` is bootstrap node for the other 3 nodes. +2. scripts to run chat app that publishes using lightpush protocol over the mixnet + +## Usage + +Start the service node with below command, which acts as bootstrap node for all other mix nodes. + +`./run_lp_service_node.sh` + +To run the nodes for mixnet run the 4 node scripts in different terminals as below. + +`./run_mix_node1.sh` + +Look for following 2 log lines to ensure node ran successfully and has also mounted mix protocol. + +```log +INF 2025-08-01 14:51:05.445+05:30 mounting mix protocol topics="waku node" tid=39996871 file=waku_node.nim:231 nodeId="(listenAddresses: @[\"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o\"], enrUri: \"enr:-NC4QKYtas8STkenlqBTJ3a1TTLzJA2DsGGbFlnxem9aSM2IXm-CSVZULdk2467bAyFnepnt8KP_QlfDzdaMXd_zqtwBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA6RFtVJVBh0SYOoP8xrgnXSlpiFARmQkF9d8Rn4fSeiog3RjcILqYYN1ZHCCIymFd2FrdTIt\")" + +INF 2025-08-01 14:49:23.467+05:30 Node setup complete topics="wakunode main" tid=39994244 file=wakunode2.nim:104 +``` + +Once all the 4 nodes are up without any issues, run the script to start the chat application. + +`./run_chat_app.sh` + +Enter a nickname to be used. + +```bash +pubsub topic is: /waku/2/rs/2/0 +Choose a nickname >> +``` + +Once you see below log, it means the app is ready for publishing messages over the mixnet. + +```bash +Welcome, test! +Listening on + /ip4/192.168.68.64/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57 +ready to publish messages now +``` + +Follow similar instructions to run second instance of chat app. +Once both the apps run successfully, send a message and check if it is received by the other app. + +You can exit the chat apps by entering `/exit` as below + +```bash +>> /exit +quitting... +``` diff --git a/simulations/mixnet/config.toml b/simulations/mixnet/config.toml new file mode 100644 index 000000000..cf6c65688 --- /dev/null +++ b/simulations/mixnet/config.toml @@ -0,0 +1,25 @@ +log-level = "INFO" +relay = true +#mix = true +filter = true +store = false +lightpush = true +max-connections = 150 +peer-exchange = true +metrics-logging = false +cluster-id = 2 +discv5-discovery = true +discv5-udp-port = 9000 +discv5-enr-auto-update = true +rest = true +rest-admin = true +ports-shift = 1 +num-shards-in-network = 1 +shard = [0] +agent-string = "nwaku-mix" +nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a" +mixkey = "a87db88246ec0eedda347b9b643864bee3d6933eb15ba41e6d58cb678d813258" +rendezvous = true +listen-address = "127.0.0.1" +nat = "extip:127.0.0.1" +ip-colocation-limit=0 diff --git a/simulations/mixnet/config1.toml b/simulations/mixnet/config1.toml new file mode 100644 index 000000000..d85925209 --- /dev/null +++ b/simulations/mixnet/config1.toml @@ -0,0 +1,27 @@ +log-level = "INFO" +relay = true +mix = true +filter = true +store = false +lightpush = true +max-connections = 150 +peer-exchange = true +metrics-logging = false +cluster-id = 2 +discv5-discovery = true +discv5-udp-port = 9001 +discv5-enr-auto-update = true +discv5-bootstrap-node = ["enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ"] +rest = true +rest-admin = true +ports-shift = 2 +num-shards-in-network = 1 +shard = [0] +agent-string = "nwaku-mix" +nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684" +mixkey = "c86029e02c05a7e25182974b519d0d52fcbafeca6fe191fbb64857fb05be1a53" +rendezvous = true +listen-address = "127.0.0.1" +nat = "extip:127.0.0.1" +ip-colocation-limit=0 +#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config2.toml b/simulations/mixnet/config2.toml new file mode 100644 index 000000000..94174be83 --- /dev/null +++ b/simulations/mixnet/config2.toml @@ -0,0 +1,27 @@ +log-level = "INFO" +relay = true +mix = true +filter = true +store = false +lightpush = true +max-connections = 150 +peer-exchange = true +metrics-logging = false +cluster-id = 2 +discv5-discovery = true +discv5-udp-port = 9002 +discv5-enr-auto-update = true +discv5-bootstrap-node = ["enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ"] +rest = false +rest-admin = false +ports-shift = 3 +num-shards-in-network = 1 +shard = [0] +agent-string = "nwaku-mix" +nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e" +mixkey = "b858ac16bbb551c4b2973313b1c8c8f7ea469fca03f1608d200bbf58d388ec7f" +rendezvous = true +listen-address = "127.0.0.1" +nat = "extip:127.0.0.1" +ip-colocation-limit=0 +#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config3.toml b/simulations/mixnet/config3.toml new file mode 100644 index 000000000..6b81942b9 --- /dev/null +++ b/simulations/mixnet/config3.toml @@ -0,0 +1,27 @@ +log-level = "INFO" +relay = true +mix = true +filter = true +store = false +lightpush = true +max-connections = 150 +peer-exchange = true +metrics-logging = false +cluster-id = 2 +discv5-discovery = true +discv5-udp-port = 9003 +discv5-enr-auto-update = true +discv5-bootstrap-node = ["enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ"] +rest = false +rest-admin = false +ports-shift = 4 +num-shards-in-network = 1 +shard = [0] +agent-string = "nwaku-mix" +nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6" +mixkey = "d8bd379bb394b0f22dd236d63af9f1a9bc45266beffc3fbbe19e8b6575f2535b" +rendezvous = true +listen-address = "127.0.0.1" +nat = "extip:127.0.0.1" +ip-colocation-limit=0 +#staticnode = ["/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config4.toml b/simulations/mixnet/config4.toml new file mode 100644 index 000000000..2a33f3f2d --- /dev/null +++ b/simulations/mixnet/config4.toml @@ -0,0 +1,27 @@ +log-level = "DEBUG" +relay = true +mix = true +filter = true +store = false +lightpush = true +max-connections = 150 +peer-exchange = true +metrics-logging = false +cluster-id = 2 +discv5-discovery = true +discv5-udp-port = 9004 +discv5-enr-auto-update = true +discv5-bootstrap-node = ["enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ"] +rest = false +rest-admin = false +ports-shift = 5 +num-shards-in-network = 1 +shard = [0] +agent-string = "nwaku-mix" +#nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4" +mixkey = "780fff09e51e98df574e266bf3266ec6a3a1ddfcf7da826a349a29c137009d49" +rendezvous = true +listen-address = "127.0.0.1" +nat = "extip:127.0.0.1" +ip-colocation-limit=0 +#staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF"] diff --git a/simulations/mixnet/run_chat_mix.sh b/simulations/mixnet/run_chat_mix.sh new file mode 100755 index 000000000..e1aceaf42 --- /dev/null +++ b/simulations/mixnet/run_chat_mix.sh @@ -0,0 +1 @@ +../../build/chat2mix --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ" --mixnode="enr:-NC4QH3HbfXxl0emm33s-6ovpu4VEA959XDSMU7rQOMfS8w6U9WB39Y25Z_ZOcgegg2SQBoGsX4kwHpNdsd-ZWzuGasBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCSMehtpkMlApAKhPhnAEznhjKrUs2OMLHsMizXlXEMKoptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA3pHyDDU-7wjAyVNqnT8Tu9V3XBxoWCD9VRpp5VZJlhUg3RjcILqYoN1ZHCCIyuFd2FrdTIt" --mixnode="enr:-Nq4QMAtUyBnD7j_o7qsXLuKWn2fxvSOC0EciyK91qSQyqCib_bPxRqYbjZLNh0YOSI0t0xN23Kp46OctZtzRhw_hxsCgmlkgnY0gmlwhH8AAAGHbWl4LWtleaAnXNaInh8pykjlue24ANGpT0nxPTk6Ds8aB691NQbebIptdWx0aWFkZHJzigAIBMCoREAG6mOCcnOFAAIBAACJc2VjcDI1NmsxoQPYhmrbTqylbdenVfvO2U0w6EC4A-l5lwvu3QWL7IqkO4N0Y3CC6mODdWRwgiMthXdha3UyLQ" --mixnode="enr:-Nq4QHo7054AuNsTZde5A5GcklDZmcrumkd32BBW3UUlLh7lBuYstu8dmClolil8g3nDQBqwU_B5-iEfVS1UVxRWoVoDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaDg7VlKjVBmgb4HXo4jcjR4OI-xgkd_ekaTCaJecHb8GIptdWx0aWFkZHJzigAIBMCoREAG6mSCcnOFAAIBAACJc2VjcDI1NmsxoQOnphVC3U5zmOCkjOI2tY0v8K5QkXSaE5xO37q3iFfKGIN0Y3CC6mSDdWRwgiMvhXdha3UyLQ" diff --git a/simulations/mixnet/run_chat_mix1.sh b/simulations/mixnet/run_chat_mix1.sh new file mode 100755 index 000000000..dfd65d2f3 --- /dev/null +++ b/simulations/mixnet/run_chat_mix1.sh @@ -0,0 +1 @@ +../../build/chat2mix --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="enr:-Nq4QHkkKSYWg12nFHlB-Yjb53I922uquTL26R7G6Kk7Z0gNJx2N4qCt_fq0RFg-_ZDX5ju8SatMjNYyImPprl37utMEgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzigAIBMCoAWkG6mGCcnOFAAIBAACJc2VjcDI1NmsxoQOkRbVSVQYdEmDqD_Ma4J10paYhQEZkJBfXfEZ-H0noqIN0Y3CC6mGDdWRwgiMphXdha3UyLQ" --mixnode="enr:-NC4QH3HbfXxl0emm33s-6ovpu4VEA959XDSMU7rQOMfS8w6U9WB39Y25Z_ZOcgegg2SQBoGsX4kwHpNdsd-ZWzuGasBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCSMehtpkMlApAKhPhnAEznhjKrUs2OMLHsMizXlXEMKoptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA3pHyDDU-7wjAyVNqnT8Tu9V3XBxoWCD9VRpp5VZJlhUg3RjcILqYoN1ZHCCIyuFd2FrdTIt" --mixnode="enr:-Nq4QMAtUyBnD7j_o7qsXLuKWn2fxvSOC0EciyK91qSQyqCib_bPxRqYbjZLNh0YOSI0t0xN23Kp46OctZtzRhw_hxsCgmlkgnY0gmlwhH8AAAGHbWl4LWtleaAnXNaInh8pykjlue24ANGpT0nxPTk6Ds8aB691NQbebIptdWx0aWFkZHJzigAIBMCoREAG6mOCcnOFAAIBAACJc2VjcDI1NmsxoQPYhmrbTqylbdenVfvO2U0w6EC4A-l5lwvu3QWL7IqkO4N0Y3CC6mODdWRwgiMthXdha3UyLQ" --mixnode="enr:-Nq4QHo7054AuNsTZde5A5GcklDZmcrumkd32BBW3UUlLh7lBuYstu8dmClolil8g3nDQBqwU_B5-iEfVS1UVxRWoVoDgmlkgnY0gmlwhH8AAAGHbWl4LWtleaDg7VlKjVBmgb4HXo4jcjR4OI-xgkd_ekaTCaJecHb8GIptdWx0aWFkZHJzigAIBMCoREAG6mSCcnOFAAIBAACJc2VjcDI1NmsxoQOnphVC3U5zmOCkjOI2tY0v8K5QkXSaE5xO37q3iFfKGIN0Y3CC6mSDdWRwgiMvhXdha3UyLQ" --ports-shift=1 diff --git a/simulations/mixnet/run_lp_service_node.sh b/simulations/mixnet/run_lp_service_node.sh new file mode 100755 index 000000000..1d005796e --- /dev/null +++ b/simulations/mixnet/run_lp_service_node.sh @@ -0,0 +1 @@ +../../build/wakunode2 --config-file="config.toml" diff --git a/simulations/mixnet/run_mix_node1.sh b/simulations/mixnet/run_mix_node1.sh new file mode 100755 index 000000000..024eb3f99 --- /dev/null +++ b/simulations/mixnet/run_mix_node1.sh @@ -0,0 +1 @@ +../../build/wakunode2 --config-file="config1.toml" diff --git a/simulations/mixnet/run_mix_node2.sh b/simulations/mixnet/run_mix_node2.sh new file mode 100755 index 000000000..e55a9bac8 --- /dev/null +++ b/simulations/mixnet/run_mix_node2.sh @@ -0,0 +1 @@ +../../build/wakunode2 --config-file="config2.toml" diff --git a/simulations/mixnet/run_mix_node3.sh b/simulations/mixnet/run_mix_node3.sh new file mode 100755 index 000000000..dca8119a3 --- /dev/null +++ b/simulations/mixnet/run_mix_node3.sh @@ -0,0 +1 @@ +../../build/wakunode2 --config-file="config3.toml" diff --git a/simulations/mixnet/run_mix_node4.sh b/simulations/mixnet/run_mix_node4.sh new file mode 100755 index 000000000..9cf25158b --- /dev/null +++ b/simulations/mixnet/run_mix_node4.sh @@ -0,0 +1 @@ +../../build/wakunode2 --config-file="config4.toml" diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index cd60b254a..4c0d11c8b 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit cd60b254a0700b0daac7a6cb2c0c48860b57c539 +Subproject commit 4c0d11c8b59dca7076e8e3acc4714d56a5d67c6e diff --git a/waku.nimble b/waku.nimble index 3cb1a5e16..b68626a4e 100644 --- a/waku.nimble +++ b/waku.nimble @@ -149,6 +149,14 @@ task chat2, "Build example Waku chat usage": let name = "chat2" buildBinary name, "apps/chat2/", "-d:chronicles_sinks=textlines[file] -d:ssl" +task chat2mix, "Build example Waku chat mix usage": + # NOTE For debugging, set debug level. For chat usage we want minimal log + # output to STDOUT. Can be fixed by redirecting logs to file (e.g.) + #buildBinary name, "examples/", "-d:chronicles_log_level=WARN" + + let name = "chat2mix" + buildBinary name, "apps/chat2mix/", "-d:chronicles_sinks=textlines[file] -d:ssl" + task chat2bridge, "Build chat2bridge": let name = "chat2bridge" buildBinary name, "apps/chat2bridge/" diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index b998d585e..387618dff 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -27,7 +27,8 @@ import mix/mix_protocol, mix/curve25519, mix/protocol, - mix/mix_metrics + mix/mix_metrics, + mix/entry_connection import ../waku_core, @@ -1182,17 +1183,36 @@ proc lightpushPublishHandler( pubsubTopic: PubsubTopic, message: WakuMessage, peer: RemotePeerInfo | PeerInfo, + mixify: bool = false, ): Future[lightpush_protocol.WakuLightPushResult] {.async.} = let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() + if not node.wakuLightpushClient.isNil(): notice "publishing message with lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, - msg_hash = msgHash - return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer) + msg_hash = msgHash, + mixify = mixify + if mixify: + let conn = MixEntryConnection.newConn( + $peer.addrs[0], #TODO: How to handle multiple addresses? + peer.peerId, + ProtocolType.fromString(WakuLightPushCodec), + node.mix, + ) + return await node.wakuLightpushClient.publishWithConn( + pubsubTopic, message, conn, peer.peerId + ) + else: + return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer) if not node.wakuLightPush.isNil(): + if mixify: + error "mixify is not supported with self hosted lightpush" + return lighpushErrorResult( + SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available" + ) notice "publishing message with self hosted lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, @@ -1206,11 +1226,16 @@ proc lightpushPublish*( pubsubTopic: Option[PubsubTopic], message: WakuMessage, peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo), + mixify: bool = false, ): Future[lightpush_protocol.WakuLightPushResult] {.async.} = if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): error "failed to publish message as lightpush not available" return lighpushErrorResult(SERVICE_NOT_AVAILABLE, "Waku lightpush not available") - + if mixify and node.mix.isNil(): + error "failed to publish message using mix as mix protocol is not mounted" + return lighpushErrorResult( + SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available" + ) let toPeer: RemotePeerInfo = peerOpt.valueOr: if not node.wakuLightPush.isNil(): RemotePeerInfo.init(node.peerId()) @@ -1233,7 +1258,7 @@ proc lightpushPublish*( error "lightpush publish error", error = msg return lighpushErrorResult(INTERNAL_SERVER_ERROR, msg) - return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer) + return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, mixify) ## Waku RLN Relay proc mountRlnRelay*(