Merge 7d54103053748f4a28724e22ad7a794a0dcc78ae into 549bf8bc4309aadb30530a181fa1933bced2c9a7

This commit is contained in:
Prem Chaitanya Prathi 2026-04-07 13:41:57 +05:30 committed by GitHub
commit 53eef39a52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 838 additions and 70 deletions

1
.gitignore vendored
View File

@ -43,6 +43,7 @@ node_modules/
# RLN / keystore
rlnKeystore.json
rln_keystore*.json
*.tar.gz
# Nimbus Build System

5
.gitmodules vendored
View File

@ -195,3 +195,8 @@
url = https://github.com/logos-messaging/nim-ffi/
ignore = untracked
branch = master
[submodule "vendor/mix-rln-spam-protection-plugin"]
path = vendor/mix-rln-spam-protection-plugin
url = https://github.com/logos-co/mix-rln-spam-protection-plugin.git
ignore = untracked
branch = master

View File

@ -187,10 +187,15 @@ nimbus-build-system-nimble-dir:
##################
## RLN ##
##################
.PHONY: librln
.PHONY: librln mix-librln
LIBRLN_BUILDDIR := $(CURDIR)/vendor/zerokit
LIBRLN_VERSION := v0.9.0
MIX_LIBRLN_VERSION ?= v2.0.0
MIX_LIBRLN_REPO ?= https://github.com/vacp2p/zerokit.git
MIX_LIBRLN_SRCDIR ?= $(CURDIR)/build/zerokit_$(MIX_LIBRLN_VERSION)
MIX_LIBRLN_FILE ?= $(CURDIR)/build/librln_mix_$(MIX_LIBRLN_VERSION).a
MIX_LIBRLN_NIM_PARAMS := --passL:$(MIX_LIBRLN_FILE) --passL:-lm
ifeq ($(detected_OS),Windows)
LIBRLN_FILE ?= rln.lib
@ -202,12 +207,19 @@ $(LIBRLN_FILE):
echo -e $(BUILD_MSG) "$@" && \
./scripts/build_rln.sh $(LIBRLN_BUILDDIR) $(LIBRLN_VERSION) $(LIBRLN_FILE)
$(MIX_LIBRLN_FILE):
echo -e $(BUILD_MSG) "$@" && \
./scripts/build_rln_mix.sh $(MIX_LIBRLN_SRCDIR) $(MIX_LIBRLN_VERSION) $(MIX_LIBRLN_FILE) $(MIX_LIBRLN_REPO)
librln: | $(LIBRLN_FILE)
$(eval NIM_PARAMS += --passL:$(LIBRLN_FILE) --passL:-lm)
mix-librln: | $(MIX_LIBRLN_FILE)
clean-librln:
cargo clean --manifest-path vendor/zerokit/rln/Cargo.toml
rm -f $(LIBRLN_FILE)
rm -f $(MIX_LIBRLN_FILE)
# Extend clean target
clean: | clean-librln
@ -232,10 +244,10 @@ testwaku: | build deps rln-deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim test -d:os=$(shell uname) $(NIM_PARAMS) waku.nims
wakunode2: | build deps librln
wakunode2: | build deps librln mix-librln
echo -e $(BUILD_MSG) "build/$@" && \
\
$(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) waku.nims
$(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) $(MIX_LIBRLN_NIM_PARAMS) waku.nims
benchmarks: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
@ -253,9 +265,9 @@ chat2: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims
chat2mix: | build deps librln
chat2mix: | build deps librln mix-librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim chat2mix $(NIM_PARAMS) waku.nims
$(ENV_SCRIPT) nim chat2mix $(NIM_PARAMS) $(MIX_LIBRLN_NIM_PARAMS) waku.nims
rln-db-inspector: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \

View File

