diff --git a/config.nims b/config.nims index 27e92da6b..9a99adf06 100644 --- a/config.nims +++ b/config.nims @@ -5,6 +5,9 @@ if defined(release): else: switch("nimcache", "nimcache/debug/$projectName") +# Add mix-rln-spam-protection-plugin to search path (all platforms) +switch("path", "./vendor/mix-rln-spam-protection-plugin/src") + if defined(windows): switch("passL", "rln.lib") switch("define", "postgres=false") diff --git a/flake.lock b/flake.lock index 225bfc3ed..b856db23c 100644 --- a/flake.lock +++ b/flake.lock @@ -20,7 +20,8 @@ "inputs": { "nixpkgs": "nixpkgs", "rust-overlay": "rust-overlay", - "zerokit": "zerokit" + "zerokit": "zerokit", + "zerokitMix": "zerokitMix" } }, "rust-overlay": { @@ -64,6 +65,27 @@ "type": "github" } }, + "rust-overlay_3": { + "inputs": { + "nixpkgs": [ + "zerokitMix", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1771297684, + "narHash": "sha256-wieWskQxZLPlNXX06JEB0bMoS/ZYQ89xBzF0RL9lyLs=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "755d3669699a7c62aef35af187d75dc2728cfd85", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + }, "zerokit": { "inputs": { "nixpkgs": [ @@ -85,6 +107,28 @@ "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", "type": "github" } + }, + "zerokitMix": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ], + "rust-overlay": "rust-overlay_3" + }, + "locked": { + "lastModified": 1773917887, + "narHash": "sha256-5a2cL26uw7NdLCD0gCA3tS7uX8W9yxRGqcPhWNevstM=", + "ref": "refs/tags/v2.0.0", + "rev": "0ef1364c86c9c2c441284389c46b0b461feab893", + "revCount": 371, + "type": "git", + "url": "https://github.com/vacp2p/zerokit" + }, + "original": { + "ref": "refs/tags/v2.0.0", + "type": "git", + "url": "https://github.com/vacp2p/zerokit" + } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 5b76fd9ac..f71c356c2 100644 --- a/flake.nix +++ b/flake.nix @@ -24,9 +24,15 @@ url = "github:vacp2p/zerokit/5e64cb8822bee65eed6cf459f95ae72b80c6ba63"; inputs.nixpkgs.follows = "nixpkgs"; }; + # Mix RLN spam protection requires a separate, newer zerokit (v2.0.0). + # Pinned by tag to match what scripts/build_rln_mix.sh uses by default. + zerokitMix = { + url = "git+https://github.com/vacp2p/zerokit?ref=refs/tags/v2.0.0"; + inputs.nixpkgs.follows = "nixpkgs"; + }; }; - outputs = { self, nixpkgs, rust-overlay, zerokit }: + outputs = { self, nixpkgs, rust-overlay, zerokit, zerokitMix }: let systems = [ "x86_64-linux" "aarch64-linux" @@ -71,13 +77,26 @@ let pkgs = pkgsFor system; - zerokitRln = import ./nix/zerokit.nix { inherit zerokit system; }; + zerokitRln = import ./nix/zerokit.nix { + inherit zerokit system; + vendorHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU="; + }; + zerokitMixRln = import ./nix/zerokit.nix { + zerokit = zerokitMix; + inherit system; + vendorHash = "sha256-SoMl0QBBgTG1b4UhOlErlzWmg3J6G0xOC0tNDddOptA="; + }; liblogosdelivery = pkgs.callPackage ./nix/default.nix { inherit pkgs; src = ./.; - inherit zerokitRln; + inherit zerokitRln zerokitMixRln; gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}"; + # The .lgx wrapper does not put its variant dir on dyld's search + # path, so nim's runtime dlopen("libpq.dylib") from db_postgres + # fails even though libpq.dylib ships in the bundle. Disable + # postgres so the dylib has no runtime libpq dependency. + enablePostgres = false; }; wakucanary = pkgs.callPackage ./nix/default.nix { diff --git a/liblogosdelivery/declare_lib.nim b/liblogosdelivery/declare_lib.nim index 77e992eea..26ad5fd9a 100644 --- a/liblogosdelivery/declare_lib.nim +++ b/liblogosdelivery/declare_lib.nim @@ -1,6 +1,7 @@ import ffi import std/locks import logos_delivery/waku/factory/waku +import logos_delivery/waku/waku_mix/logos_core_client as mix_rln_client declareLibrary("logosdelivery") @@ -31,3 +32,47 @@ proc logosdelivery_set_event_callback( ctx[].eventCallback = cast[pointer](callback) ctx[].eventUserData = userData + +proc logosdelivery_init(): cint {.dynlib, exportc, cdecl.} = + initializeLibrary() + # Default log level is configured at compile time via chronicles_log_level; + # the runtime setLogLevel resolution conflicts with std/termios's two-arg + # template under ff8d518 — leave the compile-time default in effect. + return RET_OK + +proc logosdelivery_set_rln_fetcher( + ctx: ptr FFIContext[Waku], fetcher: mix_rln_client.RlnFetcherFunc, fetcherData: pointer +) {.dynlib, exportc, cdecl.} = + if fetcher.isNil: + echo "error: nil fetcher in logosdelivery_set_rln_fetcher" + return + mix_rln_client.setRlnFetcher(fetcher, fetcherData) + +proc logosdelivery_set_rln_config( + ctx: ptr FFIContext[Waku], configAccountId: cstring, leafIndex: cint +): cint {.dynlib, exportc, cdecl.} = + if configAccountId.isNil: + return RET_ERR + mix_rln_client.setRlnConfig($configAccountId, leafIndex.int) + return RET_OK + +proc logosdelivery_set_rln_identity( + ctx: ptr FFIContext[Waku], idSecretHashHex: cstring +) {.dynlib, exportc, cdecl.} = + if idSecretHashHex.isNil: + return + mix_rln_client.setRlnIdentity($idSecretHashHex) + +proc logosdelivery_push_roots( + ctx: ptr FFIContext[Waku], rootsJson: cstring +) {.dynlib, exportc, cdecl.} = + if rootsJson.isNil: + return + mix_rln_client.pushRoots($rootsJson) + +proc logosdelivery_push_proof( + ctx: ptr FFIContext[Waku], proofJson: cstring +) {.dynlib, exportc, cdecl.} = + if proofJson.isNil: + return + mix_rln_client.pushProof($proofJson) diff --git a/liblogosdelivery/liblogosdelivery.h b/liblogosdelivery/liblogosdelivery.h index 5092db9f2..ff446285b 100644 --- a/liblogosdelivery/liblogosdelivery.h +++ b/liblogosdelivery/liblogosdelivery.h @@ -93,6 +93,18 @@ extern "C" FFICallBack callback, void *userData); + int logosdelivery_init(void); + + // RLN fetcher: C++ implements this, Nim calls it to get roots/proofs from RLN module. + typedef int (*RlnFetcherFunc)(const char *method, const char *params, + FFICallBack callback, void *callbackData, void *fetcherData); + + void logosdelivery_set_rln_fetcher(void *ctx, RlnFetcherFunc fetcher, void *fetcherData); + int logosdelivery_set_rln_config(void *ctx, const char *configAccountId, int leafIndex); + void logosdelivery_set_rln_identity(void *ctx, const char *idSecretHashHex); + void logosdelivery_push_roots(void *ctx, const char *rootsJson); + void logosdelivery_push_proof(void *ctx, const char *proofJson); + #ifdef __cplusplus } #endif diff --git a/liblogosdelivery/nim.cfg b/liblogosdelivery/nim.cfg index 3fd5adb32..501719997 100644 --- a/liblogosdelivery/nim.cfg +++ b/liblogosdelivery/nim.cfg @@ -11,6 +11,8 @@ "../vendor/nim-ffi" --path: "../" +--path: + "../vendor/mix-rln-spam-protection-plugin/src" # Optimization and debugging --opt: diff --git a/logos_delivery/waku/factory/conf_builder/mix_conf_builder.nim b/logos_delivery/waku/factory/conf_builder/mix_conf_builder.nim index 1daba2fb4..7061ca6db 100644 --- a/logos_delivery/waku/factory/conf_builder/mix_conf_builder.nim +++ b/logos_delivery/waku/factory/conf_builder/mix_conf_builder.nim @@ -14,6 +14,12 @@ type MixConfBuilder* = object mixNodes: seq[MixNodePubInfo] userMessageLimit: Option[int] disableSpamProtection: bool + useOnchainLEZ: bool + gifterService: bool + gifterWalletAccount: string + gifterNode: string + gifterAllowlist: string + gifterAuthKey: string proc init*(T: type MixConfBuilder): MixConfBuilder = MixConfBuilder() @@ -33,6 +39,24 @@ proc withUserMessageLimit*(b: var MixConfBuilder, limit: int) = proc withDisableSpamProtection*(b: var MixConfBuilder, disable: bool) = b.disableSpamProtection = disable +proc withUseOnchainLEZ*(b: var MixConfBuilder, use: bool) = + b.useOnchainLEZ = use + +proc withGifterService*(b: var MixConfBuilder, enabled: bool) = + b.gifterService = enabled + +proc withGifterWalletAccount*(b: var MixConfBuilder, account: string) = + b.gifterWalletAccount = account + +proc withGifterNode*(b: var MixConfBuilder, node: string) = + b.gifterNode = node + +proc withGifterAllowlist*(b: var MixConfBuilder, allowlist: string) = + b.gifterAllowlist = allowlist + +proc withGifterAuthKey*(b: var MixConfBuilder, authKey: string) = + b.gifterAuthKey = authKey + proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = if not b.enabled.get(false): return ok(none[MixConf]()) @@ -48,6 +72,12 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = mixNodes: b.mixNodes, userMessageLimit: b.userMessageLimit, disableSpamProtection: b.disableSpamProtection, + useOnchainLEZ: b.useOnchainLEZ, + gifterService: b.gifterService, + gifterWalletAccount: b.gifterWalletAccount, + gifterNode: b.gifterNode, + gifterAllowlist: b.gifterAllowlist, + gifterAuthKey: b.gifterAuthKey, ) ) ) @@ -62,6 +92,12 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = mixNodes: b.mixNodes, userMessageLimit: b.userMessageLimit, disableSpamProtection: b.disableSpamProtection, + useOnchainLEZ: b.useOnchainLEZ, + gifterService: b.gifterService, + gifterWalletAccount: b.gifterWalletAccount, + gifterNode: b.gifterNode, + gifterAllowlist: b.gifterAllowlist, + gifterAuthKey: b.gifterAuthKey, ) ) ) diff --git a/logos_delivery/waku/factory/node_factory.nim b/logos_delivery/waku/factory/node_factory.nim index d0a0af5f8..703d13a8d 100644 --- a/logos_delivery/waku/factory/node_factory.nim +++ b/logos_delivery/waku/factory/node_factory.nim @@ -1,5 +1,6 @@ import - std/[options, sequtils], + std/[options, os, sequtils, json, strutils, sets], + eth/common/[addresses, keys], chronicles, chronos, libp2p/peerid, @@ -23,6 +24,12 @@ import ../waku_core, ../waku_core/codecs, ../waku_rln_relay, + ../waku_mix/logos_core_client as mix_lez_client, + ../waku_mix/protocol as mix_protocol, + mix_rln_spam_protection/onchain_group_manager, + mix_rln_spam_protection/rln_interface as mix_rln_interface, + ../waku_rln_relay/rln_gifter/protocol as rln_gifter_protocol, + ../waku_rln_relay/rln_gifter/client as rln_gifter_client, ../discovery/waku_dnsdisc, ../waku_archive/retention_policy as policy, ../waku_archive/retention_policy/builder as policy_builder, @@ -169,10 +176,235 @@ proc setupProtocols( await node.mountMix( conf.clusterId, mixConf.mixKey, mixConf.mixnodes, mixConf.userMessageLimit, mixConf.disableSpamProtection, + useOnchainLEZ = mixConf.useOnchainLEZ, ) ).isOkOr: return err("failed to mount waku mix protocol: " & $error) + # Wire OnchainLEZGroupManager to the LEZ RLN module via the fetcher + # callback bridge (setRlnConfig is called later by the C++ plugin). + if mixConf.useOnchainLEZ and not node.wakuMix.isNil: + let gm = node.wakuMix.mixRlnSpamProtection.groupManager + if gm of OnchainLEZGroupManager: + let lezGm = OnchainLEZGroupManager(gm) + # Adapt logos_core_client callbacks to OnchainLEZGroupManager types + let clientFetchRoots = mix_lez_client.makeFetchLatestRoots() + let clientFetchProof = mix_lez_client.makeFetchMerkleProof() + + let fetchRoots: onchain_group_manager.FetchRootsCallback = clientFetchRoots + let fetchProof: onchain_group_manager.FetchProofCallback = clientFetchProof + lezGm.setFetchCallbacks(fetchRoots, fetchProof) + mix_lez_client.setGroupManagerRef(lezGm) + info "Wired LEZ callbacks for mix RLN spam protection" + + # Mount RLN gifter server if configured + if mixConf.gifterService: + let walletAccount = mixConf.gifterWalletAccount + + # All gifter-initiated registrations funnel through a single + # serialization worker so the gifter's wallet never has two + # unconfirmed txns outstanding from the same signer — concurrent + # submissions otherwise silently fail at the sequencer with + # "Nonce mismatch" because the wallet refetches the chain nonce + # per call and several submissions read the same value before + # any commits. See cleanup/MODE_A_GIFTER_SLOT_BUG.md. + # + # The libp2p handler enqueues a job with an optimistic + # leaf_index taken from a local counter, returns immediately + # (so the stream doesn't time out), and the worker drains the + # queue one tx at a time. Clients reconcile the authoritative + # leaf via the existing status RPC + watcher. + type GifterJob = ref object + identityCommitment: seq[byte] + rateLimit: uint64 + assigned: uint64 + + let gifterQueue = newAsyncQueue[GifterJob]() + var gifterNextLeaf: uint64 = 0 + + proc gifterSubmitOnce( + idc: seq[byte], rateLimit: uint64 + ): Result[uint64, string] {.gcsafe.} = + let (configAccount, _) = mix_lez_client.getRlnConfig() + if configAccount.len == 0: + return err("RLN config not set on gifter node") + let holdingAccount = + if walletAccount.len > 0: walletAccount else: configAccount + var idCommitmentHex = newStringOfCap(idc.len * 2) + for b in idc: + idCommitmentHex.add(toHex(int(b), 2)) + let params = + "{\"configAccountId\":\"" & configAccount & + "\",\"userHoldingAccountId\":\"" & holdingAccount & + "\",\"idCommitment\":\"" & idCommitmentHex & + "\",\"rateLimit\":" & $rateLimit & "}" + let regResult = + mix_lez_client.callRlnFetcher("register_member", params) + if regResult.isErr: + return err(regResult.error) + try: + let parsed = parseJson(regResult.get()) + if parsed.hasKey("error"): + return err(parsed["error"].getStr("register_member failed")) + return ok(parsed["leaf_index"].getInt().uint64) + except CatchableError as e: + return err("failed to parse register_member result: " & e.msg) + + proc waitForChainCommit( + idc: seq[byte], deadlineMs: int + ): Future[Result[uint64, string]] {.async, gcsafe.} = + ## Poll is_member_registered until the just-submitted tx + ## commits. Without this gate, the worker's next iteration + ## would race the wallet nonce-refetch against the previous + ## tx's block inclusion and submit two txns with the same + ## nonce. + const pollMs = 2_000 + let (configAccount, _) = mix_lez_client.getRlnConfig() + if configAccount.len == 0: + return err("RLN config not set") + var idHex = newStringOfCap(idc.len * 2) + for b in idc: + idHex.add(toHex(int(b), 2)) + let params = + "{\"configAccountId\":\"" & configAccount & + "\",\"idCommitment\":\"" & idHex & "\"}" + let deadline = Moment.now() + chronos.milliseconds(deadlineMs) + while Moment.now() < deadline: + await sleepAsync(chronos.milliseconds(pollMs)) + let raw = mix_lez_client.callRlnFetcher( + "is_member_registered", params) + if raw.isErr: continue + try: + let parsed = parseJson(raw.get()) + if parsed.hasKey("registered") and + parsed["registered"].getBool() and + parsed.hasKey("leaf_index"): + return ok(parsed["leaf_index"].getInt().uint64) + except CatchableError: + continue + return err("confirmation timeout") + + proc gifterWorker() {.async, gcsafe.} = + # Confirmation deadline must cover one block-include cycle plus + # propagation lag on the slowest chain we ship against. Testnet + # blocks ~60-90s + finality lag → 5min headroom. + const confirmDeadlineMs = 300_000 + while true: + let job = await gifterQueue.popFirst() + let res = gifterSubmitOnce(job.identityCommitment, job.rateLimit) + if res.isErr: + error "Gifter worker: submission failed", + optimistic = job.assigned, err = res.error + continue + # Wait until the tx commits before processing the next job + # so the wallet's next nonce fetch sees the advanced value. + let confRes = + await waitForChainCommit(job.identityCommitment, confirmDeadlineMs) + if confRes.isErr: + warn "Gifter worker: tx did not confirm within deadline", + optimistic = job.assigned, err = confRes.error + continue + let actual = confRes.get() + if actual >= gifterNextLeaf: + gifterNextLeaf = actual + 1 + if actual != job.assigned: + info "Gifter worker: chain leaf differs from optimistic ack", + optimistic = job.assigned, actual = actual + else: + debug "Gifter worker: submission accepted at expected leaf", + leaf = actual + + let registerHandler: rln_gifter_protocol.RegisterMemberHandler = + proc( + identityCommitment: seq[byte], rateLimit: uint64 + ): Future[Result[rln_gifter_protocol.MembershipAllocationSuccess, string]] {.async, gcsafe.} = + let (configAccount, _) = mix_lez_client.getRlnConfig() + if configAccount.len == 0: + return err("RLN config not set on gifter node") + let assigned = gifterNextLeaf + gifterNextLeaf += 1 + let job = GifterJob( + identityCommitment: identityCommitment, + rateLimit: rateLimit, + assigned: assigned, + ) + try: + await gifterQueue.addLast(job) + except CatchableError as e: + return err("failed to enqueue gifter job: " & e.msg) + return ok(rln_gifter_protocol.MembershipAllocationSuccess( + leafIndex: assigned, + merkleRoot: @[], + blockNumber: 0'u64, + transactionHash: @[], + configAccountId: some(configAccount), + )) + + var auth = none(rln_gifter_protocol.EthAllowlistAuth) + if mixConf.gifterAllowlist.len > 0: + var addrs: HashSet[Address] + for piece in mixConf.gifterAllowlist.split(','): + let s = piece.strip() + if s.len == 0: + continue + let parsedAddr = + try: + Address.fromHex(s) + except ValueError as e: + return err("invalid gifter allowlist address '" & s & "': " & e.msg) + addrs.incl(parsedAddr) + if addrs.len > 0: + auth = some(rln_gifter_protocol.EthAllowlistAuth( + addresses: addrs, consumed: initHashSet[Address]() + )) + info "RLN gifter allowlist auth enabled", count = addrs.len + + let statusHandler: rln_gifter_protocol.MembershipStatusHandler = + proc( + configAccountId: string, identityCommitment: seq[byte] + ): Future[Result[rln_gifter_protocol.MembershipStatusResponse, string]] + {.async, gcsafe.} = + var idHex = newStringOfCap(identityCommitment.len * 2) + for b in identityCommitment: + idHex.add(toHex(int(b), 2)) + let params = + "{\"configAccountId\":\"" & configAccountId & + "\",\"idCommitment\":\"" & idHex & "\"}" + let raw = mix_lez_client.callRlnFetcher( + "is_member_registered", params) + if raw.isErr: + return err(raw.error) + try: + let parsed = parseJson(raw.get()) + var resp = rln_gifter_protocol.MembershipStatusResponse( + registered: false) + if parsed.hasKey("registered") and + parsed["registered"].getBool(): + resp.registered = true + if parsed.hasKey("leaf_index"): + resp.leafIndex = some(parsed["leaf_index"].getInt().uint64) + return ok(resp) + except CatchableError as e: + return err("failed to parse is_member_registered: " & e.msg) + + let gifter = rln_gifter_protocol.WakuRlnGifter.new( + node.peerManager, node.rng, registerHandler, auth, statusHandler + ) + node.switch.mount(gifter, protocolMatcher(WakuRlnGifterCodec)) + node.wakuRlnGifter = gifter + let gifterStatus = rln_gifter_protocol.WakuRlnGifterStatus.new( + statusHandler + ) + node.switch.mount( + gifterStatus, protocolMatcher(WakuRlnGifterStatusCodec)) + asyncSpawn gifterWorker() + info "RLN gifter service mounted for mix", + statusCodec = WakuRlnGifterStatusCodec + + # Defer client registration to startNode (needs running switch). + if mixConf.gifterNode.len > 0: + info "Gifter client mode: registration deferred to startNode()" + # Setup extended kademlia discovery if conf.kademliaDiscoveryConf.isSome(): let mixPubKey = @@ -425,6 +657,13 @@ proc startNode*( except CatchableError: return err("failed to start waku node: " & getCurrentExceptionMsg()) + # Start deferred OnchainLEZ poll loop now that the switch is fully started. + if conf.mixConf.isSome() and conf.mixConf.get().useOnchainLEZ and + not node.wakuMix.isNil(): + let gm = node.wakuMix.mixRlnSpamProtection.groupManager + if gm of OnchainLEZGroupManager: + OnchainLEZGroupManager(gm).startPolling() + # Connect to configured static nodes if conf.staticNodes.len > 0: try: @@ -440,6 +679,177 @@ proc startNode*( return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) + # The gifter is also a mix relay and needs its own membership to forward. + # Deferred to startNode so the wallet RPC subprocess is wired first. + if conf.mixConf.isSome() and conf.mixConf.get().useOnchainLEZ and + conf.mixConf.get().gifterService and not node.wakuMix.isNil() and + not node.wakuRlnGifter.isNil(): + let gm = node.wakuMix.mixRlnSpamProtection.groupManager + if gm of OnchainLEZGroupManager: + let lezGm = OnchainLEZGroupManager(gm) + if lezGm.membershipIndex.isNone: + let selfCred = + if lezGm.credentials.isSome: + lezGm.credentials.get() + else: + mix_rln_interface.membershipKeyGen().valueOr: + return err("failed to generate gifter RLN identity: " & $error) + let gifter = node.wakuRlnGifter + asyncSpawn (proc(): Future[void] {.async.} = + # Handler reads configAccount, which is set by setRlnConfig. + while true: + let (cfg, _) = mix_lez_client.getRlnConfig() + if cfg.len > 0: break + await sleepAsync(500.milliseconds) + info "Self-registering gifter as mix relay", + identityCommitmentLen = selfCred.idCommitment.len + try: + let selfRes = await gifter.registerHandler( + @(selfCred.idCommitment), uint64(lezGm.userMessageLimit) + ) + if selfRes.isErr: + warn "Gifter self-registration failed", error = selfRes.error + return + let success = selfRes.get() + if success.configAccountId.isNone: + warn "Gifter self-registration response missing configAccountId" + return + let configAccount = success.configAccountId.get() + lezGm.credentials = some(selfCred) + lezGm.membershipIndex = + some(onchain_group_manager.MembershipIndex(success.leafIndex)) + mix_lez_client.setRlnConfig(configAccount, success.leafIndex.int) + info "Gifter self-registered as mix relay", + leafIndex = success.leafIndex, + configAccount = configAccount + # The gifter's own handler now returns an optimistic leaf + # (queued submission); poll the status RPC in-process to + # reconcile if the chain assigned a different leaf. + let watcherLezGm = lezGm + let watcherConfigAccount = configAccount + let watcherIdc = @(selfCred.idCommitment) + let optimisticLeaf = success.leafIndex + asyncSpawn (proc(): Future[void] {.async.} = + const selfPollMs = 5_000 + const selfDeadlineMs = 1_800_000 + let deadline = Moment.now() + + chronos.milliseconds(selfDeadlineMs) + while Moment.now() < deadline: + await sleepAsync(chronos.milliseconds(selfPollMs)) + let qr = + try: + await gifter.statusHandler( + watcherConfigAccount, watcherIdc) + except CatchableError: + continue + if qr.isErr: continue + let resp = qr.get() + if not resp.registered: continue + if resp.leafIndex.isNone: continue + let authLeaf = resp.leafIndex.get() + if some(onchain_group_manager.MembershipIndex(authLeaf)) != + watcherLezGm.membershipIndex: + info "Gifter self-reg leaf corrected from optimistic", + optimistic = optimisticLeaf, authoritative = authLeaf + watcherLezGm.membershipIndex = + some(onchain_group_manager.MembershipIndex(authLeaf)) + mix_lez_client.setRlnConfig( + watcherConfigAccount, authLeaf.int) + else: + info "Gifter self-reg confirmed on-chain", + leafIndex = authLeaf + watcherLezGm.markMembershipConfirmed() + return + warn "Gifter self-reg confirmation timed out", + optimisticLeaf = optimisticLeaf + )() + except CatchableError as e: + warn "Gifter self-registration exception", error = e.msg + )() + + # RLN gifter client registration — runs after switch start so the gifter peer is reachable. + if conf.mixConf.isSome() and conf.mixConf.get().useOnchainLEZ and + conf.mixConf.get().gifterNode.len > 0 and not node.wakuMix.isNil(): + let mixConf = conf.mixConf.get() + let gm = node.wakuMix.mixRlnSpamProtection.groupManager + if gm of OnchainLEZGroupManager: + let lezGm = OnchainLEZGroupManager(gm) + let gifterClient = rln_gifter_client.WakuRlnGifterClient.new( + node.peerManager, node.rng + ) + let gifterPeer = parsePeerInfo(mixConf.gifterNode).valueOr: + return err("failed to parse gifter peer: " & error) + node.peerManager.addServicePeer(gifterPeer, WakuRlnGifterCodec) + + # Use keystore credentials if available, otherwise generate new ones + let idCred = + if lezGm.credentials.isSome: + lezGm.credentials.get() + else: + mix_rln_interface.membershipKeyGen().valueOr: + return err("failed to generate RLN identity: " & $error) + let idCommitmentBytes = @(idCred.idCommitment) + + info "Registering via RLN gifter", + gifterPeer = mixConf.gifterNode, + identityCommitmentLen = idCommitmentBytes.len, + fromKeystore = lezGm.credentials.isSome + + var authType: seq[byte] + var authPayload: seq[byte] + if mixConf.gifterAuthKey.len > 0: + let seckey = PrivateKey.fromHex(mixConf.gifterAuthKey).valueOr: + return err("invalid mix-gifter-auth-key: " & $error) + let sig = seckey.sign(rln_gifter_protocol.eip191Message(idCommitmentBytes)) + authPayload = @(sig.toRaw()) + for c in rln_gifter_protocol.EthAllowlistAuthType: + authType.add(byte(c)) + info "Signing gifter request with EIP-191 auth key", + signer = seckey.toPublicKey().to(Address).to0xHex() + + var success: rln_gifter_protocol.MembershipAllocationSuccess + try: + let res = await gifterClient.requestMembership( + idCommitmentBytes, + some(uint64(lezGm.userMessageLimit)), + gifterPeer, + authType, + authPayload, + ) + if res.isErr: + return err("failed to register via gifter: " & res.error) + success = res.get() + except CatchableError: + return err("gifter registration exception: " & getCurrentExceptionMsg()) + + let configAccountId = success.configAccountId.valueOr: + return err("gifter response missing configAccountId extension") + + lezGm.credentials = some(idCred) + lezGm.membershipIndex = some(onchain_group_manager.MembershipIndex(success.leafIndex)) + mix_lez_client.setRlnConfig(configAccountId, success.leafIndex.int) + + info "Registered via RLN gifter", + leafIndex = success.leafIndex, + configAccount = configAccountId + + # Correct the optimistic leaf via the status codec if a concurrent + # registration tx beat ours to the slot. Self-verify drops bad proofs + # until the poll loop picks up the corrected witness. + let watcherLezGm = lezGm + let watcherConfigAccount = configAccountId + asyncSpawn gifterClient.watchMembershipConfirmation( + gifterPeer, configAccountId, idCommitmentBytes, success.leafIndex, + "Mix-node", + proc(authLeaf: uint64) {.gcsafe, raises: [].} = + if some(onchain_group_manager.MembershipIndex(authLeaf)) != + watcherLezGm.membershipIndex: + watcherLezGm.membershipIndex = + some(onchain_group_manager.MembershipIndex(authLeaf)) + mix_lez_client.setRlnConfig(watcherConfigAccount, authLeaf.int) + watcherLezGm.markMembershipConfirmed(), + ) + # retrieve px peers and add the to the peer store if conf.remotePeerExchangeNode.isSome(): var desiredOutDegree = DefaultPXNumPeersReq @@ -464,6 +874,16 @@ proc startNode*( (await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr: return err("failed to start kademlia discovery: " & error) + # Re-publish gossipsub trigger after switch + kademlia are up. The dummy + # publish in WakuMix.start() fires too early in LEZ mode (0 peers on topic), + # so SUBSCRIBE messages never propagate without this second publish. + if conf.mixConf.isSome() and conf.mixConf.get().useOnchainLEZ and + not node.wakuMix.isNil(): + try: + await node.wakuMix.publishGossipsubTrigger() + except CatchableError: + warn "gossipsub trigger publish failed", error = getCurrentExceptionMsg() + return ok() proc setupNode*( diff --git a/logos_delivery/waku/factory/waku_conf.nim b/logos_delivery/waku/factory/waku_conf.nim index d20dc263c..1274302ed 100644 --- a/logos_delivery/waku/factory/waku_conf.nim +++ b/logos_delivery/waku/factory/waku_conf.nim @@ -53,6 +53,12 @@ type MixConf* = ref object mixnodes*: seq[MixNodePubInfo] userMessageLimit*: Option[int] disableSpamProtection*: bool + useOnchainLEZ*: bool + gifterService*: bool + gifterWalletAccount*: string + gifterNode*: string + gifterAllowlist*: string + gifterAuthKey*: string type KademliaDiscoveryConf* = object bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] diff --git a/logos_delivery/waku/node/subscription_manager.nim b/logos_delivery/waku/node/subscription_manager.nim index e5995f572..5a15a50c8 100644 --- a/logos_delivery/waku/node/subscription_manager.nim +++ b/logos_delivery/waku/node/subscription_manager.nim @@ -15,6 +15,7 @@ import waku_filter_v2/common as filter_common, waku_filter_v2/client as filter_client, waku_filter_v2/protocol as filter_protocol, + waku_mix/protocol as mix_protocol, events/health_events, events/message_events, events/peer_events, diff --git a/logos_delivery/waku/node/waku_node.nim b/logos_delivery/waku/node/waku_node.nim index 43ffb6ac9..0c3fdf0a4 100644 --- a/logos_delivery/waku/node/waku_node.nim +++ b/logos_delivery/waku/node/waku_node.nim @@ -41,6 +41,7 @@ import waku_store_sync, waku_filter_v2, waku_filter_v2/client as filter_client, + waku_filter_v2/common as filter_common, waku_metadata, waku_rendezvous/protocol, waku_rendezvous/client as rendezvous_client, @@ -52,6 +53,7 @@ import waku_enr, waku_peer_exchange, waku_rln_relay, + waku_rln_relay/rln_gifter/protocol as rln_gifter_protocol, common/option_shims, common/rate_limit/setting, common/callbacks, @@ -134,6 +136,12 @@ type ## Kernel API Relay appHandlers (if any) subscriptionManager*: SubscriptionManager wakuMix*: WakuMix + wakuRlnGifter*: rln_gifter_protocol.WakuRlnGifter + ## Set when gifterService=true. Exposed so startNode can invoke its + ## registerHandler for the gifter's self-registration. + edgeTopicsHealth*: Table[PubsubTopic, TopicHealth] + edgeHealthEvent*: AsyncEvent + edgeHealthLoop: Future[void] kademliaDiscoveryLoop*: Future[void] wakuKademlia*: WakuKademlia ports*: BoundPorts @@ -328,6 +336,7 @@ proc mountMix*( mixnodes: seq[MixNodePubInfo], userMessageLimit: Option[int] = none(int), disableSpamProtection: bool = false, + useOnchainLEZ: bool = false, ): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used @@ -360,7 +369,7 @@ proc mountMix*( node.wakuMix = WakuMix.new( localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes, publishMessage, - userMessageLimit, disableSpamProtection, + userMessageLimit, disableSpamProtection, useOnchainLEZ, ).valueOr: error "Waku Mix protocol initialization failed", err = error return @@ -528,6 +537,46 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] return ok() +const EdgeTopicHealthyThreshold = 2 + +proc calculateEdgeTopicHealth(node: WakuNode, shard: PubsubTopic): TopicHealth = + let filterPeers = + node.peerManager.getPeersForShard(filter_common.WakuFilterSubscribeCodec, shard) + let lightpushPeers = + node.peerManager.getPeersForShard(lightpush_protocol.WakuLightPushCodec, shard) + + if filterPeers >= EdgeTopicHealthyThreshold and + lightpushPeers >= EdgeTopicHealthyThreshold: + return TopicHealth.SUFFICIENTLY_HEALTHY + elif filterPeers > 0 and lightpushPeers > 0: + return TopicHealth.MINIMALLY_HEALTHY + + return TopicHealth.UNHEALTHY + +proc loopEdgeHealth(node: WakuNode) {.async.} = + while node.started: + await node.edgeHealthEvent.wait() + node.edgeHealthEvent.clear() + + try: + for shard in node.edgeTopicsHealth.keys: + if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard): + continue + + let oldHealth = node.edgeTopicsHealth.getOrDefault(shard, TopicHealth.UNHEALTHY) + let newHealth = node.calculateEdgeTopicHealth(shard) + if newHealth != oldHealth: + node.edgeTopicsHealth[shard] = newHealth + EventShardTopicHealthChange.emit(node.brokerCtx, shard, newHealth) + except CancelledError: + break + except CatchableError as e: + # KEEP: async health loop must survive transient peer-manager errors. + warn "Error in edge health check", error = e.msg + + # safety cooldown to protect from edge cases + await sleepAsync(100.milliseconds) + proc startProvidersAndListeners*(node: WakuNode) = RequestRelayShard.setProvider( node.brokerCtx, @@ -616,6 +665,16 @@ proc start*(node: WakuNode) {.async.} = if isBindIpWithZeroPort(address): zeroPortPresent = true + # Perform relay-specific startup tasks TODO: this should be rethought + if not node.wakuRelay.isNil(): + await node.wakuRelay.start() + + if not node.wakuMix.isNil(): + await node.wakuMix.start() + + if not node.wakuMetadata.isNil(): + await node.wakuMetadata.start() + if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.start() @@ -683,6 +742,7 @@ proc stop*(node: WakuNode) {.async.} = try: await node.wakuRlnRelay.stop() ## this can raise an exception except Exception: + # KEEP: shutdown path; continue tearing down other subsystems regardless. error "exception stopping the node", error = getCurrentExceptionMsg() if not node.wakuArchive.isNil(): diff --git a/logos_delivery/waku/waku_core/codecs.nim b/logos_delivery/waku/waku_core/codecs.nim index f0f0c977e..f1e3f7e33 100644 --- a/logos_delivery/waku/waku_core/codecs.nim +++ b/logos_delivery/waku/waku_core/codecs.nim @@ -10,3 +10,8 @@ const WakuMetadataCodec* = "/vac/waku/metadata/1.0.0" WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1" WakuRendezVousCodec* = "/vac/waku/rendezvous/1.0.0" + WakuRlnGifterCodec* = "/logos/rln/membership/1.0.0" + # Separate codec so clients can poll registration status on short-lived + # streams instead of holding the original register stream open past the + # libp2p timeout. + WakuRlnGifterStatusCodec* = "/logos/rln/membership/status/1.0.0" diff --git a/logos_delivery/waku/waku_mix/logos_core_client.nim b/logos_delivery/waku/waku_mix/logos_core_client.nim new file mode 100644 index 000000000..8392f8e3f --- /dev/null +++ b/logos_delivery/waku/waku_mix/logos_core_client.nim @@ -0,0 +1,395 @@ +{.push raises: [].} + +## Mix RLN client: fetches roots/proofs from logos-core via the C++ RLN module. +## The C++ delivery module registers an RLN fetcher at startup; event-push +## caching avoids round-trips on the hot path. + +import std/[json, strutils, locks, algorithm, options] +import chronos, chronos/threadsync +import results +import chronicles +import mix_rln_spam_protection/group_manager {.all.} +import mix_rln_spam_protection/types {.all.} +import mix_rln_spam_protection/rln_interface +import mix_rln_spam_protection/onchain_group_manager + +export onchain_group_manager.ExternalMerkleProof + +logScope: + topics = "waku mix rln-lez-client" + +type + FetchLatestRootsCallback* = + proc(): Future[Result[seq[MerkleNode], string]] {.gcsafe, raises: [].} + FetchMerkleProofCallback* = + proc(index: MembershipIndex): Future[Result[ExternalMerkleProof, string]] {.gcsafe, raises: [].} + +type + RlnFetchCallback* = proc(callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer) {.cdecl, gcsafe, raises: [].} + RlnFetcherFunc* = proc( + methodName: cstring, params: cstring, + callback: RlnFetchCallback, + callbackData: pointer, fetcherData: pointer + ): cint {.cdecl, gcsafe, raises: [].} + +const RLN_CONFIG_ACCOUNT_ID_CAP = 64 + ## Max bytes for the base58-encoded config account ID (real values ≤44 + ## chars). Sized as a value-type buffer so cross-thread access doesn't + ## go through Nim's heap/GC — see rlnConfigAccountIdBuf below. + +var + rlnFetcherLock: Lock + rlnFetcher: RlnFetcherFunc + rlnFetcherData: pointer + ## Fixed-size byte buffer + length, not a Nim string: the FFI worker + ## thread reads concurrently with main-thread writes, and a heap string + ## here SIGSEGVs under testnet load (GC collects the old value mid-read + ## during the 200-500ms HTTPS fetchRoots awaits). + rlnConfigAccountIdBuf: array[RLN_CONFIG_ACCOUNT_ID_CAP, char] + rlnConfigAccountIdLen: int = 0 + rlnLeafIndex: int = -1 + rlnIdentitySecretHash: string + rlnGroupManager: pointer + ## OnchainLEZGroupManager ref erased to `pointer` so this lock-protected + ## global is not tracked by Nim's GC: setGroupManagerRef may be invoked + ## from a thread distinct from the one that later runs setRlnIdentity, + ## and storing a ref in a cross-thread global risks GC interference. + ## Cast back to OnchainLEZGroupManager in setRlnIdentity to attach credentials. + cachedRootsJson: string + cachedProofJson: string + +rlnFetcherLock.initLock() + +proc setRlnFetcher*(fetcher: RlnFetcherFunc, fetcherData: pointer) {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + rlnFetcher = fetcher + rlnFetcherData = fetcherData + rlnFetcherLock.release() + +proc setRlnConfig*(configAccountId: string, leafIndex: int) {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + let n = min(configAccountId.len, RLN_CONFIG_ACCOUNT_ID_CAP) + for i in 0 ..< n: + rlnConfigAccountIdBuf[i] = configAccountId[i] + rlnConfigAccountIdLen = n + rlnLeafIndex = leafIndex + rlnFetcherLock.release() + +proc setGroupManagerRef*(lezGm: OnchainLEZGroupManager) {.gcsafe.} = + ## Store the group manager so setRlnIdentity can attach credentials later. + ## Erases the typed ref to `pointer` internally so the lock-protected global + ## stays outside Nim's cross-thread GC tracking (see rlnGroupManager). + {.gcsafe.}: + rlnFetcherLock.acquire() + rlnGroupManager = cast[pointer](lezGm) + rlnFetcherLock.release() + +proc setRlnIdentity*(idSecretHashHex: string) {.gcsafe.} = + ## Regenerate the full credential via membershipKeyGen(seed=idSecretHash) + ## and attach it to the group manager. Called by the C++ plugin after + ## selfRegisterRln completes. + {.gcsafe.}: + rlnFetcherLock.acquire() + rlnIdentitySecretHash = idSecretHashHex + let gm = rlnGroupManager + let leafIdx = rlnLeafIndex + rlnFetcherLock.release() + + trace "Set mix RLN identity", hashPrefix = idSecretHashHex[0 .. min(7, idSecretHashHex.len - 1)] + + if not gm.isNil and idSecretHashHex.len == 64: + var seedBytes: seq[byte] + for i in 0 ..< 32: + try: + seedBytes.add(byte(parseHexInt(idSecretHashHex[i * 2 .. i * 2 + 1]))) + except ValueError: + warn "Invalid hex in identity hash" + return + + # Generate full credential using the same seed that selfRegisterRln used + # via generate_identity. membershipKeyGen(seed) produces deterministic output. + let cred = membershipKeyGen(seedBytes).valueOr: + warn "Failed to regenerate full credential from seed", error = $error + return + + let gmRef = cast[OnchainLEZGroupManager](gm) + gmRef.credentials = some(cred) + if leafIdx >= 0: + gmRef.membershipIndex = some(types.MembershipIndex(leafIdx)) + info "Set full RLN identity on group manager", + leafIndex = leafIdx, + commitment = cred.idCommitment[0 .. 7].toHex() & "..." + +proc getRlnIdentity*(): string {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + result = rlnIdentitySecretHash + rlnFetcherLock.release() + +proc getRlnConfig*(): (string, int) {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + let n = rlnConfigAccountIdLen + let leafIdx = rlnLeafIndex + var s = newString(n) + for i in 0 ..< n: + s[i] = rlnConfigAccountIdBuf[i] + rlnFetcherLock.release() + result = (s, leafIdx) + +proc pushRoots*(rootsJson: string) {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + cachedRootsJson = rootsJson + rlnFetcherLock.release() + trace "Received roots via event push", len = rootsJson.len + +proc pushProof*(proofJson: string) {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + cachedProofJson = proofJson + rlnFetcherLock.release() + trace "Received proof via event push", len = proofJson.len + +proc getCachedRoots(): string {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + result = cachedRootsJson + rlnFetcherLock.release() + +proc getCachedProof(): string {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + result = cachedProofJson + rlnFetcherLock.release() + +type FetchResult = object + json: string + errMsg: string + success: bool + +proc callRlnFetcher*(methodName: string, params: string): Result[string, string] {.gcsafe.} = + {.gcsafe.}: + rlnFetcherLock.acquire() + let fetcher = rlnFetcher + let data = rlnFetcherData + rlnFetcherLock.release() + + if fetcher.isNil: + return err("RLN fetcher not registered") + + var fetchResult: FetchResult + + let cb: RlnFetchCallback = proc(callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer) {.cdecl, gcsafe, raises: [].} = + let res = cast[ptr FetchResult](userData) + if callerRet == 0 and not msg.isNil and len > 0: + res[].json = newString(len.int) + copyMem(addr res[].json[0], msg, len.int) + res[].success = true + elif not msg.isNil and len > 0: + res[].errMsg = newString(len.int) + copyMem(addr res[].errMsg[0], msg, len.int) + res[].success = false + else: + res[].success = (callerRet == 0) + + let ret = fetcher(methodName.cstring, params.cstring, cb, addr fetchResult, data) + if ret != 0 or not fetchResult.success: + if fetchResult.errMsg.len > 0: + return err(fetchResult.errMsg) + return err("RLN fetcher returned error code: " & $ret) + if fetchResult.json.len == 0: + return err("RLN fetcher returned empty response") + return ok(fetchResult.json) + +type ThreadArgs = object + fetcher: RlnFetcherFunc + fetcherData: pointer + methodBuf: cstring + paramsBuf: cstring + res: ptr FetchResult + sig: ThreadSignalPtr + +proc fetcherThreadBody(args: ThreadArgs) {.thread.} = + let cb: RlnFetchCallback = proc(callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer) {.cdecl, gcsafe, raises: [].} = + let r = cast[ptr FetchResult](userData) + if callerRet == 0 and not msg.isNil and len > 0: + r[].json = newString(len.int) + copyMem(addr r[].json[0], msg, len.int) + r[].success = true + elif not msg.isNil and len > 0: + r[].errMsg = newString(len.int) + copyMem(addr r[].errMsg[0], msg, len.int) + r[].success = false + else: + r[].success = (callerRet == 0) + + discard args.fetcher(args.methodBuf, args.paramsBuf, cb, args.res, args.fetcherData) + discard args.sig.fireSync() + +proc callRlnFetcherAsync*(methodName: string, params: string): Future[Result[string, string]] {.async.} = + ## Runs the fetcher on a dedicated thread so HTTPS blocking calls don't + ## stall the chronos event loop. + {.gcsafe.}: + rlnFetcherLock.acquire() + let fetcher = rlnFetcher + let data = rlnFetcherData + rlnFetcherLock.release() + + if fetcher.isNil: + return err("RLN fetcher not registered") + + let signal = ThreadSignalPtr.new().valueOr: + return err("failed to create thread signal") + defer: + discard signal.close() + + # Stable cross-thread copies — `methodName`/`params` live on the GC heap. + # IMPORTANT: guard the unsafeAddr deref — `unsafeAddr s[0]` on an empty + # Nim string is UB (the backing buffer can be nil), and SIGSEGVs at the + # polling layer (e.g. get_valid_roots with no config yet). allocShared0 + # already zero-fills, so the copy is safely no-op when len is 0. + var methodCopy = allocShared0(methodName.len + 1) + var paramsCopy = allocShared0(params.len + 1) + if methodName.len > 0: + copyMem(methodCopy, unsafeAddr methodName[0], methodName.len) + if params.len > 0: + copyMem(paramsCopy, unsafeAddr params[0], params.len) + defer: + deallocShared(methodCopy) + deallocShared(paramsCopy) + + var fetchRes: FetchResult + var thread: Thread[ThreadArgs] + + createThread(thread, fetcherThreadBody, + ThreadArgs( + fetcher: fetcher, + fetcherData: data, + methodBuf: cast[cstring](methodCopy), + paramsBuf: cast[cstring](paramsCopy), + res: addr fetchRes, + sig: signal, + )) + + await signal.wait() + joinThread(thread) + + if not fetchRes.success: + if fetchRes.errMsg.len > 0: + return err(fetchRes.errMsg) + return err("RLN fetcher async call failed") + if fetchRes.json.len == 0: + return err("RLN fetcher returned empty response") + return ok(fetchRes.json) + +proc hexToBytes32(hex: string): Result[array[32, byte], string] = + var h = hex + if h.startsWith("0x") or h.startsWith("0X"): + h = h[2 .. ^1] + if h.len != 64: + return err("Expected 64 hex chars, got " & $h.len) + var output: array[32, byte] + for i in 0 ..< 32: + try: + output[i] = byte(parseHexInt(h[i * 2 .. i * 2 + 1])) + except ValueError: + return err("Invalid hex at position " & $i) + ok(output) + +proc hexToBytes32LE(hex: string): Result[array[32, byte], string] = + ## Parse hex string to bytes in little-endian order (for zerokit field elements). + ## LEZ/Ethereum return big-endian hex; zerokit expects LE internally. + var res = hexToBytes32(hex).valueOr: + return err(error) + var reversed: array[32, byte] + for i in 0 ..< 32: + reversed[i] = res[31 - i] + ok(reversed) + +proc parseRootsJson*(snapshot: string): Result[seq[MerkleNode], string] = + if snapshot.len == 0: + return err("No roots data") + try: + let parsed = parseJson(snapshot) + var roots: seq[MerkleNode] + for elem in parsed: + let root = hexToBytes32(elem.getStr()).valueOr: + return err("Invalid root hex: " & error) + roots.add(MerkleNode(root)) + return ok(roots) + except CatchableError as e: + return err("Failed to parse roots: " & e.msg) + +proc parseExternalProof(snapshot: string): Result[ExternalMerkleProof, string] = + if snapshot.len == 0: + return err("No merkle proof data") + try: + let parsed = parseJson(snapshot) + let root = hexToBytes32(parsed["root"].getStr()).valueOr: + return err("Invalid root hex: " & error) + var pathElements: seq[byte] + for elem in parsed["path_elements"]: + let elemBytes = hexToBytes32(elem.getStr()).valueOr: + return err("Invalid pathElement hex: " & error) + for b in elemBytes: + pathElements.add(b) + var identityPathIndex: seq[byte] + for idx in parsed["path_indices"]: + identityPathIndex.add(byte(idx.getInt())) + var validRoots: seq[MerkleNode] + if parsed.hasKey("valid_roots"): + for r in parsed["valid_roots"]: + let rb = hexToBytes32(r.getStr()).valueOr: + continue + validRoots.add(MerkleNode(rb)) + ok(ExternalMerkleProof( + pathElements: pathElements, + identityPathIndex: identityPathIndex, + root: MerkleNode(root), + validRoots: validRoots, + )) + except CatchableError as e: + err("Failed to parse proof: " & e.msg) + +proc makeFetchLatestRoots*(): FetchLatestRootsCallback = + return proc(): Future[Result[seq[MerkleNode], string]] {.async, gcsafe, raises: [].} = + let cached = getCachedRoots() + if cached.len > 0: + let res = parseRootsJson(cached) + if res.isOk: + trace "Using cached roots from event push", count = res.get().len + return res + let (configAccount, _) = getRlnConfig() + if configAccount.len == 0: + return err("RLN config not set") + let rootsJson = callRlnFetcher("get_valid_roots", configAccount) + if rootsJson.isErr: + return err(rootsJson.error) + let res = parseRootsJson(rootsJson.get()) + if res.isOk: + trace "Fetched roots from RLN module via fetcher", count = res.get().len + return res + +proc makeFetchMerkleProof*(): FetchMerkleProofCallback = + return proc( + index: MembershipIndex + ): Future[Result[ExternalMerkleProof, string]] {.async, gcsafe, raises: [].} = + let cached = getCachedProof() + if cached.len > 0: + let res = parseExternalProof(cached) + if res.isOk: + trace "Using cached proof from event push", index = index + return res + let (configAccount, _) = getRlnConfig() + if configAccount.len == 0: + return err("RLN config not set") + let params = configAccount & "," & $index + let proofJson = callRlnFetcher("get_merkle_proofs", params) + if proofJson.isErr: + return err(proofJson.error) + return parseExternalProof(proofJson.get()) + +{.pop.} diff --git a/logos_delivery/waku/waku_mix/protocol.nim b/logos_delivery/waku/waku_mix/protocol.nim index 1521e8052..da422f5eb 100644 --- a/logos_delivery/waku/waku_mix/protocol.nim +++ b/logos_delivery/waku/waku_mix/protocol.nim @@ -110,6 +110,7 @@ proc new*( publishMessage: PublishMessage, userMessageLimit: Option[int] = none(int), disableSpamProtection: bool = false, + useOnchainLEZ: bool = false, ): WakuMixResult[T] = let mixPubKey = public(mixPrivKey) trace "mixPubKey", mixPubKey = mixPubKey @@ -128,9 +129,12 @@ proc new*( var spamProtectionOpt = default(Opt[SpamProtection]) if not disableSpamProtection: - # Initialize spam protection with persistent credentials + # Initialize spam protection with persistent credentials. In LEZ mode, + # roots/proofs are fetched from the on-chain tree via the RLN module; + # only the identity is local. let peerId = peermgr.switch.peerInfo.peerId var spamProtectionConfig = defaultConfig() + spamProtectionConfig.useOnchainLEZ = useOnchainLEZ spamProtectionConfig.keystorePath = "rln_keystore_" & $peerId & ".json" spamProtectionConfig.keystorePassword = "mix-rln-password" if userMessageLimit.isSome(): @@ -385,6 +389,24 @@ proc registerDoSProtectionWithNetwork*(mix: WakuMix) = return mix.dosRegistrationTask = mix.dosRegistrationRetryLoop() +proc publishGossipsubTrigger*(mix: WakuMix) {.async.} = + ## Publish dummy messages to spam protection topics to trigger gossipsub + ## SUBSCRIBE flow and kademlia peer exchange. Call AFTER the node is fully + ## started (switch running, peers connected) so the messages actually reach + ## gossipsub mesh peers. In off-chain mode the side effect of registerSelf() + ## broadcasting already triggers SUBSCRIBE; in LEZ mode registerSelf() doesn't + ## fire (membership comes from on-chain) so we publish explicitly. + if mix.publishMessage.isNil or mix.mixRlnSpamProtection.isNil: + return + let spTopics = mix.getSpamProtectionContentTopics() + for ct in spTopics: + let msg = WakuMessage(contentTopic: ct, payload: @[byte(0)]) + let pubRes = await mix.publishMessage(msg) + if pubRes.isErr: + debug "gossipsub trigger failed (expected if no peers yet)", contentTopic = ct + else: + info "Published gossipsub trigger (post-start)", contentTopic = ct + method stop*(mix: WakuMix) {.async.} = # Cancel the in-flight DoS-protection registration retry loop, if any if not mix.dosRegistrationTask.isNil and not mix.dosRegistrationTask.finished: diff --git a/logos_delivery/waku/waku_rln_relay/rln_gifter/client.nim b/logos_delivery/waku/waku_rln_relay/rln_gifter/client.nim new file mode 100644 index 000000000..7399549e0 --- /dev/null +++ b/logos_delivery/waku/waku_rln_relay/rln_gifter/client.nim @@ -0,0 +1,176 @@ +{.push raises: [].} + +import std/options, results, chronicles, chronos, bearssl/rand +import libp2p/stream/connection +import + ../../node/peer_manager, + ../../waku_core, + ../../utils/requests, + ./rpc, + ./rpc_codec + +logScope: + topics = "waku rln-gifter client" + +type + RlnGifterResult* = Result[MembershipAllocationSuccess, string] + + WakuRlnGifterClient* = ref object + rng*: ref rand.HmacDrbgContext + peerManager*: PeerManager + +proc new*( + T: type WakuRlnGifterClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext +): T = + WakuRlnGifterClient(peerManager: peerManager, rng: rng) + +proc requestMembership*( + wc: WakuRlnGifterClient, + identityCommitment: seq[byte], + rateLimit: Option[uint64], + peer: RemotePeerInfo, + authenticationType: seq[byte] = @[], + authenticationPayload: seq[byte] = @[], +): Future[RlnGifterResult] {.async.} = + let request = RlnGifterRequest( + requestId: generateRequestId(wc.rng), + authenticationType: authenticationType, + authenticationPayload: authenticationPayload, + identityCommitment: identityCommitment, + rateLimit: rateLimit, + ) + + info "requesting RLN membership from gifter", + requestId = request.requestId, + identityCommitmentLen = identityCommitment.len + + # Retry dial with backoff (gifter node may still be initializing) + var connection: Connection + var dialAttempts = 0 + while true: + let connOpt = await wc.peerManager.dialPeer(peer, WakuRlnGifterCodec) + if connOpt.isSome: + connection = connOpt.get() + break + dialAttempts += 1 + if dialAttempts >= 5: + return err("failed to dial gifter peer after " & $dialAttempts & " attempts") + warn "gifter dial failed, retrying", attempt = dialAttempts + await sleepAsync(seconds(5)) + + try: + await connection.writeLP(request.encode().buffer) + except LPStreamError: + return err("failed to write request: " & getCurrentExceptionMsg()) + + var buffer: seq[byte] + try: + buffer = await connection.readLp(DefaultMaxRpcSize) + except LPStreamError: + return err("failed to read response: " & getCurrentExceptionMsg()) + + # Do NOT close the connection here. Let it leak and be cleaned up by GC. + # Calling closeWithEOF triggers yamux cleanup that crashes the delivery module + # process when the FFI boundary returns to C++ before yamux completes. + + let response = RlnGifterResponse.decode(buffer).valueOr: + return err("failed to decode response: " & $error) + + if response.requestId != request.requestId: + return err("requestId mismatch") + + if not response.authSuccess: + let desc = response.error.get( + if response.failure.isSome: response.failure.get().errorMessage + else: "authentication failed" + ) + return err("authentication failed: " & desc) + + if response.failure.isSome: + return err("registration failed: " & response.failure.get().errorMessage) + + let success = response.success.valueOr: + return err("response missing success/failure result") + + info "RLN membership granted", leafIndex = success.leafIndex + + return ok(success) + +proc queryMembershipStatus*( + wc: WakuRlnGifterClient, + peer: RemotePeerInfo, + configAccountId: string, + identityCommitment: seq[byte], +): Future[Result[MembershipStatusResponse, string]] {.async.} = + ## Short-lived RPC: each call opens its own stream so pollers don't hold + ## a stream open past the libp2p timeout. + let connOpt = await wc.peerManager.dialPeer(peer, WakuRlnGifterStatusCodec) + if connOpt.isNone: + return err("failed to dial gifter status peer") + let connection = connOpt.get() + + let req = MembershipStatusRequest( + configAccountId: configAccountId, + identityCommitment: identityCommitment, + ) + try: + await connection.writeLP(req.encode().buffer) + except LPStreamError: + return err("failed to write status request: " & getCurrentExceptionMsg()) + + var buffer: seq[byte] + try: + buffer = await connection.readLp(DefaultMaxRpcSize) + except LPStreamError: + return err("failed to read status response: " & getCurrentExceptionMsg()) + + let resp = MembershipStatusResponse.decode(buffer).valueOr: + return err("failed to decode status response: " & $error) + + return ok(resp) + +proc watchMembershipConfirmation*( + wc: WakuRlnGifterClient, + peer: RemotePeerInfo, + configAccountId: string, + identityCommitment: seq[byte], + optimisticLeaf: uint64, + label: string, + onConfirmed: proc(authLeaf: uint64) {.gcsafe, raises: [].}, +): Future[void] {.async.} = + ## Poll the status codec until the membership PDA exists, then call + ## `onConfirmed` with the authoritative leaf — whether or not it differs + ## from `optimisticLeaf`. Returns on first confirmation or after the + ## deadline elapses. + const pollEveryMs = 30_000 + const deadlineMs = 1_800_000 + let deadline = Moment.now() + chronos.milliseconds(deadlineMs) + while Moment.now() < deadline: + try: + await sleepAsync(chronos.milliseconds(pollEveryMs)) + except CancelledError: + return + let qr = + try: + await wc.queryMembershipStatus(peer, configAccountId, identityCommitment) + except CancelledError: + return + except CatchableError as e: + Result[MembershipStatusResponse, string].err( + "queryMembershipStatus raised: " & e.msg) + if qr.isErr: continue + let resp = qr.get() + if resp.errorMessage.isSome: continue + if not resp.registered: continue + if resp.leafIndex.isSome: + let authLeaf = resp.leafIndex.get() + if authLeaf != optimisticLeaf: + info "membership leaf corrected from optimistic", + label = label, optimistic = optimisticLeaf, authoritative = authLeaf + else: + info "membership confirmed on-chain", + label = label, leafIndex = authLeaf + onConfirmed(authLeaf) + return + warn "membership confirmation timed out", + label = label, optimisticLeaf = optimisticLeaf diff --git a/logos_delivery/waku/waku_rln_relay/rln_gifter/protocol.nim b/logos_delivery/waku/waku_rln_relay/rln_gifter/protocol.nim new file mode 100644 index 000000000..d9395d2e5 --- /dev/null +++ b/logos_delivery/waku/waku_rln_relay/rln_gifter/protocol.nim @@ -0,0 +1,249 @@ +{.push raises: [].} + +import + std/[options, sets], + results, + chronicles, + chronos, + bearssl/rand, + eth/common/[addresses, keys] +import + ../../node/peer_manager/peer_manager, + ../../waku_core, + ./rpc, + ./rpc_codec +export rpc + +logScope: + topics = "waku rln-gifter" + +type + RegisterMemberHandler* = proc( + identityCommitment: seq[byte], rateLimit: uint64 + ): Future[Result[MembershipAllocationSuccess, string]] {.async, gcsafe.} + + MembershipStatusHandler* = proc( + configAccountId: string, identityCommitment: seq[byte] + ): Future[Result[MembershipStatusResponse, string]] {.async, gcsafe.} + + EthAllowlistAuth* = ref object + addresses*: HashSet[Address] + consumed*: HashSet[Address] + + WakuRlnGifter* = ref object of LPProtocol + rng*: ref rand.HmacDrbgContext + peerManager*: PeerManager + registerHandler*: RegisterMemberHandler + statusHandler*: MembershipStatusHandler + auth*: Option[EthAllowlistAuth] + + WakuRlnGifterStatus* = ref object of LPProtocol + statusHandler*: MembershipStatusHandler + +proc toHexLower(b: openArray[byte]): string = + result = newStringOfCap(b.len * 2) + const digits = "0123456789abcdef" + for x in b: + result.add(digits[int(x shr 4)]) + result.add(digits[int(x and 0x0f)]) + +proc eip191Message*(idCommitment: openArray[byte]): seq[byte] = + ## The EIP-191 personal_sign envelope wraps the lowercase hex representation + ## of the 32-byte identity commitment. Hex is used (rather than raw bytes) + ## so the signed message is human-readable in wallets that surface it. + let hex = toHexLower(idCommitment) + let prefix = "\x19Ethereum Signed Message:\n" & $hex.len + result = newSeqOfCap[byte](prefix.len + hex.len) + for c in prefix: + result.add(byte(c)) + for c in hex: + result.add(byte(c)) + +proc verifyEip191*( + idCommitment: openArray[byte], sigBytes: openArray[byte] +): Result[Address, string] = + if sigBytes.len != 65: + return err("signature must be 65 bytes, got " & $sigBytes.len) + let sig = Signature.fromRaw(sigBytes).valueOr: + return err("invalid signature encoding: " & $error) + let pub = sig.recover(eip191Message(idCommitment)).valueOr: + return err("signature recovery failed: " & $error) + ok(pub.to(Address)) + +proc failureResponse( + requestId: string, authSuccess: bool, message: string +): RlnGifterResponse = + RlnGifterResponse( + requestId: requestId, + authSuccess: authSuccess, + error: some(message), + failure: some(MembershipAllocationFailure(errorMessage: message)), + ) + +proc handleRequest( + wg: WakuRlnGifter, peerId: PeerId, buffer: seq[byte] +): Future[RlnGifterResponse] {.async.} = + let request = RlnGifterRequest.decode(buffer).valueOr: + error "failed to decode RLN gifter request", error = $error + return failureResponse("N/A", false, "decode error: " & $error) + + info "handling RLN gifter request", + peerId = peerId, + requestId = request.requestId, + identityCommitment = toHexLower(request.identityCommitment)[0 .. min(15, request.identityCommitment.len * 2 - 1)] & "..." + + if request.identityCommitment.len != 32: + return failureResponse( + request.requestId, true, "identity_commitment must be 32 bytes" + ) + + var authorizedSigner: Option[Address] + if wg.auth.isSome: + let auth = wg.auth.get() + let authType = + block: + var s = newStringOfCap(request.authenticationType.len) + for b in request.authenticationType: s.add(char(b)) + s + if authType != EthAllowlistAuthType: + return failureResponse( + request.requestId, false, + "unsupported authentication_type: '" & authType & "'", + ) + if request.authenticationPayload.len == 0: + return failureResponse( + request.requestId, false, "missing authentication_payload" + ) + let signer = verifyEip191( + request.identityCommitment, request.authenticationPayload + ).valueOr: + return failureResponse( + request.requestId, false, "signature verification failed: " & error + ) + if signer notin auth.addresses: + return failureResponse( + request.requestId, false, "address not allowlisted: " & signer.to0xHex() + ) + if signer in auth.consumed: + return failureResponse( + request.requestId, false, "address already used: " & signer.to0xHex() + ) + authorizedSigner = some(signer) + + let effectiveRateLimit = request.rateLimit.get(100'u64) + let success = (await wg.registerHandler(request.identityCommitment, effectiveRateLimit)).valueOr: + error "RLN gifter registration failed", error = error + return RlnGifterResponse( + requestId: request.requestId, + authSuccess: true, + failure: some(MembershipAllocationFailure(errorMessage: error)), + ) + + if authorizedSigner.isSome and wg.auth.isSome: + wg.auth.get().consumed.incl(authorizedSigner.get()) + + info "RLN gifter registration succeeded", + leafIndex = success.leafIndex, + requestId = request.requestId + + return RlnGifterResponse( + requestId: request.requestId, + authSuccess: true, + success: some(success), + ) + +proc initProtocolHandler(wg: WakuRlnGifter) = + proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} = + var rpc: RlnGifterResponse + # NOTE: Do NOT close the connection from the server side. The client closes + # its side after reading the response. If the server closes first, the remote + # FIN triggers yamux cleanup on the client side after createNode returns, + # causing a use-after-free crash in the delivery module process. + + var buffer: seq[byte] + try: + buffer = await conn.readLp(DefaultMaxRpcSize) + except LPStreamError: + error "rln-gifter read stream failed", error = getCurrentExceptionMsg() + return + + try: + rpc = await wg.handleRequest(conn.peerId, buffer) + except CatchableError: + error "rln-gifter handleRequest failed", error = getCurrentExceptionMsg() + rpc = failureResponse("N/A", true, "internal error") + + try: + await conn.writeLp(rpc.encode().buffer) + except LPStreamError: + error "rln-gifter write stream failed", error = getCurrentExceptionMsg() + + wg.handler = handler + wg.codec = WakuRlnGifterCodec + +proc new*( + T: type WakuRlnGifter, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + registerHandler: RegisterMemberHandler, + auth: Option[EthAllowlistAuth] = none(EthAllowlistAuth), + statusHandler: MembershipStatusHandler = nil, +): T = + let wg = WakuRlnGifter( + rng: rng, + peerManager: peerManager, + registerHandler: registerHandler, + statusHandler: statusHandler, + auth: auth, + ) + wg.initProtocolHandler() + return wg + +proc initStatusProtocolHandler(ws: WakuRlnGifterStatus) = + proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} = + var buffer: seq[byte] + try: + buffer = await conn.readLp(DefaultMaxRpcSize) + except LPStreamError: + error "rln-gifter-status read stream failed", + error = getCurrentExceptionMsg() + return + + var resp = MembershipStatusResponse(registered: false) + let req = MembershipStatusRequest.decode(buffer).valueOr: + resp.errorMessage = some("decode error: " & $error) + try: + await conn.writeLp(resp.encode().buffer) + except LPStreamError: + discard + return + + if ws.statusHandler.isNil: + resp.errorMessage = some("status handler not wired") + else: + try: + let r = await ws.statusHandler(req.configAccountId, req.identityCommitment) + if r.isErr: + resp.errorMessage = some(r.error) + else: + resp = r.get() + except CatchableError: + resp.errorMessage = some( + "status handler raised: " & getCurrentExceptionMsg()) + + try: + await conn.writeLp(resp.encode().buffer) + except LPStreamError: + error "rln-gifter-status write stream failed", + error = getCurrentExceptionMsg() + + ws.handler = handler + ws.codec = WakuRlnGifterStatusCodec + +proc new*( + T: type WakuRlnGifterStatus, + statusHandler: MembershipStatusHandler, +): T = + let ws = WakuRlnGifterStatus(statusHandler: statusHandler) + ws.initStatusProtocolHandler() + return ws diff --git a/logos_delivery/waku/waku_rln_relay/rln_gifter/rpc.nim b/logos_delivery/waku/waku_rln_relay/rln_gifter/rpc.nim new file mode 100644 index 000000000..1d886dc64 --- /dev/null +++ b/logos_delivery/waku/waku_rln_relay/rln_gifter/rpc.nim @@ -0,0 +1,42 @@ +import std/options + +type + MembershipAllocationSuccess* = object + leafIndex*: uint64 + merkleRoot*: seq[byte] + blockNumber*: uint64 + transactionHash*: seq[byte] + # Non-spec extension (high tag): LEZ config account that owns the + # membership. Required so the client can route on-chain queries. + configAccountId*: Option[string] + + MembershipAllocationFailure* = object + errorMessage*: string + + RlnGifterRequest* = object + requestId*: string + authenticationType*: seq[byte] + authenticationPayload*: seq[byte] + identityCommitment*: seq[byte] + rateLimit*: Option[uint64] + + RlnGifterResponse* = object + requestId*: string + authSuccess*: bool + error*: Option[string] + success*: Option[MembershipAllocationSuccess] + failure*: Option[MembershipAllocationFailure] + + MembershipStatusRequest* = object + configAccountId*: string + identityCommitment*: seq[byte] + + MembershipStatusResponse* = object + registered*: bool + leafIndex*: Option[uint64] + # Set on lookup failure (e.g. wallet RPC error); distinct from a + # successful registered=false answer. + errorMessage*: Option[string] + +const + EthAllowlistAuthType* = "eth-allowlist" diff --git a/logos_delivery/waku/waku_rln_relay/rln_gifter/rpc_codec.nim b/logos_delivery/waku/waku_rln_relay/rln_gifter/rpc_codec.nim new file mode 100644 index 000000000..3601b5583 --- /dev/null +++ b/logos_delivery/waku/waku_rln_relay/rln_gifter/rpc_codec.nim @@ -0,0 +1,182 @@ +{.push raises: [].} + +import std/options +import ../../common/protobuf, ./rpc + +const DefaultMaxRpcSize* = 4096 + +proc encode*(rpc: MembershipAllocationSuccess): ProtoBuffer = + var pb = initProtoBuffer() + pb.write3(1, rpc.leafIndex) + pb.write3(2, rpc.merkleRoot) + pb.write3(3, rpc.blockNumber) + pb.write3(4, rpc.transactionHash) + if rpc.configAccountId.isSome: + pb.write3(100, rpc.configAccountId.get()) + pb.finish3() + return pb + +proc decode*(T: type MembershipAllocationSuccess, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var msg = MembershipAllocationSuccess() + + var leafIndex: uint64 + if ?pb.getField(1, leafIndex): + msg.leafIndex = leafIndex + + var merkleRoot: seq[byte] + if ?pb.getField(2, merkleRoot): + msg.merkleRoot = merkleRoot + + var blockNumber: uint64 + if ?pb.getField(3, blockNumber): + msg.blockNumber = blockNumber + + var transactionHash: seq[byte] + if ?pb.getField(4, transactionHash): + msg.transactionHash = transactionHash + + var configAccountId: string + if ?pb.getField(100, configAccountId): + msg.configAccountId = some(configAccountId) + + return ok(msg) + +proc encode*(rpc: MembershipAllocationFailure): ProtoBuffer = + var pb = initProtoBuffer() + pb.write3(1, rpc.errorMessage) + pb.finish3() + return pb + +proc decode*(T: type MembershipAllocationFailure, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var msg = MembershipAllocationFailure() + var errorMessage: string + if ?pb.getField(1, errorMessage): + msg.errorMessage = errorMessage + return ok(msg) + +proc encode*(rpc: RlnGifterRequest): ProtoBuffer = + var pb = initProtoBuffer() + pb.write3(1, rpc.requestId) + pb.write3(2, rpc.authenticationType) + pb.write3(3, rpc.authenticationPayload) + pb.write3(4, rpc.identityCommitment) + if rpc.rateLimit.isSome: + pb.write3(5, rpc.rateLimit.get()) + pb.finish3() + return pb + +proc decode*(T: type RlnGifterRequest, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = RlnGifterRequest() + + var requestId: string + if not ?pb.getField(1, requestId): + return err(ProtobufError.missingRequiredField("request_id")) + rpc.requestId = requestId + + var authenticationType: seq[byte] + discard ?pb.getField(2, authenticationType) + rpc.authenticationType = authenticationType + + var authenticationPayload: seq[byte] + discard ?pb.getField(3, authenticationPayload) + rpc.authenticationPayload = authenticationPayload + + var identityCommitment: seq[byte] + if not ?pb.getField(4, identityCommitment): + return err(ProtobufError.missingRequiredField("identity_commitment")) + rpc.identityCommitment = identityCommitment + + var rateLimit: uint64 + if ?pb.getField(5, rateLimit): + rpc.rateLimit = some(rateLimit) + + return ok(rpc) + +proc encode*(rpc: RlnGifterResponse): ProtoBuffer = + var pb = initProtoBuffer() + pb.write3(1, rpc.requestId) + pb.write3(2, rpc.authSuccess) + if rpc.error.isSome: + pb.write3(3, rpc.error.get()) + if rpc.success.isSome: + pb.write3(4, rpc.success.get().encode().buffer) + if rpc.failure.isSome: + pb.write3(5, rpc.failure.get().encode().buffer) + pb.finish3() + return pb + +proc decode*(T: type RlnGifterResponse, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = RlnGifterResponse() + + var requestId: string + if not ?pb.getField(1, requestId): + return err(ProtobufError.missingRequiredField("request_id")) + rpc.requestId = requestId + + var authSuccess: bool + if not ?pb.getField(2, authSuccess): + return err(ProtobufError.missingRequiredField("auth_success")) + rpc.authSuccess = authSuccess + + var error: string + if ?pb.getField(3, error): + rpc.error = some(error) + + var successBuf: seq[byte] + if ?pb.getField(4, successBuf): + rpc.success = some(?MembershipAllocationSuccess.decode(successBuf)) + + var failureBuf: seq[byte] + if ?pb.getField(5, failureBuf): + rpc.failure = some(?MembershipAllocationFailure.decode(failureBuf)) + + return ok(rpc) + +proc encode*(req: MembershipStatusRequest): ProtoBuffer = + var pb = initProtoBuffer() + pb.write3(1, req.configAccountId) + pb.write3(2, req.identityCommitment) + pb.finish3() + return pb + +proc decode*(T: type MembershipStatusRequest, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var req = MembershipStatusRequest() + var configAccountId: string + if not ?pb.getField(1, configAccountId): + return err(ProtobufError.missingRequiredField("config_account_id")) + req.configAccountId = configAccountId + var identityCommitment: seq[byte] + if not ?pb.getField(2, identityCommitment): + return err(ProtobufError.missingRequiredField("identity_commitment")) + req.identityCommitment = identityCommitment + return ok(req) + +proc encode*(resp: MembershipStatusResponse): ProtoBuffer = + var pb = initProtoBuffer() + pb.write3(1, resp.registered) + if resp.leafIndex.isSome: + pb.write3(2, resp.leafIndex.get()) + if resp.errorMessage.isSome: + pb.write3(3, resp.errorMessage.get()) + pb.finish3() + return pb + +proc decode*(T: type MembershipStatusResponse, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var resp = MembershipStatusResponse() + var registered: bool + if not ?pb.getField(1, registered): + return err(ProtobufError.missingRequiredField("registered")) + resp.registered = registered + var leafIndex: uint64 + if ?pb.getField(2, leafIndex): + resp.leafIndex = some(leafIndex) + var errorMessage: string + if ?pb.getField(3, errorMessage): + resp.errorMessage = some(errorMessage) + return ok(resp) diff --git a/nix/default.nix b/nix/default.nix index dfe537f24..67071a5ca 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -1,6 +1,7 @@ { pkgs , src , zerokitRln +, zerokitMixRln ? null , targets ? [] , gitVersion ? "n/a" , enablePostgres ? true @@ -42,6 +43,12 @@ let # few mode-specific flags (e.g. --app:lib, --noMain, --header); everything # else (paths, defines, threading, gc, nimcache, rln linkage) is constant. # $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase. + # Optional mix-rln link flag: appended when zerokitMixRln is provided so + # liblogosdelivery's mix-rln-spam-protection-plugin path resolves at link + # time. Empty otherwise (vanilla liblogosdelivery build). + mixRlnLinkArg = pkgs.lib.optionalString (zerokitMixRln != null) + "--passL:${zerokitMixRln}/lib/librln.a --passL:-lm"; + nimCompile = { outFile, sourceFile, extraArgs ? [] }: '' nim c \ --noNimblePath \ @@ -49,6 +56,7 @@ let --path:$NAT_TRAV \ --path:$NAT_TRAV/src \ --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ + ${mixRlnLinkArg} \ ${nimDefineArgs} \ --threads:on \ --mm:refc \ diff --git a/nix/zerokit.nix b/nix/zerokit.nix index a112186c3..c9ca81068 100644 --- a/nix/zerokit.nix +++ b/nix/zerokit.nix @@ -1,9 +1,11 @@ -# zerokit rln built from source; overrides the stale v2.0.2 vendor cargoHash. -{ zerokit, system }: +# zerokit rln built from source; overrides the stale vendor cargoHash. +# vendorHash differs per zerokit version (2.0.0 vs 2.0.2 etc.) and must be +# passed in by the caller so the same builder works for both pins. +{ zerokit, system, vendorHash }: zerokit.packages.${system}.rln.overrideAttrs (old: { cargoDeps = old.cargoDeps.overrideAttrs (oldCargoDeps: { vendorStaging = oldCargoDeps.vendorStaging.overrideAttrs (_: { - outputHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU="; + outputHash = vendorHash; }); }); }) diff --git a/scripts/build_rln_mix.sh b/scripts/build_rln_mix.sh new file mode 100755 index 000000000..786ef76f5 --- /dev/null +++ b/scripts/build_rln_mix.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +# Build a separate, pinned RLN library for mix spam-protection usage. +# This keeps the main nwaku RLN dependency flow unchanged. + +set -euo pipefail + +source_dir="${1:-}" +version="${2:-}" +output_file="${3:-}" +repo_url="${4:-https://github.com/vacp2p/zerokit.git}" + +if [[ -z "${source_dir}" || -z "${version}" || -z "${output_file}" ]]; then + echo "Usage: $0 [repo_url]" + exit 1 +fi + +mkdir -p "$(dirname "${source_dir}")" +mkdir -p "$(dirname "${output_file}")" + +if [[ ! -d "${source_dir}/.git" ]]; then + echo "Cloning zerokit ${version} from ${repo_url}..." + if [[ -e "${source_dir}" ]]; then + echo "Path exists but is not a git repository: ${source_dir}" + echo "Please remove it and retry." + exit 1 + fi + git clone --depth 1 --branch "${version}" "${repo_url}" "${source_dir}" +else + echo "Using existing zerokit checkout in ${source_dir}" + current_tag="$(git -C "${source_dir}" describe --tags --exact-match 2>/dev/null || true)" + if [[ "${current_tag}" != "${version}" ]]; then + echo "Updating zerokit checkout to ${version}..." + git -C "${source_dir}" fetch --tags origin "${version}" + git -C "${source_dir}" checkout --detach "${version}" + fi +fi + +echo "Building mix RLN library from source (version ${version})..." +cargo build --release -p rln --manifest-path "${source_dir}/rln/Cargo.toml" + +cp "${source_dir}/target/release/librln.a" "${output_file}" +echo "Successfully built ${output_file}" diff --git a/tests/waku_rln_relay/test_all.nim b/tests/waku_rln_relay/test_all.nim index 706fff49e..0892ea35f 100644 --- a/tests/waku_rln_relay/test_all.nim +++ b/tests/waku_rln_relay/test_all.nim @@ -4,4 +4,5 @@ import ./test_rln_group_manager_onchain, ./test_waku_rln_relay, ./test_wakunode_rln_relay, - ./test_rln_nonce_manager + ./test_rln_nonce_manager, + ./test_rln_gifter diff --git a/tests/waku_rln_relay/test_rln_gifter.nim b/tests/waku_rln_relay/test_rln_gifter.nim new file mode 100644 index 000000000..40521b941 --- /dev/null +++ b/tests/waku_rln_relay/test_rln_gifter.nim @@ -0,0 +1,136 @@ +{.used.} + +import std/options, results +import testutils/unittests +import eth/common/[addresses, keys] +import waku/waku_rln_relay/rln_gifter/rpc +import waku/waku_rln_relay/rln_gifter/rpc_codec +import waku/waku_rln_relay/rln_gifter/protocol as rln_gifter_protocol + +proc eip191Sign(seckey: PrivateKey, idCommitment: openArray[byte]): seq[byte] = + @(seckey.sign(eip191Message(idCommitment)).toRaw()) + +proc bytesOf(s: string): seq[byte] = + result = newSeqOfCap[byte](s.len) + for c in s: + result.add(byte(c)) + +const + TestSecretHex = + "1111111111111111111111111111111111111111111111111111111111111111" + +let TestCommitment = @[ + byte 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, + 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, + 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, + 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, +] + +let TamperedCommitment = @[ + byte 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, + 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, + 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, + 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, +] + +suite "RLN gifter EIP-191 auth": + test "verifyEip191 recovers the signer address": + let sk = PrivateKey.fromHex(TestSecretHex).expect("valid key") + let expected = sk.toPublicKey().to(Address) + let sig = eip191Sign(sk, TestCommitment) + + let recovered = verifyEip191(TestCommitment, sig).expect("recoverable") + check recovered == expected + + test "verifyEip191 rejects wrong-length payload": + let bad = newSeq[byte](64) + let res = verifyEip191(TestCommitment, bad) + check res.isErr + + test "verifyEip191 produces a different address when the message differs": + let sk = PrivateKey.fromHex(TestSecretHex).expect("valid key") + let expected = sk.toPublicKey().to(Address) + let sig = eip191Sign(sk, TestCommitment) + + let recovered = verifyEip191(TamperedCommitment, sig).expect("recoverable") + check recovered != expected + + test "verifyEip191 rejects malformed signature bytes": + let bogus = newSeq[byte](65) + let res = verifyEip191(TestCommitment, bogus) + check res.isErr + +suite "RLN gifter request codec": + test "round-trips all fields when populated": + let req = RlnGifterRequest( + requestId: "req-1", + authenticationType: bytesOf("eth-allowlist"), + authenticationPayload: @[byte 0xde, 0xad, 0xbe, 0xef], + identityCommitment: TestCommitment, + rateLimit: some(42'u64), + ) + let decoded = RlnGifterRequest.decode(req.encode().buffer).expect("decodes") + check: + decoded.requestId == "req-1" + decoded.authenticationType == bytesOf("eth-allowlist") + decoded.authenticationPayload == @[byte 0xde, 0xad, 0xbe, 0xef] + decoded.identityCommitment == TestCommitment + decoded.rateLimit == some(42'u64) + + test "rate_limit is optional on the wire": + let req = RlnGifterRequest( + requestId: "req-2", + authenticationType: @[], + authenticationPayload: @[], + identityCommitment: TestCommitment, + rateLimit: none(uint64), + ) + let decoded = RlnGifterRequest.decode(req.encode().buffer).expect("decodes") + check: + decoded.requestId == "req-2" + decoded.identityCommitment == TestCommitment + decoded.rateLimit.isNone + +suite "RLN gifter response codec": + test "encodes/decodes a success result": + let resp = RlnGifterResponse( + requestId: "req-1", + authSuccess: true, + error: none(string), + success: some(MembershipAllocationSuccess( + leafIndex: 7'u64, + merkleRoot: @[byte 0x01, 0x02, 0x03], + blockNumber: 42'u64, + transactionHash: @[byte 0xaa, 0xbb], + configAccountId: some("acct-123"), + )), + failure: none(MembershipAllocationFailure), + ) + let decoded = RlnGifterResponse.decode(resp.encode().buffer).expect("decodes") + check: + decoded.requestId == "req-1" + decoded.authSuccess == true + decoded.success.isSome + decoded.success.get().leafIndex == 7'u64 + decoded.success.get().merkleRoot == @[byte 0x01, 0x02, 0x03] + decoded.success.get().blockNumber == 42'u64 + decoded.success.get().transactionHash == @[byte 0xaa, 0xbb] + decoded.success.get().configAccountId == some("acct-123") + decoded.failure.isNone + + test "encodes/decodes a failure result": + let resp = RlnGifterResponse( + requestId: "req-2", + authSuccess: false, + error: some("address not allowlisted"), + success: none(MembershipAllocationSuccess), + failure: some(MembershipAllocationFailure(errorMessage: "address not allowlisted")), + ) + let decoded = RlnGifterResponse.decode(resp.encode().buffer).expect("decodes") + check: + decoded.requestId == "req-2" + decoded.authSuccess == false + decoded.error == some("address not allowlisted") + decoded.failure.isSome + decoded.failure.get().errorMessage == "address not allowlisted" + decoded.success.isNone diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index aebfa1f8d..561a838bd 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -654,6 +654,45 @@ with the drawback of consuming some more bandwidth.""", name: "mix-disable-spam-protection" .}: bool + mixOnchainLEZ* {. + desc: "Use on-chain LEZ (LSSA sequencer) for mix RLN spam protection instead of off-chain keystores.", + defaultValue: false, + name: "mix-onchain-lez" + .}: bool + + mixGifterService* {. + desc: "Run as RLN gifter service for mix nodes.", + defaultValue: false, + name: "mix-gifter-service" + .}: bool + + mixGifterWalletAccount* {. + desc: "Wallet account ID for RLN gifter registration payments.", + defaultValue: "", + name: "mix-gifter-wallet-account" + .}: string + + mixGifterNode* {. + desc: "Multiaddress of the RLN gifter node (for client auto-registration).", + defaultValue: "", + name: "mix-gifter-node" + .}: string + + mixGifterAllowlist* {. + desc: + "Comma-separated 0x-prefixed Ethereum addresses allowed to redeem an RLN membership via the gifter (one-shot per address). Empty disables auth.", + defaultValue: "", + name: "mix-gifter-allowlist" + .}: string + + mixGifterAuthKey* {. + desc: + "64-hex-char secp256k1 private key used to sign gifter-membership requests (EIP-191 over the idCommitment hex). Empty disables signing.", + defaultValue: "", + name: "mix-gifter-auth-key" + .}: string + + # Kademlia Discovery config enableKadDiscovery* {. desc: @@ -1089,6 +1128,12 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.mixConf.withEnabled(n.mix) b.mixConf.withMixNodes(n.mixnodes) b.withMix(n.mix) + b.mixConf.withUseOnchainLEZ(n.mixOnchainLEZ) + b.mixConf.withGifterService(n.mixGifterService) + b.mixConf.withGifterWalletAccount(n.mixGifterWalletAccount) + b.mixConf.withGifterNode(n.mixGifterNode) + b.mixConf.withGifterAllowlist(n.mixGifterAllowlist) + b.mixConf.withGifterAuthKey(n.mixGifterAuthKey) if n.mixkey.isSome(): b.mixConf.withMixKey(n.mixkey.get()) if n.mixUserMessageLimit.isSome():