feat(mix): mixnet sender-anonymity for Logos Chat (static RLN, no LEZ)

Global, restart-based Required/None anonymity mode: route chat messages through the
libp2p mixnet for sender anonymity, on the logos-delivery (efafdfdc2) nwaku stack.
Static RLN spam protection; no on-chain LEZ gifter / dynamic membership.

Delivery (src/chat/delivery/waku_client.nim):
- WakuConfig.mixEnabled/mixNodes/minMixPoolSize; parseMixNodes, waitForMixPool,
  getMixPoolSize, mixReady.
- Required mode: sendBytes -> lightpushPublish(mixify=true) over the mix pool,
  fail-fast below minMixPoolSize (no relay fallback). None mode: relay publish.
  Errors propagate up to chat_send_message.
- Receive via WakuFilter (subscribe to static peers; no relay mounted), refreshed
  by a 60s keep-alive.
- Static RLN: pre-populated rln_tree.db + per-peer keystore; nodekey config to adopt
  a provisioned identity. No per-send root-convergence wait (static membership).

API / build:
- chat_get_mix_status FFI -> {mixEnabled,mixReady,mixPoolSize,minPoolSize}.
- Reproducible nix build: librln consumed as a cdylib (avoids the two-Rust-staticlib
  symbol collision); -d:libp2p_mix_experimental_exit_is_dest.
- vendor/nwaku -> efafdfdc2; vendor/nim-protobuf-serialization -> 38d24eb (0.4.0).
This commit is contained in:
Prem Chaitanya Prathi 2026-06-25 21:45:41 +05:30
parent 15f68f2ec2
commit c9dfefa498
No known key found for this signature in database
17 changed files with 531 additions and 5571 deletions

3
.dockerignore Normal file
View File

@ -0,0 +1,3 @@
# Everything — the Dockerfile doesn't COPY from the build context.
# Guest binaries are staged via docker cp separately.
*

4
.gitmodules vendored
View File

@ -3,9 +3,9 @@
url = https://github.com/status-im/nimbus-build-system.git
[submodule "vendor/nwaku"]
path = vendor/nwaku
url = https://github.com/waku-org/nwaku.git
url = https://github.com/logos-messaging/logos-delivery.git
ignore = untracked
branch = master
branch = feat/cover-traffic
[submodule "vendor/nim-protobuf-serialization"]
path = vendor/nim-protobuf-serialization
url = https://github.com/status-im/nim-protobuf-serialization.git

View File