@ -35,6 +35,7 @@ import
import
waku/[
waku_core,
waku_core/topics/sharding,
waku_lightpush/common,
waku_lightpush/rpc,
waku_enr,
@ -47,6 +48,8 @@ import
common/utils/nat,
waku_store/common,
waku_filter_v2/client,
waku_filter_v2/common as filter_common,
waku_mix/protocol,
common/logging,
],
./config_chat2mix
@ -57,7 +60,108 @@ import ../../waku/waku_rln_relay
logScope:
topics = "chat2 mix"
const Help = """
#########################
## Mix Spam Protection ##
#########################
# Forward declaration
proc maintainSpamProtectionSubscription(
node: WakuNode, contentTopics: seq[ContentTopic]
) {.async.}
proc setupMixSpamProtectionViaFilter(node: WakuNode) {.async.} =
## Setup filter-based spam protection coordination for mix protocol.
## Since chat2mix doesn't use relay, we subscribe via filter to receive
## spam protection coordination messages.
# Register message handler for spam protection coordination
let spamTopics = node.wakuMix.getSpamProtectionContentTopics()
proc handleSpamMessage(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, gcsafe.} =
await node.wakuMix.handleMessage(pubsubTopic, message)
node.wakuFilterClient.registerPushHandler(handleSpamMessage)
# Wait for filter peer and maintain subscription
asyncSpawn maintainSpamProtectionSubscription(node, spamTopics)
proc maintainSpamProtectionSubscription(
node: WakuNode, contentTopics: seq[ContentTopic]
) {.async.} =
## Maintain filter subscription for spam protection topics.
## Monitors subscription health with periodic pings and re-subscribes on failure.
const RetryInterval = chronos.seconds(5)
const SubscriptionMaintenance = chronos.seconds(30)
const MaxFailedSubscribes = 3
var currentFilterPeer: Option[RemotePeerInfo] = none(RemotePeerInfo)
var noFailedSubscribes = 0
while true:
# Select or reuse filter peer
if currentFilterPeer.isNone():
let filterPeerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if filterPeerOpt.isNone():
debug "No filter peer available yet for spam protection, retrying..."
await sleepAsync(RetryInterval)
continue
currentFilterPeer = some(filterPeerOpt.get())
info "Selected filter peer for spam protection",
peer = currentFilterPeer.get().peerId
# Check if subscription is still alive with ping
let pingErr = (await node.wakuFilterClient.ping(currentFilterPeer.get())).errorOr:
# Subscription is alive, wait before next check
await sleepAsync(SubscriptionMaintenance)
if noFailedSubscribes > 0:
noFailedSubscribes = 0
continue
# Subscription lost, need to re-subscribe
warn "Spam protection filter subscription ping failed, re-subscribing",
error = pingErr, peer = currentFilterPeer.get().peerId
# Determine pubsub topic from content topics (using auto-sharding)
if node.wakuAutoSharding.isNone():
error "Auto-sharding not configured, cannot determine pubsub topic for spam protection"
await sleepAsync(RetryInterval)
continue
let shardRes = node.wakuAutoSharding.get().getShard(contentTopics[0])
if shardRes.isErr():
error "Failed to determine shard for spam protection", error = shardRes.error
await sleepAsync(RetryInterval)
continue
let shard = shardRes.get()
let pubsubTopic: PubsubTopic = shard # converter toPubsubTopic
# Subscribe to spam protection topics
let res = await node.wakuFilterClient.subscribe(
currentFilterPeer.get(), pubsubTopic, contentTopics
)
if res.isErr():
noFailedSubscribes += 1
warn "Failed to subscribe to spam protection topics via filter",
error = res.error, topics = contentTopics, failCount = noFailedSubscribes
if noFailedSubscribes >= MaxFailedSubscribes:
# Try with a different peer
warn "Max subscription failures reached, selecting new filter peer"
currentFilterPeer = none(RemotePeerInfo)
noFailedSubscribes = 0
await sleepAsync(RetryInterval)
else:
info "Successfully subscribed to spam protection topics via filter",
topics = contentTopics, peer = currentFilterPeer.get().peerId
noFailedSubscribes = 0
await sleepAsync(SubscriptionMaintenance)
const Help =
"""
Commands: /[?|help|connect|nick|exit]
help: Prints this help
connect: dials a remote peer
@ -209,20 +313,21 @@ proc publish(c: Chat, line: string) {.async.} =
try:
if not c.node.wakuLightpushClient.isNil():
# Attempt lightpush with mix
(
waitFor c.node.lightpushPublish(
some(c.conf.getPubsubTopic(c.node, c.contentTopic)),
message,
none(RemotePeerInfo),
true,
)
).isOkOr:
error "failed to publish lightpush message", error = error
let res = await c.node.lightpushPublish(
some(c.conf.getPubsubTopic(c.node, c.contentTopic)),
message,
none(RemotePeerInfo),
true,
)
if res.isErr():
error "failed to publish lightpush message", error = res.error
echo "Error: " & res.error.desc.get("unknown error")
else:
error "failed to publish message as lightpush client is not initialized"
echo "Error: lightpush client is not initialized"
except CatchableError:
error "caught error publishing message: ", error = getCurrentExceptionMsg()
echo "Error: " & getCurrentExceptionMsg()
# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =
@ -451,7 +556,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
error "failed to generate mix key pair", error = error
return
(await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr:
(
await node.mountMix(
conf.clusterId, mixPrivKey, conf.mixnodes, some(conf.rlnUserMessageLimit)
)
).isOkOr:
error "failed to mount waku mix protocol: ", error = $error
quit(QuitFailure)
@ -486,6 +595,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
#await node.mountRendezvousClient(conf.clusterId)
# Subscribe to spam protection coordination topics via filter since chat2mix doesn't use relay
if not node.wakuFilterClient.isNil():
asyncSpawn setupMixSpamProtectionViaFilter(node)
await node.start()
node.peerManager.start()

View File

@ -236,6 +236,13 @@ type
name: "kad-bootstrap-node"
.}: seq[string]
## RLN spam protection config
rlnUserMessageLimit* {.
desc: "Maximum messages per epoch for RLN spam protection.",
defaultValue: 100,
name: "rln-user-message-limit"
.}: int
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
let elements = p.split(":")
if elements.len != 2:

