diff --git a/.gitignore b/.gitignore index 5222a0d5e..6f209cbca 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ node_modules/ # RLN / keystore rlnKeystore.json +rln_keystore*.json *.tar.gz # Nimbus Build System diff --git a/.gitmodules b/.gitmodules index 6a63491e3..19c4eec17 100644 --- a/.gitmodules +++ b/.gitmodules @@ -195,3 +195,8 @@ url = https://github.com/logos-messaging/nim-ffi/ ignore = untracked branch = master +[submodule "vendor/mix-rln-spam-protection-plugin"] + path = vendor/mix-rln-spam-protection-plugin + url = https://github.com/logos-co/mix-rln-spam-protection-plugin.git + ignore = untracked + branch = master diff --git a/Makefile b/Makefile index afd2389d2..22377a37f 100644 --- a/Makefile +++ b/Makefile @@ -187,10 +187,15 @@ nimbus-build-system-nimble-dir: ################## ## RLN ## ################## -.PHONY: librln +.PHONY: librln mix-librln LIBRLN_BUILDDIR := $(CURDIR)/vendor/zerokit LIBRLN_VERSION := v0.9.0 +MIX_LIBRLN_VERSION ?= v2.0.0 +MIX_LIBRLN_REPO ?= https://github.com/vacp2p/zerokit.git +MIX_LIBRLN_SRCDIR ?= $(CURDIR)/build/zerokit_$(MIX_LIBRLN_VERSION) +MIX_LIBRLN_FILE ?= $(CURDIR)/build/librln_mix_$(MIX_LIBRLN_VERSION).a +MIX_LIBRLN_NIM_PARAMS := --passL:$(MIX_LIBRLN_FILE) --passL:-lm ifeq ($(detected_OS),Windows) LIBRLN_FILE ?= rln.lib @@ -202,12 +207,19 @@ $(LIBRLN_FILE): echo -e $(BUILD_MSG) "$@" && \ ./scripts/build_rln.sh $(LIBRLN_BUILDDIR) $(LIBRLN_VERSION) $(LIBRLN_FILE) +$(MIX_LIBRLN_FILE): + echo -e $(BUILD_MSG) "$@" && \ + ./scripts/build_rln_mix.sh $(MIX_LIBRLN_SRCDIR) $(MIX_LIBRLN_VERSION) $(MIX_LIBRLN_FILE) $(MIX_LIBRLN_REPO) + librln: | $(LIBRLN_FILE) $(eval NIM_PARAMS += --passL:$(LIBRLN_FILE) --passL:-lm) +mix-librln: | $(MIX_LIBRLN_FILE) + clean-librln: cargo clean --manifest-path vendor/zerokit/rln/Cargo.toml rm -f $(LIBRLN_FILE) + rm -f $(MIX_LIBRLN_FILE) # Extend clean target clean: | clean-librln @@ -232,10 +244,10 @@ testwaku: | build deps rln-deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim test -d:os=$(shell uname) $(NIM_PARAMS) waku.nims -wakunode2: | build deps librln +wakunode2: | build deps librln mix-librln echo -e $(BUILD_MSG) "build/$@" && \ \ - $(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) waku.nims + $(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) $(MIX_LIBRLN_NIM_PARAMS) waku.nims benchmarks: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ @@ -253,9 +265,9 @@ chat2: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims -chat2mix: | build deps librln +chat2mix: | build deps librln mix-librln echo -e $(BUILD_MSG) "build/$@" && \ - $(ENV_SCRIPT) nim chat2mix $(NIM_PARAMS) waku.nims + $(ENV_SCRIPT) nim chat2mix $(NIM_PARAMS) $(MIX_LIBRLN_NIM_PARAMS) waku.nims rln-db-inspector: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 8b786d7b6..ea3cbd376 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -35,6 +35,7 @@ import import waku/[ waku_core, + waku_core/topics/sharding, waku_lightpush/common, waku_lightpush/rpc, waku_enr, @@ -47,6 +48,8 @@ import common/utils/nat, waku_store/common, waku_filter_v2/client, + waku_filter_v2/common as filter_common, + waku_mix/protocol, common/logging, ], ./config_chat2mix @@ -57,7 +60,108 @@ import ../../waku/waku_rln_relay logScope: topics = "chat2 mix" -const Help = """ + +######################### +## Mix Spam Protection ## +######################### + +# Forward declaration +proc maintainSpamProtectionSubscription( + node: WakuNode, contentTopics: seq[ContentTopic] +) {.async.} + +proc setupMixSpamProtectionViaFilter(node: WakuNode) {.async.} = + ## Setup filter-based spam protection coordination for mix protocol. + ## Since chat2mix doesn't use relay, we subscribe via filter to receive + ## spam protection coordination messages. + + # Register message handler for spam protection coordination + let spamTopics = node.wakuMix.getSpamProtectionContentTopics() + + proc handleSpamMessage( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, gcsafe.} = + await node.wakuMix.handleMessage(pubsubTopic, message) + + node.wakuFilterClient.registerPushHandler(handleSpamMessage) + + # Wait for filter peer and maintain subscription + asyncSpawn maintainSpamProtectionSubscription(node, spamTopics) + +proc maintainSpamProtectionSubscription( + node: WakuNode, contentTopics: seq[ContentTopic] +) {.async.} = + ## Maintain filter subscription for spam protection topics. + ## Monitors subscription health with periodic pings and re-subscribes on failure. + const RetryInterval = chronos.seconds(5) + const SubscriptionMaintenance = chronos.seconds(30) + const MaxFailedSubscribes = 3 + var currentFilterPeer: Option[RemotePeerInfo] = none(RemotePeerInfo) + var noFailedSubscribes = 0 + + while true: + # Select or reuse filter peer + if currentFilterPeer.isNone(): + let filterPeerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + if filterPeerOpt.isNone(): + debug "No filter peer available yet for spam protection, retrying..." + await sleepAsync(RetryInterval) + continue + currentFilterPeer = some(filterPeerOpt.get()) + info "Selected filter peer for spam protection", + peer = currentFilterPeer.get().peerId + + # Check if subscription is still alive with ping + let pingErr = (await node.wakuFilterClient.ping(currentFilterPeer.get())).errorOr: + # Subscription is alive, wait before next check + await sleepAsync(SubscriptionMaintenance) + if noFailedSubscribes > 0: + noFailedSubscribes = 0 + continue + + # Subscription lost, need to re-subscribe + warn "Spam protection filter subscription ping failed, re-subscribing", + error = pingErr, peer = currentFilterPeer.get().peerId + + # Determine pubsub topic from content topics (using auto-sharding) + if node.wakuAutoSharding.isNone(): + error "Auto-sharding not configured, cannot determine pubsub topic for spam protection" + await sleepAsync(RetryInterval) + continue + + let shardRes = node.wakuAutoSharding.get().getShard(contentTopics[0]) + if shardRes.isErr(): + error "Failed to determine shard for spam protection", error = shardRes.error + await sleepAsync(RetryInterval) + continue + + let shard = shardRes.get() + let pubsubTopic: PubsubTopic = shard # converter toPubsubTopic + + # Subscribe to spam protection topics + let res = await node.wakuFilterClient.subscribe( + currentFilterPeer.get(), pubsubTopic, contentTopics + ) + if res.isErr(): + noFailedSubscribes += 1 + warn "Failed to subscribe to spam protection topics via filter", + error = res.error, topics = contentTopics, failCount = noFailedSubscribes + + if noFailedSubscribes >= MaxFailedSubscribes: + # Try with a different peer + warn "Max subscription failures reached, selecting new filter peer" + currentFilterPeer = none(RemotePeerInfo) + noFailedSubscribes = 0 + + await sleepAsync(RetryInterval) + else: + info "Successfully subscribed to spam protection topics via filter", + topics = contentTopics, peer = currentFilterPeer.get().peerId + noFailedSubscribes = 0 + await sleepAsync(SubscriptionMaintenance) + +const Help = + """ Commands: /[?|help|connect|nick|exit] help: Prints this help connect: dials a remote peer @@ -209,20 +313,21 @@ proc publish(c: Chat, line: string) {.async.} = try: if not c.node.wakuLightpushClient.isNil(): # Attempt lightpush with mix - - ( - waitFor c.node.lightpushPublish( - some(c.conf.getPubsubTopic(c.node, c.contentTopic)), - message, - none(RemotePeerInfo), - true, - ) - ).isOkOr: - error "failed to publish lightpush message", error = error + let res = await c.node.lightpushPublish( + some(c.conf.getPubsubTopic(c.node, c.contentTopic)), + message, + none(RemotePeerInfo), + true, + ) + if res.isErr(): + error "failed to publish lightpush message", error = res.error + echo "Error: " & res.error.desc.get("unknown error") else: error "failed to publish message as lightpush client is not initialized" + echo "Error: lightpush client is not initialized" except CatchableError: error "caught error publishing message: ", error = getCurrentExceptionMsg() + echo "Error: " & getCurrentExceptionMsg() # TODO This should read or be subscribe handler subscribe proc readAndPrint(c: Chat) {.async.} = @@ -451,7 +556,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = error "failed to generate mix key pair", error = error return - (await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr: + ( + await node.mountMix( + conf.clusterId, mixPrivKey, conf.mixnodes, some(conf.rlnUserMessageLimit) + ) + ).isOkOr: error "failed to mount waku mix protocol: ", error = $error quit(QuitFailure) @@ -486,6 +595,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = #await node.mountRendezvousClient(conf.clusterId) + # Subscribe to spam protection coordination topics via filter since chat2mix doesn't use relay + if not node.wakuFilterClient.isNil(): + asyncSpawn setupMixSpamProtectionViaFilter(node) + await node.start() node.peerManager.start() diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index 4e5a32e6d..85b5cfc94 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -236,6 +236,13 @@ type name: "kad-bootstrap-node" .}: seq[string] + ## RLN spam protection config + rlnUserMessageLimit* {. + desc: "Maximum messages per epoch for RLN spam protection.", + defaultValue: 100, + name: "rln-user-message-limit" + .}: int + proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = let elements = p.split(":") if elements.len != 2: diff --git a/scripts/build_rln_mix.sh b/scripts/build_rln_mix.sh new file mode 100755 index 000000000..786ef76f5 --- /dev/null +++ b/scripts/build_rln_mix.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +# Build a separate, pinned RLN library for mix spam-protection usage. +# This keeps the main nwaku RLN dependency flow unchanged. + +set -euo pipefail + +source_dir="${1:-}" +version="${2:-}" +output_file="${3:-}" +repo_url="${4:-https://github.com/vacp2p/zerokit.git}" + +if [[ -z "${source_dir}" || -z "${version}" || -z "${output_file}" ]]; then + echo "Usage: $0 [repo_url]" + exit 1 +fi + +mkdir -p "$(dirname "${source_dir}")" +mkdir -p "$(dirname "${output_file}")" + +if [[ ! -d "${source_dir}/.git" ]]; then + echo "Cloning zerokit ${version} from ${repo_url}..." + if [[ -e "${source_dir}" ]]; then + echo "Path exists but is not a git repository: ${source_dir}" + echo "Please remove it and retry." + exit 1 + fi + git clone --depth 1 --branch "${version}" "${repo_url}" "${source_dir}" +else + echo "Using existing zerokit checkout in ${source_dir}" + current_tag="$(git -C "${source_dir}" describe --tags --exact-match 2>/dev/null || true)" + if [[ "${current_tag}" != "${version}" ]]; then + echo "Updating zerokit checkout to ${version}..." + git -C "${source_dir}" fetch --tags origin "${version}" + git -C "${source_dir}" checkout --detach "${version}" + fi +fi + +echo "Building mix RLN library from source (version ${version})..." +cargo build --release -p rln --manifest-path "${source_dir}/rln/Cargo.toml" + +cp "${source_dir}/target/release/librln.a" "${output_file}" +echo "Successfully built ${output_file}" diff --git a/simulations/mixnet/README.md b/simulations/mixnet/README.md index fcc67b6e1..99b0ba50b 100644 --- a/simulations/mixnet/README.md +++ b/simulations/mixnet/README.md @@ -3,66 +3,128 @@ ## Aim Simulate a local mixnet along with a chat app to publish using mix. -This is helpful to test any changes while development. -It includes scripts that run a `4 node` mixnet along with a lightpush service node(without mix) that can be used to test quickly. +This is helpful to test any changes during development. ## Simulation Details -Note that before running the simulation both `wakunode2` and `chat2mix` have to be built. +The simulation includes: + +1. A 5-node mixnet where `run_mix_node.sh` is the bootstrap node for the other 4 nodes +2. Two chat app instances that publish messages using lightpush protocol over the mixnet + +### Available Scripts + +| Script | Description | +| ------------------ | ------------------------------------------ | +| `run_mix_node.sh` | Bootstrap mix node (must be started first) | +| `run_mix_node1.sh` | Mix node 1 | +| `run_mix_node2.sh` | Mix node 2 | +| `run_mix_node3.sh` | Mix node 3 | +| `run_mix_node4.sh` | Mix node 4 | +| `run_chat_mix.sh` | Chat app instance 1 | +| `run_chat_mix1.sh` | Chat app instance 2 | +| `build_setup.sh` | Build and generate RLN credentials | + +## Prerequisites + +Before running the simulation, build `wakunode2` and `chat2mix`: ```bash cd -make wakunode2 -make chat2mix +source env.sh +make wakunode2 chat2mix ``` -Simulation includes scripts for: +## RLN Spam Protection Setup -1. a 4 waku-node mixnet where `node1` is bootstrap node for the other 3 nodes. -2. scripts to run chat app that publishes using lightpush protocol over the mixnet +Generate RLN credentials and the shared Merkle tree for all nodes: + +```bash +cd simulations/mixnet +./build_setup.sh +``` + +This script will: + +1. Build and run the `setup_credentials` tool +2. Generate RLN credentials for all nodes (5 mix nodes + 2 chat clients) +3. Create `rln_tree.db` - the shared Merkle tree with all members +4. Create keystore files (`rln_keystore_{peerId}.json`) for each node + +**Important:** All scripts must be run from this directory (`simulations/mixnet/`) so they can access their credentials and tree file. + +To regenerate credentials (e.g., after adding new nodes), run `./build_setup.sh` again - it will clean up old files first. ## Usage -Start the service node with below command, which acts as bootstrap node for all other mix nodes. +### Step 1: Start the Mix Nodes -`./run_lp_service_node.sh` +Start the bootstrap node first (in a separate terminal): -To run the nodes for mixnet run the 4 node scripts in different terminals as below. - -`./run_mix_node1.sh` - -Look for following 2 log lines to ensure node ran successfully and has also mounted mix protocol. - -```log -INF 2025-08-01 14:51:05.445+05:30 mounting mix protocol topics="waku node" tid=39996871 file=waku_node.nim:231 nodeId="(listenAddresses: @[\"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o\"], enrUri: \"enr:-NC4QKYtas8STkenlqBTJ3a1TTLzJA2DsGGbFlnxem9aSM2IXm-CSVZULdk2467bAyFnepnt8KP_QlfDzdaMXd_zqtwBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA6RFtVJVBh0SYOoP8xrgnXSlpiFARmQkF9d8Rn4fSeiog3RjcILqYYN1ZHCCIymFd2FrdTIt\")" - -INF 2025-08-01 14:49:23.467+05:30 Node setup complete topics="wakunode main" tid=39994244 file=wakunode2.nim:104 +```bash +./run_mix_node.sh ``` -Once all the 4 nodes are up without any issues, run the script to start the chat application. +Look for the following log lines to ensure the node started successfully: -`./run_chat_app.sh` +```log +INF mounting mix protocol topics="waku node" +INF Node setup complete topics="wakunode main" +``` -Enter a nickname to be used. +Verify RLN spam protection initialized correctly by checking for these logs: + +```log +INF Initializing MixRlnSpamProtection +INF MixRlnSpamProtection initialized, waiting for sync +DBG Tree loaded from file +INF MixRlnSpamProtection started +``` + +Then start the remaining mix nodes in separate terminals: + +```bash +./run_mix_node1.sh +./run_mix_node2.sh +./run_mix_node3.sh +./run_mix_node4.sh +``` + +### Step 2: Start the Chat Applications + +Once all 5 mix nodes are running, start the first chat app: + +```bash +./run_chat_mix.sh +``` + +Enter a nickname when prompted: ```bash pubsub topic is: /waku/2/rs/2/0 Choose a nickname >> ``` -Once you see below log, it means the app is ready for publishing messages over the mixnet. +Once you see the following log, the app is ready to publish messages over the mixnet: ```bash Welcome, test! Listening on - /ip4/192.168.68.64/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57 + /ip4//tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57 ready to publish messages now ``` -Follow similar instructions to run second instance of chat app. -Once both the apps run successfully, send a message and check if it is received by the other app. +Start the second chat app in another terminal: -You can exit the chat apps by entering `/exit` as below +```bash +./run_chat_mix1.sh +``` + +### Step 3: Test Messaging + +Once both chat apps are running, send a message from one and verify it is received by the other. + +To exit the chat apps, enter `/exit`: ```bash >> /exit diff --git a/simulations/mixnet/build_setup.sh b/simulations/mixnet/build_setup.sh new file mode 100755 index 000000000..d36475da7 --- /dev/null +++ b/simulations/mixnet/build_setup.sh @@ -0,0 +1,36 @@ +#!/bin/bash +cd "$(dirname "$0")" +MIXNET_DIR=$(pwd) +cd ../.. +ROOT_DIR=$(pwd) +source "$ROOT_DIR/env.sh" + +# Prefer explicitly provided RLN library path, otherwise use local migrated build. +LIBRLN_PATH=${LIBRLN_PATH:-"/Users/prem/Code/mix-rln-spam-protection-plugin/librln.a"} + +# Clean up old files first +rm -f "$MIXNET_DIR/rln_tree.db" "$MIXNET_DIR"/rln_keystore_*.json + +echo "Building and running credentials setup..." +# Compile to temp location, then run from mixnet directory +nim c -d:release --mm:refc \ + --passL:"$LIBRLN_PATH" --passL:-lm \ + -o:/tmp/setup_credentials_$$ \ + "$MIXNET_DIR/setup_credentials.nim" 2>&1 | tail -30 + +# Run from mixnet directory so files are created there +cd "$MIXNET_DIR" +/tmp/setup_credentials_$$ + +# Clean up temp binary +rm -f /tmp/setup_credentials_$$ + +# Verify output +if [ -f "rln_tree.db" ]; then + echo "" + echo "Tree file ready at: $(pwd)/rln_tree.db" + ls -la rln_keystore_*.json 2>/dev/null | wc -l | xargs -I {} echo "Generated {} keystore files" +else + echo "Setup failed - rln_tree.db not found" + exit 1 +fi diff --git a/simulations/mixnet/config2.toml b/simulations/mixnet/config2.toml index c40e41103..3acd2bf8a 100644 --- a/simulations/mixnet/config2.toml +++ b/simulations/mixnet/config2.toml @@ -13,7 +13,7 @@ discv5-udp-port = 9002 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] -rest = false +rest = true rest-admin = false ports-shift = 3 num-shards-in-network = 1 diff --git a/simulations/mixnet/config3.toml b/simulations/mixnet/config3.toml index 80c19b34b..bd8e7c4e9 100644 --- a/simulations/mixnet/config3.toml +++ b/simulations/mixnet/config3.toml @@ -13,7 +13,7 @@ discv5-udp-port = 9003 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] -rest = false +rest = true rest-admin = false ports-shift = 4 num-shards-in-network = 1 diff --git a/simulations/mixnet/config4.toml b/simulations/mixnet/config4.toml index ed5b2dad0..f174250d5 100644 --- a/simulations/mixnet/config4.toml +++ b/simulations/mixnet/config4.toml @@ -13,7 +13,7 @@ discv5-udp-port = 9004 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] -rest = false +rest = true rest-admin = false ports-shift = 5 num-shards-in-network = 1 diff --git a/simulations/mixnet/run_chat_mix.sh b/simulations/mixnet/run_chat_mix.sh index f711c055e..ef0575375 100755 --- a/simulations/mixnet/run_chat_mix.sh +++ b/simulations/mixnet/run_chat_mix.sh @@ -1,2 +1,2 @@ -../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" +../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --nodekey="cb6fe589db0e5d5b48f7e82d33093e4d9d35456f4aaffc2322c473a173b2ac49" --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --fleet="none" #--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" diff --git a/simulations/mixnet/run_chat_mix1.sh b/simulations/mixnet/run_chat_mix1.sh index 7323bb3a9..5961fce45 100755 --- a/simulations/mixnet/run_chat_mix1.sh +++ b/simulations/mixnet/run_chat_mix1.sh @@ -1,2 +1 @@ -../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE -#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" +../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --nodekey="35eace7ccb246f20c487e05015ca77273d8ecaed0ed683de3d39bf4f69336feb" --mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" --mixnode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o:9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c" --fleet="none" diff --git a/simulations/mixnet/run_mix_node.sh b/simulations/mixnet/run_mix_node.sh index 2b293540c..5d9ff70d8 100755 --- a/simulations/mixnet/run_mix_node.sh +++ b/simulations/mixnet/run_mix_node.sh @@ -1 +1,2 @@ ../../build/wakunode2 --config-file="config.toml" 2>&1 | tee mix_node.log + diff --git a/simulations/mixnet/setup_credentials.nim b/simulations/mixnet/setup_credentials.nim new file mode 100644 index 000000000..77c796354 --- /dev/null +++ b/simulations/mixnet/setup_credentials.nim @@ -0,0 +1,139 @@ +{.push raises: [].} + +## Setup script to generate RLN credentials and shared Merkle tree for mix nodes. +## +## This script: +## 1. Generates credentials for each node (identified by peer ID) +## 2. Registers all credentials in a shared Merkle tree +## 3. Saves the tree to rln_tree.db +## 4. Saves individual keystores named by peer ID +## +## Usage: nim c -r setup_credentials.nim + +import std/[os, strformat, options], chronicles, chronos, results + +import + mix_rln_spam_protection/credentials, + mix_rln_spam_protection/group_manager, + mix_rln_spam_protection/rln_interface, + mix_rln_spam_protection/types + +const + KeystorePassword = "mix-rln-password" # Must match protocol.nim + DefaultUserMessageLimit = 100'u64 # Network-wide default rate limit + SpammerUserMessageLimit = 3'u64 # Lower limit for spammer testing + + # Peer IDs derived from nodekeys in config files + # config.toml: nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a" + # config1.toml: nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684" + # config2.toml: nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e" + # config3.toml: nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6" + # config4.toml: nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4" + # chat2mix.sh: nodekey = "cb6fe589db0e5d5b48f7e82d33093e4d9d35456f4aaffc2322c473a173b2ac49" + # chat2mix1.sh: nodekey = "35eace7ccb246f20c487e05015ca77273d8ecaed0ed683de3d39bf4f69336feb" + + # Node info: (peerId, userMessageLimit) + NodeConfigs = [ + ("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", DefaultUserMessageLimit), + # config.toml (service node) + ("16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", DefaultUserMessageLimit), + # config1.toml (mix node 1) + ("16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA", DefaultUserMessageLimit), + # config2.toml (mix node 2) + ("16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f", DefaultUserMessageLimit), + # config3.toml (mix node 3) + ("16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", DefaultUserMessageLimit), + # config4.toml (mix node 4) + ("16Uiu2HAm1QxSjNvNbsT2xtLjRGAsBLVztsJiTHr9a3EK96717hpj", DefaultUserMessageLimit), + # chat2mix client 1 + ("16Uiu2HAmC9h26U1C83FJ5xpE32ghqya8CaZHX1Y7qpfHNnRABscN", DefaultUserMessageLimit), + # chat2mix client 2 + ] + +proc setupCredentialsAndTree() {.async.} = + ## Generate credentials for all nodes and create a shared tree + + echo "=== RLN Credentials Setup ===" + echo "Generating credentials for ", NodeConfigs.len, " nodes...\n" + + # Generate credentials for all nodes + var allCredentials: + seq[tuple[peerId: string, cred: IdentityCredential, rateLimit: uint64]] + for (peerId, rateLimit) in NodeConfigs: + let cred = generateCredentials().valueOr: + echo "Failed to generate credentials for ", peerId, ": ", error + quit(1) + + allCredentials.add((peerId: peerId, cred: cred, rateLimit: rateLimit)) + echo "Generated credentials for ", peerId + echo " idCommitment: ", cred.idCommitment.toHex()[0 .. 15], "..." + echo " userMessageLimit: ", rateLimit + + echo "" + + # Create a group manager directly to build the tree + let rlnInstance = newRLNInstance().valueOr: + echo "Failed to create RLN instance: ", error + quit(1) + + let groupManager = newOffchainGroupManager(rlnInstance, "/mix/rln/membership/v1") + + # Initialize the group manager + let initRes = await groupManager.init() + if initRes.isErr: + echo "Failed to initialize group manager: ", initRes.error + quit(1) + + # Register all credentials in the tree with their specific rate limits + echo "Registering all credentials in the Merkle tree..." + for i, entry in allCredentials: + let index = ( + await groupManager.registerWithLimit(entry.cred.idCommitment, entry.rateLimit) + ).valueOr: + echo "Failed to register credential for ", entry.peerId, ": ", error + quit(1) + echo " Registered ", + entry.peerId, " at index ", index, " (limit: ", entry.rateLimit, ")" + + echo "" + + # Save the tree to disk + echo "Saving tree to rln_tree.db..." + let saveRes = groupManager.saveTreeToFile("rln_tree.db") + if saveRes.isErr: + echo "Failed to save tree: ", saveRes.error + quit(1) + echo "Tree saved successfully!" + + echo "" + + # Save each credential to a keystore file named by peer ID + echo "Saving keystores..." + for i, entry in allCredentials: + let keystorePath = &"rln_keystore_{entry.peerId}.json" + + # Save with membership index and rate limit + let saveResult = saveKeystore( + entry.cred, + KeystorePassword, + keystorePath, + some(MembershipIndex(i)), + some(entry.rateLimit), + ) + if saveResult.isErr: + echo "Failed to save keystore for ", entry.peerId, ": ", saveResult.error + quit(1) + echo " Saved: ", keystorePath, " (limit: ", entry.rateLimit, ")" + + echo "" + echo "=== Setup Complete ===" + echo " Tree file: rln_tree.db (", NodeConfigs.len, " members)" + echo " Keystores: rln_keystore_{peerId}.json" + echo " Password: ", KeystorePassword + echo " Default rate limit: ", DefaultUserMessageLimit + echo " Spammer rate limit: ", SpammerUserMessageLimit + echo "" + echo "Note: All nodes must use the same rln_tree.db file." + +when isMainModule: + waitFor setupCredentialsAndTree() diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index a99ba43ee..6bd2d937e 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -1138,4 +1138,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = of WakuMode.noMode: discard # use explicit CLI flags as-is + b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery) + b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes) + return b.build() diff --git a/vendor/mix-rln-spam-protection-plugin b/vendor/mix-rln-spam-protection-plugin new file mode 160000 index 000000000..037f8e100 --- /dev/null +++ b/vendor/mix-rln-spam-protection-plugin @@ -0,0 +1 @@ +Subproject commit 037f8e100bfedffdbad1c4442e760d10a2437428 diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim index 94b63a321..aab3a9819 100644 --- a/waku/discovery/waku_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -194,7 +194,7 @@ proc runDiscoveryLoop( info "extended kademlia discovery loop started", interval = interval try: - while true: + while wk.running: # Wait for node to be started if not wk.isNodeStarted.isNil() and not wk.isNodeStarted(): await sleepAsync(ExtendedKademliaDiscoveryStartupDelay) @@ -258,6 +258,8 @@ proc start*( except CatchableError as e: return err("failed to start kademlia discovery: " & e.msg) + wk.running = true + wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers) info "kademlia discovery started" diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index c5a11ff02..1b583cabe 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -27,6 +27,7 @@ import waku_archive, waku_store_sync, waku_rln_relay, + waku_mix, node/waku_node, node/peer_manager, common/broker/broker_context, @@ -91,6 +92,12 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) + proc mixHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + if node.wakuMix.isNil(): + return + + await node.wakuMix.handleMessage(topic, msg) + proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = MessageSeenEvent.emit(node.brokerCtx, topic, msg) @@ -101,6 +108,7 @@ proc registerRelayHandler( await filterHandler(topic, msg) await archiveHandler(topic, msg) await syncHandler(topic, msg) + await mixHandler(topic, msg) await internalHandler(topic, msg) # Call the legacy (kernel API) app handler if it exists. diff --git a/waku/node/waku_mix_coordination.nim b/waku/node/waku_mix_coordination.nim new file mode 100644 index 000000000..3eb53750e --- /dev/null +++ b/waku/node/waku_mix_coordination.nim @@ -0,0 +1,125 @@ +## Mix spam protection coordination via filter protocol +## This module handles filter-based subscription for spam protection coordination +## when relay is not available. + +{.push raises: [].} + +import chronos, chronicles, std/options +import + ../waku_core, + ../waku_core/topics/sharding, + ../waku_filter_v2/common, + ./peer_manager, + ../waku_filter_v2/client, + ../waku_mix/protocol + +logScope: + topics = "waku node mix_coordination" + +# Type aliases for callbacks to avoid circular imports +type + FilterSubscribeProc* = proc( + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: RemotePeerInfo, + ): Future[FilterSubscribeResult] {.async, gcsafe.} + + FilterPingProc* = + proc(peer: RemotePeerInfo): Future[FilterSubscribeResult] {.async, gcsafe.} + +# Forward declaration +proc subscribeSpamProtectionViaFilter( + wakuMix: WakuMix, + peerManager: PeerManager, + filterClient: WakuFilterClient, + filterSubscribe: FilterSubscribeProc, + contentTopics: seq[ContentTopic], +) {.async.} + +proc setupSpamProtectionViaFilter*( + wakuMix: WakuMix, + peerManager: PeerManager, + filterClient: WakuFilterClient, + filterSubscribe: FilterSubscribeProc, +) = + ## Set up filter-based spam protection coordination. + ## Registers message handler and spawns subscription maintenance task. + let spamTopics = wakuMix.getSpamProtectionContentTopics() + if spamTopics.len == 0: + return + + info "Relay not available, subscribing to spam protection via filter", + topics = spamTopics + + # Register handler for spam protection messages + filterClient.registerPushHandler( + proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + if message.contentTopic in spamTopics: + await wakuMix.handleMessage(pubsubTopic, message) + ) + + # Wait for filter peer to be available and maintain subscription + asyncSpawn subscribeSpamProtectionViaFilter( + wakuMix, peerManager, filterClient, filterSubscribe, spamTopics + ) + +proc subscribeSpamProtectionViaFilter( + wakuMix: WakuMix, + peerManager: PeerManager, + filterClient: WakuFilterClient, + filterSubscribe: FilterSubscribeProc, + contentTopics: seq[ContentTopic], +) {.async.} = + ## Subscribe to spam protection topics via filter and maintain the subscription. + ## Waits for a filter peer to be available before subscribing. + ## Continuously monitors the subscription health with periodic pings. + const RetryInterval = chronos.seconds(5) + const SubscriptionMaintenance = chronos.seconds(30) + const MaxFailedSubscribes = 3 + var currentFilterPeer: Option[RemotePeerInfo] = none(RemotePeerInfo) + var noFailedSubscribes = 0 + + while true: + # Select or reuse filter peer + if currentFilterPeer.isNone(): + let filterPeerOpt = peerManager.selectPeer(WakuFilterSubscribeCodec) + if filterPeerOpt.isNone(): + debug "No filter peer available yet for spam protection, retrying..." + await sleepAsync(RetryInterval) + continue + currentFilterPeer = some(filterPeerOpt.get()) + info "Selected filter peer for spam protection", + peer = currentFilterPeer.get().peerId + + # Check if subscription is still alive with ping + let pingErr = (await filterClient.ping(currentFilterPeer.get())).errorOr: + # Subscription is alive, wait before next check + await sleepAsync(SubscriptionMaintenance) + if noFailedSubscribes > 0: + noFailedSubscribes = 0 + continue + + # Subscription lost, need to re-subscribe + warn "Spam protection filter subscription ping failed, re-subscribing", + error = pingErr, peer = currentFilterPeer.get().peerId + + # Subscribe to spam protection topics + let res = + await filterSubscribe(none(PubsubTopic), contentTopics, currentFilterPeer.get()) + if res.isErr(): + noFailedSubscribes += 1 + warn "Failed to subscribe to spam protection topics via filter", + error = res.error, topics = contentTopics, failCount = noFailedSubscribes + + if noFailedSubscribes >= MaxFailedSubscribes: + # Try with a different peer + warn "Max subscription failures reached, selecting new filter peer" + currentFilterPeer = none(RemotePeerInfo) + noFailedSubscribes = 0 + + await sleepAsync(RetryInterval) + else: + info "Successfully subscribed to spam protection topics via filter", + topics = contentTopics, peer = currentFilterPeer.get().peerId + noFailedSubscribes = 0 + await sleepAsync(SubscriptionMaintenance) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 506a3e592..c84e480aa 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -181,6 +181,18 @@ proc getShardsGetter(node: WakuNode, configuredShards: seq[uint16]): GetShards = return shards return configuredShards +proc getRelayMixHandler*(node: WakuNode): Option[WakuRelayHandler] = + ## Returns a handler for mix spam protection coordination messages if mix is mounted + if node.wakuMix.isNil(): + return none(WakuRelayHandler) + + let handler: WakuRelayHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, gcsafe.} = + await node.wakuMix.handleMessage(pubsubTopic, message) + + return some(handler) + proc getCapabilitiesGetter(node: WakuNode): GetCapabilities = return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} = if node.wakuRelay.isNil(): @@ -301,6 +313,7 @@ proc mountMix*( clusterId: uint16, mixPrivKey: Curve25519Key, mixnodes: seq[MixNodePubInfo], + userMessageLimit: Option[int] = none(int), ): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used @@ -311,8 +324,29 @@ proc mountMix*( return err("Failed to convert multiaddress to string.") info "local addr", localaddr = localaddrStr + # Create callback to publish coordination messages via relay + let publishMessage: PublishMessage = proc( + message: WakuMessage + ): Future[Result[void, string]] {.async.} = + # Inline implementation of publish logic to avoid circular import + if node.wakuRelay.isNil(): + return err("WakuRelay not mounted") + + # Derive pubsub topic from content topic using auto sharding + let pubsubTopic = + if node.wakuAutoSharding.isNone(): + return err("Auto sharding not configured") + else: + node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr: + return err("Autosharding error: " & error) + + # Publish via relay + discard await node.wakuRelay.publish(pubsubTopic, message) + return ok() + node.wakuMix = WakuMix.new( - localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes + localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes, publishMessage, + userMessageLimit, ).valueOr: error "Waku Mix protocol initialization failed", err = error return @@ -322,9 +356,10 @@ proc mountMix*( node.switch.mount(node.wakuMix) catchRes.isOkOr: return err(error.msg) - return ok() -## Waku Sync + # Note: start() is called later in WakuNode.start(), not here during mount + + return ok() proc mountStoreSync*( node: WakuNode, @@ -583,7 +618,7 @@ proc start*(node: WakuNode) {.async.} = await node.startRelay() if not node.wakuMix.isNil(): - node.wakuMix.start() + await node.wakuMix.start() if not node.wakuMetadata.isNil(): node.wakuMetadata.start() diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index e31929b71..7400d7011 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -1,6 +1,6 @@ {.push raises: [].} -import chronicles, std/options, chronos, results, metrics +import chronicles, std/[options, sequtils], chronos, results, metrics import libp2p/crypto/curve25519, @@ -10,14 +10,18 @@ import libp2p/protocols/mix/mix_protocol, libp2p/protocols/mix/mix_metrics, libp2p/protocols/mix/delay_strategy, - libp2p/[multiaddress, peerid], + libp2p/protocols/mix/spam_protection, + libp2p/[multiaddress, multicodec, peerid, peerinfo], eth/common/keys import waku/node/peer_manager, waku/waku_core, waku/waku_enr, - waku/node/peer_manager/waku_peer_store + waku/node/peer_manager/waku_peer_store, + mix_rln_spam_protection, + waku/waku_relay, + waku/common/nimchronos logScope: topics = "waku mix" @@ -25,10 +29,16 @@ logScope: const minMixPoolSize = 4 type + PublishMessage* = proc(message: WakuMessage): Future[Result[void, string]] {. + async, gcsafe, raises: [] + .} + WakuMix* = ref object of MixProtocol peerManager*: PeerManager clusterId: uint16 pubKey*: Curve25519Key + mixRlnSpamProtection*: MixRlnSpamProtection + publishMessage*: PublishMessage WakuMixResult*[T] = Result[T, string] @@ -41,11 +51,9 @@ proc processBootNodes( ) = var count = 0 for node in bootnodes: - let pInfo = parsePeerInfo(node.multiAddr).valueOr: - error "Failed to get peer id from multiaddress: ", - error = error, multiAddr = $node.multiAddr + let (peerId, networkAddr) = parseFullAddress(node.multiAddr).valueOr: + error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error continue - let peerId = pInfo.peerId var peerPubKey: crypto.PublicKey if not peerId.extractPublicKey(peerPubKey): warn "Failed to extract public key from peerId, skipping node", peerId = peerId @@ -65,10 +73,10 @@ proc processBootNodes( count.inc() peermgr.addPeer( - RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey)) + RemotePeerInfo.init(peerId, @[networkAddr], mixPubKey = some(node.pubKey)) ) mix_pool_size.set(count) - info "using mix bootstrap nodes ", count = count + debug "using mix bootstrap nodes ", count = count proc new*( T: typedesc[WakuMix], @@ -77,9 +85,11 @@ proc new*( clusterId: uint16, mixPrivKey: Curve25519Key, bootnodes: seq[MixNodePubInfo], + publishMessage: PublishMessage, + userMessageLimit: Option[int] = none(int), ): WakuMixResult[T] = let mixPubKey = public(mixPrivKey) - info "mixPubKey", mixPubKey = mixPubKey + trace "mixPubKey", mixPubKey = mixPubKey let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr: return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error) let localMixNodeInfo = initMixNodeInfo( @@ -87,27 +97,193 @@ proc new*( peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey, ) - var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey) + # Initialize spam protection with persistent credentials + # Use peerID in keystore path so multiple peers can run from same directory + # Tree path is shared across all nodes to maintain the full membership set + let peerId = peermgr.switch.peerInfo.peerId + var spamProtectionConfig = defaultConfig() + spamProtectionConfig.keystorePath = "rln_keystore_" & $peerId & ".json" + spamProtectionConfig.keystorePassword = "mix-rln-password" + if userMessageLimit.isSome(): + spamProtectionConfig.userMessageLimit = userMessageLimit.get() + # rlnResourcesPath left empty to use bundled resources (via "tree_height_/" placeholder) + + let spamProtection = newMixRlnSpamProtection(spamProtectionConfig).valueOr: + return err("failed to create spam protection: " & error) + + var m = WakuMix( + peerManager: peermgr, + clusterId: clusterId, + pubKey: mixPubKey, + mixRlnSpamProtection: spamProtection, + publishMessage: publishMessage, + ) procCall MixProtocol(m).init( localMixNodeInfo, peermgr.switch, + spamProtection = Opt.some(SpamProtection(spamProtection)), delayStrategy = - ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()), + ExponentialDelayStrategy.new(meanDelayMs = 100, rng = crypto.newRng()), ) processBootNodes(bootnodes, peermgr, m) if m.nodePool.len < minMixPoolSize: warn "publishing with mix won't work until atleast 3 mix nodes in node pool" + return ok(m) proc poolSize*(mix: WakuMix): int = mix.nodePool.len -method start*(mix: WakuMix) = +proc setupSpamProtectionCallbacks(mix: WakuMix) = + ## Set up the publish callback for spam protection coordination. + ## This enables the plugin to broadcast membership updates and proof metadata + ## via Waku relay. + if mix.publishMessage.isNil(): + warn "PublishMessage callback not available, spam protection coordination disabled" + return + + let publishCallback: PublishCallback = proc( + contentTopic: string, data: seq[byte] + ) {.async.} = + # Create a WakuMessage for the coordination data + let msg = WakuMessage( + payload: data, + contentTopic: contentTopic, + ephemeral: true, # Coordination messages don't need to be stored + timestamp: getNowInNanosecondTime(), + ) + + # Delegate to node's publish API which handles topic derivation and relay publishing + let res = await mix.publishMessage(msg) + if res.isErr(): + warn "Failed to publish spam protection coordination message", + contentTopic = contentTopic, error = res.error + return + + trace "Published spam protection coordination message", contentTopic = contentTopic + + mix.mixRlnSpamProtection.setPublishCallback(publishCallback) + trace "Spam protection publish callback configured" + +proc handleMessage*( + mix: WakuMix, pubsubTopic: PubsubTopic, message: WakuMessage +) {.async, gcsafe.} = + ## Handle incoming messages for spam protection coordination. + ## This should be called from the relay handler for coordination content topics. + if mix.mixRlnSpamProtection.isNil(): + return + + let contentTopic = message.contentTopic + + if contentTopic == mix.mixRlnSpamProtection.getMembershipContentTopic(): + # Handle membership update + let res = await mix.mixRlnSpamProtection.handleMembershipUpdate(message.payload) + if res.isErr: + warn "Failed to handle membership update", error = res.error + else: + trace "Handled membership update" + + # Persist tree after membership changes (temporary solution) + # TODO: Replace with proper persistence strategy (e.g., periodic snapshots) + let saveRes = mix.mixRlnSpamProtection.saveTree() + if saveRes.isErr: + debug "Failed to save tree after membership update", error = saveRes.error + else: + trace "Saved tree after membership update" + elif contentTopic == mix.mixRlnSpamProtection.getProofMetadataContentTopic(): + # Handle proof metadata for network-wide spam detection + let res = mix.mixRlnSpamProtection.handleProofMetadata(message.payload) + if res.isErr: + warn "Failed to handle proof metadata", error = res.error + else: + trace "Handled proof metadata" + +proc getSpamProtectionContentTopics*(mix: WakuMix): seq[string] = + ## Get the content topics used by spam protection for coordination. + ## Use these to set up relay subscriptions. + if mix.mixRlnSpamProtection.isNil(): + return @[] + return mix.mixRlnSpamProtection.getContentTopics() + +proc saveSpamProtectionTree*(mix: WakuMix): Result[void, string] = + ## Save the spam protection membership tree to disk. + ## This allows preserving the tree state across restarts. + if mix.mixRlnSpamProtection.isNil(): + return err("Spam protection not initialized") + + mix.mixRlnSpamProtection.saveTree().mapErr( + proc(e: string): string = + e + ) + +proc loadSpamProtectionTree*(mix: WakuMix): Result[void, string] = + ## Load the spam protection membership tree from disk. + ## Call this before init() to restore tree state from previous runs. + ## TODO: This is a temporary solution. Ideally nodes should sync tree state + ## via a store query for historical membership messages or via dedicated + ## tree sync protocol. + if mix.mixRlnSpamProtection.isNil(): + return err("Spam protection not initialized") + + mix.mixRlnSpamProtection.loadTree().mapErr( + proc(e: string): string = + e + ) + +method start*(mix: WakuMix) {.async.} = info "starting waku mix protocol" + # Set up spam protection callbacks and start + if not mix.mixRlnSpamProtection.isNil(): + # Initialize spam protection (MixProtocol.init() does NOT call init() on the plugin) + let initRes = await mix.mixRlnSpamProtection.init() + if initRes.isErr: + error "Failed to initialize spam protection", error = initRes.error + else: + # Load existing tree to sync with other members + # This should be done after init() (which loads credentials) + # but before registerSelf() (which adds us to the tree) + let loadRes = mix.mixRlnSpamProtection.loadTree() + if loadRes.isErr: + debug "No existing tree found or failed to load, starting fresh", + error = loadRes.error + else: + debug "Loaded existing spam protection membership tree from disk" + + # Restore our credentials to the tree (after tree load, whether it succeeded or not) + # This ensures our member is in the tree if we have an index from keystore + let restoreRes = mix.mixRlnSpamProtection.restoreCredentialsToTree() + if restoreRes.isErr: + error "Failed to restore credentials to tree", error = restoreRes.error + + # Set up publish callback (must be before start so registerSelf can use it) + mix.setupSpamProtectionCallbacks() + + let startRes = await mix.mixRlnSpamProtection.start() + if startRes.isErr: + error "Failed to start spam protection", error = startRes.error + else: + # Register self to broadcast membership to the network + let registerRes = await mix.mixRlnSpamProtection.registerSelf() + if registerRes.isErr: + error "Failed to register spam protection credentials", + error = registerRes.error + else: + debug "Registered spam protection credentials", index = registerRes.get() + + # Save tree to persist membership state + let saveRes = mix.mixRlnSpamProtection.saveTree() + if saveRes.isErr: + warn "Failed to save spam protection tree", error = saveRes.error + else: + trace "Saved spam protection tree to disk" + method stop*(mix: WakuMix) {.async.} = - discard + # Stop spam protection + if not mix.mixRlnSpamProtection.isNil(): + await mix.mixRlnSpamProtection.stop() + debug "Spam protection stopped" # Mix Protocol