From 4a165f09d36200475d0538e9d39e3cab6baa9706 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 13 Feb 2026 18:49:30 +0530 Subject: [PATCH] integrate RLN spam protection into mix protocol with OffChainGroupManager and logos-messaging for broadcasting membership changes and general coordination with API updates and fixes Co-Authored-By: Claude Opus 4.6 --- .gitignore | 1 + .gitmodules | 5 + apps/chat2mix/chat2mix.nim | 133 +++++++++++++-- apps/chat2mix/config_chat2mix.nim | 7 + config.nims | 3 + simulations/mixnet/README.md | 117 +++++++++---- simulations/mixnet/build_setup.sh | 32 ++++ simulations/mixnet/config2.toml | 2 +- simulations/mixnet/config3.toml | 2 +- simulations/mixnet/config4.toml | 2 +- simulations/mixnet/run_chat_mix.sh | 2 +- simulations/mixnet/run_chat_mix1.sh | 3 +- simulations/mixnet/run_mix_node.sh | 1 + simulations/mixnet/setup_credentials.nim | 139 ++++++++++++++++ vendor/mix-rln-spam-protection-plugin | 1 + waku/node/kernel_api/relay.nim | 11 +- waku/node/waku_mix_coordination.nim | 125 ++++++++++++++ waku/node/waku_node.nim | 43 ++++- waku/waku_mix/protocol.nim | 202 +++++++++++++++++++++-- 19 files changed, 768 insertions(+), 63 deletions(-) create mode 100755 simulations/mixnet/build_setup.sh create mode 100644 simulations/mixnet/setup_credentials.nim create mode 160000 vendor/mix-rln-spam-protection-plugin create mode 100644 waku/node/waku_mix_coordination.nim diff --git a/.gitignore b/.gitignore index 5222a0d5e..6f209cbca 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ node_modules/ # RLN / keystore rlnKeystore.json +rln_keystore*.json *.tar.gz # Nimbus Build System diff --git a/.gitmodules b/.gitmodules index 6a63491e3..19c4eec17 100644 --- a/.gitmodules +++ b/.gitmodules @@ -195,3 +195,8 @@ url = https://github.com/logos-messaging/nim-ffi/ ignore = untracked branch = master +[submodule "vendor/mix-rln-spam-protection-plugin"] + path = vendor/mix-rln-spam-protection-plugin + url = https://github.com/logos-co/mix-rln-spam-protection-plugin.git + ignore = untracked + branch = master diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 558454307..d88325568 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -35,6 +35,7 @@ import import waku/[ waku_core, + waku_core/topics/sharding, waku_lightpush/common, waku_lightpush/rpc, waku_enr, @@ -47,6 +48,8 @@ import common/utils/nat, waku_store/common, waku_filter_v2/client, + waku_filter_v2/common as filter_common, + waku_mix/protocol, common/logging, ], ./config_chat2mix @@ -57,6 +60,105 @@ import ../../waku/waku_rln_relay logScope: topics = "chat2 mix" +######################### +## Mix Spam Protection ## +######################### + +# Forward declaration +proc maintainSpamProtectionSubscription( + node: WakuNode, contentTopics: seq[ContentTopic] +) {.async.} + +proc setupMixSpamProtectionViaFilter(node: WakuNode) {.async.} = + ## Setup filter-based spam protection coordination for mix protocol. + ## Since chat2mix doesn't use relay, we subscribe via filter to receive + ## spam protection coordination messages. + + # Register message handler for spam protection coordination + let spamTopics = node.wakuMix.getSpamProtectionContentTopics() + + proc handleSpamMessage( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, gcsafe.} = + await node.wakuMix.handleMessage(pubsubTopic, message) + + node.wakuFilterClient.registerPushHandler(handleSpamMessage) + + # Wait for filter peer and maintain subscription + asyncSpawn maintainSpamProtectionSubscription(node, spamTopics) + +proc maintainSpamProtectionSubscription( + node: WakuNode, contentTopics: seq[ContentTopic] +) {.async.} = + ## Maintain filter subscription for spam protection topics. + ## Monitors subscription health with periodic pings and re-subscribes on failure. + const RetryInterval = chronos.seconds(5) + const SubscriptionMaintenance = chronos.seconds(30) + const MaxFailedSubscribes = 3 + var currentFilterPeer: Option[RemotePeerInfo] = none(RemotePeerInfo) + var noFailedSubscribes = 0 + + while true: + # Select or reuse filter peer + if currentFilterPeer.isNone(): + let filterPeerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + if filterPeerOpt.isNone(): + debug "No filter peer available yet for spam protection, retrying..." + await sleepAsync(RetryInterval) + continue + currentFilterPeer = some(filterPeerOpt.get()) + info "Selected filter peer for spam protection", + peer = currentFilterPeer.get().peerId + + # Check if subscription is still alive with ping + let pingErr = (await node.wakuFilterClient.ping(currentFilterPeer.get())).errorOr: + # Subscription is alive, wait before next check + await sleepAsync(SubscriptionMaintenance) + if noFailedSubscribes > 0: + noFailedSubscribes = 0 + continue + + # Subscription lost, need to re-subscribe + warn "Spam protection filter subscription ping failed, re-subscribing", + error = pingErr, peer = currentFilterPeer.get().peerId + + # Determine pubsub topic from content topics (using auto-sharding) + if node.wakuAutoSharding.isNone(): + error "Auto-sharding not configured, cannot determine pubsub topic for spam protection" + await sleepAsync(RetryInterval) + continue + + let shardRes = node.wakuAutoSharding.get().getShard(contentTopics[0]) + if shardRes.isErr(): + error "Failed to determine shard for spam protection", error = shardRes.error + await sleepAsync(RetryInterval) + continue + + let shard = shardRes.get() + let pubsubTopic: PubsubTopic = shard # converter toPubsubTopic + + # Subscribe to spam protection topics + let res = await node.wakuFilterClient.subscribe( + currentFilterPeer.get(), pubsubTopic, contentTopics + ) + if res.isErr(): + noFailedSubscribes += 1 + warn "Failed to subscribe to spam protection topics via filter", + error = res.error, topics = contentTopics, failCount = noFailedSubscribes + + if noFailedSubscribes >= MaxFailedSubscribes: + # Try with a different peer + warn "Max subscription failures reached, selecting new filter peer" + currentFilterPeer = none(RemotePeerInfo) + noFailedSubscribes = 0 + + await sleepAsync(RetryInterval) + else: + info "Successfully subscribed to spam protection topics via filter", + topics = contentTopics, peer = currentFilterPeer.get().peerId + noFailedSubscribes = 0 + await sleepAsync(SubscriptionMaintenance) + const Help = """ Commands: /[?|help|connect|nick|exit] @@ -210,20 +312,21 @@ proc publish(c: Chat, line: string) {.async.} = try: if not c.node.wakuLightpushClient.isNil(): # Attempt lightpush with mix - - ( - waitFor c.node.lightpushPublish( - some(c.conf.getPubsubTopic(c.node, c.contentTopic)), - message, - none(RemotePeerInfo), - true, - ) - ).isOkOr: - error "failed to publish lightpush message", error = error + let res = await c.node.lightpushPublish( + some(c.conf.getPubsubTopic(c.node, c.contentTopic)), + message, + none(RemotePeerInfo), + true, + ) + if res.isErr(): + error "failed to publish lightpush message", error = res.error + echo "Error: " & res.error.desc.get("unknown error") else: error "failed to publish message as lightpush client is not initialized" + echo "Error: lightpush client is not initialized" except CatchableError: error "caught error publishing message: ", error = getCurrentExceptionMsg() + echo "Error: " & getCurrentExceptionMsg() # TODO This should read or be subscribe handler subscribe proc readAndPrint(c: Chat) {.async.} = @@ -452,7 +555,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = error "failed to generate mix key pair", error = error return - (await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr: + ( + await node.mountMix( + conf.clusterId, mixPrivKey, conf.mixnodes, some(conf.rlnUserMessageLimit) + ) + ).isOkOr: error "failed to mount waku mix protocol: ", error = $error quit(QuitFailure) @@ -487,6 +594,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = #await node.mountRendezvousClient(conf.clusterId) + # Subscribe to spam protection coordination topics via filter since chat2mix doesn't use relay + if not node.wakuFilterClient.isNil(): + asyncSpawn setupMixSpamProtectionViaFilter(node) + await node.start() node.peerManager.start() diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index 46cd481d7..e17827533 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -237,6 +237,13 @@ type name: "kad-bootstrap-node" .}: seq[string] + ## RLN spam protection config + rlnUserMessageLimit* {. + desc: "Maximum messages per epoch for RLN spam protection.", + defaultValue: 100, + name: "rln-user-message-limit" + .}: int + proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = let elements = p.split(":") if elements.len != 2: diff --git a/config.nims b/config.nims index f74fe183f..895f6e41a 100644 --- a/config.nims +++ b/config.nims @@ -1,5 +1,8 @@ import os +when fileExists("nimbus-build-system.paths"): + include "nimbus-build-system.paths" + if defined(release): switch("nimcache", "nimcache/release/$projectName") else: diff --git a/simulations/mixnet/README.md b/simulations/mixnet/README.md index fcc67b6e1..a70b43e62 100644 --- a/simulations/mixnet/README.md +++ b/simulations/mixnet/README.md @@ -3,66 +3,127 @@ ## Aim Simulate a local mixnet along with a chat app to publish using mix. -This is helpful to test any changes while development. -It includes scripts that run a `4 node` mixnet along with a lightpush service node(without mix) that can be used to test quickly. +This is helpful to test any changes during development. ## Simulation Details -Note that before running the simulation both `wakunode2` and `chat2mix` have to be built. +The simulation includes: + +1. A 5-node mixnet where `run_mix_node.sh` is the bootstrap node for the other 4 nodes +2. Two chat app instances that publish messages using lightpush protocol over the mixnet + +### Available Scripts + +| Script | Description | +|--------|-------------| +| `run_mix_node.sh` | Bootstrap mix node (must be started first) | +| `run_mix_node1.sh` | Mix node 1 | +| `run_mix_node2.sh` | Mix node 2 | +| `run_mix_node3.sh` | Mix node 3 | +| `run_mix_node4.sh` | Mix node 4 | +| `run_chat_mix.sh` | Chat app instance 1 | +| `run_chat_mix1.sh` | Chat app instance 2 | +| `build_setup.sh` | Build and generate RLN credentials | + +## Prerequisites + +Before running the simulation, build `wakunode2` and `chat2mix`: ```bash cd -make wakunode2 -make chat2mix +make wakunode2 chat2mix ``` -Simulation includes scripts for: +## RLN Spam Protection Setup -1. a 4 waku-node mixnet where `node1` is bootstrap node for the other 3 nodes. -2. scripts to run chat app that publishes using lightpush protocol over the mixnet +Before running the simulation, generate RLN credentials and the shared Merkle tree for all nodes: + +```bash +cd simulations/mixnet +./build_setup.sh +``` + +This script will: + +1. Build and run the `setup_credentials` tool +2. Generate RLN credentials for all nodes (5 mix nodes + 2 chat clients) +3. Create `rln_tree.db` - the shared Merkle tree with all members +4. Create keystore files (`rln_keystore_{peerId}.json`) for each node + +**Important:** All scripts must be run from this directory (`simulations/mixnet/`) so they can access their credentials and tree file. + +To regenerate credentials (e.g., after adding new nodes), run `./build_setup.sh` again - it will clean up old files first. ## Usage -Start the service node with below command, which acts as bootstrap node for all other mix nodes. +### Step 1: Start the Mix Nodes -`./run_lp_service_node.sh` +Start the bootstrap node first (in a separate terminal): -To run the nodes for mixnet run the 4 node scripts in different terminals as below. - -`./run_mix_node1.sh` - -Look for following 2 log lines to ensure node ran successfully and has also mounted mix protocol. - -```log -INF 2025-08-01 14:51:05.445+05:30 mounting mix protocol topics="waku node" tid=39996871 file=waku_node.nim:231 nodeId="(listenAddresses: @[\"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o\"], enrUri: \"enr:-NC4QKYtas8STkenlqBTJ3a1TTLzJA2DsGGbFlnxem9aSM2IXm-CSVZULdk2467bAyFnepnt8KP_QlfDzdaMXd_zqtwBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA6RFtVJVBh0SYOoP8xrgnXSlpiFARmQkF9d8Rn4fSeiog3RjcILqYYN1ZHCCIymFd2FrdTIt\")" - -INF 2025-08-01 14:49:23.467+05:30 Node setup complete topics="wakunode main" tid=39994244 file=wakunode2.nim:104 +```bash +./run_mix_node.sh ``` -Once all the 4 nodes are up without any issues, run the script to start the chat application. +Look for the following log lines to ensure the node started successfully: -`./run_chat_app.sh` +```log +INF mounting mix protocol topics="waku node" +INF Node setup complete topics="wakunode main" +``` -Enter a nickname to be used. +Verify RLN spam protection initialized correctly by checking for these logs: + +```log +INF Initializing MixRlnSpamProtection +INF MixRlnSpamProtection initialized, waiting for sync +DBG Tree loaded from file +INF MixRlnSpamProtection started +``` + +Then start the remaining mix nodes in separate terminals: + +```bash +./run_mix_node1.sh +./run_mix_node2.sh +./run_mix_node3.sh +./run_mix_node4.sh +``` + +### Step 2: Start the Chat Applications + +Once all 5 mix nodes are running, start the first chat app: + +```bash +./run_chat_mix.sh +``` + +Enter a nickname when prompted: ```bash pubsub topic is: /waku/2/rs/2/0 Choose a nickname >> ``` -Once you see below log, it means the app is ready for publishing messages over the mixnet. +Once you see the following log, the app is ready to publish messages over the mixnet: ```bash Welcome, test! Listening on - /ip4/192.168.68.64/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57 + /ip4//tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57 ready to publish messages now ``` -Follow similar instructions to run second instance of chat app. -Once both the apps run successfully, send a message and check if it is received by the other app. +Start the second chat app in another terminal: -You can exit the chat apps by entering `/exit` as below +```bash +./run_chat_mix1.sh +``` + +### Step 3: Test Messaging + +Once both chat apps are running, send a message from one and verify it is received by the other. + +To exit the chat apps, enter `/exit`: ```bash >> /exit diff --git a/simulations/mixnet/build_setup.sh b/simulations/mixnet/build_setup.sh new file mode 100755 index 000000000..8265d88c1 --- /dev/null +++ b/simulations/mixnet/build_setup.sh @@ -0,0 +1,32 @@ +#!/bin/bash +cd "$(dirname "$0")" +MIXNET_DIR=$(pwd) +cd ../.. +ROOT_DIR=$(pwd) + +# Clean up old files first +rm -f "$MIXNET_DIR/rln_tree.db" "$MIXNET_DIR"/rln_keystore_*.json + +echo "Building and running credentials setup..." +# Compile to temp location, then run from mixnet directory +nim c -d:release --mm:refc \ + --passL:"-L$ROOT_DIR/vendor/zerokit/target/release -lrln" \ + -o:/tmp/setup_credentials_$$ \ + "$MIXNET_DIR/setup_credentials.nim" 2>&1 | tail -30 + +# Run from mixnet directory so files are created there +cd "$MIXNET_DIR" +/tmp/setup_credentials_$$ + +# Clean up temp binary +rm -f /tmp/setup_credentials_$$ + +# Verify output +if [ -f "rln_tree.db" ]; then + echo "" + echo "Tree file ready at: $(pwd)/rln_tree.db" + ls -la rln_keystore_*.json 2>/dev/null | wc -l | xargs -I {} echo "Generated {} keystore files" +else + echo "Setup failed - rln_tree.db not found" + exit 1 +fi diff --git a/simulations/mixnet/config2.toml b/simulations/mixnet/config2.toml index c40e41103..3acd2bf8a 100644 --- a/simulations/mixnet/config2.toml +++ b/simulations/mixnet/config2.toml @@ -13,7 +13,7 @@ discv5-udp-port = 9002 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] -rest = false +rest = true rest-admin = false ports-shift = 3 num-shards-in-network = 1 diff --git a/simulations/mixnet/config3.toml b/simulations/mixnet/config3.toml index 80c19b34b..bd8e7c4e9 100644 --- a/simulations/mixnet/config3.toml +++ b/simulations/mixnet/config3.toml @@ -13,7 +13,7 @@ discv5-udp-port = 9003 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] -rest = false +rest = true rest-admin = false ports-shift = 4 num-shards-in-network = 1 diff --git a/simulations/mixnet/config4.toml b/simulations/mixnet/config4.toml index ed5b2dad0..f174250d5 100644 --- a/simulations/mixnet/config4.toml +++ b/simulations/mixnet/config4.toml @@ -13,7 +13,7 @@ discv5-udp-port = 9004 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] -rest = false +rest = true rest-admin = false ports-shift = 5 num-shards-in-network = 1 diff --git a/simulations/mixnet/run_chat_mix.sh b/simulations/mixnet/run_chat_mix.sh index f711c055e..ef0575375 100755 --- a/simulations/mixnet/run_chat_mix.sh +++ b/simulations/mixnet/run_chat_mix.sh @@ -1,2 +1,2 @@ -../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" +../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --nodekey="cb6fe589db0e5d5b48f7e82d33093e4d9d35456f4aaffc2322c473a173b2ac49" --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --fleet="none" #--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" diff --git a/simulations/mixnet/run_chat_mix1.sh b/simulations/mixnet/run_chat_mix1.sh index 7323bb3a9..5961fce45 100755 --- a/simulations/mixnet/run_chat_mix1.sh +++ b/simulations/mixnet/run_chat_mix1.sh @@ -1,2 +1 @@ -../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE -#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" +../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --nodekey="35eace7ccb246f20c487e05015ca77273d8ecaed0ed683de3d39bf4f69336feb" --mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" --mixnode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o:9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c" --fleet="none" diff --git a/simulations/mixnet/run_mix_node.sh b/simulations/mixnet/run_mix_node.sh index 2b293540c..5d9ff70d8 100755 --- a/simulations/mixnet/run_mix_node.sh +++ b/simulations/mixnet/run_mix_node.sh @@ -1 +1,2 @@ ../../build/wakunode2 --config-file="config.toml" 2>&1 | tee mix_node.log + diff --git a/simulations/mixnet/setup_credentials.nim b/simulations/mixnet/setup_credentials.nim new file mode 100644 index 000000000..77c796354 --- /dev/null +++ b/simulations/mixnet/setup_credentials.nim @@ -0,0 +1,139 @@ +{.push raises: [].} + +## Setup script to generate RLN credentials and shared Merkle tree for mix nodes. +## +## This script: +## 1. Generates credentials for each node (identified by peer ID) +## 2. Registers all credentials in a shared Merkle tree +## 3. Saves the tree to rln_tree.db +## 4. Saves individual keystores named by peer ID +## +## Usage: nim c -r setup_credentials.nim + +import std/[os, strformat, options], chronicles, chronos, results + +import + mix_rln_spam_protection/credentials, + mix_rln_spam_protection/group_manager, + mix_rln_spam_protection/rln_interface, + mix_rln_spam_protection/types + +const + KeystorePassword = "mix-rln-password" # Must match protocol.nim + DefaultUserMessageLimit = 100'u64 # Network-wide default rate limit + SpammerUserMessageLimit = 3'u64 # Lower limit for spammer testing + + # Peer IDs derived from nodekeys in config files + # config.toml: nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a" + # config1.toml: nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684" + # config2.toml: nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e" + # config3.toml: nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6" + # config4.toml: nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4" + # chat2mix.sh: nodekey = "cb6fe589db0e5d5b48f7e82d33093e4d9d35456f4aaffc2322c473a173b2ac49" + # chat2mix1.sh: nodekey = "35eace7ccb246f20c487e05015ca77273d8ecaed0ed683de3d39bf4f69336feb" + + # Node info: (peerId, userMessageLimit) + NodeConfigs = [ + ("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", DefaultUserMessageLimit), + # config.toml (service node) + ("16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", DefaultUserMessageLimit), + # config1.toml (mix node 1) + ("16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA", DefaultUserMessageLimit), + # config2.toml (mix node 2) + ("16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f", DefaultUserMessageLimit), + # config3.toml (mix node 3) + ("16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", DefaultUserMessageLimit), + # config4.toml (mix node 4) + ("16Uiu2HAm1QxSjNvNbsT2xtLjRGAsBLVztsJiTHr9a3EK96717hpj", DefaultUserMessageLimit), + # chat2mix client 1 + ("16Uiu2HAmC9h26U1C83FJ5xpE32ghqya8CaZHX1Y7qpfHNnRABscN", DefaultUserMessageLimit), + # chat2mix client 2 + ] + +proc setupCredentialsAndTree() {.async.} = + ## Generate credentials for all nodes and create a shared tree + + echo "=== RLN Credentials Setup ===" + echo "Generating credentials for ", NodeConfigs.len, " nodes...\n" + + # Generate credentials for all nodes + var allCredentials: + seq[tuple[peerId: string, cred: IdentityCredential, rateLimit: uint64]] + for (peerId, rateLimit) in NodeConfigs: + let cred = generateCredentials().valueOr: + echo "Failed to generate credentials for ", peerId, ": ", error + quit(1) + + allCredentials.add((peerId: peerId, cred: cred, rateLimit: rateLimit)) + echo "Generated credentials for ", peerId + echo " idCommitment: ", cred.idCommitment.toHex()[0 .. 15], "..." + echo " userMessageLimit: ", rateLimit + + echo "" + + # Create a group manager directly to build the tree + let rlnInstance = newRLNInstance().valueOr: + echo "Failed to create RLN instance: ", error + quit(1) + + let groupManager = newOffchainGroupManager(rlnInstance, "/mix/rln/membership/v1") + + # Initialize the group manager + let initRes = await groupManager.init() + if initRes.isErr: + echo "Failed to initialize group manager: ", initRes.error + quit(1) + + # Register all credentials in the tree with their specific rate limits + echo "Registering all credentials in the Merkle tree..." + for i, entry in allCredentials: + let index = ( + await groupManager.registerWithLimit(entry.cred.idCommitment, entry.rateLimit) + ).valueOr: + echo "Failed to register credential for ", entry.peerId, ": ", error + quit(1) + echo " Registered ", + entry.peerId, " at index ", index, " (limit: ", entry.rateLimit, ")" + + echo "" + + # Save the tree to disk + echo "Saving tree to rln_tree.db..." + let saveRes = groupManager.saveTreeToFile("rln_tree.db") + if saveRes.isErr: + echo "Failed to save tree: ", saveRes.error + quit(1) + echo "Tree saved successfully!" + + echo "" + + # Save each credential to a keystore file named by peer ID + echo "Saving keystores..." + for i, entry in allCredentials: + let keystorePath = &"rln_keystore_{entry.peerId}.json" + + # Save with membership index and rate limit + let saveResult = saveKeystore( + entry.cred, + KeystorePassword, + keystorePath, + some(MembershipIndex(i)), + some(entry.rateLimit), + ) + if saveResult.isErr: + echo "Failed to save keystore for ", entry.peerId, ": ", saveResult.error + quit(1) + echo " Saved: ", keystorePath, " (limit: ", entry.rateLimit, ")" + + echo "" + echo "=== Setup Complete ===" + echo " Tree file: rln_tree.db (", NodeConfigs.len, " members)" + echo " Keystores: rln_keystore_{peerId}.json" + echo " Password: ", KeystorePassword + echo " Default rate limit: ", DefaultUserMessageLimit + echo " Spammer rate limit: ", SpammerUserMessageLimit + echo "" + echo "Note: All nodes must use the same rln_tree.db file." + +when isMainModule: + waitFor setupCredentialsAndTree() diff --git a/vendor/mix-rln-spam-protection-plugin b/vendor/mix-rln-spam-protection-plugin new file mode 160000 index 000000000..b3e3f7293 --- /dev/null +++ b/vendor/mix-rln-spam-protection-plugin @@ -0,0 +1 @@ +Subproject commit b3e3f72932d53fe0959e182fc081fe14c0c2c2f0 diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index a0a128449..374943167 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -28,7 +28,8 @@ import ../../waku_archive, ../../waku_store_sync, ../peer_manager, - ../../waku_rln_relay + ../../waku_rln_relay, + ../../waku_mix export waku_relay.WakuRelayHandler @@ -82,13 +83,21 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) + proc mixHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + if node.wakuMix.isNil(): + return + + await node.wakuMix.handleMessage(topic, msg) + let uniqueTopicHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = + # TODO: Why are all these synchronous in nature? wouldn't it be better to run them concurrently? await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) await syncHandler(topic, msg) + await mixHandler(topic, msg) await appHandler(topic, msg) node.wakuRelay.subscribe(topic, uniqueTopicHandler) diff --git a/waku/node/waku_mix_coordination.nim b/waku/node/waku_mix_coordination.nim new file mode 100644 index 000000000..3eb53750e --- /dev/null +++ b/waku/node/waku_mix_coordination.nim @@ -0,0 +1,125 @@ +## Mix spam protection coordination via filter protocol +## This module handles filter-based subscription for spam protection coordination +## when relay is not available. + +{.push raises: [].} + +import chronos, chronicles, std/options +import + ../waku_core, + ../waku_core/topics/sharding, + ../waku_filter_v2/common, + ./peer_manager, + ../waku_filter_v2/client, + ../waku_mix/protocol + +logScope: + topics = "waku node mix_coordination" + +# Type aliases for callbacks to avoid circular imports +type + FilterSubscribeProc* = proc( + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: RemotePeerInfo, + ): Future[FilterSubscribeResult] {.async, gcsafe.} + + FilterPingProc* = + proc(peer: RemotePeerInfo): Future[FilterSubscribeResult] {.async, gcsafe.} + +# Forward declaration +proc subscribeSpamProtectionViaFilter( + wakuMix: WakuMix, + peerManager: PeerManager, + filterClient: WakuFilterClient, + filterSubscribe: FilterSubscribeProc, + contentTopics: seq[ContentTopic], +) {.async.} + +proc setupSpamProtectionViaFilter*( + wakuMix: WakuMix, + peerManager: PeerManager, + filterClient: WakuFilterClient, + filterSubscribe: FilterSubscribeProc, +) = + ## Set up filter-based spam protection coordination. + ## Registers message handler and spawns subscription maintenance task. + let spamTopics = wakuMix.getSpamProtectionContentTopics() + if spamTopics.len == 0: + return + + info "Relay not available, subscribing to spam protection via filter", + topics = spamTopics + + # Register handler for spam protection messages + filterClient.registerPushHandler( + proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + if message.contentTopic in spamTopics: + await wakuMix.handleMessage(pubsubTopic, message) + ) + + # Wait for filter peer to be available and maintain subscription + asyncSpawn subscribeSpamProtectionViaFilter( + wakuMix, peerManager, filterClient, filterSubscribe, spamTopics + ) + +proc subscribeSpamProtectionViaFilter( + wakuMix: WakuMix, + peerManager: PeerManager, + filterClient: WakuFilterClient, + filterSubscribe: FilterSubscribeProc, + contentTopics: seq[ContentTopic], +) {.async.} = + ## Subscribe to spam protection topics via filter and maintain the subscription. + ## Waits for a filter peer to be available before subscribing. + ## Continuously monitors the subscription health with periodic pings. + const RetryInterval = chronos.seconds(5) + const SubscriptionMaintenance = chronos.seconds(30) + const MaxFailedSubscribes = 3 + var currentFilterPeer: Option[RemotePeerInfo] = none(RemotePeerInfo) + var noFailedSubscribes = 0 + + while true: + # Select or reuse filter peer + if currentFilterPeer.isNone(): + let filterPeerOpt = peerManager.selectPeer(WakuFilterSubscribeCodec) + if filterPeerOpt.isNone(): + debug "No filter peer available yet for spam protection, retrying..." + await sleepAsync(RetryInterval) + continue + currentFilterPeer = some(filterPeerOpt.get()) + info "Selected filter peer for spam protection", + peer = currentFilterPeer.get().peerId + + # Check if subscription is still alive with ping + let pingErr = (await filterClient.ping(currentFilterPeer.get())).errorOr: + # Subscription is alive, wait before next check + await sleepAsync(SubscriptionMaintenance) + if noFailedSubscribes > 0: + noFailedSubscribes = 0 + continue + + # Subscription lost, need to re-subscribe + warn "Spam protection filter subscription ping failed, re-subscribing", + error = pingErr, peer = currentFilterPeer.get().peerId + + # Subscribe to spam protection topics + let res = + await filterSubscribe(none(PubsubTopic), contentTopics, currentFilterPeer.get()) + if res.isErr(): + noFailedSubscribes += 1 + warn "Failed to subscribe to spam protection topics via filter", + error = res.error, topics = contentTopics, failCount = noFailedSubscribes + + if noFailedSubscribes >= MaxFailedSubscribes: + # Try with a different peer + warn "Max subscription failures reached, selecting new filter peer" + currentFilterPeer = none(RemotePeerInfo) + noFailedSubscribes = 0 + + await sleepAsync(RetryInterval) + else: + info "Successfully subscribed to spam protection topics via filter", + topics = contentTopics, peer = currentFilterPeer.get().peerId + noFailedSubscribes = 0 + await sleepAsync(SubscriptionMaintenance) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 254387c32..6f869e364 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -194,6 +194,18 @@ proc getShardsGetter(node: WakuNode, configuredShards: seq[uint16]): GetShards = return shards return configuredShards +proc getRelayMixHandler*(node: WakuNode): Option[WakuRelayHandler] = + ## Returns a handler for mix spam protection coordination messages if mix is mounted + if node.wakuMix.isNil(): + return none(WakuRelayHandler) + + let handler: WakuRelayHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, gcsafe.} = + await node.wakuMix.handleMessage(pubsubTopic, message) + + return some(handler) + proc getCapabilitiesGetter(node: WakuNode): GetCapabilities = return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} = if node.wakuRelay.isNil(): @@ -314,6 +326,7 @@ proc mountMix*( clusterId: uint16, mixPrivKey: Curve25519Key, mixnodes: seq[MixNodePubInfo], + userMessageLimit: Option[int] = none(int), ): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used @@ -324,8 +337,29 @@ proc mountMix*( return err("Failed to convert multiaddress to string.") info "local addr", localaddr = localaddrStr + # Create callback to publish coordination messages via relay + let publishMessage: PublishMessage = proc( + message: WakuMessage + ): Future[Result[void, string]] {.async.} = + # Inline implementation of publish logic to avoid circular import + if node.wakuRelay.isNil(): + return err("WakuRelay not mounted") + + # Derive pubsub topic from content topic using auto sharding + let pubsubTopic = + if node.wakuAutoSharding.isNone(): + return err("Auto sharding not configured") + else: + node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr: + return err("Autosharding error: " & error) + + # Publish via relay + discard await node.wakuRelay.publish(pubsubTopic, message) + return ok() + node.wakuMix = WakuMix.new( - localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes + localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes, publishMessage, + userMessageLimit, ).valueOr: error "Waku Mix protocol initialization failed", err = error return @@ -335,9 +369,10 @@ proc mountMix*( node.switch.mount(node.wakuMix) catchRes.isOkOr: return err(error.msg) - return ok() -## Waku Sync + # Note: start() is called later in WakuNode.start(), not here during mount + + return ok() proc mountStoreSync*( node: WakuNode, @@ -633,7 +668,7 @@ proc start*(node: WakuNode) {.async.} = await node.startRelay() if not node.wakuMix.isNil(): - node.wakuMix.start() + await node.wakuMix.start() if not node.wakuMetadata.isNil(): node.wakuMetadata.start() diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index 2c972bef6..7cc7d7a7e 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -1,6 +1,6 @@ {.push raises: [].} -import chronicles, std/options, chronos, results, metrics +import chronicles, std/[options, sequtils], chronos, results, metrics import libp2p/crypto/curve25519, @@ -10,14 +10,18 @@ import libp2p/protocols/mix/mix_protocol, libp2p/protocols/mix/mix_metrics, libp2p/protocols/mix/delay_strategy, - libp2p/[multiaddress, peerid], + libp2p/protocols/mix/spam_protection, + libp2p/[multiaddress, multicodec, peerid, peerinfo], eth/common/keys import waku/node/peer_manager, waku/waku_core, waku/waku_enr, - waku/node/peer_manager/waku_peer_store + waku/node/peer_manager/waku_peer_store, + mix_rln_spam_protection, + waku/waku_relay, + waku/common/nimchronos logScope: topics = "waku mix" @@ -25,10 +29,16 @@ logScope: const minMixPoolSize = 4 type + PublishMessage* = proc(message: WakuMessage): Future[Result[void, string]] {. + async, gcsafe, raises: [] + .} + WakuMix* = ref object of MixProtocol peerManager*: PeerManager clusterId: uint16 pubKey*: Curve25519Key + mixRlnSpamProtection*: MixRlnSpamProtection + publishMessage*: PublishMessage WakuMixResult*[T] = Result[T, string] @@ -41,11 +51,9 @@ proc processBootNodes( ) = var count = 0 for node in bootnodes: - let pInfo = parsePeerInfo(node.multiAddr).valueOr: - error "Failed to get peer id from multiaddress: ", - error = error, multiAddr = $node.multiAddr + let (peerId, networkAddr) = parseFullAddress(node.multiAddr).valueOr: + error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error continue - let peerId = pInfo.peerId var peerPubKey: crypto.PublicKey if not peerId.extractPublicKey(peerPubKey): warn "Failed to extract public key from peerId, skipping node", peerId = peerId @@ -65,10 +73,10 @@ proc processBootNodes( count.inc() peermgr.addPeer( - RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey)) + RemotePeerInfo.init(peerId, @[networkAddr], mixPubKey = some(node.pubKey)) ) mix_pool_size.set(count) - info "using mix bootstrap nodes ", count = count + debug "using mix bootstrap nodes ", count = count proc new*( T: type WakuMix, @@ -77,9 +85,11 @@ proc new*( clusterId: uint16, mixPrivKey: Curve25519Key, bootnodes: seq[MixNodePubInfo], + publishMessage: PublishMessage, + userMessageLimit: Option[int] = none(int), ): WakuMixResult[T] = let mixPubKey = public(mixPrivKey) - info "mixPubKey", mixPubKey = mixPubKey + trace "mixPubKey", mixPubKey = mixPubKey let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr: return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error) let localMixNodeInfo = initMixNodeInfo( @@ -89,10 +99,31 @@ proc new*( if bootnodes.len < minMixPoolSize: warn "publishing with mix won't work until atleast 3 mix nodes in node pool" - var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey) + # Initialize spam protection with persistent credentials + # Use peerID in keystore path so multiple peers can run from same directory + # Tree path is shared across all nodes to maintain the full membership set + let peerId = peermgr.switch.peerInfo.peerId + var spamProtectionConfig = defaultConfig() + spamProtectionConfig.keystorePath = "rln_keystore_" & $peerId & ".json" + spamProtectionConfig.keystorePassword = "mix-rln-password" + if userMessageLimit.isSome(): + spamProtectionConfig.userMessageLimit = userMessageLimit.get() + # rlnResourcesPath left empty to use bundled resources (via "tree_height_/" placeholder) + + let spamProtection = newMixRlnSpamProtection(spamProtectionConfig).valueOr: + return err("failed to create spam protection: " & error) + + var m = WakuMix( + peerManager: peermgr, + clusterId: clusterId, + pubKey: mixPubKey, + mixRlnSpamProtection: spamProtection, + publishMessage: publishMessage, + ) procCall MixProtocol(m).init( localMixNodeInfo, peermgr.switch, + spamProtection = Opt.some(SpamProtection(spamProtection)), delayStrategy = ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()), ) @@ -101,15 +132,160 @@ proc new*( if m.nodePool.len < minMixPoolSize: warn "publishing with mix won't work until atleast 3 mix nodes in node pool" + return ok(m) proc poolSize*(mix: WakuMix): int = mix.nodePool.len -method start*(mix: WakuMix) = +proc setupSpamProtectionCallbacks(mix: WakuMix) = + ## Set up the publish callback for spam protection coordination. + ## This enables the plugin to broadcast membership updates and proof metadata + ## via Waku relay. + if mix.publishMessage.isNil(): + warn "PublishMessage callback not available, spam protection coordination disabled" + return + + let publishCallback: PublishCallback = proc( + contentTopic: string, data: seq[byte] + ) {.async.} = + # Create a WakuMessage for the coordination data + let msg = WakuMessage( + payload: data, + contentTopic: contentTopic, + ephemeral: true, # Coordination messages don't need to be stored + timestamp: getNowInNanosecondTime(), + ) + + # Delegate to node's publish API which handles topic derivation and relay publishing + let res = await mix.publishMessage(msg) + if res.isErr(): + warn "Failed to publish spam protection coordination message", + contentTopic = contentTopic, error = res.error + return + + trace "Published spam protection coordination message", contentTopic = contentTopic + + mix.mixRlnSpamProtection.setPublishCallback(publishCallback) + trace "Spam protection publish callback configured" + +proc handleMessage*( + mix: WakuMix, pubsubTopic: PubsubTopic, message: WakuMessage +) {.async, gcsafe.} = + ## Handle incoming messages for spam protection coordination. + ## This should be called from the relay handler for coordination content topics. + if mix.mixRlnSpamProtection.isNil(): + return + + let contentTopic = message.contentTopic + + if contentTopic == mix.mixRlnSpamProtection.getMembershipContentTopic(): + # Handle membership update + let res = await mix.mixRlnSpamProtection.handleMembershipUpdate(message.payload) + if res.isErr: + warn "Failed to handle membership update", error = res.error + else: + trace "Handled membership update" + + # Persist tree after membership changes (temporary solution) + # TODO: Replace with proper persistence strategy (e.g., periodic snapshots) + let saveRes = mix.mixRlnSpamProtection.saveTree() + if saveRes.isErr: + debug "Failed to save tree after membership update", error = saveRes.error + else: + trace "Saved tree after membership update" + elif contentTopic == mix.mixRlnSpamProtection.getProofMetadataContentTopic(): + # Handle proof metadata for network-wide spam detection + let res = mix.mixRlnSpamProtection.handleProofMetadata(message.payload) + if res.isErr: + warn "Failed to handle proof metadata", error = res.error + else: + trace "Handled proof metadata" + +proc getSpamProtectionContentTopics*(mix: WakuMix): seq[string] = + ## Get the content topics used by spam protection for coordination. + ## Use these to set up relay subscriptions. + if mix.mixRlnSpamProtection.isNil(): + return @[] + return mix.mixRlnSpamProtection.getContentTopics() + +proc saveSpamProtectionTree*(mix: WakuMix): Result[void, string] = + ## Save the spam protection membership tree to disk. + ## This allows preserving the tree state across restarts. + if mix.mixRlnSpamProtection.isNil(): + return err("Spam protection not initialized") + + mix.mixRlnSpamProtection.saveTree().mapErr( + proc(e: string): string = + e + ) + +proc loadSpamProtectionTree*(mix: WakuMix): Result[void, string] = + ## Load the spam protection membership tree from disk. + ## Call this before init() to restore tree state from previous runs. + ## TODO: This is a temporary solution. Ideally nodes should sync tree state + ## via a store query for historical membership messages or via dedicated + ## tree sync protocol. + if mix.mixRlnSpamProtection.isNil(): + return err("Spam protection not initialized") + + mix.mixRlnSpamProtection.loadTree().mapErr( + proc(e: string): string = + e + ) + +method start*(mix: WakuMix) {.async.} = info "starting waku mix protocol" + # Set up spam protection callbacks and start + if not mix.mixRlnSpamProtection.isNil(): + # Initialize spam protection (MixProtocol.init() does NOT call init() on the plugin) + let initRes = await mix.mixRlnSpamProtection.init() + if initRes.isErr: + error "Failed to initialize spam protection", error = initRes.error + else: + # Load existing tree to sync with other members + # This should be done after init() (which loads credentials) + # but before registerSelf() (which adds us to the tree) + let loadRes = mix.mixRlnSpamProtection.loadTree() + if loadRes.isErr: + debug "No existing tree found or failed to load, starting fresh", + error = loadRes.error + else: + debug "Loaded existing spam protection membership tree from disk" + + # Restore our credentials to the tree (after tree load, whether it succeeded or not) + # This ensures our member is in the tree if we have an index from keystore + let restoreRes = mix.mixRlnSpamProtection.restoreCredentialsToTree() + if restoreRes.isErr: + error "Failed to restore credentials to tree", error = restoreRes.error + + # Set up publish callback (must be before start so registerSelf can use it) + mix.setupSpamProtectionCallbacks() + + let startRes = await mix.mixRlnSpamProtection.start() + if startRes.isErr: + error "Failed to start spam protection", error = startRes.error + else: + # Register self to broadcast membership to the network + let registerRes = await mix.mixRlnSpamProtection.registerSelf() + if registerRes.isErr: + error "Failed to register spam protection credentials", + error = registerRes.error + else: + debug "Registered spam protection credentials", index = registerRes.get() + + # Save tree to persist membership state + let saveRes = mix.mixRlnSpamProtection.saveTree() + if saveRes.isErr: + warn "Failed to save spam protection tree", error = saveRes.error + else: + trace "Saved spam protection tree to disk" + method stop*(mix: WakuMix) {.async.} = - discard + # Stop spam protection + if not mix.mixRlnSpamProtection.isNil(): + await mix.mixRlnSpamProtection.stop() + debug "Spam protection stopped" # Mix Protocol