@ -1,6 +1,7 @@
export BUILD_SYSTEM_DIR := vendor/nimbus-build-system
export EXCLUDED_NIM_PACKAGES := vendor/nwaku/vendor/nim-dnsdisc/vendor \
vendor/nwaku/vendor/nimbus-build-system \
vendor/nwaku/vendor/nim-ffi \
vendor/nim-sds/vendor
LINK_PCRE := 0
FORMAT_MSG := "\\x1B[95mFormatting:\\x1B[39m"
@ -69,7 +70,7 @@ TARGET ?= prod
## Git version
GIT_VERSION ?= $(shell git describe --abbrev=6 --always --tags)
## Compilation parameters. If defined in the CLI the assignments won't be executed
NIM_PARAMS := $(NIM_PARAMS) -d:git_version=\"$(GIT_VERSION)\"
NIM_PARAMS := $(NIM_PARAMS) -d:git_version=\"$(GIT_VERSION)\" -d:libp2p_mix_experimental_exit_is_dest
##################
## Dependencies ##
@ -78,6 +79,46 @@ NIM_PARAMS := $(NIM_PARAMS) -d:git_version=\"$(GIT_VERSION)\"
CARGO_TARGET_DIR ?= rust-bundle/target
RUST_BUNDLE_LIB := $(CARGO_TARGET_DIR)/release/liblogoschat_rust_bundle.a
# Mix RLN spam protection library (zerokit v2.0.2, stateless, for the
# mix-rln-spam-protection-plugin).
#
# liblogoschat also statically links libchat's Rust bundle; embedding librln
# statically too would put two copies of the Rust runtime in one image and
# collide at link time (`_rust_eh_personality`, `_ffi_c_string_free`).
#
# On darwin we link librln as a cdylib (librln.dylib) so its Rust runtime stays
# in its own image — nothing collides and no symbol surgery is needed. The
# shared lib is staged into build/ beside liblogoschat.dylib with an @rpath
# install name. On other platforms we keep the static archive and let the linker
# take the first copy of the duplicate runtime symbols via
# --allow-multiple-definition.
MIX_LIBRLN_VERSION ?= v2.0.2
ifeq ($(detected_OS),Darwin)
MIX_LIBRLN_DYLIB := $(CURDIR)/build/librln.dylib
MIX_LIBRLN_NIM_PARAMS := --passL:-L$(CURDIR)/build --passL:-lrln --passL:-Wl,-rpath,@loader_path
.PHONY: mix-librln
mix-librln: | $(MIX_LIBRLN_DYLIB)
$(MIX_LIBRLN_DYLIB):
echo -e $(BUILD_MSG) "$@" && \
mkdir -p $(CURDIR)/build && \
$(CURDIR)/scripts/fetch_mix_librln_dylib.sh $(MIX_LIBRLN_DYLIB) $(MIX_LIBRLN_VERSION)
else
MIX_LIBRLN_SRC := $(CURDIR)/vendor/nwaku/librln_$(MIX_LIBRLN_VERSION).a
MIX_LIBRLN_FILE := $(CURDIR)/build/librln_mix_$(MIX_LIBRLN_VERSION).a
MIX_LIBRLN_NIM_PARAMS := --passL:$(MIX_LIBRLN_FILE) --passL:"-Wl,--allow-multiple-definition"
.PHONY: mix-librln
mix-librln: | $(MIX_LIBRLN_FILE)
$(MIX_LIBRLN_FILE): $(MIX_LIBRLN_SRC)
echo -e $(BUILD_MSG) "$@" && \
mkdir -p $(CURDIR)/build && \
cp $(MIX_LIBRLN_SRC) $(MIX_LIBRLN_FILE)
endif
# libchat and rln each embed Rust std when built as staticlibs; linking both
# causes duplicate-symbol errors. rust-bundle/ links them as rlibs so std
# is emitted once. [1]
@ -107,9 +148,9 @@ tests: | build-rust-bundle build-waku-nat logos_chat.nims
##########
# Ensure there is a nimble task with a name that matches the target
tui bot_echo pingpong: | build-rust-bundle build-waku-nat logos_chat.nims
tui bot_echo pingpong: | build-rust-bundle build-waku-nat mix-librln logos_chat.nims
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim $@ $(NIM_PARAMS) \
$(ENV_SCRIPT) nim $@ $(NIM_PARAMS) $(MIX_LIBRLN_NIM_PARAMS) \
--passL:$(RUST_BUNDLE_LIB) --passL:-lm \
--path:src logos_chat.nims
@ -129,9 +170,9 @@ endif
LIBLOGOSCHAT := build/liblogoschat.$(LIBLOGOSCHAT_EXT)
.PHONY: liblogoschat
liblogoschat: | build-rust-bundle build-waku-nat logos_chat.nims
liblogoschat: | build-rust-bundle build-waku-nat mix-librln logos_chat.nims
echo -e $(BUILD_MSG) "$(LIBLOGOSCHAT)" && \
$(ENV_SCRIPT) nim liblogoschat $(NIM_PARAMS) \
$(ENV_SCRIPT) nim liblogoschat $(NIM_PARAMS) $(MIX_LIBRLN_NIM_PARAMS) \
--passL:$(RUST_BUNDLE_LIB) --passL:-lm \
--path:src logos_chat.nims && \
echo -e "\n\x1B[92mLibrary built successfully:\x1B[39m" && \

View File

@ -1,4 +1,4 @@
import std/os
import std/[os, strutils]
# all vendor subdirectories
for dir in walkDir(thisDir() / "vendor"):
@ -7,4 +7,20 @@ for dir in walkDir(thisDir() / "vendor"):
switch("path", dir.path / "src")
switch("path", thisDir() / "vendor/libchat/nim-bindings")
switch("path", thisDir() / "vendor/libchat/nim-bindings/src")
switch("path", thisDir() / "vendor/libchat/nim-bindings/src")
# nwaku (PR #3807) consumes deps via nimble rather than vendored submodules,
# so add each package dir under nimbledeps/pkgs2 (and its src/) to the nim
# search path. The dir name carries the "<name>-<version>-<sha>" stamp; nim
# resolves imports by package name relative to those paths.
let nwakuDeps = thisDir() / "vendor/nwaku/nimbledeps/pkgs2"
if dirExists(nwakuDeps):
for pkg in walkDir(nwakuDeps):
if pkg.kind == pcDir:
# The chat ships its own FFI framework via vendor/nim-ffi; skip the
# nwaku-vendored `ffi` package(s) to avoid a declareLibrary API clash.
if pkg.path.lastPathPart.startsWith("ffi-"):
continue
switch("path", pkg.path)
if dirExists(pkg.path / "src"):
switch("path", pkg.path / "src")

View File

