mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-03-02 23:23:13 +00:00
integrate RLN spam protection into mix protocol with OffChainGroupManager and logos-messaging for broadcasting membership changes and general coordination with API updates and fixes
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
b9ffbfeb65
commit
4a165f09d3
1
.gitignore
vendored
1
.gitignore
vendored
@ -43,6 +43,7 @@ node_modules/
|
||||
|
||||
# RLN / keystore
|
||||
rlnKeystore.json
|
||||
rln_keystore*.json
|
||||
*.tar.gz
|
||||
|
||||
# Nimbus Build System
|
||||
|
||||
5
.gitmodules
vendored
5
.gitmodules
vendored
@ -195,3 +195,8 @@
|
||||
url = https://github.com/logos-messaging/nim-ffi/
|
||||
ignore = untracked
|
||||
branch = master
|
||||
[submodule "vendor/mix-rln-spam-protection-plugin"]
|
||||
path = vendor/mix-rln-spam-protection-plugin
|
||||
url = https://github.com/logos-co/mix-rln-spam-protection-plugin.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
|
||||
@ -35,6 +35,7 @@ import
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
waku_core/topics/sharding,
|
||||
waku_lightpush/common,
|
||||
waku_lightpush/rpc,
|
||||
waku_enr,
|
||||
@ -47,6 +48,8 @@ import
|
||||
common/utils/nat,
|
||||
waku_store/common,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/common as filter_common,
|
||||
waku_mix/protocol,
|
||||
common/logging,
|
||||
],
|
||||
./config_chat2mix
|
||||
@ -57,6 +60,105 @@ import ../../waku/waku_rln_relay
|
||||
logScope:
|
||||
topics = "chat2 mix"
|
||||
|
||||
#########################
|
||||
## Mix Spam Protection ##
|
||||
#########################
|
||||
|
||||
# Forward declaration
|
||||
proc maintainSpamProtectionSubscription(
|
||||
node: WakuNode, contentTopics: seq[ContentTopic]
|
||||
) {.async.}
|
||||
|
||||
proc setupMixSpamProtectionViaFilter(node: WakuNode) {.async.} =
|
||||
## Setup filter-based spam protection coordination for mix protocol.
|
||||
## Since chat2mix doesn't use relay, we subscribe via filter to receive
|
||||
## spam protection coordination messages.
|
||||
|
||||
# Register message handler for spam protection coordination
|
||||
let spamTopics = node.wakuMix.getSpamProtectionContentTopics()
|
||||
|
||||
proc handleSpamMessage(
|
||||
pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
await node.wakuMix.handleMessage(pubsubTopic, message)
|
||||
|
||||
node.wakuFilterClient.registerPushHandler(handleSpamMessage)
|
||||
|
||||
# Wait for filter peer and maintain subscription
|
||||
asyncSpawn maintainSpamProtectionSubscription(node, spamTopics)
|
||||
|
||||
proc maintainSpamProtectionSubscription(
|
||||
node: WakuNode, contentTopics: seq[ContentTopic]
|
||||
) {.async.} =
|
||||
## Maintain filter subscription for spam protection topics.
|
||||
## Monitors subscription health with periodic pings and re-subscribes on failure.
|
||||
const RetryInterval = chronos.seconds(5)
|
||||
const SubscriptionMaintenance = chronos.seconds(30)
|
||||
const MaxFailedSubscribes = 3
|
||||
var currentFilterPeer: Option[RemotePeerInfo] = none(RemotePeerInfo)
|
||||
var noFailedSubscribes = 0
|
||||
|
||||
while true:
|
||||
# Select or reuse filter peer
|
||||
if currentFilterPeer.isNone():
|
||||
let filterPeerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
|
||||
if filterPeerOpt.isNone():
|
||||
debug "No filter peer available yet for spam protection, retrying..."
|
||||
await sleepAsync(RetryInterval)
|
||||
continue
|
||||
currentFilterPeer = some(filterPeerOpt.get())
|
||||
info "Selected filter peer for spam protection",
|
||||
peer = currentFilterPeer.get().peerId
|
||||
|
||||
# Check if subscription is still alive with ping
|
||||
let pingErr = (await node.wakuFilterClient.ping(currentFilterPeer.get())).errorOr:
|
||||
# Subscription is alive, wait before next check
|
||||
await sleepAsync(SubscriptionMaintenance)
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes = 0
|
||||
continue
|
||||
|
||||
# Subscription lost, need to re-subscribe
|
||||
warn "Spam protection filter subscription ping failed, re-subscribing",
|
||||
error = pingErr, peer = currentFilterPeer.get().peerId
|
||||
|
||||
# Determine pubsub topic from content topics (using auto-sharding)
|
||||
if node.wakuAutoSharding.isNone():
|
||||
error "Auto-sharding not configured, cannot determine pubsub topic for spam protection"
|
||||
await sleepAsync(RetryInterval)
|
||||
continue
|
||||
|
||||
let shardRes = node.wakuAutoSharding.get().getShard(contentTopics[0])
|
||||
if shardRes.isErr():
|
||||
error "Failed to determine shard for spam protection", error = shardRes.error
|
||||
await sleepAsync(RetryInterval)
|
||||
continue
|
||||
|
||||
let shard = shardRes.get()
|
||||
let pubsubTopic: PubsubTopic = shard # converter toPubsubTopic
|
||||
|
||||
# Subscribe to spam protection topics
|
||||
let res = await node.wakuFilterClient.subscribe(
|
||||
currentFilterPeer.get(), pubsubTopic, contentTopics
|
||||
)
|
||||
if res.isErr():
|
||||
noFailedSubscribes += 1
|
||||
warn "Failed to subscribe to spam protection topics via filter",
|
||||
error = res.error, topics = contentTopics, failCount = noFailedSubscribes
|
||||
|
||||
if noFailedSubscribes >= MaxFailedSubscribes:
|
||||
# Try with a different peer
|
||||
warn "Max subscription failures reached, selecting new filter peer"
|
||||
currentFilterPeer = none(RemotePeerInfo)
|
||||
noFailedSubscribes = 0
|
||||
|
||||
await sleepAsync(RetryInterval)
|
||||
else:
|
||||
info "Successfully subscribed to spam protection topics via filter",
|
||||
topics = contentTopics, peer = currentFilterPeer.get().peerId
|
||||
noFailedSubscribes = 0
|
||||
await sleepAsync(SubscriptionMaintenance)
|
||||
|
||||
const Help =
|
||||
"""
|
||||
Commands: /[?|help|connect|nick|exit]
|
||||
@ -210,20 +312,21 @@ proc publish(c: Chat, line: string) {.async.} =
|
||||
try:
|
||||
if not c.node.wakuLightpushClient.isNil():
|
||||
# Attempt lightpush with mix
|
||||
|
||||
(
|
||||
waitFor c.node.lightpushPublish(
|
||||
some(c.conf.getPubsubTopic(c.node, c.contentTopic)),
|
||||
message,
|
||||
none(RemotePeerInfo),
|
||||
true,
|
||||
)
|
||||
).isOkOr:
|
||||
error "failed to publish lightpush message", error = error
|
||||
let res = await c.node.lightpushPublish(
|
||||
some(c.conf.getPubsubTopic(c.node, c.contentTopic)),
|
||||
message,
|
||||
none(RemotePeerInfo),
|
||||
true,
|
||||
)
|
||||
if res.isErr():
|
||||
error "failed to publish lightpush message", error = res.error
|
||||
echo "Error: " & res.error.desc.get("unknown error")
|
||||
else:
|
||||
error "failed to publish message as lightpush client is not initialized"
|
||||
echo "Error: lightpush client is not initialized"
|
||||
except CatchableError:
|
||||
error "caught error publishing message: ", error = getCurrentExceptionMsg()
|
||||
echo "Error: " & getCurrentExceptionMsg()
|
||||
|
||||
# TODO This should read or be subscribe handler subscribe
|
||||
proc readAndPrint(c: Chat) {.async.} =
|
||||
@ -452,7 +555,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
error "failed to generate mix key pair", error = error
|
||||
return
|
||||
|
||||
(await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr:
|
||||
(
|
||||
await node.mountMix(
|
||||
conf.clusterId, mixPrivKey, conf.mixnodes, some(conf.rlnUserMessageLimit)
|
||||
)
|
||||
).isOkOr:
|
||||
error "failed to mount waku mix protocol: ", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
@ -487,6 +594,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
#await node.mountRendezvousClient(conf.clusterId)
|
||||
|
||||
# Subscribe to spam protection coordination topics via filter since chat2mix doesn't use relay
|
||||
if not node.wakuFilterClient.isNil():
|
||||
asyncSpawn setupMixSpamProtectionViaFilter(node)
|
||||
|
||||
await node.start()
|
||||
|
||||
node.peerManager.start()
|
||||
|
||||
@ -237,6 +237,13 @@ type
|
||||
name: "kad-bootstrap-node"
|
||||
.}: seq[string]
|
||||
|
||||
## RLN spam protection config
|
||||
rlnUserMessageLimit* {.
|
||||
desc: "Maximum messages per epoch for RLN spam protection.",
|
||||
defaultValue: 100,
|
||||
name: "rln-user-message-limit"
|
||||
.}: int
|
||||
|
||||
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
|
||||
let elements = p.split(":")
|
||||
if elements.len != 2:
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
import os
|
||||
|
||||
when fileExists("nimbus-build-system.paths"):
|
||||
include "nimbus-build-system.paths"
|
||||
|
||||
if defined(release):
|
||||
switch("nimcache", "nimcache/release/$projectName")
|
||||
else:
|
||||
|
||||
@ -3,66 +3,127 @@
|
||||
## Aim
|
||||
|
||||
Simulate a local mixnet along with a chat app to publish using mix.
|
||||
This is helpful to test any changes while development.
|
||||
It includes scripts that run a `4 node` mixnet along with a lightpush service node(without mix) that can be used to test quickly.
|
||||
This is helpful to test any changes during development.
|
||||
|
||||
## Simulation Details
|
||||
|
||||
Note that before running the simulation both `wakunode2` and `chat2mix` have to be built.
|
||||
The simulation includes:
|
||||
|
||||
1. A 5-node mixnet where `run_mix_node.sh` is the bootstrap node for the other 4 nodes
|
||||
2. Two chat app instances that publish messages using lightpush protocol over the mixnet
|
||||
|
||||
### Available Scripts
|
||||
|
||||
| Script | Description |
|
||||
|--------|-------------|
|
||||
| `run_mix_node.sh` | Bootstrap mix node (must be started first) |
|
||||
| `run_mix_node1.sh` | Mix node 1 |
|
||||
| `run_mix_node2.sh` | Mix node 2 |
|
||||
| `run_mix_node3.sh` | Mix node 3 |
|
||||
| `run_mix_node4.sh` | Mix node 4 |
|
||||
| `run_chat_mix.sh` | Chat app instance 1 |
|
||||
| `run_chat_mix1.sh` | Chat app instance 2 |
|
||||
| `build_setup.sh` | Build and generate RLN credentials |
|
||||
|
||||
## Prerequisites
|
||||
|
||||
Before running the simulation, build `wakunode2` and `chat2mix`:
|
||||
|
||||
```bash
|
||||
cd <repo-root-dir>
|
||||
make wakunode2
|
||||
make chat2mix
|
||||
make wakunode2 chat2mix
|
||||
```
|
||||
|
||||
Simulation includes scripts for:
|
||||
## RLN Spam Protection Setup
|
||||
|
||||
1. a 4 waku-node mixnet where `node1` is bootstrap node for the other 3 nodes.
|
||||
2. scripts to run chat app that publishes using lightpush protocol over the mixnet
|
||||
Before running the simulation, generate RLN credentials and the shared Merkle tree for all nodes:
|
||||
|
||||
```bash
|
||||
cd simulations/mixnet
|
||||
./build_setup.sh
|
||||
```
|
||||
|
||||
This script will:
|
||||
|
||||
1. Build and run the `setup_credentials` tool
|
||||
2. Generate RLN credentials for all nodes (5 mix nodes + 2 chat clients)
|
||||
3. Create `rln_tree.db` - the shared Merkle tree with all members
|
||||
4. Create keystore files (`rln_keystore_{peerId}.json`) for each node
|
||||
|
||||
**Important:** All scripts must be run from this directory (`simulations/mixnet/`) so they can access their credentials and tree file.
|
||||
|
||||
To regenerate credentials (e.g., after adding new nodes), run `./build_setup.sh` again - it will clean up old files first.
|
||||
|
||||
## Usage
|
||||
|
||||
Start the service node with below command, which acts as bootstrap node for all other mix nodes.
|
||||
### Step 1: Start the Mix Nodes
|
||||
|
||||
`./run_lp_service_node.sh`
|
||||
Start the bootstrap node first (in a separate terminal):
|
||||
|
||||
To run the nodes for mixnet run the 4 node scripts in different terminals as below.
|
||||
|
||||
`./run_mix_node1.sh`
|
||||
|
||||
Look for following 2 log lines to ensure node ran successfully and has also mounted mix protocol.
|
||||
|
||||
```log
|
||||
INF 2025-08-01 14:51:05.445+05:30 mounting mix protocol topics="waku node" tid=39996871 file=waku_node.nim:231 nodeId="(listenAddresses: @[\"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o\"], enrUri: \"enr:-NC4QKYtas8STkenlqBTJ3a1TTLzJA2DsGGbFlnxem9aSM2IXm-CSVZULdk2467bAyFnepnt8KP_QlfDzdaMXd_zqtwBgmlkgnY0gmlwhH8AAAGHbWl4LWtleaCdCc5iT3bo9gYmXtucyit96bQXcqbXhL3a-S_6j7p9LIptdWx0aWFkZHJzgIJyc4UAAgEAAIlzZWNwMjU2azGhA6RFtVJVBh0SYOoP8xrgnXSlpiFARmQkF9d8Rn4fSeiog3RjcILqYYN1ZHCCIymFd2FrdTIt\")"
|
||||
|
||||
INF 2025-08-01 14:49:23.467+05:30 Node setup complete topics="wakunode main" tid=39994244 file=wakunode2.nim:104
|
||||
```bash
|
||||
./run_mix_node.sh
|
||||
```
|
||||
|
||||
Once all the 4 nodes are up without any issues, run the script to start the chat application.
|
||||
Look for the following log lines to ensure the node started successfully:
|
||||
|
||||
`./run_chat_app.sh`
|
||||
```log
|
||||
INF mounting mix protocol topics="waku node"
|
||||
INF Node setup complete topics="wakunode main"
|
||||
```
|
||||
|
||||
Enter a nickname to be used.
|
||||
Verify RLN spam protection initialized correctly by checking for these logs:
|
||||
|
||||
```log
|
||||
INF Initializing MixRlnSpamProtection
|
||||
INF MixRlnSpamProtection initialized, waiting for sync
|
||||
DBG Tree loaded from file
|
||||
INF MixRlnSpamProtection started
|
||||
```
|
||||
|
||||
Then start the remaining mix nodes in separate terminals:
|
||||
|
||||
```bash
|
||||
./run_mix_node1.sh
|
||||
./run_mix_node2.sh
|
||||
./run_mix_node3.sh
|
||||
./run_mix_node4.sh
|
||||
```
|
||||
|
||||
### Step 2: Start the Chat Applications
|
||||
|
||||
Once all 5 mix nodes are running, start the first chat app:
|
||||
|
||||
```bash
|
||||
./run_chat_mix.sh
|
||||
```
|
||||
|
||||
Enter a nickname when prompted:
|
||||
|
||||
```bash
|
||||
pubsub topic is: /waku/2/rs/2/0
|
||||
Choose a nickname >>
|
||||
```
|
||||
|
||||
Once you see below log, it means the app is ready for publishing messages over the mixnet.
|
||||
Once you see the following log, the app is ready to publish messages over the mixnet:
|
||||
|
||||
```bash
|
||||
Welcome, test!
|
||||
Listening on
|
||||
/ip4/192.168.68.64/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57
|
||||
/ip4/<local-network-ip>/tcp/60000/p2p/16Uiu2HAkxDGqix1ifY3wF1ZzojQWRAQEdKP75wn1LJMfoHhfHz57
|
||||
ready to publish messages now
|
||||
```
|
||||
|
||||
Follow similar instructions to run second instance of chat app.
|
||||
Once both the apps run successfully, send a message and check if it is received by the other app.
|
||||
Start the second chat app in another terminal:
|
||||
|
||||
You can exit the chat apps by entering `/exit` as below
|
||||
```bash
|
||||
./run_chat_mix1.sh
|
||||
```
|
||||
|
||||
### Step 3: Test Messaging
|
||||
|
||||
Once both chat apps are running, send a message from one and verify it is received by the other.
|
||||
|
||||
To exit the chat apps, enter `/exit`:
|
||||
|
||||
```bash
|
||||
>> /exit
|
||||
|
||||
32
simulations/mixnet/build_setup.sh
Executable file
32
simulations/mixnet/build_setup.sh
Executable file
@ -0,0 +1,32 @@
|
||||
#!/bin/bash
|
||||
cd "$(dirname "$0")"
|
||||
MIXNET_DIR=$(pwd)
|
||||
cd ../..
|
||||
ROOT_DIR=$(pwd)
|
||||
|
||||
# Clean up old files first
|
||||
rm -f "$MIXNET_DIR/rln_tree.db" "$MIXNET_DIR"/rln_keystore_*.json
|
||||
|
||||
echo "Building and running credentials setup..."
|
||||
# Compile to temp location, then run from mixnet directory
|
||||
nim c -d:release --mm:refc \
|
||||
--passL:"-L$ROOT_DIR/vendor/zerokit/target/release -lrln" \
|
||||
-o:/tmp/setup_credentials_$$ \
|
||||
"$MIXNET_DIR/setup_credentials.nim" 2>&1 | tail -30
|
||||
|
||||
# Run from mixnet directory so files are created there
|
||||
cd "$MIXNET_DIR"
|
||||
/tmp/setup_credentials_$$
|
||||
|
||||
# Clean up temp binary
|
||||
rm -f /tmp/setup_credentials_$$
|
||||
|
||||
# Verify output
|
||||
if [ -f "rln_tree.db" ]; then
|
||||
echo ""
|
||||
echo "Tree file ready at: $(pwd)/rln_tree.db"
|
||||
ls -la rln_keystore_*.json 2>/dev/null | wc -l | xargs -I {} echo "Generated {} keystore files"
|
||||
else
|
||||
echo "Setup failed - rln_tree.db not found"
|
||||
exit 1
|
||||
fi
|
||||
@ -13,7 +13,7 @@ discv5-udp-port = 9002
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
|
||||
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
|
||||
rest = false
|
||||
rest = true
|
||||
rest-admin = false
|
||||
ports-shift = 3
|
||||
num-shards-in-network = 1
|
||||
|
||||
@ -13,7 +13,7 @@ discv5-udp-port = 9003
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
|
||||
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
|
||||
rest = false
|
||||
rest = true
|
||||
rest-admin = false
|
||||
ports-shift = 4
|
||||
num-shards-in-network = 1
|
||||
|
||||
@ -13,7 +13,7 @@ discv5-udp-port = 9004
|
||||
discv5-enr-auto-update = true
|
||||
discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"]
|
||||
kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"]
|
||||
rest = false
|
||||
rest = true
|
||||
rest-admin = false
|
||||
ports-shift = 5
|
||||
num-shards-in-network = 1
|
||||
|
||||
@ -1,2 +1,2 @@
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --nodekey="cb6fe589db0e5d5b48f7e82d33093e4d9d35456f4aaffc2322c473a173b2ac49" --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --fleet="none"
|
||||
#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
|
||||
@ -1,2 +1 @@
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE
|
||||
#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
|
||||
../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --nodekey="35eace7ccb246f20c487e05015ca77273d8ecaed0ed683de3d39bf4f69336feb" --mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" --mixnode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o:9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c" --fleet="none"
|
||||
|
||||
@ -1 +1,2 @@
|
||||
../../build/wakunode2 --config-file="config.toml" 2>&1 | tee mix_node.log
|
||||
|
||||
|
||||
139
simulations/mixnet/setup_credentials.nim
Normal file
139
simulations/mixnet/setup_credentials.nim
Normal file
@ -0,0 +1,139 @@
|
||||
{.push raises: [].}
|
||||
|
||||
## Setup script to generate RLN credentials and shared Merkle tree for mix nodes.
|
||||
##
|
||||
## This script:
|
||||
## 1. Generates credentials for each node (identified by peer ID)
|
||||
## 2. Registers all credentials in a shared Merkle tree
|
||||
## 3. Saves the tree to rln_tree.db
|
||||
## 4. Saves individual keystores named by peer ID
|
||||
##
|
||||
## Usage: nim c -r setup_credentials.nim
|
||||
|
||||
import std/[os, strformat, options], chronicles, chronos, results
|
||||
|
||||
import
|
||||
mix_rln_spam_protection/credentials,
|
||||
mix_rln_spam_protection/group_manager,
|
||||
mix_rln_spam_protection/rln_interface,
|
||||
mix_rln_spam_protection/types
|
||||
|
||||
const
|
||||
KeystorePassword = "mix-rln-password" # Must match protocol.nim
|
||||
DefaultUserMessageLimit = 100'u64 # Network-wide default rate limit
|
||||
SpammerUserMessageLimit = 3'u64 # Lower limit for spammer testing
|
||||
|
||||
# Peer IDs derived from nodekeys in config files
|
||||
# config.toml: nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a"
|
||||
# config1.toml: nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684"
|
||||
# config2.toml: nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e"
|
||||
# config3.toml: nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6"
|
||||
# config4.toml: nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4"
|
||||
# chat2mix.sh: nodekey = "cb6fe589db0e5d5b48f7e82d33093e4d9d35456f4aaffc2322c473a173b2ac49"
|
||||
# chat2mix1.sh: nodekey = "35eace7ccb246f20c487e05015ca77273d8ecaed0ed683de3d39bf4f69336feb"
|
||||
|
||||
# Node info: (peerId, userMessageLimit)
|
||||
NodeConfigs = [
|
||||
("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", DefaultUserMessageLimit),
|
||||
# config.toml (service node)
|
||||
("16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", DefaultUserMessageLimit),
|
||||
# config1.toml (mix node 1)
|
||||
("16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA", DefaultUserMessageLimit),
|
||||
# config2.toml (mix node 2)
|
||||
("16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f", DefaultUserMessageLimit),
|
||||
# config3.toml (mix node 3)
|
||||
("16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", DefaultUserMessageLimit),
|
||||
# config4.toml (mix node 4)
|
||||
("16Uiu2HAm1QxSjNvNbsT2xtLjRGAsBLVztsJiTHr9a3EK96717hpj", DefaultUserMessageLimit),
|
||||
# chat2mix client 1
|
||||
("16Uiu2HAmC9h26U1C83FJ5xpE32ghqya8CaZHX1Y7qpfHNnRABscN", DefaultUserMessageLimit),
|
||||
# chat2mix client 2
|
||||
]
|
||||
|
||||
proc setupCredentialsAndTree() {.async.} =
|
||||
## Generate credentials for all nodes and create a shared tree
|
||||
|
||||
echo "=== RLN Credentials Setup ==="
|
||||
echo "Generating credentials for ", NodeConfigs.len, " nodes...\n"
|
||||
|
||||
# Generate credentials for all nodes
|
||||
var allCredentials:
|
||||
seq[tuple[peerId: string, cred: IdentityCredential, rateLimit: uint64]]
|
||||
for (peerId, rateLimit) in NodeConfigs:
|
||||
let cred = generateCredentials().valueOr:
|
||||
echo "Failed to generate credentials for ", peerId, ": ", error
|
||||
quit(1)
|
||||
|
||||
allCredentials.add((peerId: peerId, cred: cred, rateLimit: rateLimit))
|
||||
echo "Generated credentials for ", peerId
|
||||
echo " idCommitment: ", cred.idCommitment.toHex()[0 .. 15], "..."
|
||||
echo " userMessageLimit: ", rateLimit
|
||||
|
||||
echo ""
|
||||
|
||||
# Create a group manager directly to build the tree
|
||||
let rlnInstance = newRLNInstance().valueOr:
|
||||
echo "Failed to create RLN instance: ", error
|
||||
quit(1)
|
||||
|
||||
let groupManager = newOffchainGroupManager(rlnInstance, "/mix/rln/membership/v1")
|
||||
|
||||
# Initialize the group manager
|
||||
let initRes = await groupManager.init()
|
||||
if initRes.isErr:
|
||||
echo "Failed to initialize group manager: ", initRes.error
|
||||
quit(1)
|
||||
|
||||
# Register all credentials in the tree with their specific rate limits
|
||||
echo "Registering all credentials in the Merkle tree..."
|
||||
for i, entry in allCredentials:
|
||||
let index = (
|
||||
await groupManager.registerWithLimit(entry.cred.idCommitment, entry.rateLimit)
|
||||
).valueOr:
|
||||
echo "Failed to register credential for ", entry.peerId, ": ", error
|
||||
quit(1)
|
||||
echo " Registered ",
|
||||
entry.peerId, " at index ", index, " (limit: ", entry.rateLimit, ")"
|
||||
|
||||
echo ""
|
||||
|
||||
# Save the tree to disk
|
||||
echo "Saving tree to rln_tree.db..."
|
||||
let saveRes = groupManager.saveTreeToFile("rln_tree.db")
|
||||
if saveRes.isErr:
|
||||
echo "Failed to save tree: ", saveRes.error
|
||||
quit(1)
|
||||
echo "Tree saved successfully!"
|
||||
|
||||
echo ""
|
||||
|
||||
# Save each credential to a keystore file named by peer ID
|
||||
echo "Saving keystores..."
|
||||
for i, entry in allCredentials:
|
||||
let keystorePath = &"rln_keystore_{entry.peerId}.json"
|
||||
|
||||
# Save with membership index and rate limit
|
||||
let saveResult = saveKeystore(
|
||||
entry.cred,
|
||||
KeystorePassword,
|
||||
keystorePath,
|
||||
some(MembershipIndex(i)),
|
||||
some(entry.rateLimit),
|
||||
)
|
||||
if saveResult.isErr:
|
||||
echo "Failed to save keystore for ", entry.peerId, ": ", saveResult.error
|
||||
quit(1)
|
||||
echo " Saved: ", keystorePath, " (limit: ", entry.rateLimit, ")"
|
||||
|
||||
echo ""
|
||||
echo "=== Setup Complete ==="
|
||||
echo " Tree file: rln_tree.db (", NodeConfigs.len, " members)"
|
||||
echo " Keystores: rln_keystore_{peerId}.json"
|
||||
echo " Password: ", KeystorePassword
|
||||
echo " Default rate limit: ", DefaultUserMessageLimit
|
||||
echo " Spammer rate limit: ", SpammerUserMessageLimit
|
||||
echo ""
|
||||
echo "Note: All nodes must use the same rln_tree.db file."
|
||||
|
||||
when isMainModule:
|
||||
waitFor setupCredentialsAndTree()
|
||||
1
vendor/mix-rln-spam-protection-plugin
vendored
Submodule
1
vendor/mix-rln-spam-protection-plugin
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit b3e3f72932d53fe0959e182fc081fe14c0c2c2f0
|
||||
@ -28,7 +28,8 @@ import
|
||||
../../waku_archive,
|
||||
../../waku_store_sync,
|
||||
../peer_manager,
|
||||
../../waku_rln_relay
|
||||
../../waku_rln_relay,
|
||||
../../waku_mix
|
||||
|
||||
export waku_relay.WakuRelayHandler
|
||||
|
||||
@ -82,13 +83,21 @@ proc registerRelayHandler(
|
||||
|
||||
node.wakuStoreReconciliation.messageIngress(topic, msg)
|
||||
|
||||
proc mixHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuMix.isNil():
|
||||
return
|
||||
|
||||
await node.wakuMix.handleMessage(topic, msg)
|
||||
|
||||
let uniqueTopicHandler = proc(
|
||||
topic: PubsubTopic, msg: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
# TODO: Why are all these synchronous in nature? wouldn't it be better to run them concurrently?
|
||||
await traceHandler(topic, msg)
|
||||
await filterHandler(topic, msg)
|
||||
await archiveHandler(topic, msg)
|
||||
await syncHandler(topic, msg)
|
||||
await mixHandler(topic, msg)
|
||||
await appHandler(topic, msg)
|
||||
|
||||
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
|
||||
|
||||
125
waku/node/waku_mix_coordination.nim
Normal file
125
waku/node/waku_mix_coordination.nim
Normal file
@ -0,0 +1,125 @@
|
||||
## Mix spam protection coordination via filter protocol
|
||||
## This module handles filter-based subscription for spam protection coordination
|
||||
## when relay is not available.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import chronos, chronicles, std/options
|
||||
import
|
||||
../waku_core,
|
||||
../waku_core/topics/sharding,
|
||||
../waku_filter_v2/common,
|
||||
./peer_manager,
|
||||
../waku_filter_v2/client,
|
||||
../waku_mix/protocol
|
||||
|
||||
logScope:
|
||||
topics = "waku node mix_coordination"
|
||||
|
||||
# Type aliases for callbacks to avoid circular imports
|
||||
type
|
||||
FilterSubscribeProc* = proc(
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: RemotePeerInfo,
|
||||
): Future[FilterSubscribeResult] {.async, gcsafe.}
|
||||
|
||||
FilterPingProc* =
|
||||
proc(peer: RemotePeerInfo): Future[FilterSubscribeResult] {.async, gcsafe.}
|
||||
|
||||
# Forward declaration
|
||||
proc subscribeSpamProtectionViaFilter(
|
||||
wakuMix: WakuMix,
|
||||
peerManager: PeerManager,
|
||||
filterClient: WakuFilterClient,
|
||||
filterSubscribe: FilterSubscribeProc,
|
||||
contentTopics: seq[ContentTopic],
|
||||
) {.async.}
|
||||
|
||||
proc setupSpamProtectionViaFilter*(
|
||||
wakuMix: WakuMix,
|
||||
peerManager: PeerManager,
|
||||
filterClient: WakuFilterClient,
|
||||
filterSubscribe: FilterSubscribeProc,
|
||||
) =
|
||||
## Set up filter-based spam protection coordination.
|
||||
## Registers message handler and spawns subscription maintenance task.
|
||||
let spamTopics = wakuMix.getSpamProtectionContentTopics()
|
||||
if spamTopics.len == 0:
|
||||
return
|
||||
|
||||
info "Relay not available, subscribing to spam protection via filter",
|
||||
topics = spamTopics
|
||||
|
||||
# Register handler for spam protection messages
|
||||
filterClient.registerPushHandler(
|
||||
proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
|
||||
if message.contentTopic in spamTopics:
|
||||
await wakuMix.handleMessage(pubsubTopic, message)
|
||||
)
|
||||
|
||||
# Wait for filter peer to be available and maintain subscription
|
||||
asyncSpawn subscribeSpamProtectionViaFilter(
|
||||
wakuMix, peerManager, filterClient, filterSubscribe, spamTopics
|
||||
)
|
||||
|
||||
proc subscribeSpamProtectionViaFilter(
|
||||
wakuMix: WakuMix,
|
||||
peerManager: PeerManager,
|
||||
filterClient: WakuFilterClient,
|
||||
filterSubscribe: FilterSubscribeProc,
|
||||
contentTopics: seq[ContentTopic],
|
||||
) {.async.} =
|
||||
## Subscribe to spam protection topics via filter and maintain the subscription.
|
||||
## Waits for a filter peer to be available before subscribing.
|
||||
## Continuously monitors the subscription health with periodic pings.
|
||||
const RetryInterval = chronos.seconds(5)
|
||||
const SubscriptionMaintenance = chronos.seconds(30)
|
||||
const MaxFailedSubscribes = 3
|
||||
var currentFilterPeer: Option[RemotePeerInfo] = none(RemotePeerInfo)
|
||||
var noFailedSubscribes = 0
|
||||
|
||||
while true:
|
||||
# Select or reuse filter peer
|
||||
if currentFilterPeer.isNone():
|
||||
let filterPeerOpt = peerManager.selectPeer(WakuFilterSubscribeCodec)
|
||||
if filterPeerOpt.isNone():
|
||||
debug "No filter peer available yet for spam protection, retrying..."
|
||||
await sleepAsync(RetryInterval)
|
||||
continue
|
||||
currentFilterPeer = some(filterPeerOpt.get())
|
||||
info "Selected filter peer for spam protection",
|
||||
peer = currentFilterPeer.get().peerId
|
||||
|
||||
# Check if subscription is still alive with ping
|
||||
let pingErr = (await filterClient.ping(currentFilterPeer.get())).errorOr:
|
||||
# Subscription is alive, wait before next check
|
||||
await sleepAsync(SubscriptionMaintenance)
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes = 0
|
||||
continue
|
||||
|
||||
# Subscription lost, need to re-subscribe
|
||||
warn "Spam protection filter subscription ping failed, re-subscribing",
|
||||
error = pingErr, peer = currentFilterPeer.get().peerId
|
||||
|
||||
# Subscribe to spam protection topics
|
||||
let res =
|
||||
await filterSubscribe(none(PubsubTopic), contentTopics, currentFilterPeer.get())
|
||||
if res.isErr():
|
||||
noFailedSubscribes += 1
|
||||
warn "Failed to subscribe to spam protection topics via filter",
|
||||
error = res.error, topics = contentTopics, failCount = noFailedSubscribes
|
||||
|
||||
if noFailedSubscribes >= MaxFailedSubscribes:
|
||||
# Try with a different peer
|
||||
warn "Max subscription failures reached, selecting new filter peer"
|
||||
currentFilterPeer = none(RemotePeerInfo)
|
||||
noFailedSubscribes = 0
|
||||
|
||||
await sleepAsync(RetryInterval)
|
||||
else:
|
||||
info "Successfully subscribed to spam protection topics via filter",
|
||||
topics = contentTopics, peer = currentFilterPeer.get().peerId
|
||||
noFailedSubscribes = 0
|
||||
await sleepAsync(SubscriptionMaintenance)
|
||||
@ -194,6 +194,18 @@ proc getShardsGetter(node: WakuNode, configuredShards: seq[uint16]): GetShards =
|
||||
return shards
|
||||
return configuredShards
|
||||
|
||||
proc getRelayMixHandler*(node: WakuNode): Option[WakuRelayHandler] =
|
||||
## Returns a handler for mix spam protection coordination messages if mix is mounted
|
||||
if node.wakuMix.isNil():
|
||||
return none(WakuRelayHandler)
|
||||
|
||||
let handler: WakuRelayHandler = proc(
|
||||
pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
await node.wakuMix.handleMessage(pubsubTopic, message)
|
||||
|
||||
return some(handler)
|
||||
|
||||
proc getCapabilitiesGetter(node: WakuNode): GetCapabilities =
|
||||
return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} =
|
||||
if node.wakuRelay.isNil():
|
||||
@ -314,6 +326,7 @@ proc mountMix*(
|
||||
clusterId: uint16,
|
||||
mixPrivKey: Curve25519Key,
|
||||
mixnodes: seq[MixNodePubInfo],
|
||||
userMessageLimit: Option[int] = none(int),
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||
|
||||
@ -324,8 +337,29 @@ proc mountMix*(
|
||||
return err("Failed to convert multiaddress to string.")
|
||||
info "local addr", localaddr = localaddrStr
|
||||
|
||||
# Create callback to publish coordination messages via relay
|
||||
let publishMessage: PublishMessage = proc(
|
||||
message: WakuMessage
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
# Inline implementation of publish logic to avoid circular import
|
||||
if node.wakuRelay.isNil():
|
||||
return err("WakuRelay not mounted")
|
||||
|
||||
# Derive pubsub topic from content topic using auto sharding
|
||||
let pubsubTopic =
|
||||
if node.wakuAutoSharding.isNone():
|
||||
return err("Auto sharding not configured")
|
||||
else:
|
||||
node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr:
|
||||
return err("Autosharding error: " & error)
|
||||
|
||||
# Publish via relay
|
||||
discard await node.wakuRelay.publish(pubsubTopic, message)
|
||||
return ok()
|
||||
|
||||
node.wakuMix = WakuMix.new(
|
||||
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes
|
||||
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes, publishMessage,
|
||||
userMessageLimit,
|
||||
).valueOr:
|
||||
error "Waku Mix protocol initialization failed", err = error
|
||||
return
|
||||
@ -335,9 +369,10 @@ proc mountMix*(
|
||||
node.switch.mount(node.wakuMix)
|
||||
catchRes.isOkOr:
|
||||
return err(error.msg)
|
||||
return ok()
|
||||
|
||||
## Waku Sync
|
||||
# Note: start() is called later in WakuNode.start(), not here during mount
|
||||
|
||||
return ok()
|
||||
|
||||
proc mountStoreSync*(
|
||||
node: WakuNode,
|
||||
@ -633,7 +668,7 @@ proc start*(node: WakuNode) {.async.} =
|
||||
await node.startRelay()
|
||||
|
||||
if not node.wakuMix.isNil():
|
||||
node.wakuMix.start()
|
||||
await node.wakuMix.start()
|
||||
|
||||
if not node.wakuMetadata.isNil():
|
||||
node.wakuMetadata.start()
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronicles, std/options, chronos, results, metrics
|
||||
import chronicles, std/[options, sequtils], chronos, results, metrics
|
||||
|
||||
import
|
||||
libp2p/crypto/curve25519,
|
||||
@ -10,14 +10,18 @@ import
|
||||
libp2p/protocols/mix/mix_protocol,
|
||||
libp2p/protocols/mix/mix_metrics,
|
||||
libp2p/protocols/mix/delay_strategy,
|
||||
libp2p/[multiaddress, peerid],
|
||||
libp2p/protocols/mix/spam_protection,
|
||||
libp2p/[multiaddress, multicodec, peerid, peerinfo],
|
||||
eth/common/keys
|
||||
|
||||
import
|
||||
waku/node/peer_manager,
|
||||
waku/waku_core,
|
||||
waku/waku_enr,
|
||||
waku/node/peer_manager/waku_peer_store
|
||||
waku/node/peer_manager/waku_peer_store,
|
||||
mix_rln_spam_protection,
|
||||
waku/waku_relay,
|
||||
waku/common/nimchronos
|
||||
|
||||
logScope:
|
||||
topics = "waku mix"
|
||||
@ -25,10 +29,16 @@ logScope:
|
||||
const minMixPoolSize = 4
|
||||
|
||||
type
|
||||
PublishMessage* = proc(message: WakuMessage): Future[Result[void, string]] {.
|
||||
async, gcsafe, raises: []
|
||||
.}
|
||||
|
||||
WakuMix* = ref object of MixProtocol
|
||||
peerManager*: PeerManager
|
||||
clusterId: uint16
|
||||
pubKey*: Curve25519Key
|
||||
mixRlnSpamProtection*: MixRlnSpamProtection
|
||||
publishMessage*: PublishMessage
|
||||
|
||||
WakuMixResult*[T] = Result[T, string]
|
||||
|
||||
@ -41,11 +51,9 @@ proc processBootNodes(
|
||||
) =
|
||||
var count = 0
|
||||
for node in bootnodes:
|
||||
let pInfo = parsePeerInfo(node.multiAddr).valueOr:
|
||||
error "Failed to get peer id from multiaddress: ",
|
||||
error = error, multiAddr = $node.multiAddr
|
||||
let (peerId, networkAddr) = parseFullAddress(node.multiAddr).valueOr:
|
||||
error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error
|
||||
continue
|
||||
let peerId = pInfo.peerId
|
||||
var peerPubKey: crypto.PublicKey
|
||||
if not peerId.extractPublicKey(peerPubKey):
|
||||
warn "Failed to extract public key from peerId, skipping node", peerId = peerId
|
||||
@ -65,10 +73,10 @@ proc processBootNodes(
|
||||
count.inc()
|
||||
|
||||
peermgr.addPeer(
|
||||
RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey))
|
||||
RemotePeerInfo.init(peerId, @[networkAddr], mixPubKey = some(node.pubKey))
|
||||
)
|
||||
mix_pool_size.set(count)
|
||||
info "using mix bootstrap nodes ", count = count
|
||||
debug "using mix bootstrap nodes ", count = count
|
||||
|
||||
proc new*(
|
||||
T: type WakuMix,
|
||||
@ -77,9 +85,11 @@ proc new*(
|
||||
clusterId: uint16,
|
||||
mixPrivKey: Curve25519Key,
|
||||
bootnodes: seq[MixNodePubInfo],
|
||||
publishMessage: PublishMessage,
|
||||
userMessageLimit: Option[int] = none(int),
|
||||
): WakuMixResult[T] =
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
info "mixPubKey", mixPubKey = mixPubKey
|
||||
trace "mixPubKey", mixPubKey = mixPubKey
|
||||
let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr:
|
||||
return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error)
|
||||
let localMixNodeInfo = initMixNodeInfo(
|
||||
@ -89,10 +99,31 @@ proc new*(
|
||||
if bootnodes.len < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
|
||||
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
|
||||
# Initialize spam protection with persistent credentials
|
||||
# Use peerID in keystore path so multiple peers can run from same directory
|
||||
# Tree path is shared across all nodes to maintain the full membership set
|
||||
let peerId = peermgr.switch.peerInfo.peerId
|
||||
var spamProtectionConfig = defaultConfig()
|
||||
spamProtectionConfig.keystorePath = "rln_keystore_" & $peerId & ".json"
|
||||
spamProtectionConfig.keystorePassword = "mix-rln-password"
|
||||
if userMessageLimit.isSome():
|
||||
spamProtectionConfig.userMessageLimit = userMessageLimit.get()
|
||||
# rlnResourcesPath left empty to use bundled resources (via "tree_height_/" placeholder)
|
||||
|
||||
let spamProtection = newMixRlnSpamProtection(spamProtectionConfig).valueOr:
|
||||
return err("failed to create spam protection: " & error)
|
||||
|
||||
var m = WakuMix(
|
||||
peerManager: peermgr,
|
||||
clusterId: clusterId,
|
||||
pubKey: mixPubKey,
|
||||
mixRlnSpamProtection: spamProtection,
|
||||
publishMessage: publishMessage,
|
||||
)
|
||||
procCall MixProtocol(m).init(
|
||||
localMixNodeInfo,
|
||||
peermgr.switch,
|
||||
spamProtection = Opt.some(SpamProtection(spamProtection)),
|
||||
delayStrategy =
|
||||
ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()),
|
||||
)
|
||||
@ -101,15 +132,160 @@ proc new*(
|
||||
|
||||
if m.nodePool.len < minMixPoolSize:
|
||||
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
|
||||
|
||||
return ok(m)
|
||||
|
||||
proc poolSize*(mix: WakuMix): int =
|
||||
mix.nodePool.len
|
||||
|
||||
method start*(mix: WakuMix) =
|
||||
proc setupSpamProtectionCallbacks(mix: WakuMix) =
|
||||
## Set up the publish callback for spam protection coordination.
|
||||
## This enables the plugin to broadcast membership updates and proof metadata
|
||||
## via Waku relay.
|
||||
if mix.publishMessage.isNil():
|
||||
warn "PublishMessage callback not available, spam protection coordination disabled"
|
||||
return
|
||||
|
||||
let publishCallback: PublishCallback = proc(
|
||||
contentTopic: string, data: seq[byte]
|
||||
) {.async.} =
|
||||
# Create a WakuMessage for the coordination data
|
||||
let msg = WakuMessage(
|
||||
payload: data,
|
||||
contentTopic: contentTopic,
|
||||
ephemeral: true, # Coordination messages don't need to be stored
|
||||
timestamp: getNowInNanosecondTime(),
|
||||
)
|
||||
|
||||
# Delegate to node's publish API which handles topic derivation and relay publishing
|
||||
let res = await mix.publishMessage(msg)
|
||||
if res.isErr():
|
||||
warn "Failed to publish spam protection coordination message",
|
||||
contentTopic = contentTopic, error = res.error
|
||||
return
|
||||
|
||||
trace "Published spam protection coordination message", contentTopic = contentTopic
|
||||
|
||||
mix.mixRlnSpamProtection.setPublishCallback(publishCallback)
|
||||
trace "Spam protection publish callback configured"
|
||||
|
||||
proc handleMessage*(
|
||||
mix: WakuMix, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
) {.async, gcsafe.} =
|
||||
## Handle incoming messages for spam protection coordination.
|
||||
## This should be called from the relay handler for coordination content topics.
|
||||
if mix.mixRlnSpamProtection.isNil():
|
||||
return
|
||||
|
||||
let contentTopic = message.contentTopic
|
||||
|
||||
if contentTopic == mix.mixRlnSpamProtection.getMembershipContentTopic():
|
||||
# Handle membership update
|
||||
let res = await mix.mixRlnSpamProtection.handleMembershipUpdate(message.payload)
|
||||
if res.isErr:
|
||||
warn "Failed to handle membership update", error = res.error
|
||||
else:
|
||||
trace "Handled membership update"
|
||||
|
||||
# Persist tree after membership changes (temporary solution)
|
||||
# TODO: Replace with proper persistence strategy (e.g., periodic snapshots)
|
||||
let saveRes = mix.mixRlnSpamProtection.saveTree()
|
||||
if saveRes.isErr:
|
||||
debug "Failed to save tree after membership update", error = saveRes.error
|
||||
else:
|
||||
trace "Saved tree after membership update"
|
||||
elif contentTopic == mix.mixRlnSpamProtection.getProofMetadataContentTopic():
|
||||
# Handle proof metadata for network-wide spam detection
|
||||
let res = mix.mixRlnSpamProtection.handleProofMetadata(message.payload)
|
||||
if res.isErr:
|
||||
warn "Failed to handle proof metadata", error = res.error
|
||||
else:
|
||||
trace "Handled proof metadata"
|
||||
|
||||
proc getSpamProtectionContentTopics*(mix: WakuMix): seq[string] =
|
||||
## Get the content topics used by spam protection for coordination.
|
||||
## Use these to set up relay subscriptions.
|
||||
if mix.mixRlnSpamProtection.isNil():
|
||||
return @[]
|
||||
return mix.mixRlnSpamProtection.getContentTopics()
|
||||
|
||||
proc saveSpamProtectionTree*(mix: WakuMix): Result[void, string] =
|
||||
## Save the spam protection membership tree to disk.
|
||||
## This allows preserving the tree state across restarts.
|
||||
if mix.mixRlnSpamProtection.isNil():
|
||||
return err("Spam protection not initialized")
|
||||
|
||||
mix.mixRlnSpamProtection.saveTree().mapErr(
|
||||
proc(e: string): string =
|
||||
e
|
||||
)
|
||||
|
||||
proc loadSpamProtectionTree*(mix: WakuMix): Result[void, string] =
|
||||
## Load the spam protection membership tree from disk.
|
||||
## Call this before init() to restore tree state from previous runs.
|
||||
## TODO: This is a temporary solution. Ideally nodes should sync tree state
|
||||
## via a store query for historical membership messages or via dedicated
|
||||
## tree sync protocol.
|
||||
if mix.mixRlnSpamProtection.isNil():
|
||||
return err("Spam protection not initialized")
|
||||
|
||||
mix.mixRlnSpamProtection.loadTree().mapErr(
|
||||
proc(e: string): string =
|
||||
e
|
||||
)
|
||||
|
||||
method start*(mix: WakuMix) {.async.} =
|
||||
info "starting waku mix protocol"
|
||||
|
||||
# Set up spam protection callbacks and start
|
||||
if not mix.mixRlnSpamProtection.isNil():
|
||||
# Initialize spam protection (MixProtocol.init() does NOT call init() on the plugin)
|
||||
let initRes = await mix.mixRlnSpamProtection.init()
|
||||
if initRes.isErr:
|
||||
error "Failed to initialize spam protection", error = initRes.error
|
||||
else:
|
||||
# Load existing tree to sync with other members
|
||||
# This should be done after init() (which loads credentials)
|
||||
# but before registerSelf() (which adds us to the tree)
|
||||
let loadRes = mix.mixRlnSpamProtection.loadTree()
|
||||
if loadRes.isErr:
|
||||
debug "No existing tree found or failed to load, starting fresh",
|
||||
error = loadRes.error
|
||||
else:
|
||||
debug "Loaded existing spam protection membership tree from disk"
|
||||
|
||||
# Restore our credentials to the tree (after tree load, whether it succeeded or not)
|
||||
# This ensures our member is in the tree if we have an index from keystore
|
||||
let restoreRes = mix.mixRlnSpamProtection.restoreCredentialsToTree()
|
||||
if restoreRes.isErr:
|
||||
error "Failed to restore credentials to tree", error = restoreRes.error
|
||||
|
||||
# Set up publish callback (must be before start so registerSelf can use it)
|
||||
mix.setupSpamProtectionCallbacks()
|
||||
|
||||
let startRes = await mix.mixRlnSpamProtection.start()
|
||||
if startRes.isErr:
|
||||
error "Failed to start spam protection", error = startRes.error
|
||||
else:
|
||||
# Register self to broadcast membership to the network
|
||||
let registerRes = await mix.mixRlnSpamProtection.registerSelf()
|
||||
if registerRes.isErr:
|
||||
error "Failed to register spam protection credentials",
|
||||
error = registerRes.error
|
||||
else:
|
||||
debug "Registered spam protection credentials", index = registerRes.get()
|
||||
|
||||
# Save tree to persist membership state
|
||||
let saveRes = mix.mixRlnSpamProtection.saveTree()
|
||||
if saveRes.isErr:
|
||||
warn "Failed to save spam protection tree", error = saveRes.error
|
||||
else:
|
||||
trace "Saved spam protection tree to disk"
|
||||
|
||||
method stop*(mix: WakuMix) {.async.} =
|
||||
discard
|
||||
# Stop spam protection
|
||||
if not mix.mixRlnSpamProtection.isNil():
|
||||
await mix.mixRlnSpamProtection.stop()
|
||||
debug "Spam protection stopped"
|
||||
|
||||
# Mix Protocol
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user