43
scripts/build_rln_mix.sh Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env bash
# Build a separate, pinned RLN library for mix spam-protection usage.
# This keeps the main nwaku RLN dependency flow unchanged.
set -euo pipefail
source_dir="${1:-}"
version="${2:-}"
output_file="${3:-}"
repo_url="${4:-https://github.com/vacp2p/zerokit.git}"
if [[ -z "${source_dir}" || -z "${version}" || -z "${output_file}" ]]; then
echo "Usage: $0 <source_dir> <version_tag> <output_file> [repo_url]"
exit 1
fi
mkdir -p "$(dirname "${source_dir}")"
mkdir -p "$(dirname "${output_file}")"
if [[ ! -d "${source_dir}/.git" ]]; then
echo "Cloning zerokit ${version} from ${repo_url}..."
if [[ -e "${source_dir}" ]]; then
echo "Path exists but is not a git repository: ${source_dir}"
echo "Please remove it and retry."
exit 1
fi
git clone --depth 1 --branch "${version}" "${repo_url}" "${source_dir}"
else
echo "Using existing zerokit checkout in ${source_dir}"
current_tag="$(git -C "${source_dir}" describe --tags --exact-match 2>/dev/null || true)"
if [[ "${current_tag}" != "${version}" ]]; then
echo "Updating zerokit checkout to ${version}..."
git -C "${source_dir}" fetch --tags origin "${version}"
git -C "${source_dir}" checkout --detach "${version}"
fi
fi
echo "Building mix RLN library from source (version ${version})..."
cargo build --release -p rln --manifest-path "${source_dir}/rln/Cargo.toml"
cp "${source_dir}/target/release/librln.a" "${output_file}"
echo "Successfully built ${output_file}"

View File

@ -3,66 +3,128 @@
## Aim
Simulate a local mixnet along with a chat app to publish using mix.
This is helpful to test any changes while development.
It includes scripts that run a `4 node` mixnet along with a lightpush service node(without mix) that can be used to test quickly.
This is helpful to test any changes during development.
## Simulation Details
Note that before running the simulation both `wakunode2` and `chat2mix` have to be built.
The simulation includes:
1. A 5-node mixnet where `run_mix_node.sh` is the bootstrap node for the other 4 nodes
2. Two chat app instances that publish messages using lightpush protocol over the mixnet
### Available Scripts
| Script | Description |
| ------------------ | ------------------------------------------ |
| `run_mix_node.sh` | Bootstrap mix node (must be started first) |
| `run_mix_node1.sh` | Mix node 1 |
| `run_mix_node2.sh` | Mix node 2 |
| `run_mix_node3.sh` | Mix node 3 |
| `run_mix_node4.sh` | Mix node 4 |
| `run_chat_mix.sh` | Chat app instance 1 |
| `run_chat_mix1.sh` | Chat app instance 2 |
| `build_setup.sh` | Build and generate RLN credentials |
## Prerequisites
Before running the simulation, build `wakunode2` and `chat2mix`:
```bash
cd <repo-root-dir>
make wakunode2
make chat2mix
source env.sh
make wakunode2 chat2mix
```
Simulation includes scripts for:
## RLN Spam Protection Setup
1. a 4 waku-node mixnet where `node1` is bootstrap node for the other 3 nodes.
2. scripts to run chat app that publishes using lightpush protocol over the mixnet
Generate RLN credentials and the shared Merkle tree for all nodes:
```bash
cd simulations/mixnet
./build_setup.sh
```
This script will:
1. Build and run the `setup_credentials` tool
2. Generate RLN credentials for all nodes (5 mix nodes + 2 chat clients)
3. Create `rln_tree.db` - the shared Merkle tree with all members
4. Create keystore files (`rln_keystore_{peerId}.json`) for each node
**Important:** All scripts must be run from this directory (`simulations/mixnet/`) so they can access their credentials and tree file.
To regenerate credentials (e.g., after adding new nodes), run `./build_setup.sh` again - it will clean up old files first.
## Usage
Start the service node with below command, which acts as bootstrap node for all other mix nodes.
### Step 1: Start the Mix Nodes
`./run_lp_service_node.sh`
Start the bootstrap node first (in a separate terminal):
To run the nodes for mixnet run the 4 node scripts in different terminals as below.
`./run_mix_node1.sh`
Look for following 2 log lines to ensure node ran successfully and has also mounted mix protocol.
```log
INF 2025-08-01 14:51:05.445+05:30 mounting mix protocol topics="waku node" tid=39996871 file=waku_node.nim:231 nodeId="(listenAddresses: @[\"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o\"], enrUri: \"enr:-NC4QKYtas8STkenlqBTJ3a1TTLzJA2DsGGbFlnxem9aSM2IXm-CSVZULdk2467bAyFnepnt8KP_QlfDzdaMXd_zqtwBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA6RFtVJVBh0SYOoP8xrgnXSlpiFARmQkF9d8Rn4fSeiog3RjcILqYYN1ZHCCIymFd2FrdTIt\")"
INF 2025-08-01 14:49:23.467+05:30 Node setup complete topics="wakunode main" tid=39994244 file=wakunode2.nim:104
```bash
./run_mix_node.sh
```
Once all the 4 nodes are up without any issues, run the script to start the chat application.
Look for the following log lines to ensure the node started successfully:
`./run_chat_app.sh`
```log
INF mounting mix protocol topics="waku node"
INF Node setup complete topics="wakunode main"
```
Enter a nickname to be used.
Verify RLN spam protection initialized correctly by checking for these logs:
```log
INF Initializing MixRlnSpamProtection
INF MixRlnSpamProtection initialized, waiting for sync
DBG Tree loaded from file
INF MixRlnSpamProtection started
```
Then start the remaining mix nodes in separate terminals:
```bash
./run_mix_node1.sh
./run_mix_node2.sh
./run_mix_node3.sh
./run_mix_node4.sh
```
### Step 2: Start the Chat Applications
Once all 5 mix nodes are running, start the first chat app:
```bash
./run_chat_mix.sh
```
Enter a nickname when prompted:
```bash
pubsub topic is: /waku/2/rs/2/0
Choose a nickname >>
```
Once you see below log, it means the app is ready for publishing messages over the mixnet.
Once you see the following log, the app is ready to publish messages over the mixnet:
```bash
Welcome, test!
Listening on
/ip4/192.168.68.64/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57
/ip4/<local-network-ip>/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57
ready to publish messages now
```
Follow similar instructions to run second instance of chat app.
Once both the apps run successfully, send a message and check if it is received by the other app.
Start the second chat app in another terminal:
You can exit the chat apps by entering `/exit` as below
```bash
./run_chat_mix1.sh
```
### Step 3: Test Messaging
Once both chat apps are running, send a message from one and verify it is received by the other.
To exit the chat apps, enter `/exit`:
```bash
>> /exit

