mirror of
https://github.com/logos-messaging/nim-chat-poc.git
synced 2026-07-04 15:39:32 +00:00
feat: add mix+LEZ RLN integration for logos-chat
Integrate logos-chat with the LEZ-backed mix network: - Update vendor/nwaku to logos-delivery fork (feat/mix-rln-gifter-sim) with mix protocol, OnchainLEZGroupManager, RLN gifter client, and mix-rln-spam-protection-plugin - Add vendor/logos-lez-rln submodule for reproducible LEZ module builds - Add RLN FFI surface to liblogoschat: chat_set_rln_fetcher, chat_set_rln_config, chat_push_roots, chat_push_proof, etc. - Wire gifter client for RLN membership registration via mix network - Switch to filter-based message reception (relay: false, filter: true) - Send via lightpushPublish(mixify=true) for Sphinx onion routing - Add mix-librln separate build + duplicate symbol resolution - Exclude vendor/logos-lez-rln from nimble link creation - Update nix build for mix-librln and cross-platform support
This commit is contained in:
parent
15f68f2ec2
commit
4060bb67c1
6
.gitmodules
vendored
6
.gitmodules
vendored
@ -3,7 +3,7 @@
|
||||
url = https://github.com/status-im/nimbus-build-system.git
|
||||
[submodule "vendor/nwaku"]
|
||||
path = vendor/nwaku
|
||||
url = https://github.com/waku-org/nwaku.git
|
||||
url = https://github.com/adklempner/logos-delivery.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
[submodule "vendor/nim-protobuf-serialization"]
|
||||
@ -28,3 +28,7 @@
|
||||
path = vendor/nim-ffi
|
||||
url = https://github.com/logos-messaging/nim-ffi.git
|
||||
branch = master
|
||||
[submodule "vendor/logos-lez-rln"]
|
||||
path = vendor/logos-lez-rln
|
||||
url = https://github.com/logos-co/logos-lez-rln.git
|
||||
branch = feat/mix-rln-gifter-sim
|
||||
|
||||
33
Makefile
33
Makefile
@ -1,7 +1,9 @@
|
||||
export BUILD_SYSTEM_DIR := vendor/nimbus-build-system
|
||||
export EXCLUDED_NIM_PACKAGES := vendor/nwaku/vendor/nim-dnsdisc/vendor \
|
||||
vendor/nwaku/vendor/nimbus-build-system \
|
||||
vendor/nim-sds/vendor
|
||||
vendor/nwaku/vendor/nim-ffi \
|
||||
vendor/nim-sds/vendor \
|
||||
vendor/logos-lez-rln
|
||||
LINK_PCRE := 0
|
||||
FORMAT_MSG := "\\x1B[95mFormatting:\\x1B[39m"
|
||||
# we don't want an error here, so we can handle things later, in the ".DEFAULT" target
|
||||
@ -69,7 +71,7 @@ TARGET ?= prod
|
||||
## Git version
|
||||
GIT_VERSION ?= $(shell git describe --abbrev=6 --always --tags)
|
||||
## Compilation parameters. If defined in the CLI the assignments won't be executed
|
||||
NIM_PARAMS := $(NIM_PARAMS) -d:git_version=\"$(GIT_VERSION)\"
|
||||
NIM_PARAMS := $(NIM_PARAMS) -d:git_version=\"$(GIT_VERSION)\" -d:libp2p_mix_experimental_exit_is_dest
|
||||
|
||||
##################
|
||||
## Dependencies ##
|
||||
@ -78,6 +80,25 @@ NIM_PARAMS := $(NIM_PARAMS) -d:git_version=\"$(GIT_VERSION)\"
|
||||
CARGO_TARGET_DIR ?= rust-bundle/target
|
||||
RUST_BUNDLE_LIB := $(CARGO_TARGET_DIR)/release/liblogoschat_rust_bundle.a
|
||||
|
||||
# Mix RLN spam protection library (separate zerokit build for mix-rln-spam-protection-plugin)
|
||||
MIX_LIBRLN_VERSION ?= v2.0.0
|
||||
MIX_LIBRLN_FILE ?= $(CURDIR)/build/librln_mix_$(MIX_LIBRLN_VERSION).a
|
||||
MIX_LIBRLN_NIM_PARAMS := --passL:$(MIX_LIBRLN_FILE) --passL:-lm
|
||||
ifneq ($(detected_OS),Darwin)
|
||||
MIX_LIBRLN_NIM_PARAMS += --passL:"-Wl,--allow-multiple-definition"
|
||||
endif
|
||||
|
||||
.PHONY: mix-librln
|
||||
mix-librln: | $(MIX_LIBRLN_FILE)
|
||||
|
||||
$(MIX_LIBRLN_FILE):
|
||||
echo -e $(BUILD_MSG) "$@" && \
|
||||
$(CURDIR)/vendor/nwaku/scripts/build_rln_mix.sh \
|
||||
$(CURDIR)/build/zerokit_$(MIX_LIBRLN_VERSION) \
|
||||
$(MIX_LIBRLN_VERSION) \
|
||||
$(MIX_LIBRLN_FILE) && \
|
||||
$(CURDIR)/scripts/fix_mix_librln_dupes.sh $(MIX_LIBRLN_FILE) $(RUST_BUNDLE_LIB)
|
||||
|
||||
# libchat and rln each embed Rust std when built as staticlibs; linking both
|
||||
# causes duplicate-symbol errors. rust-bundle/ links them as rlibs so std
|
||||
# is emitted once. [1]
|
||||
@ -107,9 +128,9 @@ tests: | build-rust-bundle build-waku-nat logos_chat.nims
|
||||
##########
|
||||
|
||||
# Ensure there is a nimble task with a name that matches the target
|
||||
tui bot_echo pingpong: | build-rust-bundle build-waku-nat logos_chat.nims
|
||||
tui bot_echo pingpong: | build-rust-bundle build-waku-nat mix-librln logos_chat.nims
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim $@ $(NIM_PARAMS) \
|
||||
$(ENV_SCRIPT) nim $@ $(NIM_PARAMS) $(MIX_LIBRLN_NIM_PARAMS) \
|
||||
--passL:$(RUST_BUNDLE_LIB) --passL:-lm \
|
||||
--path:src logos_chat.nims
|
||||
|
||||
@ -129,9 +150,9 @@ endif
|
||||
LIBLOGOSCHAT := build/liblogoschat.$(LIBLOGOSCHAT_EXT)
|
||||
|
||||
.PHONY: liblogoschat
|
||||
liblogoschat: | build-rust-bundle build-waku-nat logos_chat.nims
|
||||
liblogoschat: | build-rust-bundle build-waku-nat mix-librln logos_chat.nims
|
||||
echo -e $(BUILD_MSG) "$(LIBLOGOSCHAT)" && \
|
||||
$(ENV_SCRIPT) nim liblogoschat $(NIM_PARAMS) \
|
||||
$(ENV_SCRIPT) nim liblogoschat $(NIM_PARAMS) $(MIX_LIBRLN_NIM_PARAMS) \
|
||||
--passL:$(RUST_BUNDLE_LIB) --passL:-lm \
|
||||
--path:src logos_chat.nims && \
|
||||
echo -e "\n\x1B[92mLibrary built successfully:\x1B[39m" && \
|
||||
|
||||
@ -50,7 +50,20 @@ proc createChatClient(
|
||||
wakuCfg.staticPeers = @[]
|
||||
for peer in config["staticPeers"]:
|
||||
wakuCfg.staticPeers.add(peer.getStr())
|
||||
|
||||
|
||||
if config.hasKey("mixEnabled"):
|
||||
wakuCfg.mixEnabled = config["mixEnabled"].getBool(false)
|
||||
if config.hasKey("mixNodes"):
|
||||
wakuCfg.mixNodes = @[]
|
||||
for node in config["mixNodes"]:
|
||||
wakuCfg.mixNodes.add(node.getStr())
|
||||
if config.hasKey("destPeerAddr"):
|
||||
wakuCfg.destPeerAddr = config["destPeerAddr"].getStr("")
|
||||
if config.hasKey("minMixPoolSize"):
|
||||
wakuCfg.minMixPoolSize = config["minMixPoolSize"].getInt(4)
|
||||
if config.hasKey("gifterNodeAddr"):
|
||||
wakuCfg.gifterNodeAddr = config["gifterNodeAddr"].getStr("")
|
||||
|
||||
# Create Waku client
|
||||
let wakuClient = initWakuClient(wakuCfg)
|
||||
|
||||
@ -117,6 +130,28 @@ proc chat_get_id(
|
||||
let clientId = ctx.myLib[].getId()
|
||||
return ok(clientId)
|
||||
|
||||
#################################################
|
||||
# Mix Protocol Status
|
||||
#################################################
|
||||
|
||||
proc chat_get_mix_status(
|
||||
ctx: ptr FFIContext[ChatClient],
|
||||
callback: FFICallBack,
|
||||
userData: pointer
|
||||
) {.ffi.} =
|
||||
let client = ctx.myLib[]
|
||||
let mixEnabled = client.ds.cfg.mixEnabled
|
||||
var poolSize = 0
|
||||
if mixEnabled:
|
||||
poolSize = client.ds.getMixPoolSize()
|
||||
let status = %*{
|
||||
"mixEnabled": mixEnabled,
|
||||
"mixReady": client.ds.mixReady,
|
||||
"mixPoolSize": poolSize,
|
||||
"minPoolSize": client.ds.cfg.minMixPoolSize
|
||||
}
|
||||
return ok($status)
|
||||
|
||||
#################################################
|
||||
# Conversation List Operations
|
||||
#################################################
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import ffi
|
||||
import src/chat/client
|
||||
import waku/waku_mix/logos_core_client as mix_rln_client
|
||||
|
||||
declareLibrary("logoschat")
|
||||
|
||||
@ -11,3 +12,40 @@ proc set_event_callback(
|
||||
ctx[].eventCallback = cast[pointer](callback)
|
||||
ctx[].eventUserData = userData
|
||||
|
||||
proc chat_set_rln_fetcher(
|
||||
ctx: ptr FFIContext[ChatClient], fetcher: mix_rln_client.RlnFetcherFunc, fetcherData: pointer
|
||||
) {.dynlib, exportc, cdecl.} =
|
||||
if fetcher.isNil:
|
||||
echo "error: nil fetcher in chat_set_rln_fetcher"
|
||||
return
|
||||
mix_rln_client.setRlnFetcher(fetcher, fetcherData)
|
||||
|
||||
proc chat_set_rln_config(
|
||||
ctx: ptr FFIContext[ChatClient], configAccountId: cstring, leafIndex: cint
|
||||
): cint {.dynlib, exportc, cdecl.} =
|
||||
if configAccountId.isNil:
|
||||
return RET_ERR
|
||||
mix_rln_client.setRlnConfig($configAccountId, leafIndex.int)
|
||||
return RET_OK
|
||||
|
||||
proc chat_set_rln_identity(
|
||||
ctx: ptr FFIContext[ChatClient], idSecretHashHex: cstring
|
||||
) {.dynlib, exportc, cdecl.} =
|
||||
if idSecretHashHex.isNil:
|
||||
return
|
||||
mix_rln_client.setRlnIdentity($idSecretHashHex)
|
||||
|
||||
proc chat_push_roots(
|
||||
ctx: ptr FFIContext[ChatClient], rootsJson: cstring
|
||||
) {.dynlib, exportc, cdecl.} =
|
||||
if rootsJson.isNil:
|
||||
return
|
||||
mix_rln_client.pushRoots($rootsJson)
|
||||
|
||||
proc chat_push_proof(
|
||||
ctx: ptr FFIContext[ChatClient], proofJson: cstring
|
||||
) {.dynlib, exportc, cdecl.} =
|
||||
if proofJson.isNil:
|
||||
return
|
||||
mix_rln_client.pushProof($proofJson)
|
||||
|
||||
|
||||
@ -30,6 +30,10 @@ typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len,
|
||||
// - "clusterId": int - Waku cluster ID (optional)
|
||||
// - "shardId": int - Waku shard ID (optional)
|
||||
// - "staticPeers": array of strings - static peer multiaddrs (optional)
|
||||
// - "mixEnabled": bool - enable mix protocol for sender anonymity (default: false)
|
||||
// - "mixNodes": array of strings - mix bootstrap nodes as "multiaddr:mixPubKeyHex"
|
||||
// - "destPeerAddr": string - lightpush destination peer "multiaddr/p2p/peerId"
|
||||
// - "minMixPoolSize": int - minimum mix pool size before sending via mix (default: 4)
|
||||
void *chat_new(const char *configJson, FFICallBack callback, void *userData);
|
||||
|
||||
// Start the chat client and begin listening for messages
|
||||
@ -97,6 +101,38 @@ int chat_get_identity(void *ctx, FFICallBack callback, void *userData);
|
||||
// Returns the intro bundle as an ASCII string (format: logos_chatintro_<version>_<base64url payload>)
|
||||
int chat_create_intro_bundle(void *ctx, FFICallBack callback, void *userData);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Mix Protocol Status
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Get mix protocol status
|
||||
// Returns JSON: {"mixEnabled":bool,"mixReady":bool,"mixPoolSize":int,"minPoolSize":int}
|
||||
int chat_get_mix_status(void *ctx, FFICallBack callback, void *userData);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// RLN Integration (for logos-core module wiring)
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// RLN fetcher callback type: C++ implements this, Nim calls it to get
|
||||
// roots/proofs from the RLN module via LogosAPI IPC.
|
||||
typedef int (*RlnFetcherFunc)(const char *method, const char *params,
|
||||
FFICallBack callback, void *callbackData, void *fetcherData);
|
||||
|
||||
// Register the RLN fetcher callback (called by C++ plugin during init)
|
||||
void chat_set_rln_fetcher(void *ctx, RlnFetcherFunc fetcher, void *fetcherData);
|
||||
|
||||
// Set RLN configuration (account ID + leaf index in the on-chain tree)
|
||||
int chat_set_rln_config(void *ctx, const char *configAccountId, int leafIndex);
|
||||
|
||||
// Set RLN identity credential (seed hex for proof generation)
|
||||
void chat_set_rln_identity(void *ctx, const char *idSecretHashHex);
|
||||
|
||||
// Push valid merkle roots from RLN module events
|
||||
void chat_push_roots(void *ctx, const char *rootsJson);
|
||||
|
||||
// Push merkle proof from RLN module events
|
||||
void chat_push_proof(void *ctx, const char *proofJson);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
{ lib, stdenv, nim, which, pkg-config, writeScriptBin,
|
||||
openssl, miniupnpc, libnatpmp,
|
||||
openssl, miniupnpc, libnatpmp, rustPlatform, fetchFromGitHub, darwin ? {},
|
||||
src, # logos-chat source (self from flake, with submodules=1)
|
||||
rustBundleDrv }: # result of rust_bundle.nix
|
||||
|
||||
@ -13,20 +13,50 @@ assert lib.assertMsg ((src.submodules or false) == true)
|
||||
|
||||
let
|
||||
revision = lib.substring 0 8 (src.rev or "dirty");
|
||||
logosChatSrc = src;
|
||||
|
||||
zerokitMixSrc = fetchFromGitHub {
|
||||
owner = "vacp2p";
|
||||
repo = "zerokit";
|
||||
rev = "v2.0.0";
|
||||
hash = "sha256-5a2cL26uw7NdLCD0gCA3tS7uX8W9yxRGqcPhWNevstM=";
|
||||
};
|
||||
|
||||
mixRlnLib = rustPlatform.buildRustPackage {
|
||||
pname = "librln-mix";
|
||||
version = "2.0.0";
|
||||
src = zerokitMixSrc;
|
||||
cargoHash = "sha256-SoMl0QBBgTG1b4UhOlErlzWmg3J6G0xOC0tNDddOptA=";
|
||||
buildAndTestSubdir = "rln";
|
||||
buildType = "release";
|
||||
nativeBuildInputs = lib.optionals stdenv.isDarwin [ darwin.cctools ];
|
||||
doCheck = false;
|
||||
installPhase = ''
|
||||
mkdir -p $out/lib
|
||||
find target -name "librln.a" -exec cp {} $out/lib/librln_mix_v2.0.0.a \;
|
||||
ls $out/lib/librln_mix_v2.0.0.a
|
||||
'' + lib.optionalString stdenv.isDarwin ''
|
||||
bash ${logosChatSrc}/scripts/fix_mix_librln_dupes.sh $out/lib/librln_mix_v2.0.0.a ${rustBundleDrv}/lib/liblogoschat_rust_bundle.a
|
||||
'';
|
||||
};
|
||||
in stdenv.mkDerivation {
|
||||
pname = "liblogoschat";
|
||||
version = "0.1.0";
|
||||
inherit src;
|
||||
|
||||
NIMFLAGS = lib.concatStringsSep " " [
|
||||
NIMFLAGS = lib.concatStringsSep " " ([
|
||||
"--passL:${rustBundleDrv}/lib/liblogoschat_rust_bundle.a"
|
||||
"--passL:${mixRlnLib}/lib/librln_mix_v2.0.0.a"
|
||||
"--passL:-lm"
|
||||
"-d:miniupnpcUseSystemLibs"
|
||||
"-d:libnatpmpUseSystemLibs"
|
||||
"--passL:-lminiupnpc"
|
||||
"--passL:-lnatpmp"
|
||||
"-d:git_version=${revision}"
|
||||
];
|
||||
"-d:libp2p_mix_experimental_exit_is_dest"
|
||||
] ++ lib.optionals stdenv.isLinux [
|
||||
"--passL:-Wl,--allow-multiple-definition"
|
||||
]);
|
||||
|
||||
nativeBuildInputs = let
|
||||
fakeGit = writeScriptBin "git" ''
|
||||
|
||||
4052
rust-bundle/Cargo.lock
generated
4052
rust-bundle/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -9,4 +9,4 @@ crate-type = ["staticlib"]
|
||||
|
||||
[dependencies]
|
||||
libchat = { path = "../vendor/libchat/conversations" }
|
||||
rln = { path = "../vendor/nwaku/vendor/zerokit/rln", features = ["arkzkey"] }
|
||||
rln = { path = "../vendor/nwaku/vendor/zerokit/rln" }
|
||||
|
||||
41
scripts/fix_mix_librln_dupes.sh
Executable file
41
scripts/fix_mix_librln_dupes.sh
Executable file
@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env bash
|
||||
# Fix duplicate symbols between librln_mix and rust-bundle on macOS.
|
||||
set -uo pipefail
|
||||
|
||||
LIB="$(cd "$(dirname "$1")" && pwd)/$(basename "$1")"
|
||||
RUST_BUNDLE="${2:-}"
|
||||
[ -n "$RUST_BUNDLE" ] && RUST_BUNDLE="$(cd "$(dirname "$RUST_BUNDLE")" && pwd)/$(basename "$RUST_BUNDLE")"
|
||||
|
||||
case "$(uname -s)" in
|
||||
Darwin)
|
||||
[ -z "$RUST_BUNDLE" ] && echo "Usage: $0 <mix-lib> <rust-bundle-lib>" && exit 0
|
||||
|
||||
WORK=$(mktemp -d)
|
||||
trap 'rm -rf "$WORK"' EXIT
|
||||
|
||||
# Match all global symbols (T=text, D=data, S=common, B=BSS, etc — uppercase = global)
|
||||
(nm "$RUST_BUNDLE" 2>/dev/null || true) | grep " [TDSBCR] " | awk '{print $3}' | sort -u > "$WORK/b.txt"
|
||||
(nm "$LIB" 2>/dev/null || true) | grep " [TDSBCR] " | awk '{print $3}' | sort -u > "$WORK/m.txt"
|
||||
comm -12 "$WORK/b.txt" "$WORK/m.txt" > "$WORK/d.txt"
|
||||
DCOUNT=$(wc -l < "$WORK/d.txt" | tr -d ' ')
|
||||
[ "$DCOUNT" -eq 0 ] && echo "No duplicates." && exit 0
|
||||
echo "Localizing $DCOUNT duplicate symbols in $(basename "$LIB")..."
|
||||
|
||||
mkdir "$WORK/o" && cd "$WORK/o"
|
||||
ar x "$LIB"
|
||||
FIXED=0
|
||||
for f in *.o; do
|
||||
# Get this object's global text symbols, intersect with dupes
|
||||
(nm "$f" 2>/dev/null || true) | grep " [TDSBCR] " | awk '{print $3}' | sort -u > "$WORK/obj.txt"
|
||||
comm -12 "$WORK/d.txt" "$WORK/obj.txt" > "$WORK/obj_dupes.txt"
|
||||
if [ -s "$WORK/obj_dupes.txt" ]; then
|
||||
nmedit -R "$WORK/obj_dupes.txt" "$f"
|
||||
FIXED=$((FIXED + 1))
|
||||
fi
|
||||
done
|
||||
rm "$LIB"
|
||||
ar rcs "$LIB" *.o
|
||||
echo "Fixed $FIXED objects."
|
||||
;;
|
||||
*) echo "No fix needed on $(uname -s)" ;;
|
||||
esac
|
||||
@ -237,7 +237,7 @@ proc messageQueueConsumer(client: ChatClient) {.async.} =
|
||||
proc start*(client: ChatClient) {.async.} =
|
||||
## Start `ChatClient` and listens for incoming messages.
|
||||
client.ds.addDispatchQueue(client.inboundQueue)
|
||||
asyncSpawn client.ds.start()
|
||||
await client.ds.start()
|
||||
|
||||
client.isRunning = true
|
||||
|
||||
|
||||
@ -4,8 +4,14 @@ import
|
||||
confutils,
|
||||
eth/p2p/discoveryv5/enr as eth_enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/peerid,
|
||||
std/random,
|
||||
libp2p/protocols/mix,
|
||||
libp2p/protocols/mix/curve25519 as mix_curve25519,
|
||||
libp2p/protocols/mix/entry_connection,
|
||||
libp2p/protocols/mix/mix_protocol as mix_proto,
|
||||
nimcrypto/utils as ncrutils,
|
||||
std/[random, strutils],
|
||||
stew/byteutils,
|
||||
strformat,
|
||||
waku/[
|
||||
@ -13,13 +19,22 @@ import
|
||||
common/enr as common_enr,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_core/codecs,
|
||||
waku_node,
|
||||
waku_enr,
|
||||
waku_mix/protocol as waku_mix_protocol,
|
||||
waku_mix/logos_core_client as mix_lez_client,
|
||||
waku_lightpush/client as lightpush_client,
|
||||
discovery/waku_discv5,
|
||||
discovery/waku_dnsdisc,
|
||||
factory/builder,
|
||||
waku_filter_v2/client,
|
||||
]
|
||||
waku_rln_relay/rln_gifter/client as rln_gifter_client,
|
||||
waku_rln_relay/rln_gifter/protocol as rln_gifter_protocol,
|
||||
],
|
||||
mix_rln_spam_protection/onchain_group_manager,
|
||||
mix_rln_spam_protection/rln_interface as mix_rln_interface,
|
||||
mix_rln_spam_protection/spam_protection
|
||||
|
||||
|
||||
logScope:
|
||||
@ -40,6 +55,7 @@ proc toChatPayload*(msg: WakuMessage, pubsubTopic: PubsubTopic): ChatPayload =
|
||||
const
|
||||
# Placeholder
|
||||
FilterContentTopic = ContentTopic("/chatsdk/test/proto")
|
||||
LibchatDeliveryAddress = ContentTopic("delivery_address")
|
||||
|
||||
## Logos.dev Fleet ENRs
|
||||
|
||||
@ -77,12 +93,17 @@ type QueueRef* = ref object
|
||||
|
||||
|
||||
type WakuConfig* = object
|
||||
nodekey*: crypto.PrivateKey # TODO: protect key exposure
|
||||
nodekey*: crypto.PrivateKey # TODO: protect key exposure
|
||||
port*: uint16
|
||||
clusterId*: uint16
|
||||
shardId*: seq[uint16]
|
||||
pubsubTopic*: string
|
||||
staticPeers*: seq[string]
|
||||
mixEnabled*: bool
|
||||
mixNodes*: seq[string]
|
||||
destPeerAddr*: string
|
||||
minMixPoolSize*: int
|
||||
gifterNodeAddr*: string
|
||||
|
||||
type
|
||||
WakuClient* = ref object
|
||||
@ -90,6 +111,8 @@ type
|
||||
node*: WakuNode
|
||||
dispatchQueues: seq[QueueRef]
|
||||
staticPeerList: seq[RemotePeerInfo]
|
||||
mixReady*: bool
|
||||
destPeerId: PeerId
|
||||
|
||||
|
||||
proc DefaultConfig*(): WakuConfig =
|
||||
@ -100,17 +123,51 @@ proc DefaultConfig*(): WakuConfig =
|
||||
|
||||
result = WakuConfig(nodeKey: nodeKey, port: port, clusterId: clusterId,
|
||||
shardId: @[shardId], pubsubTopic: &"/waku/2/rs/{clusterId}/{shardId}",
|
||||
staticPeers: LogosDevStaticPeers)
|
||||
staticPeers: LogosDevStaticPeers,
|
||||
mixEnabled: false, mixNodes: @[], destPeerAddr: "", minMixPoolSize: 4,
|
||||
gifterNodeAddr: "")
|
||||
|
||||
|
||||
proc sendBytes*(client: WakuClient, contentTopic: string,
|
||||
bytes: seq[byte]) {.async.} =
|
||||
|
||||
let msg = WakuMessage(contentTopic: contentTopic, payload: bytes)
|
||||
let res = await client.node.publish(some(PubsubTopic(client.cfg.pubsubTopic)), msg)
|
||||
if res.isErr:
|
||||
error "Failed to Publish", err = res.error,
|
||||
pubsubTopic = client.cfg.pubsubTopic
|
||||
|
||||
if client.cfg.mixEnabled:
|
||||
# Wait for mix pool to be ready before sending
|
||||
while not client.mixReady:
|
||||
info "Waiting for mix pool before sending..."
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
# Wait for RLN spam protection to be ready (roots + proofs fetched from LEZ)
|
||||
if not client.node.wakuMix.isNil:
|
||||
var attempts = 0
|
||||
while not client.node.wakuMix.mixRlnSpamProtection.isReady() and attempts < 45:
|
||||
if attempts mod 5 == 0:
|
||||
info "Waiting for RLN spam protection readiness...", attempt = attempts
|
||||
await sleepAsync(2.seconds)
|
||||
attempts += 1
|
||||
if client.node.wakuMix.mixRlnSpamProtection.isReady():
|
||||
info "RLN spam protection ready, waiting 30s for root convergence across mix nodes"
|
||||
await sleepAsync(30.seconds)
|
||||
else:
|
||||
warn "RLN spam protection not ready after timeout, sending anyway"
|
||||
|
||||
if client.cfg.mixEnabled and client.mixReady:
|
||||
info "Sending via mix (lightpushPublish)", contentTopic = contentTopic, mixPoolSize = client.node.getMixNodePoolSize()
|
||||
let res = await client.node.lightpushPublish(
|
||||
some(PubsubTopic(client.cfg.pubsubTopic)), msg, none(RemotePeerInfo), mixify = true
|
||||
)
|
||||
if res.isErr:
|
||||
error "Failed to publish via mix"
|
||||
else:
|
||||
info "Message sent via mix successfully"
|
||||
else:
|
||||
warn "Sending via relay fallback (mix not ready or not enabled)",
|
||||
mixEnabled = client.cfg.mixEnabled, mixReady = client.mixReady
|
||||
let res = await client.node.publish(some(PubsubTopic(client.cfg.pubsubTopic)), msg)
|
||||
if res.isErr:
|
||||
error "Failed to Publish", err = res.error,
|
||||
pubsubTopic = client.cfg.pubsubTopic
|
||||
|
||||
proc buildWakuNode(cfg: WakuConfig): WakuNode =
|
||||
let
|
||||
@ -145,6 +202,38 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode =
|
||||
result = node
|
||||
|
||||
|
||||
proc splitPeerIdAndAddr(maddr: string): (string, string) =
|
||||
let parts = maddr.split("/p2p/")
|
||||
if parts.len != 2:
|
||||
error "Invalid multiaddress format", maddr = maddr
|
||||
return ("", "")
|
||||
return (parts[0], parts[1])
|
||||
|
||||
proc parseMixNodes(nodeStrs: seq[string]): seq[MixNodePubInfo] =
|
||||
for nodeStr in nodeStrs:
|
||||
let elements = nodeStr.split(":")
|
||||
if elements.len != 2:
|
||||
error "Invalid mixnode format, expected multiaddr:mixPubKeyHex", node = nodeStr
|
||||
continue
|
||||
result.add(MixNodePubInfo(
|
||||
multiAddr: elements[0],
|
||||
pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
|
||||
))
|
||||
|
||||
proc waitForMixPool(client: WakuClient) {.async.} =
|
||||
while client.node.getMixNodePoolSize() < client.cfg.minMixPoolSize:
|
||||
info "Waiting for mix node pool",
|
||||
current = client.node.getMixNodePoolSize(),
|
||||
required = client.cfg.minMixPoolSize
|
||||
await sleepAsync(1000.milliseconds)
|
||||
client.mixReady = true
|
||||
notice "Mix node pool ready", poolSize = client.node.getMixNodePoolSize()
|
||||
|
||||
proc getMixPoolSize*(client: WakuClient): int =
|
||||
if client.cfg.mixEnabled:
|
||||
return client.node.getMixNodePoolSize()
|
||||
return 0
|
||||
|
||||
proc taskKeepAlive(client: WakuClient) {.async.} =
|
||||
while true:
|
||||
for peerInfo in client.staticPeerList:
|
||||
@ -157,7 +246,7 @@ proc taskKeepAlive(client: WakuClient) {.async.} =
|
||||
|
||||
# TODO: Use filter. Removing this stops relay from working so keeping for now
|
||||
let subscribeRes = await client.node.wakuFilterClient.subscribe(
|
||||
peerInfo, client.cfg.pubsubTopic, @[FilterContentTopic]
|
||||
peerInfo, client.cfg.pubsubTopic, @[FilterContentTopic, LibchatDeliveryAddress]
|
||||
)
|
||||
|
||||
if subscribeRes.isErr():
|
||||
@ -170,65 +259,6 @@ proc taskKeepAlive(client: WakuClient) {.async.} =
|
||||
|
||||
await sleepAsync(60.seconds) # Subscription maintenance interval
|
||||
|
||||
proc start*(client: WakuClient) {.async.} =
|
||||
setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)
|
||||
await client.node.mountFilter()
|
||||
await client.node.mountFilterClient()
|
||||
|
||||
await client.node.start()
|
||||
(await client.node.mountRelay()).isOkOr:
|
||||
error "failed to mount relay", error = error
|
||||
quit(1)
|
||||
|
||||
client.node.peerManager.start()
|
||||
|
||||
# Connect to all configured static peers
|
||||
if client.staticPeerList.len > 0:
|
||||
info "Connecting to static peers", peerCount = client.staticPeerList.len
|
||||
asyncSpawn client.node.connectToNodes(client.staticPeerList)
|
||||
else:
|
||||
warn "No valid static peers configured"
|
||||
|
||||
let subscription: SubscriptionEvent = (kind: PubsubSub, topic:
|
||||
client.cfg.pubsubTopic)
|
||||
|
||||
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
|
||||
let payloadStr = string.fromBytes(msg.payload)
|
||||
debug "message received",
|
||||
pubsubTopic = topic,
|
||||
contentTopic = msg.contentTopic
|
||||
|
||||
let payload = msg.toChatPayload(topic)
|
||||
|
||||
for queueRef in client.dispatchQueues:
|
||||
await queueRef.queue.put(payload)
|
||||
|
||||
let res = subscribe(client.node, subscription, handler)
|
||||
if res.isErr:
|
||||
error "Subscribe failed", err = res.error
|
||||
|
||||
await allFutures(taskKeepAlive(client))
|
||||
|
||||
proc initWakuClient*(cfg: WakuConfig): WakuClient =
|
||||
# Parse ENRs from static peers configuration
|
||||
var peerInfos: seq[RemotePeerInfo] = @[]
|
||||
for enrStr in cfg.staticPeers:
|
||||
let enrRecord = eth_enr.Record.fromURI(enrStr).valueOr:
|
||||
error "Failed to parse ENR in initWakuClient", enr = enrStr, err = error
|
||||
continue
|
||||
|
||||
let peerInfo = enrRecord.toRemotePeerInfo().valueOr:
|
||||
error "Failed to convert ENR to PeerInfo in initWakuClient", enr = enrStr, err = error
|
||||
continue
|
||||
|
||||
peerInfos.add(peerInfo)
|
||||
|
||||
result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), dispatchQueues: @[],
|
||||
staticPeerList: peerInfos)
|
||||
|
||||
proc addDispatchQueue*(client: var WakuClient, queue: QueueRef) =
|
||||
client.dispatchQueues.add(queue)
|
||||
|
||||
proc getConnectedPeerCount*(client: WakuClient): int =
|
||||
var count = 0
|
||||
for peerId, peerInfo in client.node.peerManager.switch.peerStore.peers:
|
||||
@ -236,5 +266,179 @@ proc getConnectedPeerCount*(client: WakuClient): int =
|
||||
inc count
|
||||
return count
|
||||
|
||||
proc start*(client: WakuClient) {.async.} =
|
||||
setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)
|
||||
await client.node.mountFilter()
|
||||
await client.node.mountFilterClient()
|
||||
|
||||
client.node.mountAutoSharding(client.cfg.clusterId, uint32(client.cfg.shardId.len)).isOkOr:
|
||||
error "failed to mount auto sharding", error = error
|
||||
|
||||
if client.cfg.mixEnabled:
|
||||
let (mixPrivKey, mixPubKey) = mix_curve25519.generateKeyPair().valueOr:
|
||||
error "Failed to generate mix key pair", error = error
|
||||
quit(QuitFailure)
|
||||
let mixNodeInfos = parseMixNodes(client.cfg.mixNodes)
|
||||
client.node.mountLightPushClient()
|
||||
(await client.node.mountMix(client.cfg.clusterId, mixPrivKey, mixNodeInfos,
|
||||
useOnchainLEZ = true)).isOkOr:
|
||||
error "Failed to mount mix protocol", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
# Wire LEZ callbacks BEFORE node.start() so spam protection can initialize
|
||||
if client.cfg.mixEnabled and not client.node.wakuMix.isNil:
|
||||
let gm = client.node.wakuMix.mixRlnSpamProtection.groupManager
|
||||
if gm of OnchainLEZGroupManager:
|
||||
let lezGm = OnchainLEZGroupManager(gm)
|
||||
let clientFetchRoots = mix_lez_client.makeFetchLatestRoots()
|
||||
let clientFetchProof = mix_lez_client.makeFetchMerkleProof()
|
||||
let fetchRoots: onchain_group_manager.FetchRootsCallback =
|
||||
proc(): Future[Result[seq[onchain_group_manager.MerkleNode], string]] {.async, gcsafe, raises: [].} =
|
||||
let res = await clientFetchRoots()
|
||||
if res.isOk:
|
||||
var nodes: seq[onchain_group_manager.MerkleNode]
|
||||
for r in res.get():
|
||||
nodes.add(onchain_group_manager.MerkleNode(r))
|
||||
return ok(nodes)
|
||||
else:
|
||||
return err(res.error)
|
||||
let fetchProof: onchain_group_manager.FetchProofCallback =
|
||||
proc(index: onchain_group_manager.MembershipIndex): Future[Result[onchain_group_manager.ExternalMerkleProof, string]] {.async, gcsafe, raises: [].} =
|
||||
let res = await clientFetchProof(mix_lez_client.MembershipIndex(index))
|
||||
if res.isOk:
|
||||
let p = res.get()
|
||||
return ok(onchain_group_manager.ExternalMerkleProof(
|
||||
pathElements: p.pathElements,
|
||||
identityPathIndex: p.identityPathIndex,
|
||||
root: onchain_group_manager.MerkleNode(p.root),
|
||||
))
|
||||
else:
|
||||
return err(res.error)
|
||||
lezGm.setFetchCallbacks(fetchRoots, fetchProof)
|
||||
mix_lez_client.setGroupManagerRef(cast[pointer](lezGm))
|
||||
info "Wired LEZ callbacks for mix RLN spam protection"
|
||||
|
||||
let (_, destId) = splitPeerIdAndAddr(client.cfg.destPeerAddr)
|
||||
client.destPeerId = PeerId.init(destId).valueOr:
|
||||
error "Failed to parse destination peer ID", error = error
|
||||
quit(QuitFailure)
|
||||
asyncSpawn client.waitForMixPool()
|
||||
|
||||
await client.node.start()
|
||||
|
||||
client.node.peerManager.start()
|
||||
|
||||
# Register filter push handler for incoming messages (no relay/gossipsub needed)
|
||||
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
debug "filter message received",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = msg.contentTopic
|
||||
|
||||
let payload = msg.toChatPayload(pubsubTopic)
|
||||
for queueRef in client.dispatchQueues:
|
||||
await queueRef.queue.put(payload)
|
||||
|
||||
client.node.wakuFilterClient.registerPushHandler(filterHandler)
|
||||
|
||||
# Connect to all configured static peers
|
||||
if client.staticPeerList.len > 0:
|
||||
info "Connecting to static peers", peerCount = client.staticPeerList.len
|
||||
await client.node.connectToNodes(client.staticPeerList)
|
||||
info "Connected to static peers"
|
||||
else:
|
||||
warn "No valid static peers configured"
|
||||
|
||||
if client.cfg.mixEnabled and client.cfg.gifterNodeAddr.len > 0 and
|
||||
not client.node.wakuMix.isNil:
|
||||
let gm = client.node.wakuMix.mixRlnSpamProtection.groupManager
|
||||
if gm of OnchainLEZGroupManager:
|
||||
let lezGm = OnchainLEZGroupManager(gm)
|
||||
let gifterClient = rln_gifter_client.WakuRlnGifterClient.new(
|
||||
client.node.peerManager, client.node.rng
|
||||
)
|
||||
let gifterPeer = parsePeerInfo(client.cfg.gifterNodeAddr).valueOr:
|
||||
error "Failed to parse gifter peer", error = error
|
||||
quit(QuitFailure)
|
||||
client.node.peerManager.addServicePeer(gifterPeer, WakuRlnGifterCodec)
|
||||
|
||||
let idCred =
|
||||
if lezGm.credentials.isSome:
|
||||
lezGm.credentials.get()
|
||||
else:
|
||||
mix_rln_interface.membershipKeyGen().valueOr:
|
||||
error "Failed to generate RLN identity", error = $error
|
||||
quit(QuitFailure)
|
||||
var idCommitmentHex = ""
|
||||
for b in idCred.idCommitment:
|
||||
idCommitmentHex.add(toHex(int(b), 2))
|
||||
|
||||
info "Registering via RLN gifter",
|
||||
gifterPeer = client.cfg.gifterNodeAddr,
|
||||
idCommitment = idCommitmentHex[0 .. 15] & "..."
|
||||
|
||||
let regRes = await gifterClient.requestMembership(
|
||||
idCommitmentHex, uint64(lezGm.userMessageLimit), gifterPeer
|
||||
)
|
||||
if regRes.isErr:
|
||||
error "Failed to register via gifter", error = regRes.error
|
||||
quit(QuitFailure)
|
||||
let regResult = regRes.get()
|
||||
|
||||
lezGm.credentials = some(idCred)
|
||||
lezGm.membershipIndex = some(onchain_group_manager.MembershipIndex(regResult.leafIndex))
|
||||
mix_lez_client.setRlnConfig(regResult.configAccountId, regResult.leafIndex.int)
|
||||
|
||||
info "Registered via RLN gifter",
|
||||
leafIndex = regResult.leafIndex,
|
||||
configAccount = regResult.configAccountId
|
||||
|
||||
asyncSpawn taskKeepAlive(client)
|
||||
|
||||
if client.cfg.mixEnabled and not client.node.wakuMix.isNil:
|
||||
let gm = client.node.wakuMix.mixRlnSpamProtection.groupManager
|
||||
if gm of OnchainLEZGroupManager:
|
||||
OnchainLEZGroupManager(gm).startPolling()
|
||||
|
||||
info "Waku client started",
|
||||
relayMounted = not client.node.wakuRelay.isNil,
|
||||
mixMounted = not client.node.wakuMix.isNil,
|
||||
connectedPeers = client.getConnectedPeerCount()
|
||||
|
||||
# Debug: periodically log relay/mesh status
|
||||
proc meshStatusTask(client: WakuClient) {.async.} =
|
||||
while true:
|
||||
await sleepAsync(15.seconds)
|
||||
let connPeers = client.getConnectedPeerCount()
|
||||
info "Peer status",
|
||||
pubsubTopic = client.cfg.pubsubTopic,
|
||||
connectedPeers = connPeers,
|
||||
mixReady = client.mixReady,
|
||||
mixPoolSize = (if client.cfg.mixEnabled and not client.node.wakuMix.isNil: client.node.getMixNodePoolSize() else: 0)
|
||||
|
||||
asyncSpawn meshStatusTask(client)
|
||||
|
||||
proc initWakuClient*(cfg: WakuConfig): WakuClient =
|
||||
var peerInfos: seq[RemotePeerInfo] = @[]
|
||||
for peerStr in cfg.staticPeers:
|
||||
if peerStr.startsWith("/"):
|
||||
let peerInfo = parsePeerInfo(peerStr).valueOr:
|
||||
error "Failed to parse multiaddr peer", peer = peerStr, err = error
|
||||
continue
|
||||
peerInfos.add(peerInfo)
|
||||
else:
|
||||
let enrRecord = eth_enr.Record.fromURI(peerStr).valueOr:
|
||||
error "Failed to parse ENR", enr = peerStr, err = error
|
||||
continue
|
||||
let peerInfo = enrRecord.toRemotePeerInfo().valueOr:
|
||||
error "Failed to convert ENR to PeerInfo", enr = peerStr, err = error
|
||||
continue
|
||||
peerInfos.add(peerInfo)
|
||||
|
||||
result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), dispatchQueues: @[],
|
||||
staticPeerList: peerInfos)
|
||||
|
||||
proc addDispatchQueue*(client: var WakuClient, queue: QueueRef) =
|
||||
client.dispatchQueues.add(queue)
|
||||
|
||||
proc stop*(client: WakuClient) {.async.} =
|
||||
await client.node.stop()
|
||||
|
||||
1
vendor/logos-lez-rln
vendored
Submodule
1
vendor/logos-lez-rln
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit ad17d9026b5bc69c6c42d7617fb08b18ae135f42
|
||||
2
vendor/nwaku
vendored
2
vendor/nwaku
vendored
@ -1 +1 @@
|
||||
Subproject commit 41146a9193c1e360b9a0049d672260a72c4ca2bf
|
||||
Subproject commit 48065f7d703568dbc458abf9432a63adc2e80b33
|
||||
Loading…
x
Reference in New Issue
Block a user