@ -26,10 +26,14 @@
};
libchatDrv = pkgs.callPackage ./nix/libchat.nix {};
rustBundleDrv = pkgs.callPackage ./nix/rust_bundle.nix { src = self; };
# efafdfdc2's nwaku consumes its nim deps via nimble (nimbledeps/pkgs2),
# which isn't reproducible in the nix sandbox. Instead fetch them from
# vendor/nwaku's autogenerated deps.nix (mirrors logos-delivery's build).
nwakuDeps = import (self + "/vendor/nwaku/nix/deps.nix") { inherit pkgs; };
in {
packages.default = pkgs.callPackage ./nix/default.nix {
src = self;
inherit rustBundleDrv;
inherit rustBundleDrv nwakuDeps;
};
devShells.default = pkgs.callPackage ./nix/shell.nix {
inherit libchatDrv;

View File

@ -50,7 +50,23 @@ proc createChatClient(
wakuCfg.staticPeers = @[]
for peer in config["staticPeers"]:
wakuCfg.staticPeers.add(peer.getStr())
if config.hasKey("mixEnabled"):
wakuCfg.mixEnabled = config["mixEnabled"].getBool(false)
if config.hasKey("mixNodes"):
wakuCfg.mixNodes = @[]
for node in config["mixNodes"]:
wakuCfg.mixNodes.add(node.getStr())
if config.hasKey("minMixPoolSize"):
wakuCfg.minMixPoolSize = config["minMixPoolSize"].getInt(4)
# Adopt a fixed identity (e.g. a provisioned mix-sim chat credential) so the
# peer-ID-derived RLN keystore (rln_keystore_<peerId>.json) resolves.
if config.hasKey("nodekey"):
let hex = config["nodekey"].getStr()
if hex.len > 0:
wakuCfg.nodekey = parseNodeKey(hex).valueOr:
return err("invalid nodekey: " & error)
# Create Waku client
let wakuClient = initWakuClient(wakuCfg)
@ -117,6 +133,28 @@ proc chat_get_id(
let clientId = ctx.myLib[].getId()
return ok(clientId)
#################################################
# Mix Protocol Status
#################################################
proc chat_get_mix_status(
ctx: ptr FFIContext[ChatClient],
callback: FFICallBack,
userData: pointer
) {.ffi.} =
let client = ctx.myLib[]
let mixEnabled = client.ds.cfg.mixEnabled
var poolSize = 0
if mixEnabled:
poolSize = client.ds.getMixPoolSize()
let status = %*{
"mixEnabled": mixEnabled,
"mixReady": client.ds.mixReady,
"mixPoolSize": poolSize,
"minPoolSize": client.ds.cfg.minMixPoolSize
}
return ok($status)
#################################################
# Conversation List Operations
#################################################

View File

@ -30,6 +30,10 @@ typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len,
// - "clusterId": int - Waku cluster ID (optional)
// - "shardId": int - Waku shard ID (optional)
// - "staticPeers": array of strings - static peer multiaddrs (optional)
// - "mixEnabled": bool - enable mix protocol for sender anonymity (default: false)
// - "mixNodes": array of strings - mix bootstrap nodes as "multiaddr:mixPubKeyHex"
// - "destPeerAddr": string - lightpush destination peer "multiaddr/p2p/peerId"
// - "minMixPoolSize": int - minimum mix pool size before sending via mix (default: 4)
void *chat_new(const char *configJson, FFICallBack callback, void *userData);
// Start the chat client and begin listening for messages
@ -97,6 +101,14 @@ int chat_get_identity(void *ctx, FFICallBack callback, void *userData);
// Returns the intro bundle as an ASCII string (format: logos_chatintro_<version>_<base64url payload>)
int chat_create_intro_bundle(void *ctx, FFICallBack callback, void *userData);
//////////////////////////////////////////////////////////////////////////////
// Mix Protocol Status
//////////////////////////////////////////////////////////////////////////////
// Get mix protocol status
// Returns JSON: {"mixEnabled":bool,"mixReady":bool,"mixPoolSize":int,"minPoolSize":int}
int chat_get_mix_status(void *ctx, FFICallBack callback, void *userData);
#ifdef __cplusplus
}
#endif

View File

@ -1,6 +1,7 @@
{ lib, stdenv, nim, which, pkg-config, writeScriptBin,
openssl, miniupnpc, libnatpmp,
{ lib, stdenv, nim, which, pkg-config, writeScriptBin, fetchurl,
openssl, miniupnpc, libnatpmp, rustPlatform, fetchFromGitHub, darwin ? {},
src, # logos-chat source (self from flake, with submodules=1)
nwakuDeps, # efafdfdc2 nim deps from vendor/nwaku/nix/deps.nix
rustBundleDrv }: # result of rust_bundle.nix
# NOTE: this build requires git submodules to be present in src.
@ -13,12 +14,57 @@ assert lib.assertMsg ((src.submodules or false) == true)
let
revision = lib.substring 0 8 (src.rev or "dirty");
# nwaku (efafdfdc2) nim deps on the path. Drop `ffi` — the chat ships its own
# vendor/nim-ffi (1-arg declareLibrary); nwaku's ffi (2-arg) would clash.
nwakuDepPaths = lib.concatStringsSep " " (builtins.concatMap
(p: [ "--path:${p}" "--path:${p}/src" ])
(builtins.attrValues (builtins.removeAttrs nwakuDeps [ "ffi" ])));
# Mix RLN spam-protection lib. The plugin builds a STATELESS zerokit RLN
# instance via `ffi_rln_new()` (the full build segfaults there — it expects
# tree resources the stateless build compiles out). So we use the prebuilt
# stateless release asset — the same one the local `make` build and the mix sim
# use — fetched reproducibly by hash.
#
# liblogoschat already statically links libchat's rust bundle; embedding librln
# statically too would put two copies of the Rust runtime in one image and
# collide on `_rust_eh_personality` / `_ffi_c_string_free`. On darwin we link
# librln's cdylib (its runtime stays in its own image — no symbol surgery), with
# an @rpath install name so liblogoschat.dylib loads it from beside itself. On
# linux we keep the static .a + `-Wl,--allow-multiple-definition`.
rlnTriplet = {
aarch64-darwin = "aarch64-apple-darwin";
x86_64-darwin = "x86_64-apple-darwin";
x86_64-linux = "x86_64-unknown-linux-gnu";
aarch64-linux = "aarch64-unknown-linux-gnu";
}.${stdenv.hostPlatform.system} or (throw "no stateless librln triplet for ${stdenv.hostPlatform.system}");
rlnHash = {
aarch64-darwin = "sha256-f2YppkPsKFdN00j+IY8fpvsebWTIb9lW/V1/vOTiVKU=";
}.${stdenv.hostPlatform.system} or (throw "add stateless librln hash for ${stdenv.hostPlatform.system}");
rlnTarball = fetchurl {
url = "https://github.com/vacp2p/zerokit/releases/download/v2.0.2/${rlnTriplet}-stateless-rln.tar.gz";
hash = rlnHash;
};
mixRlnDylib = stdenv.mkDerivation {
name = "librln-mix-stateless-v2.0.2";
dontUnpack = true;
buildPhase = ''
mkdir -p $out/lib
tar -xzf ${rlnTarball}
cp release/librln.* $out/lib/
chmod +w $out/lib/*
'' + lib.optionalString stdenv.isDarwin ''
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
'';
dontInstall = true;
};
in stdenv.mkDerivation {
pname = "liblogoschat";
version = "0.1.0";
inherit src;
NIMFLAGS = lib.concatStringsSep " " [
NIMFLAGS = (lib.concatStringsSep " " ([
"--passL:${rustBundleDrv}/lib/liblogoschat_rust_bundle.a"
"--passL:-lm"
"-d:miniupnpcUseSystemLibs"
@ -26,7 +72,16 @@ in stdenv.mkDerivation {
"--passL:-lminiupnpc"
"--passL:-lnatpmp"
"-d:git_version=${revision}"
];
"-d:libp2p_mix_experimental_exit_is_dest"
] ++ lib.optionals stdenv.isDarwin [
# link librln via its cdylib so its Rust runtime stays in its own image
"--passL:-L${mixRlnDylib}/lib"
"--passL:-lrln"
"--passL:-Wl,-rpath,@loader_path"
] ++ lib.optionals stdenv.isLinux [
"--passL:${mixRlnDylib}/lib/librln.a"
"--passL:-Wl,--allow-multiple-definition"
])) + " " + nwakuDepPaths;
nativeBuildInputs = let
fakeGit = writeScriptBin "git" ''
@ -69,6 +124,11 @@ in stdenv.mkDerivation {
cp build/liblogoschat.so $out/lib/ 2>/dev/null || true
cp build/liblogoschat.dylib $out/lib/ 2>/dev/null || true
ls $out/lib/liblogoschat.* > /dev/null
# Ship librln.dylib beside liblogoschat.dylib so its @rpath/@loader_path
# load command resolves at runtime (darwin only).
${lib.optionalString stdenv.isDarwin ''
cp ${mixRlnDylib}/lib/librln.dylib $out/lib/
''}
cp library/liblogoschat.h $out/include/
runHook postInstall
'';

5491
rust-bundle/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,4 +9,3 @@ crate-type = ["staticlib"]
[dependencies]
libchat = { path = "../vendor/libchat/conversations" }
rln = { path = "../vendor/nwaku/vendor/zerokit/rln", features = ["arkzkey"] }

View File

@ -1,4 +1,6 @@
// Force both rlibs into this staticlib.
// Their #[no_mangle] pub extern "C" symbols are exported from librust_bundle.a.
// Force libchat's rlib into this staticlib.
// Its #[no_mangle] pub extern "C" symbols are exported from librust_bundle.a.
// rln is now linked separately via MIX_LIBRLN_FILE (librln_mix v2.0.2 stateless)
// so importing it here would duplicate `ffi_c_string_free` (defined by both
// libchat/double_ratchets and zerokit/rln).
extern crate libchat;
extern crate rln;

View File

@ -0,0 +1,45 @@
#!/usr/bin/env bash
# Stage the mix-RLN shared library (librln.dylib / librln.so) for the host
# platform.
#
# liblogoschat also statically links libchat's Rust bundle; linking librln
# statically too puts two copies of the Rust runtime in one image and collides
# at link time (`_rust_eh_personality`, `_ffi_c_string_free`). Consuming librln
# as a cdylib keeps its runtime in its own image — nothing collides and no
# symbol surgery is needed. We pull the prebuilt shared lib from zerokit's
# stateless release (the same asset build_rln.sh uses for the static archive)
# and give it an @rpath install name so liblogoschat.dylib loads it from beside
# itself at runtime.
set -euo pipefail
out="${1:?usage: fetch_mix_librln_dylib.sh <out-path> [rln-version]}"
rln_version="${2:-v2.0.2}"
case "$(uname -s)" in
Darwin) ext="dylib" ;;
*) ext="so" ;;
esac
# Host target triple, e.g. aarch64-apple-darwin / x86_64-unknown-linux-gnu.
host_triplet="$(rustc --version --verbose | awk '/host:/{print $2}')"
tarball="${host_triplet}-stateless-rln.tar.gz"
url="https://github.com/vacp2p/zerokit/releases/download/${rln_version}/${tarball}"
work="$(mktemp -d)"
trap 'rm -rf "${work}"' EXIT
echo "Fetching mix RLN shared lib: ${url}"
curl --silent --fail-with-body -L "${url}" -o "${work}/${tarball}"
tar -xzf "${work}/${tarball}" -C "${work}"
mkdir -p "$(dirname "${out}")"
cp "${work}/release/librln.${ext}" "${out}"
chmod +w "${out}"
# Repoint the install name to @rpath so the consuming library finds it via its
# own @loader_path (we ship librln.dylib beside liblogoschat.dylib).
if [ "${ext}" = "dylib" ]; then
install_name_tool -id "@rpath/$(basename "${out}")" "${out}"
fi
echo "Staged $(basename "${out}") at ${out}"

View File

@ -145,10 +145,14 @@ proc createIntroBundle*(self: ChatClient): seq[byte] =
notice "IntroBundleCreated", client = self.getId(),
bundle = result
proc sendPayloads(ds: WakuClient, payloads: seq[PayloadResult]) =
proc sendPayloads(
ds: WakuClient, payloads: seq[PayloadResult]
): Future[Result[void, string]] {.async.} =
for payload in payloads:
# TODO: (P2) surface errors
discard ds.sendBytes(payload.address, payload.data)
let res = await ds.sendBytes(payload.address, payload.data)
if res.isErr:
return err(res.error)
return ok()
#################################################
@ -168,8 +172,10 @@ proc newPrivateConversation*(client: ChatClient,
return some(ChatError(code: errLibChat, context:fmt"got: {error}" ))
client.ds.sendPayloads(payloads);
let sendRes = await client.ds.sendPayloads(payloads)
if sendRes.isErr:
error "newPrivateConversation failed to deliver", err = sendRes.error
return some(ChatError(code: errLibChat, context: sendRes.error))
client.notifyNewConversation(Conversation(ctx: client.libchatCtx,
convoId : convoId, ds: client.ds, convo_type: ConvoType.PrivateV1
@ -212,7 +218,10 @@ proc sendMessage*(convo: Conversation, content: Content) : Future[MessageId] {.a
error "SendMessage", e=error
return "error"
convo.ds.sendPayloads(payloads);
let sendRes = await convo.ds.sendPayloads(payloads)
if sendRes.isErr:
error "SendMessage failed to deliver", err = sendRes.error
raise newException(CatchableError, sendRes.error)
#################################################
@ -237,7 +246,7 @@ proc messageQueueConsumer(client: ChatClient) {.async.} =
proc start*(client: ChatClient) {.async.} =
## Start `ChatClient` and listens for incoming messages.
client.ds.addDispatchQueue(client.inboundQueue)
asyncSpawn client.ds.start()
await client.ds.start()
client.isRunning = true

View File

@ -2,24 +2,39 @@ import
chronicles,
chronos,
confutils,
eth/common/addresses as eth_addresses,
eth/common/keys as eth_keys,
eth/p2p/discoveryv5/enr as eth_enr,
bearssl/rand,
libp2p/crypto/crypto,
libp2p/crypto/secp,
libp2p/crypto/curve25519,
libp2p/crypto/rng as libp2p_rng,
libp2p/peerid,
std/random,
libp2p_mix,
libp2p_mix/curve25519 as mix_curve25519,
libp2p_mix/entry_connection,
libp2p_mix/mix_protocol as mix_proto,
nimcrypto/utils as ncrutils,
std/[random, strutils],
stew/byteutils,
strformat,
waku/[
logos_delivery/waku/[
common/logging,
common/enr as common_enr,
node/peer_manager,
waku_core,
waku_core/codecs,
waku_node,
waku_enr,
waku_mix/protocol as waku_mix_protocol,
waku_lightpush/client as lightpush_client,
discovery/waku_discv5,
discovery/waku_dnsdisc,
factory/builder,
waku_filter_v2/client,
]
],
mix_rln_spam_protection/spam_protection
logScope:
@ -40,6 +55,7 @@ proc toChatPayload*(msg: WakuMessage, pubsubTopic: PubsubTopic): ChatPayload =
const
# Placeholder
FilterContentTopic = ContentTopic("/chatsdk/test/proto")
LibchatDeliveryAddress = ContentTopic("delivery_address")
## Logos.dev Fleet ENRs
@ -77,12 +93,15 @@ type QueueRef* = ref object
type WakuConfig* = object
nodekey*: crypto.PrivateKey # TODO: protect key exposure
nodekey*: crypto.PrivateKey # TODO: protect key exposure
port*: uint16
clusterId*: uint16
shardId*: seq[uint16]
pubsubTopic*: string
staticPeers*: seq[string]
mixEnabled*: bool
mixNodes*: seq[string]
minMixPoolSize*: int
type
WakuClient* = ref object
@ -90,27 +109,89 @@ type
node*: WakuNode
dispatchQueues: seq[QueueRef]
staticPeerList: seq[RemotePeerInfo]
mixReady*: bool
proc DefaultConfig*(): WakuConfig =
let nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
# libp2p 1.15.3+: PrivateKey.random expects a libp2p Rng (ref object
# wrapping a HmacDrbgContext); wrap BearSSL's rng accordingly.
let drbg = HmacDrbgContext.new()
let nodeKey = crypto.PrivateKey.random(
Secp256k1, libp2p_rng.newBearSslRng(drbg)
).valueOr:
raise newException(ValueError, "failed to generate nodeKey: " & $error)
let clusterId = 2'u16
let shardId = 1'u16
var port: uint16 = 50000'u16 + uint16(rand(200))
result = WakuConfig(nodeKey: nodeKey, port: port, clusterId: clusterId,
shardId: @[shardId], pubsubTopic: &"/waku/2/rs/{clusterId}/{shardId}",
staticPeers: LogosDevStaticPeers)
staticPeers: LogosDevStaticPeers,
mixEnabled: false, mixNodes: @[], minMixPoolSize: 4)
proc sendBytes*(client: WakuClient, contentTopic: string,
bytes: seq[byte]) {.async.} =
proc parseNodeKey*(hex: string): Result[crypto.PrivateKey, string] =
## Parse a 64-char hex secp256k1 private key (e.g. to adopt a provisioned
## identity whose RLN keystore the mix node pool already knows).
let sk = SkPrivateKey.init(ncrutils.fromHex(hex)).valueOr:
return err("invalid nodekey hex: " & $error)
ok(crypto.PrivateKey(scheme: Secp256k1, skkey: sk))
proc sendBytes*(
client: WakuClient, contentTopic: string, bytes: seq[byte]
): Future[Result[void, string]] {.async.} =
let msg = WakuMessage(contentTopic: contentTopic, payload: bytes)
if client.cfg.mixEnabled:
# Required (mix) mode: never fall back to relay. Fail fast if the mix pool is
# below the minimum so the caller/UI can surface "can't send anonymously"
# instead of blocking indefinitely or silently relaying.
if not client.mixReady:
warn "Mix not ready: not enough mix peers",
poolSize = client.node.getMixNodePoolSize(),
required = client.cfg.minMixPoolSize
return err("not enough mix peers available")
# Bounded wait for RLN spam-protection readiness so the first send right after
# startup doesn't race RLN init. Membership is STATIC: every mix node loads the
# same immutable rln_tree.db at startup, so the Merkle root is identical across
# nodes from t=0 and never changes. There is no root to "converge", so no
# per-send wait is needed (that only mattered for dynamic on-chain membership).
if not client.node.wakuMix.isNil:
var attempts = 0
while not client.node.wakuMix.mixRlnSpamProtection.isReady() and attempts < 300:
if attempts mod 5 == 0:
info "Waiting for RLN spam protection readiness...", attempt = attempts
await sleepAsync(2.seconds)
attempts += 1
if not client.node.wakuMix.mixRlnSpamProtection.isReady():
warn "RLN spam protection not ready after timeout, sending anyway"
info "Sending via mix (lightpushPublish)",
contentTopic = contentTopic, mixPoolSize = client.node.getMixNodePoolSize()
let publishFut = client.node.lightpushPublish(
some(PubsubTopic(client.cfg.pubsubTopic)), msg, none(RemotePeerInfo), mixify = true
)
# Forward delivery is independent of the SURB reply, so a SURB-reply timeout
# is a warning (message may still have been delivered), not a failure.
if not await publishFut.withTimeout(60.seconds):
await publishFut.cancelAndWait()
warn "Mix lightpush: no SURB reply within 60s (forward path independent of SURB reply)"
return ok()
let res = publishFut.read()
if res.isErr:
error "Failed to publish via mix", err = $res.error
return err("mix send failed")
info "Message sent via mix successfully"
return ok()
# None mode: normal relay publish.
let res = await client.node.publish(some(PubsubTopic(client.cfg.pubsubTopic)), msg)
if res.isErr:
error "Failed to Publish", err = res.error,
pubsubTopic = client.cfg.pubsubTopic
error "Failed to Publish", err = res.error, pubsubTopic = client.cfg.pubsubTopic
return err("relay send failed")
return ok()
proc buildWakuNode(cfg: WakuConfig): WakuNode =
let
@ -145,89 +226,60 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode =
result = node
proc splitPeerIdAndAddr(maddr: string): (string, string) =
let parts = maddr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", maddr = maddr
return ("", "")
return (parts[0], parts[1])
proc parseMixNodes(nodeStrs: seq[string]): seq[MixNodePubInfo] =
for nodeStr in nodeStrs:
let elements = nodeStr.split(":")
if elements.len != 2:
error "Invalid mixnode format, expected multiaddr:mixPubKeyHex", node = nodeStr
continue
result.add(MixNodePubInfo(
multiAddr: elements[0],
pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
))
proc waitForMixPool(client: WakuClient) {.async.} =
while client.node.getMixNodePoolSize() < client.cfg.minMixPoolSize:
info "Waiting for mix node pool",
current = client.node.getMixNodePoolSize(),
required = client.cfg.minMixPoolSize
await sleepAsync(1000.milliseconds)
client.mixReady = true
notice "Mix node pool ready", poolSize = client.node.getMixNodePoolSize()
proc getMixPoolSize*(client: WakuClient): int =
if client.cfg.mixEnabled:
return client.node.getMixNodePoolSize()
return 0
proc subscribeAllStaticPeers(client: WakuClient) {.async.} =
## Issues a fresh filter subscribe to every static peer for both
## FilterContentTopic and LibchatDeliveryAddress. Idempotent on the relay
## side. Called from start() (so the receiver is ready before any sender
## activity) and from taskKeepAlive (so subscriptions don't silently lapse
## on long-running clients — ping-then-subscribe was unreliable because
## a successful ping doesn't guarantee the content-topic subscription
## state is still live on the relay).
for peerInfo in client.staticPeerList:
let subscribeRes = await client.node.wakuFilterClient.subscribe(
peerInfo, client.cfg.pubsubTopic, @[FilterContentTopic, LibchatDeliveryAddress]
)
if subscribeRes.isErr():
warn "filter subscribe failed",
peerId = $peerInfo.peerId, err = subscribeRes.error
else:
debug "filter subscribe ok", peerId = $peerInfo.peerId
proc taskKeepAlive(client: WakuClient) {.async.} =
while true:
for peerInfo in client.staticPeerList:
debug "maintaining subscription", peerId = $peerInfo.peerId
# First use filter-ping to check if we have an active subscription
let pingRes = await client.node.wakuFilterClient.ping(peerInfo)
if pingRes.isErr():
# No subscription found. Let's subscribe.
warn "no subscription found. Sending subscribe request"
# TODO: Use filter. Removing this stops relay from working so keeping for now
let subscribeRes = await client.node.wakuFilterClient.subscribe(
peerInfo, client.cfg.pubsubTopic, @[FilterContentTopic]
)
if subscribeRes.isErr():
error "subscribe request failed. Skipping.", err = subscribeRes.error
continue
else:
debug "subscribe request successful."
else:
debug "subscription found."
await sleepAsync(60.seconds) # Subscription maintenance interval
proc start*(client: WakuClient) {.async.} =
setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)
await client.node.mountFilter()
await client.node.mountFilterClient()
await client.node.start()
(await client.node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
quit(1)
client.node.peerManager.start()
# Connect to all configured static peers
if client.staticPeerList.len > 0:
info "Connecting to static peers", peerCount = client.staticPeerList.len
asyncSpawn client.node.connectToNodes(client.staticPeerList)
else:
warn "No valid static peers configured"
let subscription: SubscriptionEvent = (kind: PubsubSub, topic:
client.cfg.pubsubTopic)
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
let payloadStr = string.fromBytes(msg.payload)
debug "message received",
pubsubTopic = topic,
contentTopic = msg.contentTopic
let payload = msg.toChatPayload(topic)
for queueRef in client.dispatchQueues:
await queueRef.queue.put(payload)
let res = subscribe(client.node, subscription, handler)
if res.isErr:
error "Subscribe failed", err = res.error
await allFutures(taskKeepAlive(client))
proc initWakuClient*(cfg: WakuConfig): WakuClient =
# Parse ENRs from static peers configuration
var peerInfos: seq[RemotePeerInfo] = @[]
for enrStr in cfg.staticPeers:
let enrRecord = eth_enr.Record.fromURI(enrStr).valueOr:
error "Failed to parse ENR in initWakuClient", enr = enrStr, err = error
continue
let peerInfo = enrRecord.toRemotePeerInfo().valueOr:
error "Failed to convert ENR to PeerInfo in initWakuClient", enr = enrStr, err = error
continue
peerInfos.add(peerInfo)
result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), dispatchQueues: @[],
staticPeerList: peerInfos)
proc addDispatchQueue*(client: var WakuClient, queue: QueueRef) =
client.dispatchQueues.add(queue)
await client.subscribeAllStaticPeers()
proc getConnectedPeerCount*(client: WakuClient): int =
var count = 0
@ -236,5 +288,97 @@ proc getConnectedPeerCount*(client: WakuClient): int =
inc count
return count
proc start*(client: WakuClient) {.async.} =
setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)
await client.node.mountFilter()
await client.node.mountFilterClient()
client.node.mountAutoSharding(client.cfg.clusterId, uint32(client.cfg.shardId.len)).isOkOr:
error "failed to mount auto sharding", error = error
if client.cfg.mixEnabled:
let (mixPrivKey, mixPubKey) = mix_curve25519.generateKeyPair().valueOr:
error "Failed to generate mix key pair", error = error
quit(QuitFailure)
let mixNodeInfos = parseMixNodes(client.cfg.mixNodes)
client.node.mountLightPushClient()
(await client.node.mountMix(client.cfg.clusterId, mixPrivKey, mixNodeInfos,
disableSpamProtection = false)).isOkOr:
error "Failed to mount mix protocol", error = $error
quit(QuitFailure)
asyncSpawn client.waitForMixPool()
await client.node.start()
client.node.peerManager.start()
# Register filter push handler for incoming messages (no relay/gossipsub needed)
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
debug "filter message received",
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic
let payload = msg.toChatPayload(pubsubTopic)
for queueRef in client.dispatchQueues:
await queueRef.queue.put(payload)
client.node.wakuFilterClient.registerPushHandler(filterHandler)
# Connect to all configured static peers
if client.staticPeerList.len > 0:
info "Connecting to static peers", peerCount = client.staticPeerList.len
await client.node.connectToNodes(client.staticPeerList)
info "Connected to static peers"
else:
warn "No valid static peers configured"
# Subscribe to every static peer SYNCHRONOUSLY before start() returns so
# the receiver has live content-topic subscriptions in place before any
# sender mix-publish fires. taskKeepAlive then re-subscribes every 60s
# to defend against silent server-side subscription expiry.
await client.subscribeAllStaticPeers()
asyncSpawn taskKeepAlive(client)
info "Waku client started",
relayMounted = not client.node.wakuRelay.isNil,
mixMounted = not client.node.wakuMix.isNil,
connectedPeers = client.getConnectedPeerCount()
# Debug: periodically log relay/mesh status
proc meshStatusTask(client: WakuClient) {.async.} =
while true:
await sleepAsync(15.seconds)
let connPeers = client.getConnectedPeerCount()
info "Peer status",
pubsubTopic = client.cfg.pubsubTopic,
connectedPeers = connPeers,
mixReady = client.mixReady,
mixPoolSize = (if client.cfg.mixEnabled and not client.node.wakuMix.isNil: client.node.getMixNodePoolSize() else: 0)
asyncSpawn meshStatusTask(client)
proc initWakuClient*(cfg: WakuConfig): WakuClient =
var peerInfos: seq[RemotePeerInfo] = @[]
for peerStr in cfg.staticPeers:
if peerStr.startsWith("/"):
let peerInfo = parsePeerInfo(peerStr).valueOr:
error "Failed to parse multiaddr peer", peer = peerStr, err = error
continue
peerInfos.add(peerInfo)
else:
let enrRecord = eth_enr.Record.fromURI(peerStr).valueOr:
error "Failed to parse ENR", enr = peerStr, err = error
continue
let peerInfo = enrRecord.toRemotePeerInfo().valueOr:
error "Failed to convert ENR to PeerInfo", enr = peerStr, err = error
continue
peerInfos.add(peerInfo)
result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), dispatchQueues: @[],
staticPeerList: peerInfos)
proc addDispatchQueue*(client: var WakuClient, queue: QueueRef) =
client.dispatchQueues.add(queue)
proc stop*(client: WakuClient) {.async.} =
await client.node.stop()

View File

@ -1,4 +1,4 @@
import waku/waku_core
import logos_delivery/waku/waku_core
import std/[macros, times]
import blake2
import strutils

@ -1 +1 @@
Subproject commit 4d74e157cdf1bdcd0ffd41519ebde740c4b80447
Subproject commit 38d24eb3bd93e605fb88199da71d36b1ec0ad60d

2
vendor/nwaku vendored

@ -1 +1 @@
Subproject commit 41146a9193c1e360b9a0049d672260a72c4ca2bf
Subproject commit efafdfdc24f369942951dd7ceeec37e5913b19eb