View File

@ -0,0 +1,36 @@
#!/bin/bash
cd "$(dirname "$0")"
MIXNET_DIR=$(pwd)
cd ../..
ROOT_DIR=$(pwd)
source "$ROOT_DIR/env.sh"
# Prefer explicitly provided RLN library path, otherwise use local migrated build.
LIBRLN_PATH=${LIBRLN_PATH:-"/Users/prem/Code/mix-rln-spam-protection-plugin/librln.a"}
# Clean up old files first
rm -f "$MIXNET_DIR/rln_tree.db" "$MIXNET_DIR"/rln_keystore_*.json
echo "Building and running credentials setup..."
# Compile to temp location, then run from mixnet directory
nim c -d:release --mm:refc \
--passL:"$LIBRLN_PATH" --passL:-lm \
-o:/tmp/setup_credentials_$$ \
"$MIXNET_DIR/setup_credentials.nim" 2>&1 | tail -30
# Run from mixnet directory so files are created there
cd "$MIXNET_DIR"
/tmp/setup_credentials_$$
# Clean up temp binary
rm -f /tmp/setup_credentials_$$
# Verify output
if [ -f "rln_tree.db" ]; then
echo ""
echo "Tree file ready at: $(pwd)/rln_tree.db"
ls -la rln_keystore_*.json 2>/dev/null | wc -l | xargs -I {} echo "Generated {} keystore files"
else
echo "Setup failed - rln_tree.db not found"
exit 1
fi

View File

@ -13,7 +13,7 @@ discv5-udp-port = 9002
discv5-enr-auto-update = true
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
rest = false
rest = true
rest-admin = false
ports-shift = 3
num-shards-in-network = 1

View File

@ -13,7 +13,7 @@ discv5-udp-port = 9003
discv5-enr-auto-update = true
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
rest = false
rest = true
rest-admin = false
ports-shift = 4
num-shards-in-network = 1

View File

@ -13,7 +13,7 @@ discv5-udp-port = 9004
discv5-enr-auto-update = true
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
rest = false
rest = true
rest-admin = false
ports-shift = 5
num-shards-in-network = 1

View File

@ -1,2 +1,2 @@
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --nodekey="cb6fe589db0e5d5b48f7e82d33093e4d9d35456f4aaffc2322c473a173b2ac49" --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --fleet="none"
#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"

View File

@ -1,2 +1 @@
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE
#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --nodekey="35eace7ccb246f20c487e05015ca77273d8ecaed0ed683de3d39bf4f69336feb" --mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" --mixnode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o:9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c" --fleet="none"

View File

@ -1 +1,2 @@
../../build/wakunode2 --config-file="config.toml" 2>&1 | tee mix_node.log

View File

@ -0,0 +1,139 @@
{.push raises: [].}
## Setup script to generate RLN credentials and shared Merkle tree for mix nodes.
##
## This script:
## 1. Generates credentials for each node (identified by peer ID)
## 2. Registers all credentials in a shared Merkle tree
## 3. Saves the tree to rln_tree.db
## 4. Saves individual keystores named by peer ID
##
## Usage: nim c -r setup_credentials.nim
import std/[os, strformat, options], chronicles, chronos, results
import
mix_rln_spam_protection/credentials,
mix_rln_spam_protection/group_manager,
mix_rln_spam_protection/rln_interface,
mix_rln_spam_protection/types
const
KeystorePassword = "mix-rln-password" # Must match protocol.nim
DefaultUserMessageLimit = 100'u64 # Network-wide default rate limit
SpammerUserMessageLimit = 3'u64 # Lower limit for spammer testing
# Peer IDs derived from nodekeys in config files
# config.toml: nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a"
# config1.toml: nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684"
# config2.toml: nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e"
# config3.toml: nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6"
# config4.toml: nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4"
# chat2mix.sh: nodekey = "cb6fe589db0e5d5b48f7e82d33093e4d9d35456f4aaffc2322c473a173b2ac49"
# chat2mix1.sh: nodekey = "35eace7ccb246f20c487e05015ca77273d8ecaed0ed683de3d39bf4f69336feb"
# Node info: (peerId, userMessageLimit)
NodeConfigs = [
("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", DefaultUserMessageLimit),
# config.toml (service node)
("16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", DefaultUserMessageLimit),
# config1.toml (mix node 1)
("16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA", DefaultUserMessageLimit),
# config2.toml (mix node 2)
("16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f", DefaultUserMessageLimit),
# config3.toml (mix node 3)
("16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", DefaultUserMessageLimit),
# config4.toml (mix node 4)
("16Uiu2HAm1QxSjNvNbsT2xtLjRGAsBLVztsJiTHr9a3EK96717hpj", DefaultUserMessageLimit),
# chat2mix client 1
("16Uiu2HAmC9h26U1C83FJ5xpE32ghqya8CaZHX1Y7qpfHNnRABscN", DefaultUserMessageLimit),
# chat2mix client 2
]
proc setupCredentialsAndTree() {.async.} =
## Generate credentials for all nodes and create a shared tree
echo "=== RLN Credentials Setup ==="
echo "Generating credentials for ", NodeConfigs.len, " nodes...\n"
# Generate credentials for all nodes
var allCredentials:
seq[tuple[peerId: string, cred: IdentityCredential, rateLimit: uint64]]
for (peerId, rateLimit) in NodeConfigs:
let cred = generateCredentials().valueOr:
echo "Failed to generate credentials for ", peerId, ": ", error
quit(1)
allCredentials.add((peerId: peerId, cred: cred, rateLimit: rateLimit))
echo "Generated credentials for ", peerId
echo " idCommitment: ", cred.idCommitment.toHex()[0 .. 15], "..."
echo " userMessageLimit: ", rateLimit
echo ""
# Create a group manager directly to build the tree
let rlnInstance = newRLNInstance().valueOr:
echo "Failed to create RLN instance: ", error
quit(1)
let groupManager = newOffchainGroupManager(rlnInstance, "/mix/rln/membership/v1")
# Initialize the group manager
let initRes = await groupManager.init()
if initRes.isErr:
echo "Failed to initialize group manager: ", initRes.error
quit(1)
# Register all credentials in the tree with their specific rate limits
echo "Registering all credentials in the Merkle tree..."
for i, entry in allCredentials:
let index = (
await groupManager.registerWithLimit(entry.cred.idCommitment, entry.rateLimit)
).valueOr:
echo "Failed to register credential for ", entry.peerId, ": ", error
quit(1)
echo " Registered ",
entry.peerId, " at index ", index, " (limit: ", entry.rateLimit, ")"
echo ""
# Save the tree to disk
echo "Saving tree to rln_tree.db..."
let saveRes = groupManager.saveTreeToFile("rln_tree.db")
if saveRes.isErr:
echo "Failed to save tree: ", saveRes.error
quit(1)
echo "Tree saved successfully!"
echo ""
# Save each credential to a keystore file named by peer ID
echo "Saving keystores..."
for i, entry in allCredentials:
let keystorePath = &"rln_keystore_{entry.peerId}.json"
# Save with membership index and rate limit
let saveResult = saveKeystore(
entry.cred,
KeystorePassword,
keystorePath,
some(MembershipIndex(i)),
some(entry.rateLimit),
)
if saveResult.isErr:
echo "Failed to save keystore for ", entry.peerId, ": ", saveResult.error
quit(1)
echo " Saved: ", keystorePath, " (limit: ", entry.rateLimit, ")"
echo ""
echo "=== Setup Complete ==="
echo " Tree file: rln_tree.db (", NodeConfigs.len, " members)"
echo " Keystores: rln_keystore_{peerId}.json"
echo " Password: ", KeystorePassword
echo " Default rate limit: ", DefaultUserMessageLimit
echo " Spammer rate limit: ", SpammerUserMessageLimit
echo ""
echo "Note: All nodes must use the same rln_tree.db file."
when isMainModule:
waitFor setupCredentialsAndTree()

View File

@ -1138,4 +1138,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
of WakuMode.noMode:
discard # use explicit CLI flags as-is
b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery)
b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes)
return b.build()

@ -0,0 +1 @@
Subproject commit 037f8e100bfedffdbad1c4442e760d10a2437428

View File

@ -194,7 +194,7 @@ proc runDiscoveryLoop(
info "extended kademlia discovery loop started", interval = interval
try:
while true:
while wk.running:
# Wait for node to be started
if not wk.isNodeStarted.isNil() and not wk.isNodeStarted():
await sleepAsync(ExtendedKademliaDiscoveryStartupDelay)
@ -258,6 +258,8 @@ proc start*(
except CatchableError as e:
return err("failed to start kademlia discovery: " & e.msg)
wk.running = true
wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers)
info "kademlia discovery started"

View File

@ -27,6 +27,7 @@ import
waku_archive,
waku_store_sync,
waku_rln_relay,
waku_mix,
node/waku_node,
node/peer_manager,
common/broker/broker_context,
@ -91,6 +92,12 @@ proc registerRelayHandler(
node.wakuStoreReconciliation.messageIngress(topic, msg)
proc mixHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuMix.isNil():
return
await node.wakuMix.handleMessage(topic, msg)
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
@ -101,6 +108,7 @@ proc registerRelayHandler(
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
await mixHandler(topic, msg)
await internalHandler(topic, msg)
# Call the legacy (kernel API) app handler if it exists.

View File

@ -0,0 +1,125 @@
## Mix spam protection coordination via filter protocol
## This module handles filter-based subscription for spam protection coordination
## when relay is not available.
{.push raises: [].}
import chronos, chronicles, std/options
import
../waku_core,
../waku_core/topics/sharding,
../waku_filter_v2/common,
./peer_manager,
../waku_filter_v2/client,
../waku_mix/protocol
logScope:
topics = "waku node mix_coordination"
# Type aliases for callbacks to avoid circular imports
type
FilterSubscribeProc* = proc(
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: RemotePeerInfo,
): Future[FilterSubscribeResult] {.async, gcsafe.}
FilterPingProc* =
proc(peer: RemotePeerInfo): Future[FilterSubscribeResult] {.async, gcsafe.}
# Forward declaration
proc subscribeSpamProtectionViaFilter(
wakuMix: WakuMix,
peerManager: PeerManager,
filterClient: WakuFilterClient,
filterSubscribe: FilterSubscribeProc,
contentTopics: seq[ContentTopic],
) {.async.}
proc setupSpamProtectionViaFilter*(
wakuMix: WakuMix,
peerManager: PeerManager,
filterClient: WakuFilterClient,
filterSubscribe: FilterSubscribeProc,
) =
## Set up filter-based spam protection coordination.
## Registers message handler and spawns subscription maintenance task.
let spamTopics = wakuMix.getSpamProtectionContentTopics()
if spamTopics.len == 0:
return
info "Relay not available, subscribing to spam protection via filter",
topics = spamTopics
# Register handler for spam protection messages
filterClient.registerPushHandler(
proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
if message.contentTopic in spamTopics:
await wakuMix.handleMessage(pubsubTopic, message)
)
# Wait for filter peer to be available and maintain subscription
asyncSpawn subscribeSpamProtectionViaFilter(
wakuMix, peerManager, filterClient, filterSubscribe, spamTopics
)
proc subscribeSpamProtectionViaFilter(
wakuMix: WakuMix,
peerManager: PeerManager,
filterClient: WakuFilterClient,
filterSubscribe: FilterSubscribeProc,
contentTopics: seq[ContentTopic],
) {.async.} =
## Subscribe to spam protection topics via filter and maintain the subscription.
## Waits for a filter peer to be available before subscribing.
## Continuously monitors the subscription health with periodic pings.
const RetryInterval = chronos.seconds(5)
const SubscriptionMaintenance = chronos.seconds(30)
const MaxFailedSubscribes = 3
var currentFilterPeer: Option[RemotePeerInfo] = none(RemotePeerInfo)
var noFailedSubscribes = 0
while true:
# Select or reuse filter peer
if currentFilterPeer.isNone():
let filterPeerOpt = peerManager.selectPeer(WakuFilterSubscribeCodec)
if filterPeerOpt.isNone():
debug "No filter peer available yet for spam protection, retrying..."
await sleepAsync(RetryInterval)
continue
currentFilterPeer = some(filterPeerOpt.get())
info "Selected filter peer for spam protection",
peer = currentFilterPeer.get().peerId
# Check if subscription is still alive with ping
let pingErr = (await filterClient.ping(currentFilterPeer.get())).errorOr:
# Subscription is alive, wait before next check
await sleepAsync(SubscriptionMaintenance)
if noFailedSubscribes > 0:
noFailedSubscribes = 0
continue
# Subscription lost, need to re-subscribe
warn "Spam protection filter subscription ping failed, re-subscribing",
error = pingErr, peer = currentFilterPeer.get().peerId
# Subscribe to spam protection topics
let res =
await filterSubscribe(none(PubsubTopic), contentTopics, currentFilterPeer.get())
if res.isErr():
noFailedSubscribes += 1
warn "Failed to subscribe to spam protection topics via filter",
error = res.error, topics = contentTopics, failCount = noFailedSubscribes
if noFailedSubscribes >= MaxFailedSubscribes:
# Try with a different peer
warn "Max subscription failures reached, selecting new filter peer"
currentFilterPeer = none(RemotePeerInfo)
noFailedSubscribes = 0
await sleepAsync(RetryInterval)
else:
info "Successfully subscribed to spam protection topics via filter",
topics = contentTopics, peer = currentFilterPeer.get().peerId
noFailedSubscribes = 0
await sleepAsync(SubscriptionMaintenance)

View File

@ -181,6 +181,18 @@ proc getShardsGetter(node: WakuNode, configuredShards: seq[uint16]): GetShards =
return shards
return configuredShards
proc getRelayMixHandler*(node: WakuNode): Option[WakuRelayHandler] =
## Returns a handler for mix spam protection coordination messages if mix is mounted
if node.wakuMix.isNil():
return none(WakuRelayHandler)
let handler: WakuRelayHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, gcsafe.} =
await node.wakuMix.handleMessage(pubsubTopic, message)
return some(handler)
proc getCapabilitiesGetter(node: WakuNode): GetCapabilities =
return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} =
if node.wakuRelay.isNil():
@ -301,6 +313,7 @@ proc mountMix*(
clusterId: uint16,
mixPrivKey: Curve25519Key,
mixnodes: seq[MixNodePubInfo],
userMessageLimit: Option[int] = none(int),
): Future[Result[void, string]] {.async.} =
info "mounting mix protocol", nodeId = node.info #TODO log the config used
@ -311,8 +324,29 @@ proc mountMix*(
return err("Failed to convert multiaddress to string.")
info "local addr", localaddr = localaddrStr
# Create callback to publish coordination messages via relay
let publishMessage: PublishMessage = proc(
message: WakuMessage
): Future[Result[void, string]] {.async.} =
# Inline implementation of publish logic to avoid circular import
if node.wakuRelay.isNil():
return err("WakuRelay not mounted")
# Derive pubsub topic from content topic using auto sharding
let pubsubTopic =
if node.wakuAutoSharding.isNone():
return err("Auto sharding not configured")
else:
node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr:
return err("Autosharding error: " & error)
# Publish via relay
discard await node.wakuRelay.publish(pubsubTopic, message)
return ok()
node.wakuMix = WakuMix.new(
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes, publishMessage,
userMessageLimit,
).valueOr:
error "Waku Mix protocol initialization failed", err = error
return
@ -322,9 +356,10 @@ proc mountMix*(
node.switch.mount(node.wakuMix)
catchRes.isOkOr:
return err(error.msg)
return ok()
## Waku Sync
# Note: start() is called later in WakuNode.start(), not here during mount
return ok()
proc mountStoreSync*(
node: WakuNode,
@ -583,7 +618,7 @@ proc start*(node: WakuNode) {.async.} =
await node.startRelay()
if not node.wakuMix.isNil():
node.wakuMix.start()
await node.wakuMix.start()
if not node.wakuMetadata.isNil():
node.wakuMetadata.start()

View File

@ -1,6 +1,6 @@
{.push raises: [].}
import chronicles, std/options, chronos, results, metrics
import chronicles, std/[options, sequtils], chronos, results, metrics
import
libp2p/crypto/curve25519,
@ -10,14 +10,18 @@ import
libp2p/protocols/mix/mix_protocol,
libp2p/protocols/mix/mix_metrics,
libp2p/protocols/mix/delay_strategy,
libp2p/[multiaddress, peerid],
libp2p/protocols/mix/spam_protection,
libp2p/[multiaddress, multicodec, peerid, peerinfo],
eth/common/keys
import
waku/node/peer_manager,
waku/waku_core,
waku/waku_enr,
waku/node/peer_manager/waku_peer_store
waku/node/peer_manager/waku_peer_store,
mix_rln_spam_protection,
waku/waku_relay,
waku/common/nimchronos
logScope:
topics = "waku mix"
@ -25,10 +29,16 @@ logScope:
const minMixPoolSize = 4
type
PublishMessage* = proc(message: WakuMessage): Future[Result[void, string]] {.
async, gcsafe, raises: []
.}
WakuMix* = ref object of MixProtocol
peerManager*: PeerManager
clusterId: uint16
pubKey*: Curve25519Key
mixRlnSpamProtection*: MixRlnSpamProtection
publishMessage*: PublishMessage
WakuMixResult*[T] = Result[T, string]
@ -41,11 +51,9 @@ proc processBootNodes(
) =
var count = 0
for node in bootnodes:
let pInfo = parsePeerInfo(node.multiAddr).valueOr:
error "Failed to get peer id from multiaddress: ",
error = error, multiAddr = $node.multiAddr
let (peerId, networkAddr) = parseFullAddress(node.multiAddr).valueOr:
error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error
continue
let peerId = pInfo.peerId
var peerPubKey: crypto.PublicKey
if not peerId.extractPublicKey(peerPubKey):
warn "Failed to extract public key from peerId, skipping node", peerId = peerId
@ -65,10 +73,10 @@ proc processBootNodes(
count.inc()
peermgr.addPeer(
RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey))
RemotePeerInfo.init(peerId, @[networkAddr], mixPubKey = some(node.pubKey))
)
mix_pool_size.set(count)
info "using mix bootstrap nodes ", count = count
debug "using mix bootstrap nodes ", count = count
proc new*(
T: typedesc[WakuMix],
@ -77,9 +85,11 @@ proc new*(
clusterId: uint16,
mixPrivKey: Curve25519Key,
bootnodes: seq[MixNodePubInfo],
publishMessage: PublishMessage,
userMessageLimit: Option[int] = none(int),
): WakuMixResult[T] =
let mixPubKey = public(mixPrivKey)
info "mixPubKey", mixPubKey = mixPubKey
trace "mixPubKey", mixPubKey = mixPubKey
let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr:
return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error)
let localMixNodeInfo = initMixNodeInfo(
@ -87,27 +97,193 @@ proc new*(
peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey,
)
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
# Initialize spam protection with persistent credentials
# Use peerID in keystore path so multiple peers can run from same directory
# Tree path is shared across all nodes to maintain the full membership set
let peerId = peermgr.switch.peerInfo.peerId
var spamProtectionConfig = defaultConfig()
spamProtectionConfig.keystorePath = "rln_keystore_" & $peerId & ".json"
spamProtectionConfig.keystorePassword = "mix-rln-password"
if userMessageLimit.isSome():
spamProtectionConfig.userMessageLimit = userMessageLimit.get()
# rlnResourcesPath left empty to use bundled resources (via "tree_height_/" placeholder)
let spamProtection = newMixRlnSpamProtection(spamProtectionConfig).valueOr:
return err("failed to create spam protection: " & error)
var m = WakuMix(
peerManager: peermgr,
clusterId: clusterId,
pubKey: mixPubKey,
mixRlnSpamProtection: spamProtection,
publishMessage: publishMessage,
)
procCall MixProtocol(m).init(
localMixNodeInfo,
peermgr.switch,
spamProtection = Opt.some(SpamProtection(spamProtection)),
delayStrategy =
ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()),
ExponentialDelayStrategy.new(meanDelayMs = 100, rng = crypto.newRng()),
)
processBootNodes(bootnodes, peermgr, m)
if m.nodePool.len < minMixPoolSize:
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
return ok(m)
proc poolSize*(mix: WakuMix): int =
mix.nodePool.len
method start*(mix: WakuMix) =
proc setupSpamProtectionCallbacks(mix: WakuMix) =
## Set up the publish callback for spam protection coordination.
## This enables the plugin to broadcast membership updates and proof metadata
## via Waku relay.
if mix.publishMessage.isNil():
warn "PublishMessage callback not available, spam protection coordination disabled"
return
let publishCallback: PublishCallback = proc(
contentTopic: string, data: seq[byte]
) {.async.} =
# Create a WakuMessage for the coordination data
let msg = WakuMessage(
payload: data,
contentTopic: contentTopic,
ephemeral: true, # Coordination messages don't need to be stored
timestamp: getNowInNanosecondTime(),
)
# Delegate to node's publish API which handles topic derivation and relay publishing
let res = await mix.publishMessage(msg)
if res.isErr():
warn "Failed to publish spam protection coordination message",
contentTopic = contentTopic, error = res.error
return
trace "Published spam protection coordination message", contentTopic = contentTopic
mix.mixRlnSpamProtection.setPublishCallback(publishCallback)
trace "Spam protection publish callback configured"
proc handleMessage*(
mix: WakuMix, pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, gcsafe.} =
## Handle incoming messages for spam protection coordination.
## This should be called from the relay handler for coordination content topics.
if mix.mixRlnSpamProtection.isNil():
return
let contentTopic = message.contentTopic
if contentTopic == mix.mixRlnSpamProtection.getMembershipContentTopic():
# Handle membership update
let res = await mix.mixRlnSpamProtection.handleMembershipUpdate(message.payload)
if res.isErr:
warn "Failed to handle membership update", error = res.error
else:
trace "Handled membership update"
# Persist tree after membership changes (temporary solution)
# TODO: Replace with proper persistence strategy (e.g., periodic snapshots)
let saveRes = mix.mixRlnSpamProtection.saveTree()
if saveRes.isErr:
debug "Failed to save tree after membership update", error = saveRes.error
else:
trace "Saved tree after membership update"
elif contentTopic == mix.mixRlnSpamProtection.getProofMetadataContentTopic():
# Handle proof metadata for network-wide spam detection
let res = mix.mixRlnSpamProtection.handleProofMetadata(message.payload)
if res.isErr:
warn "Failed to handle proof metadata", error = res.error
else:
trace "Handled proof metadata"
proc getSpamProtectionContentTopics*(mix: WakuMix): seq[string] =
## Get the content topics used by spam protection for coordination.
## Use these to set up relay subscriptions.
if mix.mixRlnSpamProtection.isNil():
return @[]
return mix.mixRlnSpamProtection.getContentTopics()
proc saveSpamProtectionTree*(mix: WakuMix): Result[void, string] =
## Save the spam protection membership tree to disk.
## This allows preserving the tree state across restarts.
if mix.mixRlnSpamProtection.isNil():
return err("Spam protection not initialized")
mix.mixRlnSpamProtection.saveTree().mapErr(
proc(e: string): string =
e
)
proc loadSpamProtectionTree*(mix: WakuMix): Result[void, string] =
## Load the spam protection membership tree from disk.
## Call this before init() to restore tree state from previous runs.
## TODO: This is a temporary solution. Ideally nodes should sync tree state
## via a store query for historical membership messages or via dedicated
## tree sync protocol.
if mix.mixRlnSpamProtection.isNil():
return err("Spam protection not initialized")
mix.mixRlnSpamProtection.loadTree().mapErr(
proc(e: string): string =
e
)
method start*(mix: WakuMix) {.async.} =
info "starting waku mix protocol"
# Set up spam protection callbacks and start
if not mix.mixRlnSpamProtection.isNil():
# Initialize spam protection (MixProtocol.init() does NOT call init() on the plugin)
let initRes = await mix.mixRlnSpamProtection.init()
if initRes.isErr:
error "Failed to initialize spam protection", error = initRes.error
else:
# Load existing tree to sync with other members
# This should be done after init() (which loads credentials)
# but before registerSelf() (which adds us to the tree)
let loadRes = mix.mixRlnSpamProtection.loadTree()
if loadRes.isErr:
debug "No existing tree found or failed to load, starting fresh",
error = loadRes.error
else:
debug "Loaded existing spam protection membership tree from disk"
# Restore our credentials to the tree (after tree load, whether it succeeded or not)
# This ensures our member is in the tree if we have an index from keystore
let restoreRes = mix.mixRlnSpamProtection.restoreCredentialsToTree()
if restoreRes.isErr:
error "Failed to restore credentials to tree", error = restoreRes.error
# Set up publish callback (must be before start so registerSelf can use it)
mix.setupSpamProtectionCallbacks()
let startRes = await mix.mixRlnSpamProtection.start()
if startRes.isErr:
error "Failed to start spam protection", error = startRes.error
else:
# Register self to broadcast membership to the network
let registerRes = await mix.mixRlnSpamProtection.registerSelf()
if registerRes.isErr:
error "Failed to register spam protection credentials",
error = registerRes.error
else:
debug "Registered spam protection credentials", index = registerRes.get()
# Save tree to persist membership state
let saveRes = mix.mixRlnSpamProtection.saveTree()
if saveRes.isErr:
warn "Failed to save spam protection tree", error = saveRes.error
else:
trace "Saved spam protection tree to disk"
method stop*(mix: WakuMix) {.async.} =
discard
# Stop spam protection
if not mix.mixRlnSpamProtection.isNil():
await mix.mixRlnSpamProtection.stop()
debug "Spam protection stopped"
# Mix Protocol