mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-26 11:29:28 +00:00
feat(mix,rln-gifter): LEZ-backed RLN mix + 2-phase gifter protocol (rebased onto PR #3807; plugin via nimble)
Ports commit 14562878 onto PR #3807's logos_delivery/ layout. Changes vs original 14562878: - Plugin consumed via nimble dep (no submodule add); the PR #3807 nimble resolution already provides it. - mountMix signature carries both PR #3807's disableSpamProtection AND the patch's useOnchainLEZ parameters (callsite passes both). - WakuMix.new branches on disableSpamProtection while also setting useOnchainLEZ on the spam-protection config. Cover-traffic params come from the plugin config when RLN is enabled, fall back to waku defaults when disabled. - start() retains HEAD's split between local-only init and the separate registerDoSProtectionWithNetwork retry loop. publishGossipsubTrigger is promoted to a standalone proc instead of inline in an else branch. - MixProtocol.init uses libp2p_mix's new Opt-wrapped coverTraffic and the MixRlnSpamProtection.new constructor (not the renamed newMixRlnSpamProtection). - Makefile reverted to HEAD: PR #3807 dropped the separate mix-librln archive in favor of a single stateless zerokit, so the patch's MIX_LIBRLN_* additions are obsolete. - README/sim helper config files: HEAD's values retained (env-specific paths from patch author dropped). Adds: - logos_delivery/waku/waku_mix/logos_core_client.nim (RLN client layer) - logos_delivery/waku/waku_mix/coordination handler (via waku_node) - logos_delivery/waku/waku_rln_relay/rln_gifter/{client,protocol,rpc,rpc_codec}.nim - 5 new C FFI exports (logosdelivery_set_rln_fetcher, _set_rln_config, _set_rln_identity, _push_roots, _push_proof) - --mix-gifter-* CLI flags + MixConf fields + MixConfBuilder methods - tests/waku_rln_relay/test_rln_gifter.nim
This commit is contained in:
parent
e47641f111
commit
b40ee69e0f
@ -5,6 +5,9 @@ if defined(release):
|
||||
else:
|
||||
switch("nimcache", "nimcache/debug/$projectName")
|
||||
|
||||
# Add mix-rln-spam-protection-plugin to search path (all platforms)
|
||||
switch("path", "./vendor/mix-rln-spam-protection-plugin/src")
|
||||
|
||||
if defined(windows):
|
||||
switch("passL", "rln.lib")
|
||||
switch("define", "postgres=false")
|
||||
|
||||
46
flake.lock
generated
46
flake.lock
generated
@ -20,7 +20,8 @@
|
||||
"inputs": {
|
||||
"nixpkgs": "nixpkgs",
|
||||
"rust-overlay": "rust-overlay",
|
||||
"zerokit": "zerokit"
|
||||
"zerokit": "zerokit",
|
||||
"zerokitMix": "zerokitMix"
|
||||
}
|
||||
},
|
||||
"rust-overlay": {
|
||||
@ -64,6 +65,27 @@
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"rust-overlay_3": {
|
||||
"inputs": {
|
||||
"nixpkgs": [
|
||||
"zerokitMix",
|
||||
"nixpkgs"
|
||||
]
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1771297684,
|
||||
"narHash": "sha256-wieWskQxZLPlNXX06JEB0bMoS/ZYQ89xBzF0RL9lyLs=",
|
||||
"owner": "oxalica",
|
||||
"repo": "rust-overlay",
|
||||
"rev": "755d3669699a7c62aef35af187d75dc2728cfd85",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "oxalica",
|
||||
"repo": "rust-overlay",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"zerokit": {
|
||||
"inputs": {
|
||||
"nixpkgs": [
|
||||
@ -85,6 +107,28 @@
|
||||
"rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"zerokitMix": {
|
||||
"inputs": {
|
||||
"nixpkgs": [
|
||||
"nixpkgs"
|
||||
],
|
||||
"rust-overlay": "rust-overlay_3"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1773917887,
|
||||
"narHash": "sha256-5a2cL26uw7NdLCD0gCA3tS7uX8W9yxRGqcPhWNevstM=",
|
||||
"ref": "refs/tags/v2.0.0",
|
||||
"rev": "0ef1364c86c9c2c441284389c46b0b461feab893",
|
||||
"revCount": 371,
|
||||
"type": "git",
|
||||
"url": "https://github.com/vacp2p/zerokit"
|
||||
},
|
||||
"original": {
|
||||
"ref": "refs/tags/v2.0.0",
|
||||
"type": "git",
|
||||
"url": "https://github.com/vacp2p/zerokit"
|
||||
}
|
||||
}
|
||||
},
|
||||
"root": "root",
|
||||
|
||||
25
flake.nix
25
flake.nix
@ -24,9 +24,15 @@
|
||||
url = "github:vacp2p/zerokit/5e64cb8822bee65eed6cf459f95ae72b80c6ba63";
|
||||
inputs.nixpkgs.follows = "nixpkgs";
|
||||
};
|
||||
# Mix RLN spam protection requires a separate, newer zerokit (v2.0.0).
|
||||
# Pinned by tag to match what scripts/build_rln_mix.sh uses by default.
|
||||
zerokitMix = {
|
||||
url = "git+https://github.com/vacp2p/zerokit?ref=refs/tags/v2.0.0";
|
||||
inputs.nixpkgs.follows = "nixpkgs";
|
||||
};
|
||||
};
|
||||
|
||||
outputs = { self, nixpkgs, rust-overlay, zerokit }:
|
||||
outputs = { self, nixpkgs, rust-overlay, zerokit, zerokitMix }:
|
||||
let
|
||||
systems = [
|
||||
"x86_64-linux" "aarch64-linux"
|
||||
@ -71,13 +77,26 @@
|
||||
let
|
||||
pkgs = pkgsFor system;
|
||||
|
||||
zerokitRln = import ./nix/zerokit.nix { inherit zerokit system; };
|
||||
zerokitRln = import ./nix/zerokit.nix {
|
||||
inherit zerokit system;
|
||||
vendorHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU=";
|
||||
};
|
||||
zerokitMixRln = import ./nix/zerokit.nix {
|
||||
zerokit = zerokitMix;
|
||||
inherit system;
|
||||
vendorHash = "sha256-SoMl0QBBgTG1b4UhOlErlzWmg3J6G0xOC0tNDddOptA=";
|
||||
};
|
||||
|
||||
liblogosdelivery = pkgs.callPackage ./nix/default.nix {
|
||||
inherit pkgs;
|
||||
src = ./.;
|
||||
inherit zerokitRln;
|
||||
inherit zerokitRln zerokitMixRln;
|
||||
gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}";
|
||||
# The .lgx wrapper does not put its variant dir on dyld's search
|
||||
# path, so nim's runtime dlopen("libpq.dylib") from db_postgres
|
||||
# fails even though libpq.dylib ships in the bundle. Disable
|
||||
# postgres so the dylib has no runtime libpq dependency.
|
||||
enablePostgres = false;
|
||||
};
|
||||
|
||||
wakucanary = pkgs.callPackage ./nix/default.nix {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import ffi
|
||||
import std/locks
|
||||
import logos_delivery/waku/factory/waku
|
||||
import logos_delivery/waku/waku_mix/logos_core_client as mix_rln_client
|
||||
|
||||
declareLibrary("logosdelivery")
|
||||
|
||||
@ -31,3 +32,47 @@ proc logosdelivery_set_event_callback(
|
||||
|
||||
ctx[].eventCallback = cast[pointer](callback)
|
||||
ctx[].eventUserData = userData
|
||||
|
||||
proc logosdelivery_init(): cint {.dynlib, exportc, cdecl.} =
|
||||
initializeLibrary()
|
||||
# Default log level is configured at compile time via chronicles_log_level;
|
||||
# the runtime setLogLevel resolution conflicts with std/termios's two-arg
|
||||
# template under ff8d518 — leave the compile-time default in effect.
|
||||
return RET_OK
|
||||
|
||||
proc logosdelivery_set_rln_fetcher(
|
||||
ctx: ptr FFIContext[Waku], fetcher: mix_rln_client.RlnFetcherFunc, fetcherData: pointer
|
||||
) {.dynlib, exportc, cdecl.} =
|
||||
if fetcher.isNil:
|
||||
echo "error: nil fetcher in logosdelivery_set_rln_fetcher"
|
||||
return
|
||||
mix_rln_client.setRlnFetcher(fetcher, fetcherData)
|
||||
|
||||
proc logosdelivery_set_rln_config(
|
||||
ctx: ptr FFIContext[Waku], configAccountId: cstring, leafIndex: cint
|
||||
): cint {.dynlib, exportc, cdecl.} =
|
||||
if configAccountId.isNil:
|
||||
return RET_ERR
|
||||
mix_rln_client.setRlnConfig($configAccountId, leafIndex.int)
|
||||
return RET_OK
|
||||
|
||||
proc logosdelivery_set_rln_identity(
|
||||
ctx: ptr FFIContext[Waku], idSecretHashHex: cstring
|
||||
) {.dynlib, exportc, cdecl.} =
|
||||
if idSecretHashHex.isNil:
|
||||
return
|
||||
mix_rln_client.setRlnIdentity($idSecretHashHex)
|
||||
|
||||
proc logosdelivery_push_roots(
|
||||
ctx: ptr FFIContext[Waku], rootsJson: cstring
|
||||
) {.dynlib, exportc, cdecl.} =
|
||||
if rootsJson.isNil:
|
||||
return
|
||||
mix_rln_client.pushRoots($rootsJson)
|
||||
|
||||
proc logosdelivery_push_proof(
|
||||
ctx: ptr FFIContext[Waku], proofJson: cstring
|
||||
) {.dynlib, exportc, cdecl.} =
|
||||
if proofJson.isNil:
|
||||
return
|
||||
mix_rln_client.pushProof($proofJson)
|
||||
|
||||
@ -93,6 +93,18 @@ extern "C"
|
||||
FFICallBack callback,
|
||||
void *userData);
|
||||
|
||||
int logosdelivery_init(void);
|
||||
|
||||
// RLN fetcher: C++ implements this, Nim calls it to get roots/proofs from RLN module.
|
||||
typedef int (*RlnFetcherFunc)(const char *method, const char *params,
|
||||
FFICallBack callback, void *callbackData, void *fetcherData);
|
||||
|
||||
void logosdelivery_set_rln_fetcher(void *ctx, RlnFetcherFunc fetcher, void *fetcherData);
|
||||
int logosdelivery_set_rln_config(void *ctx, const char *configAccountId, int leafIndex);
|
||||
void logosdelivery_set_rln_identity(void *ctx, const char *idSecretHashHex);
|
||||
void logosdelivery_push_roots(void *ctx, const char *rootsJson);
|
||||
void logosdelivery_push_proof(void *ctx, const char *proofJson);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -11,6 +11,8 @@
|
||||
"../vendor/nim-ffi"
|
||||
--path:
|
||||
"../"
|
||||
--path:
|
||||
"../vendor/mix-rln-spam-protection-plugin/src"
|
||||
|
||||
# Optimization and debugging
|
||||
--opt:
|
||||
|
||||
@ -14,6 +14,12 @@ type MixConfBuilder* = object
|
||||
mixNodes: seq[MixNodePubInfo]
|
||||
userMessageLimit: Option[int]
|
||||
disableSpamProtection: bool
|
||||
useOnchainLEZ: bool
|
||||
gifterService: bool
|
||||
gifterWalletAccount: string
|
||||
gifterNode: string
|
||||
gifterAllowlist: string
|
||||
gifterAuthKey: string
|
||||
|
||||
proc init*(T: type MixConfBuilder): MixConfBuilder =
|
||||
MixConfBuilder()
|
||||
@ -33,6 +39,24 @@ proc withUserMessageLimit*(b: var MixConfBuilder, limit: int) =
|
||||
proc withDisableSpamProtection*(b: var MixConfBuilder, disable: bool) =
|
||||
b.disableSpamProtection = disable
|
||||
|
||||
proc withUseOnchainLEZ*(b: var MixConfBuilder, use: bool) =
|
||||
b.useOnchainLEZ = use
|
||||
|
||||
proc withGifterService*(b: var MixConfBuilder, enabled: bool) =
|
||||
b.gifterService = enabled
|
||||
|
||||
proc withGifterWalletAccount*(b: var MixConfBuilder, account: string) =
|
||||
b.gifterWalletAccount = account
|
||||
|
||||
proc withGifterNode*(b: var MixConfBuilder, node: string) =
|
||||
b.gifterNode = node
|
||||
|
||||
proc withGifterAllowlist*(b: var MixConfBuilder, allowlist: string) =
|
||||
b.gifterAllowlist = allowlist
|
||||
|
||||
proc withGifterAuthKey*(b: var MixConfBuilder, authKey: string) =
|
||||
b.gifterAuthKey = authKey
|
||||
|
||||
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||
if not b.enabled.get(false):
|
||||
return ok(none[MixConf]())
|
||||
@ -48,6 +72,12 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||
mixNodes: b.mixNodes,
|
||||
userMessageLimit: b.userMessageLimit,
|
||||
disableSpamProtection: b.disableSpamProtection,
|
||||
useOnchainLEZ: b.useOnchainLEZ,
|
||||
gifterService: b.gifterService,
|
||||
gifterWalletAccount: b.gifterWalletAccount,
|
||||
gifterNode: b.gifterNode,
|
||||
gifterAllowlist: b.gifterAllowlist,
|
||||
gifterAuthKey: b.gifterAuthKey,
|
||||
)
|
||||
)
|
||||
)
|
||||
@ -62,6 +92,12 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
|
||||
mixNodes: b.mixNodes,
|
||||
userMessageLimit: b.userMessageLimit,
|
||||
disableSpamProtection: b.disableSpamProtection,
|
||||
useOnchainLEZ: b.useOnchainLEZ,
|
||||
gifterService: b.gifterService,
|
||||
gifterWalletAccount: b.gifterWalletAccount,
|
||||
gifterNode: b.gifterNode,
|
||||
gifterAllowlist: b.gifterAllowlist,
|
||||
gifterAuthKey: b.gifterAuthKey,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import
|
||||
std/[options, sequtils],
|
||||
std/[options, os, sequtils, json, strutils, sets],
|
||||
eth/common/[addresses, keys],
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/peerid,
|
||||
@ -23,6 +24,12 @@ import
|
||||
../waku_core,
|
||||
../waku_core/codecs,
|
||||
../waku_rln_relay,
|
||||
../waku_mix/logos_core_client as mix_lez_client,
|
||||
../waku_mix/protocol as mix_protocol,
|
||||
mix_rln_spam_protection/onchain_group_manager,
|
||||
mix_rln_spam_protection/rln_interface as mix_rln_interface,
|
||||
../waku_rln_relay/rln_gifter/protocol as rln_gifter_protocol,
|
||||
../waku_rln_relay/rln_gifter/client as rln_gifter_client,
|
||||
../discovery/waku_dnsdisc,
|
||||
../waku_archive/retention_policy as policy,
|
||||
../waku_archive/retention_policy/builder as policy_builder,
|
||||
@ -169,10 +176,235 @@ proc setupProtocols(
|
||||
await node.mountMix(
|
||||
conf.clusterId, mixConf.mixKey, mixConf.mixnodes, mixConf.userMessageLimit,
|
||||
mixConf.disableSpamProtection,
|
||||
useOnchainLEZ = mixConf.useOnchainLEZ,
|
||||
)
|
||||
).isOkOr:
|
||||
return err("failed to mount waku mix protocol: " & $error)
|
||||
|
||||
# Wire OnchainLEZGroupManager to the LEZ RLN module via the fetcher
|
||||
# callback bridge (setRlnConfig is called later by the C++ plugin).
|
||||
if mixConf.useOnchainLEZ and not node.wakuMix.isNil:
|
||||
let gm = node.wakuMix.mixRlnSpamProtection.groupManager
|
||||
if gm of OnchainLEZGroupManager:
|
||||
let lezGm = OnchainLEZGroupManager(gm)
|
||||
# Adapt logos_core_client callbacks to OnchainLEZGroupManager types
|
||||
let clientFetchRoots = mix_lez_client.makeFetchLatestRoots()
|
||||
let clientFetchProof = mix_lez_client.makeFetchMerkleProof()
|
||||
|
||||
let fetchRoots: onchain_group_manager.FetchRootsCallback = clientFetchRoots
|
||||
let fetchProof: onchain_group_manager.FetchProofCallback = clientFetchProof
|
||||
lezGm.setFetchCallbacks(fetchRoots, fetchProof)
|
||||
mix_lez_client.setGroupManagerRef(lezGm)
|
||||
info "Wired LEZ callbacks for mix RLN spam protection"
|
||||
|
||||
# Mount RLN gifter server if configured
|
||||
if mixConf.gifterService:
|
||||
let walletAccount = mixConf.gifterWalletAccount
|
||||
|
||||
# All gifter-initiated registrations funnel through a single
|
||||
# serialization worker so the gifter's wallet never has two
|
||||
# unconfirmed txns outstanding from the same signer — concurrent
|
||||
# submissions otherwise silently fail at the sequencer with
|
||||
# "Nonce mismatch" because the wallet refetches the chain nonce
|
||||
# per call and several submissions read the same value before
|
||||
# any commits. See cleanup/MODE_A_GIFTER_SLOT_BUG.md.
|
||||
#
|
||||
# The libp2p handler enqueues a job with an optimistic
|
||||
# leaf_index taken from a local counter, returns immediately
|
||||
# (so the stream doesn't time out), and the worker drains the
|
||||
# queue one tx at a time. Clients reconcile the authoritative
|
||||
# leaf via the existing status RPC + watcher.
|
||||
type GifterJob = ref object
|
||||
identityCommitment: seq[byte]
|
||||
rateLimit: uint64
|
||||
assigned: uint64
|
||||
|
||||
let gifterQueue = newAsyncQueue[GifterJob]()
|
||||
var gifterNextLeaf: uint64 = 0
|
||||
|
||||
proc gifterSubmitOnce(
|
||||
idc: seq[byte], rateLimit: uint64
|
||||
): Result[uint64, string] {.gcsafe.} =
|
||||
let (configAccount, _) = mix_lez_client.getRlnConfig()
|
||||
if configAccount.len == 0:
|
||||
return err("RLN config not set on gifter node")
|
||||
let holdingAccount =
|
||||
if walletAccount.len > 0: walletAccount else: configAccount
|
||||
var idCommitmentHex = newStringOfCap(idc.len * 2)
|
||||
for b in idc:
|
||||
idCommitmentHex.add(toHex(int(b), 2))
|
||||
let params =
|
||||
"{\"configAccountId\":\"" & configAccount &
|
||||
"\",\"userHoldingAccountId\":\"" & holdingAccount &
|
||||
"\",\"idCommitment\":\"" & idCommitmentHex &
|
||||
"\",\"rateLimit\":" & $rateLimit & "}"
|
||||
let regResult =
|
||||
mix_lez_client.callRlnFetcher("register_member", params)
|
||||
if regResult.isErr:
|
||||
return err(regResult.error)
|
||||
try:
|
||||
let parsed = parseJson(regResult.get())
|
||||
if parsed.hasKey("error"):
|
||||
return err(parsed["error"].getStr("register_member failed"))
|
||||
return ok(parsed["leaf_index"].getInt().uint64)
|
||||
except CatchableError as e:
|
||||
return err("failed to parse register_member result: " & e.msg)
|
||||
|
||||
proc waitForChainCommit(
|
||||
idc: seq[byte], deadlineMs: int
|
||||
): Future[Result[uint64, string]] {.async, gcsafe.} =
|
||||
## Poll is_member_registered until the just-submitted tx
|
||||
## commits. Without this gate, the worker's next iteration
|
||||
## would race the wallet nonce-refetch against the previous
|
||||
## tx's block inclusion and submit two txns with the same
|
||||
## nonce.
|
||||
const pollMs = 2_000
|
||||
let (configAccount, _) = mix_lez_client.getRlnConfig()
|
||||
if configAccount.len == 0:
|
||||
return err("RLN config not set")
|
||||
var idHex = newStringOfCap(idc.len * 2)
|
||||
for b in idc:
|
||||
idHex.add(toHex(int(b), 2))
|
||||
let params =
|
||||
"{\"configAccountId\":\"" & configAccount &
|
||||
"\",\"idCommitment\":\"" & idHex & "\"}"
|
||||
let deadline = Moment.now() + chronos.milliseconds(deadlineMs)
|
||||
while Moment.now() < deadline:
|
||||
await sleepAsync(chronos.milliseconds(pollMs))
|
||||
let raw = mix_lez_client.callRlnFetcher(
|
||||
"is_member_registered", params)
|
||||
if raw.isErr: continue
|
||||
try:
|
||||
let parsed = parseJson(raw.get())
|
||||
if parsed.hasKey("registered") and
|
||||
parsed["registered"].getBool() and
|
||||
parsed.hasKey("leaf_index"):
|
||||
return ok(parsed["leaf_index"].getInt().uint64)
|
||||
except CatchableError:
|
||||
continue
|
||||
return err("confirmation timeout")
|
||||
|
||||
proc gifterWorker() {.async, gcsafe.} =
|
||||
# Confirmation deadline must cover one block-include cycle plus
|
||||
# propagation lag on the slowest chain we ship against. Testnet
|
||||
# blocks ~60-90s + finality lag → 5min headroom.
|
||||
const confirmDeadlineMs = 300_000
|
||||
while true:
|
||||
let job = await gifterQueue.popFirst()
|
||||
let res = gifterSubmitOnce(job.identityCommitment, job.rateLimit)
|
||||
if res.isErr:
|
||||
error "Gifter worker: submission failed",
|
||||
optimistic = job.assigned, err = res.error
|
||||
continue
|
||||
# Wait until the tx commits before processing the next job
|
||||
# so the wallet's next nonce fetch sees the advanced value.
|
||||
let confRes =
|
||||
await waitForChainCommit(job.identityCommitment, confirmDeadlineMs)
|
||||
if confRes.isErr:
|
||||
warn "Gifter worker: tx did not confirm within deadline",
|
||||
optimistic = job.assigned, err = confRes.error
|
||||
continue
|
||||
let actual = confRes.get()
|
||||
if actual >= gifterNextLeaf:
|
||||
gifterNextLeaf = actual + 1
|
||||
if actual != job.assigned:
|
||||
info "Gifter worker: chain leaf differs from optimistic ack",
|
||||
optimistic = job.assigned, actual = actual
|
||||
else:
|
||||
debug "Gifter worker: submission accepted at expected leaf",
|
||||
leaf = actual
|
||||
|
||||
let registerHandler: rln_gifter_protocol.RegisterMemberHandler =
|
||||
proc(
|
||||
identityCommitment: seq[byte], rateLimit: uint64
|
||||
): Future[Result[rln_gifter_protocol.MembershipAllocationSuccess, string]] {.async, gcsafe.} =
|
||||
let (configAccount, _) = mix_lez_client.getRlnConfig()
|
||||
if configAccount.len == 0:
|
||||
return err("RLN config not set on gifter node")
|
||||
let assigned = gifterNextLeaf
|
||||
gifterNextLeaf += 1
|
||||
let job = GifterJob(
|
||||
identityCommitment: identityCommitment,
|
||||
rateLimit: rateLimit,
|
||||
assigned: assigned,
|
||||
)
|
||||
try:
|
||||
await gifterQueue.addLast(job)
|
||||
except CatchableError as e:
|
||||
return err("failed to enqueue gifter job: " & e.msg)
|
||||
return ok(rln_gifter_protocol.MembershipAllocationSuccess(
|
||||
leafIndex: assigned,
|
||||
merkleRoot: @[],
|
||||
blockNumber: 0'u64,
|
||||
transactionHash: @[],
|
||||
configAccountId: some(configAccount),
|
||||
))
|
||||
|
||||
var auth = none(rln_gifter_protocol.EthAllowlistAuth)
|
||||
if mixConf.gifterAllowlist.len > 0:
|
||||
var addrs: HashSet[Address]
|
||||
for piece in mixConf.gifterAllowlist.split(','):
|
||||
let s = piece.strip()
|
||||
if s.len == 0:
|
||||
continue
|
||||
let parsedAddr =
|
||||
try:
|
||||
Address.fromHex(s)
|
||||
except ValueError as e:
|
||||
return err("invalid gifter allowlist address '" & s & "': " & e.msg)
|
||||
addrs.incl(parsedAddr)
|
||||
if addrs.len > 0:
|
||||
auth = some(rln_gifter_protocol.EthAllowlistAuth(
|
||||
addresses: addrs, consumed: initHashSet[Address]()
|
||||
))
|
||||
info "RLN gifter allowlist auth enabled", count = addrs.len
|
||||
|
||||
let statusHandler: rln_gifter_protocol.MembershipStatusHandler =
|
||||
proc(
|
||||
configAccountId: string, identityCommitment: seq[byte]
|
||||
): Future[Result[rln_gifter_protocol.MembershipStatusResponse, string]]
|
||||
{.async, gcsafe.} =
|
||||
var idHex = newStringOfCap(identityCommitment.len * 2)
|
||||
for b in identityCommitment:
|
||||
idHex.add(toHex(int(b), 2))
|
||||
let params =
|
||||
"{\"configAccountId\":\"" & configAccountId &
|
||||
"\",\"idCommitment\":\"" & idHex & "\"}"
|
||||
let raw = mix_lez_client.callRlnFetcher(
|
||||
"is_member_registered", params)
|
||||
if raw.isErr:
|
||||
return err(raw.error)
|
||||
try:
|
||||
let parsed = parseJson(raw.get())
|
||||
var resp = rln_gifter_protocol.MembershipStatusResponse(
|
||||
registered: false)
|
||||
if parsed.hasKey("registered") and
|
||||
parsed["registered"].getBool():
|
||||
resp.registered = true
|
||||
if parsed.hasKey("leaf_index"):
|
||||
resp.leafIndex = some(parsed["leaf_index"].getInt().uint64)
|
||||
return ok(resp)
|
||||
except CatchableError as e:
|
||||
return err("failed to parse is_member_registered: " & e.msg)
|
||||
|
||||
let gifter = rln_gifter_protocol.WakuRlnGifter.new(
|
||||
node.peerManager, node.rng, registerHandler, auth, statusHandler
|
||||
)
|
||||
node.switch.mount(gifter, protocolMatcher(WakuRlnGifterCodec))
|
||||
node.wakuRlnGifter = gifter
|
||||
let gifterStatus = rln_gifter_protocol.WakuRlnGifterStatus.new(
|
||||
statusHandler
|
||||
)
|
||||
node.switch.mount(
|
||||
gifterStatus, protocolMatcher(WakuRlnGifterStatusCodec))
|
||||
asyncSpawn gifterWorker()
|
||||
info "RLN gifter service mounted for mix",
|
||||
statusCodec = WakuRlnGifterStatusCodec
|
||||
|
||||
# Defer client registration to startNode (needs running switch).
|
||||
if mixConf.gifterNode.len > 0:
|
||||
info "Gifter client mode: registration deferred to startNode()"
|
||||
|
||||
# Setup extended kademlia discovery
|
||||
if conf.kademliaDiscoveryConf.isSome():
|
||||
let mixPubKey =
|
||||
@ -425,6 +657,13 @@ proc startNode*(
|
||||
except CatchableError:
|
||||
return err("failed to start waku node: " & getCurrentExceptionMsg())
|
||||
|
||||
# Start deferred OnchainLEZ poll loop now that the switch is fully started.
|
||||
if conf.mixConf.isSome() and conf.mixConf.get().useOnchainLEZ and
|
||||
not node.wakuMix.isNil():
|
||||
let gm = node.wakuMix.mixRlnSpamProtection.groupManager
|
||||
if gm of OnchainLEZGroupManager:
|
||||
OnchainLEZGroupManager(gm).startPolling()
|
||||
|
||||
# Connect to configured static nodes
|
||||
if conf.staticNodes.len > 0:
|
||||
try:
|
||||
@ -440,6 +679,177 @@ proc startNode*(
|
||||
return
|
||||
err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
|
||||
|
||||
# The gifter is also a mix relay and needs its own membership to forward.
|
||||
# Deferred to startNode so the wallet RPC subprocess is wired first.
|
||||
if conf.mixConf.isSome() and conf.mixConf.get().useOnchainLEZ and
|
||||
conf.mixConf.get().gifterService and not node.wakuMix.isNil() and
|
||||
not node.wakuRlnGifter.isNil():
|
||||
let gm = node.wakuMix.mixRlnSpamProtection.groupManager
|
||||
if gm of OnchainLEZGroupManager:
|
||||
let lezGm = OnchainLEZGroupManager(gm)
|
||||
if lezGm.membershipIndex.isNone:
|
||||
let selfCred =
|
||||
if lezGm.credentials.isSome:
|
||||
lezGm.credentials.get()
|
||||
else:
|
||||
mix_rln_interface.membershipKeyGen().valueOr:
|
||||
return err("failed to generate gifter RLN identity: " & $error)
|
||||
let gifter = node.wakuRlnGifter
|
||||
asyncSpawn (proc(): Future[void] {.async.} =
|
||||
# Handler reads configAccount, which is set by setRlnConfig.
|
||||
while true:
|
||||
let (cfg, _) = mix_lez_client.getRlnConfig()
|
||||
if cfg.len > 0: break
|
||||
await sleepAsync(500.milliseconds)
|
||||
info "Self-registering gifter as mix relay",
|
||||
identityCommitmentLen = selfCred.idCommitment.len
|
||||
try:
|
||||
let selfRes = await gifter.registerHandler(
|
||||
@(selfCred.idCommitment), uint64(lezGm.userMessageLimit)
|
||||
)
|
||||
if selfRes.isErr:
|
||||
warn "Gifter self-registration failed", error = selfRes.error
|
||||
return
|
||||
let success = selfRes.get()
|
||||
if success.configAccountId.isNone:
|
||||
warn "Gifter self-registration response missing configAccountId"
|
||||
return
|
||||
let configAccount = success.configAccountId.get()
|
||||
lezGm.credentials = some(selfCred)
|
||||
lezGm.membershipIndex =
|
||||
some(onchain_group_manager.MembershipIndex(success.leafIndex))
|
||||
mix_lez_client.setRlnConfig(configAccount, success.leafIndex.int)
|
||||
info "Gifter self-registered as mix relay",
|
||||
leafIndex = success.leafIndex,
|
||||
configAccount = configAccount
|
||||
# The gifter's own handler now returns an optimistic leaf
|
||||
# (queued submission); poll the status RPC in-process to
|
||||
# reconcile if the chain assigned a different leaf.
|
||||
let watcherLezGm = lezGm
|
||||
let watcherConfigAccount = configAccount
|
||||
let watcherIdc = @(selfCred.idCommitment)
|
||||
let optimisticLeaf = success.leafIndex
|
||||
asyncSpawn (proc(): Future[void] {.async.} =
|
||||
const selfPollMs = 5_000
|
||||
const selfDeadlineMs = 1_800_000
|
||||
let deadline = Moment.now() +
|
||||
chronos.milliseconds(selfDeadlineMs)
|
||||
while Moment.now() < deadline:
|
||||
await sleepAsync(chronos.milliseconds(selfPollMs))
|
||||
let qr =
|
||||
try:
|
||||
await gifter.statusHandler(
|
||||
watcherConfigAccount, watcherIdc)
|
||||
except CatchableError:
|
||||
continue
|
||||
if qr.isErr: continue
|
||||
let resp = qr.get()
|
||||
if not resp.registered: continue
|
||||
if resp.leafIndex.isNone: continue
|
||||
let authLeaf = resp.leafIndex.get()
|
||||
if some(onchain_group_manager.MembershipIndex(authLeaf)) !=
|
||||
watcherLezGm.membershipIndex:
|
||||
info "Gifter self-reg leaf corrected from optimistic",
|
||||
optimistic = optimisticLeaf, authoritative = authLeaf
|
||||
watcherLezGm.membershipIndex =
|
||||
some(onchain_group_manager.MembershipIndex(authLeaf))
|
||||
mix_lez_client.setRlnConfig(
|
||||
watcherConfigAccount, authLeaf.int)
|
||||
else:
|
||||
info "Gifter self-reg confirmed on-chain",
|
||||
leafIndex = authLeaf
|
||||
watcherLezGm.markMembershipConfirmed()
|
||||
return
|
||||
warn "Gifter self-reg confirmation timed out",
|
||||
optimisticLeaf = optimisticLeaf
|
||||
)()
|
||||
except CatchableError as e:
|
||||
warn "Gifter self-registration exception", error = e.msg
|
||||
)()
|
||||
|
||||
# RLN gifter client registration — runs after switch start so the gifter peer is reachable.
|
||||
if conf.mixConf.isSome() and conf.mixConf.get().useOnchainLEZ and
|
||||
conf.mixConf.get().gifterNode.len > 0 and not node.wakuMix.isNil():
|
||||
let mixConf = conf.mixConf.get()
|
||||
let gm = node.wakuMix.mixRlnSpamProtection.groupManager
|
||||
if gm of OnchainLEZGroupManager:
|
||||
let lezGm = OnchainLEZGroupManager(gm)
|
||||
let gifterClient = rln_gifter_client.WakuRlnGifterClient.new(
|
||||
node.peerManager, node.rng
|
||||
)
|
||||
let gifterPeer = parsePeerInfo(mixConf.gifterNode).valueOr:
|
||||
return err("failed to parse gifter peer: " & error)
|
||||
node.peerManager.addServicePeer(gifterPeer, WakuRlnGifterCodec)
|
||||
|
||||
# Use keystore credentials if available, otherwise generate new ones
|
||||
let idCred =
|
||||
if lezGm.credentials.isSome:
|
||||
lezGm.credentials.get()
|
||||
else:
|
||||
mix_rln_interface.membershipKeyGen().valueOr:
|
||||
return err("failed to generate RLN identity: " & $error)
|
||||
let idCommitmentBytes = @(idCred.idCommitment)
|
||||
|
||||
info "Registering via RLN gifter",
|
||||
gifterPeer = mixConf.gifterNode,
|
||||
identityCommitmentLen = idCommitmentBytes.len,
|
||||
fromKeystore = lezGm.credentials.isSome
|
||||
|
||||
var authType: seq[byte]
|
||||
var authPayload: seq[byte]
|
||||
if mixConf.gifterAuthKey.len > 0:
|
||||
let seckey = PrivateKey.fromHex(mixConf.gifterAuthKey).valueOr:
|
||||
return err("invalid mix-gifter-auth-key: " & $error)
|
||||
let sig = seckey.sign(rln_gifter_protocol.eip191Message(idCommitmentBytes))
|
||||
authPayload = @(sig.toRaw())
|
||||
for c in rln_gifter_protocol.EthAllowlistAuthType:
|
||||
authType.add(byte(c))
|
||||
info "Signing gifter request with EIP-191 auth key",
|
||||
signer = seckey.toPublicKey().to(Address).to0xHex()
|
||||
|
||||
var success: rln_gifter_protocol.MembershipAllocationSuccess
|
||||
try:
|
||||
let res = await gifterClient.requestMembership(
|
||||
idCommitmentBytes,
|
||||
some(uint64(lezGm.userMessageLimit)),
|
||||
gifterPeer,
|
||||
authType,
|
||||
authPayload,
|
||||
)
|
||||
if res.isErr:
|
||||
return err("failed to register via gifter: " & res.error)
|
||||
success = res.get()
|
||||
except CatchableError:
|
||||
return err("gifter registration exception: " & getCurrentExceptionMsg())
|
||||
|
||||
let configAccountId = success.configAccountId.valueOr:
|
||||
return err("gifter response missing configAccountId extension")
|
||||
|
||||
lezGm.credentials = some(idCred)
|
||||
lezGm.membershipIndex = some(onchain_group_manager.MembershipIndex(success.leafIndex))
|
||||
mix_lez_client.setRlnConfig(configAccountId, success.leafIndex.int)
|
||||
|
||||
info "Registered via RLN gifter",
|
||||
leafIndex = success.leafIndex,
|
||||
configAccount = configAccountId
|
||||
|
||||
# Correct the optimistic leaf via the status codec if a concurrent
|
||||
# registration tx beat ours to the slot. Self-verify drops bad proofs
|
||||
# until the poll loop picks up the corrected witness.
|
||||
let watcherLezGm = lezGm
|
||||
let watcherConfigAccount = configAccountId
|
||||
asyncSpawn gifterClient.watchMembershipConfirmation(
|
||||
gifterPeer, configAccountId, idCommitmentBytes, success.leafIndex,
|
||||
"Mix-node",
|
||||
proc(authLeaf: uint64) {.gcsafe, raises: [].} =
|
||||
if some(onchain_group_manager.MembershipIndex(authLeaf)) !=
|
||||
watcherLezGm.membershipIndex:
|
||||
watcherLezGm.membershipIndex =
|
||||
some(onchain_group_manager.MembershipIndex(authLeaf))
|
||||
mix_lez_client.setRlnConfig(watcherConfigAccount, authLeaf.int)
|
||||
watcherLezGm.markMembershipConfirmed(),
|
||||
)
|
||||
|
||||
# retrieve px peers and add the to the peer store
|
||||
if conf.remotePeerExchangeNode.isSome():
|
||||
var desiredOutDegree = DefaultPXNumPeersReq
|
||||
@ -464,6 +874,16 @@ proc startNode*(
|
||||
(await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr:
|
||||
return err("failed to start kademlia discovery: " & error)
|
||||
|
||||
# Re-publish gossipsub trigger after switch + kademlia are up. The dummy
|
||||
# publish in WakuMix.start() fires too early in LEZ mode (0 peers on topic),
|
||||
# so SUBSCRIBE messages never propagate without this second publish.
|
||||
if conf.mixConf.isSome() and conf.mixConf.get().useOnchainLEZ and
|
||||
not node.wakuMix.isNil():
|
||||
try:
|
||||
await node.wakuMix.publishGossipsubTrigger()
|
||||
except CatchableError:
|
||||
warn "gossipsub trigger publish failed", error = getCurrentExceptionMsg()
|
||||
|
||||
return ok()
|
||||
|
||||
proc setupNode*(
|
||||
|
||||
@ -53,6 +53,12 @@ type MixConf* = ref object
|
||||
mixnodes*: seq[MixNodePubInfo]
|
||||
userMessageLimit*: Option[int]
|
||||
disableSpamProtection*: bool
|
||||
useOnchainLEZ*: bool
|
||||
gifterService*: bool
|
||||
gifterWalletAccount*: string
|
||||
gifterNode*: string
|
||||
gifterAllowlist*: string
|
||||
gifterAuthKey*: string
|
||||
|
||||
type KademliaDiscoveryConf* = object
|
||||
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
|
||||
|
||||
@ -15,6 +15,7 @@ import
|
||||
waku_filter_v2/common as filter_common,
|
||||
waku_filter_v2/client as filter_client,
|
||||
waku_filter_v2/protocol as filter_protocol,
|
||||
waku_mix/protocol as mix_protocol,
|
||||
events/health_events,
|
||||
events/message_events,
|
||||
events/peer_events,
|
||||
|
||||
@ -41,6 +41,7 @@ import
|
||||
waku_store_sync,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client as filter_client,
|
||||
waku_filter_v2/common as filter_common,
|
||||
waku_metadata,
|
||||
waku_rendezvous/protocol,
|
||||
waku_rendezvous/client as rendezvous_client,
|
||||
@ -52,6 +53,7 @@ import
|
||||
waku_enr,
|
||||
waku_peer_exchange,
|
||||
waku_rln_relay,
|
||||
waku_rln_relay/rln_gifter/protocol as rln_gifter_protocol,
|
||||
common/option_shims,
|
||||
common/rate_limit/setting,
|
||||
common/callbacks,
|
||||
@ -134,6 +136,12 @@ type
|
||||
## Kernel API Relay appHandlers (if any)
|
||||
subscriptionManager*: SubscriptionManager
|
||||
wakuMix*: WakuMix
|
||||
wakuRlnGifter*: rln_gifter_protocol.WakuRlnGifter
|
||||
## Set when gifterService=true. Exposed so startNode can invoke its
|
||||
## registerHandler for the gifter's self-registration.
|
||||
edgeTopicsHealth*: Table[PubsubTopic, TopicHealth]
|
||||
edgeHealthEvent*: AsyncEvent
|
||||
edgeHealthLoop: Future[void]
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
wakuKademlia*: WakuKademlia
|
||||
ports*: BoundPorts
|
||||
@ -328,6 +336,7 @@ proc mountMix*(
|
||||
mixnodes: seq[MixNodePubInfo],
|
||||
userMessageLimit: Option[int] = none(int),
|
||||
disableSpamProtection: bool = false,
|
||||
useOnchainLEZ: bool = false,
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
info "mounting mix protocol", nodeId = node.info #TODO log the config used
|
||||
|
||||
@ -360,7 +369,7 @@ proc mountMix*(
|
||||
|
||||
node.wakuMix = WakuMix.new(
|
||||
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes, publishMessage,
|
||||
userMessageLimit, disableSpamProtection,
|
||||
userMessageLimit, disableSpamProtection, useOnchainLEZ,
|
||||
).valueOr:
|
||||
error "Waku Mix protocol initialization failed", err = error
|
||||
return
|
||||
@ -528,6 +537,46 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string]
|
||||
|
||||
return ok()
|
||||
|
||||
const EdgeTopicHealthyThreshold = 2
|
||||
|
||||
proc calculateEdgeTopicHealth(node: WakuNode, shard: PubsubTopic): TopicHealth =
|
||||
let filterPeers =
|
||||
node.peerManager.getPeersForShard(filter_common.WakuFilterSubscribeCodec, shard)
|
||||
let lightpushPeers =
|
||||
node.peerManager.getPeersForShard(lightpush_protocol.WakuLightPushCodec, shard)
|
||||
|
||||
if filterPeers >= EdgeTopicHealthyThreshold and
|
||||
lightpushPeers >= EdgeTopicHealthyThreshold:
|
||||
return TopicHealth.SUFFICIENTLY_HEALTHY
|
||||
elif filterPeers > 0 and lightpushPeers > 0:
|
||||
return TopicHealth.MINIMALLY_HEALTHY
|
||||
|
||||
return TopicHealth.UNHEALTHY
|
||||
|
||||
proc loopEdgeHealth(node: WakuNode) {.async.} =
|
||||
while node.started:
|
||||
await node.edgeHealthEvent.wait()
|
||||
node.edgeHealthEvent.clear()
|
||||
|
||||
try:
|
||||
for shard in node.edgeTopicsHealth.keys:
|
||||
if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard):
|
||||
continue
|
||||
|
||||
let oldHealth = node.edgeTopicsHealth.getOrDefault(shard, TopicHealth.UNHEALTHY)
|
||||
let newHealth = node.calculateEdgeTopicHealth(shard)
|
||||
if newHealth != oldHealth:
|
||||
node.edgeTopicsHealth[shard] = newHealth
|
||||
EventShardTopicHealthChange.emit(node.brokerCtx, shard, newHealth)
|
||||
except CancelledError:
|
||||
break
|
||||
except CatchableError as e:
|
||||
# KEEP: async health loop must survive transient peer-manager errors.
|
||||
warn "Error in edge health check", error = e.msg
|
||||
|
||||
# safety cooldown to protect from edge cases
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
proc startProvidersAndListeners*(node: WakuNode) =
|
||||
RequestRelayShard.setProvider(
|
||||
node.brokerCtx,
|
||||
@ -616,6 +665,16 @@ proc start*(node: WakuNode) {.async.} =
|
||||
if isBindIpWithZeroPort(address):
|
||||
zeroPortPresent = true
|
||||
|
||||
# Perform relay-specific startup tasks TODO: this should be rethought
|
||||
if not node.wakuRelay.isNil():
|
||||
await node.wakuRelay.start()
|
||||
|
||||
if not node.wakuMix.isNil():
|
||||
await node.wakuMix.start()
|
||||
|
||||
if not node.wakuMetadata.isNil():
|
||||
await node.wakuMetadata.start()
|
||||
|
||||
if not node.wakuStoreResume.isNil():
|
||||
await node.wakuStoreResume.start()
|
||||
|
||||
@ -683,6 +742,7 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
try:
|
||||
await node.wakuRlnRelay.stop() ## this can raise an exception
|
||||
except Exception:
|
||||
# KEEP: shutdown path; continue tearing down other subsystems regardless.
|
||||
error "exception stopping the node", error = getCurrentExceptionMsg()
|
||||
|
||||
if not node.wakuArchive.isNil():
|
||||
|
||||
@ -10,3 +10,8 @@ const
|
||||
WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
|
||||
WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"
|
||||
WakuRendezVousCodec* = "/vac/waku/rendezvous/1.0.0"
|
||||
WakuRlnGifterCodec* = "/logos/rln/membership/1.0.0"
|
||||
# Separate codec so clients can poll registration status on short-lived
|
||||
# streams instead of holding the original register stream open past the
|
||||
# libp2p timeout.
|
||||
WakuRlnGifterStatusCodec* = "/logos/rln/membership/status/1.0.0"
|
||||
|
||||
395
logos_delivery/waku/waku_mix/logos_core_client.nim
Normal file
395
logos_delivery/waku/waku_mix/logos_core_client.nim
Normal file
@ -0,0 +1,395 @@
|
||||
{.push raises: [].}
|
||||
|
||||
## Mix RLN client: fetches roots/proofs from logos-core via the C++ RLN module.
|
||||
## The C++ delivery module registers an RLN fetcher at startup; event-push
|
||||
## caching avoids round-trips on the hot path.
|
||||
|
||||
import std/[json, strutils, locks, algorithm, options]
|
||||
import chronos, chronos/threadsync
|
||||
import results
|
||||
import chronicles
|
||||
import mix_rln_spam_protection/group_manager {.all.}
|
||||
import mix_rln_spam_protection/types {.all.}
|
||||
import mix_rln_spam_protection/rln_interface
|
||||
import mix_rln_spam_protection/onchain_group_manager
|
||||
|
||||
export onchain_group_manager.ExternalMerkleProof
|
||||
|
||||
logScope:
|
||||
topics = "waku mix rln-lez-client"
|
||||
|
||||
type
|
||||
FetchLatestRootsCallback* =
|
||||
proc(): Future[Result[seq[MerkleNode], string]] {.gcsafe, raises: [].}
|
||||
FetchMerkleProofCallback* =
|
||||
proc(index: MembershipIndex): Future[Result[ExternalMerkleProof, string]] {.gcsafe, raises: [].}
|
||||
|
||||
type
|
||||
RlnFetchCallback* = proc(callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer) {.cdecl, gcsafe, raises: [].}
|
||||
RlnFetcherFunc* = proc(
|
||||
methodName: cstring, params: cstring,
|
||||
callback: RlnFetchCallback,
|
||||
callbackData: pointer, fetcherData: pointer
|
||||
): cint {.cdecl, gcsafe, raises: [].}
|
||||
|
||||
const RLN_CONFIG_ACCOUNT_ID_CAP = 64
|
||||
## Max bytes for the base58-encoded config account ID (real values ≤44
|
||||
## chars). Sized as a value-type buffer so cross-thread access doesn't
|
||||
## go through Nim's heap/GC — see rlnConfigAccountIdBuf below.
|
||||
|
||||
var
|
||||
rlnFetcherLock: Lock
|
||||
rlnFetcher: RlnFetcherFunc
|
||||
rlnFetcherData: pointer
|
||||
## Fixed-size byte buffer + length, not a Nim string: the FFI worker
|
||||
## thread reads concurrently with main-thread writes, and a heap string
|
||||
## here SIGSEGVs under testnet load (GC collects the old value mid-read
|
||||
## during the 200-500ms HTTPS fetchRoots awaits).
|
||||
rlnConfigAccountIdBuf: array[RLN_CONFIG_ACCOUNT_ID_CAP, char]
|
||||
rlnConfigAccountIdLen: int = 0
|
||||
rlnLeafIndex: int = -1
|
||||
rlnIdentitySecretHash: string
|
||||
rlnGroupManager: pointer
|
||||
## OnchainLEZGroupManager ref erased to `pointer` so this lock-protected
|
||||
## global is not tracked by Nim's GC: setGroupManagerRef may be invoked
|
||||
## from a thread distinct from the one that later runs setRlnIdentity,
|
||||
## and storing a ref in a cross-thread global risks GC interference.
|
||||
## Cast back to OnchainLEZGroupManager in setRlnIdentity to attach credentials.
|
||||
cachedRootsJson: string
|
||||
cachedProofJson: string
|
||||
|
||||
rlnFetcherLock.initLock()
|
||||
|
||||
proc setRlnFetcher*(fetcher: RlnFetcherFunc, fetcherData: pointer) {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
rlnFetcher = fetcher
|
||||
rlnFetcherData = fetcherData
|
||||
rlnFetcherLock.release()
|
||||
|
||||
proc setRlnConfig*(configAccountId: string, leafIndex: int) {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
let n = min(configAccountId.len, RLN_CONFIG_ACCOUNT_ID_CAP)
|
||||
for i in 0 ..< n:
|
||||
rlnConfigAccountIdBuf[i] = configAccountId[i]
|
||||
rlnConfigAccountIdLen = n
|
||||
rlnLeafIndex = leafIndex
|
||||
rlnFetcherLock.release()
|
||||
|
||||
proc setGroupManagerRef*(lezGm: OnchainLEZGroupManager) {.gcsafe.} =
|
||||
## Store the group manager so setRlnIdentity can attach credentials later.
|
||||
## Erases the typed ref to `pointer` internally so the lock-protected global
|
||||
## stays outside Nim's cross-thread GC tracking (see rlnGroupManager).
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
rlnGroupManager = cast[pointer](lezGm)
|
||||
rlnFetcherLock.release()
|
||||
|
||||
proc setRlnIdentity*(idSecretHashHex: string) {.gcsafe.} =
|
||||
## Regenerate the full credential via membershipKeyGen(seed=idSecretHash)
|
||||
## and attach it to the group manager. Called by the C++ plugin after
|
||||
## selfRegisterRln completes.
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
rlnIdentitySecretHash = idSecretHashHex
|
||||
let gm = rlnGroupManager
|
||||
let leafIdx = rlnLeafIndex
|
||||
rlnFetcherLock.release()
|
||||
|
||||
trace "Set mix RLN identity", hashPrefix = idSecretHashHex[0 .. min(7, idSecretHashHex.len - 1)]
|
||||
|
||||
if not gm.isNil and idSecretHashHex.len == 64:
|
||||
var seedBytes: seq[byte]
|
||||
for i in 0 ..< 32:
|
||||
try:
|
||||
seedBytes.add(byte(parseHexInt(idSecretHashHex[i * 2 .. i * 2 + 1])))
|
||||
except ValueError:
|
||||
warn "Invalid hex in identity hash"
|
||||
return
|
||||
|
||||
# Generate full credential using the same seed that selfRegisterRln used
|
||||
# via generate_identity. membershipKeyGen(seed) produces deterministic output.
|
||||
let cred = membershipKeyGen(seedBytes).valueOr:
|
||||
warn "Failed to regenerate full credential from seed", error = $error
|
||||
return
|
||||
|
||||
let gmRef = cast[OnchainLEZGroupManager](gm)
|
||||
gmRef.credentials = some(cred)
|
||||
if leafIdx >= 0:
|
||||
gmRef.membershipIndex = some(types.MembershipIndex(leafIdx))
|
||||
info "Set full RLN identity on group manager",
|
||||
leafIndex = leafIdx,
|
||||
commitment = cred.idCommitment[0 .. 7].toHex() & "..."
|
||||
|
||||
proc getRlnIdentity*(): string {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
result = rlnIdentitySecretHash
|
||||
rlnFetcherLock.release()
|
||||
|
||||
proc getRlnConfig*(): (string, int) {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
let n = rlnConfigAccountIdLen
|
||||
let leafIdx = rlnLeafIndex
|
||||
var s = newString(n)
|
||||
for i in 0 ..< n:
|
||||
s[i] = rlnConfigAccountIdBuf[i]
|
||||
rlnFetcherLock.release()
|
||||
result = (s, leafIdx)
|
||||
|
||||
proc pushRoots*(rootsJson: string) {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
cachedRootsJson = rootsJson
|
||||
rlnFetcherLock.release()
|
||||
trace "Received roots via event push", len = rootsJson.len
|
||||
|
||||
proc pushProof*(proofJson: string) {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
cachedProofJson = proofJson
|
||||
rlnFetcherLock.release()
|
||||
trace "Received proof via event push", len = proofJson.len
|
||||
|
||||
proc getCachedRoots(): string {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
result = cachedRootsJson
|
||||
rlnFetcherLock.release()
|
||||
|
||||
proc getCachedProof(): string {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
result = cachedProofJson
|
||||
rlnFetcherLock.release()
|
||||
|
||||
type FetchResult = object
|
||||
json: string
|
||||
errMsg: string
|
||||
success: bool
|
||||
|
||||
proc callRlnFetcher*(methodName: string, params: string): Result[string, string] {.gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
let fetcher = rlnFetcher
|
||||
let data = rlnFetcherData
|
||||
rlnFetcherLock.release()
|
||||
|
||||
if fetcher.isNil:
|
||||
return err("RLN fetcher not registered")
|
||||
|
||||
var fetchResult: FetchResult
|
||||
|
||||
let cb: RlnFetchCallback = proc(callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer) {.cdecl, gcsafe, raises: [].} =
|
||||
let res = cast[ptr FetchResult](userData)
|
||||
if callerRet == 0 and not msg.isNil and len > 0:
|
||||
res[].json = newString(len.int)
|
||||
copyMem(addr res[].json[0], msg, len.int)
|
||||
res[].success = true
|
||||
elif not msg.isNil and len > 0:
|
||||
res[].errMsg = newString(len.int)
|
||||
copyMem(addr res[].errMsg[0], msg, len.int)
|
||||
res[].success = false
|
||||
else:
|
||||
res[].success = (callerRet == 0)
|
||||
|
||||
let ret = fetcher(methodName.cstring, params.cstring, cb, addr fetchResult, data)
|
||||
if ret != 0 or not fetchResult.success:
|
||||
if fetchResult.errMsg.len > 0:
|
||||
return err(fetchResult.errMsg)
|
||||
return err("RLN fetcher returned error code: " & $ret)
|
||||
if fetchResult.json.len == 0:
|
||||
return err("RLN fetcher returned empty response")
|
||||
return ok(fetchResult.json)
|
||||
|
||||
type ThreadArgs = object
|
||||
fetcher: RlnFetcherFunc
|
||||
fetcherData: pointer
|
||||
methodBuf: cstring
|
||||
paramsBuf: cstring
|
||||
res: ptr FetchResult
|
||||
sig: ThreadSignalPtr
|
||||
|
||||
proc fetcherThreadBody(args: ThreadArgs) {.thread.} =
|
||||
let cb: RlnFetchCallback = proc(callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer) {.cdecl, gcsafe, raises: [].} =
|
||||
let r = cast[ptr FetchResult](userData)
|
||||
if callerRet == 0 and not msg.isNil and len > 0:
|
||||
r[].json = newString(len.int)
|
||||
copyMem(addr r[].json[0], msg, len.int)
|
||||
r[].success = true
|
||||
elif not msg.isNil and len > 0:
|
||||
r[].errMsg = newString(len.int)
|
||||
copyMem(addr r[].errMsg[0], msg, len.int)
|
||||
r[].success = false
|
||||
else:
|
||||
r[].success = (callerRet == 0)
|
||||
|
||||
discard args.fetcher(args.methodBuf, args.paramsBuf, cb, args.res, args.fetcherData)
|
||||
discard args.sig.fireSync()
|
||||
|
||||
proc callRlnFetcherAsync*(methodName: string, params: string): Future[Result[string, string]] {.async.} =
|
||||
## Runs the fetcher on a dedicated thread so HTTPS blocking calls don't
|
||||
## stall the chronos event loop.
|
||||
{.gcsafe.}:
|
||||
rlnFetcherLock.acquire()
|
||||
let fetcher = rlnFetcher
|
||||
let data = rlnFetcherData
|
||||
rlnFetcherLock.release()
|
||||
|
||||
if fetcher.isNil:
|
||||
return err("RLN fetcher not registered")
|
||||
|
||||
let signal = ThreadSignalPtr.new().valueOr:
|
||||
return err("failed to create thread signal")
|
||||
defer:
|
||||
discard signal.close()
|
||||
|
||||
# Stable cross-thread copies — `methodName`/`params` live on the GC heap.
|
||||
# IMPORTANT: guard the unsafeAddr deref — `unsafeAddr s[0]` on an empty
|
||||
# Nim string is UB (the backing buffer can be nil), and SIGSEGVs at the
|
||||
# polling layer (e.g. get_valid_roots with no config yet). allocShared0
|
||||
# already zero-fills, so the copy is safely no-op when len is 0.
|
||||
var methodCopy = allocShared0(methodName.len + 1)
|
||||
var paramsCopy = allocShared0(params.len + 1)
|
||||
if methodName.len > 0:
|
||||
copyMem(methodCopy, unsafeAddr methodName[0], methodName.len)
|
||||
if params.len > 0:
|
||||
copyMem(paramsCopy, unsafeAddr params[0], params.len)
|
||||
defer:
|
||||
deallocShared(methodCopy)
|
||||
deallocShared(paramsCopy)
|
||||
|
||||
var fetchRes: FetchResult
|
||||
var thread: Thread[ThreadArgs]
|
||||
|
||||
createThread(thread, fetcherThreadBody,
|
||||
ThreadArgs(
|
||||
fetcher: fetcher,
|
||||
fetcherData: data,
|
||||
methodBuf: cast[cstring](methodCopy),
|
||||
paramsBuf: cast[cstring](paramsCopy),
|
||||
res: addr fetchRes,
|
||||
sig: signal,
|
||||
))
|
||||
|
||||
await signal.wait()
|
||||
joinThread(thread)
|
||||
|
||||
if not fetchRes.success:
|
||||
if fetchRes.errMsg.len > 0:
|
||||
return err(fetchRes.errMsg)
|
||||
return err("RLN fetcher async call failed")
|
||||
if fetchRes.json.len == 0:
|
||||
return err("RLN fetcher returned empty response")
|
||||
return ok(fetchRes.json)
|
||||
|
||||
proc hexToBytes32(hex: string): Result[array[32, byte], string] =
|
||||
var h = hex
|
||||
if h.startsWith("0x") or h.startsWith("0X"):
|
||||
h = h[2 .. ^1]
|
||||
if h.len != 64:
|
||||
return err("Expected 64 hex chars, got " & $h.len)
|
||||
var output: array[32, byte]
|
||||
for i in 0 ..< 32:
|
||||
try:
|
||||
output[i] = byte(parseHexInt(h[i * 2 .. i * 2 + 1]))
|
||||
except ValueError:
|
||||
return err("Invalid hex at position " & $i)
|
||||
ok(output)
|
||||
|
||||
proc hexToBytes32LE(hex: string): Result[array[32, byte], string] =
|
||||
## Parse hex string to bytes in little-endian order (for zerokit field elements).
|
||||
## LEZ/Ethereum return big-endian hex; zerokit expects LE internally.
|
||||
var res = hexToBytes32(hex).valueOr:
|
||||
return err(error)
|
||||
var reversed: array[32, byte]
|
||||
for i in 0 ..< 32:
|
||||
reversed[i] = res[31 - i]
|
||||
ok(reversed)
|
||||
|
||||
proc parseRootsJson*(snapshot: string): Result[seq[MerkleNode], string] =
|
||||
if snapshot.len == 0:
|
||||
return err("No roots data")
|
||||
try:
|
||||
let parsed = parseJson(snapshot)
|
||||
var roots: seq[MerkleNode]
|
||||
for elem in parsed:
|
||||
let root = hexToBytes32(elem.getStr()).valueOr:
|
||||
return err("Invalid root hex: " & error)
|
||||
roots.add(MerkleNode(root))
|
||||
return ok(roots)
|
||||
except CatchableError as e:
|
||||
return err("Failed to parse roots: " & e.msg)
|
||||
|
||||
proc parseExternalProof(snapshot: string): Result[ExternalMerkleProof, string] =
|
||||
if snapshot.len == 0:
|
||||
return err("No merkle proof data")
|
||||
try:
|
||||
let parsed = parseJson(snapshot)
|
||||
let root = hexToBytes32(parsed["root"].getStr()).valueOr:
|
||||
return err("Invalid root hex: " & error)
|
||||
var pathElements: seq[byte]
|
||||
for elem in parsed["path_elements"]:
|
||||
let elemBytes = hexToBytes32(elem.getStr()).valueOr:
|
||||
return err("Invalid pathElement hex: " & error)
|
||||
for b in elemBytes:
|
||||
pathElements.add(b)
|
||||
var identityPathIndex: seq[byte]
|
||||
for idx in parsed["path_indices"]:
|
||||
identityPathIndex.add(byte(idx.getInt()))
|
||||
var validRoots: seq[MerkleNode]
|
||||
if parsed.hasKey("valid_roots"):
|
||||
for r in parsed["valid_roots"]:
|
||||
let rb = hexToBytes32(r.getStr()).valueOr:
|
||||
continue
|
||||
validRoots.add(MerkleNode(rb))
|
||||
ok(ExternalMerkleProof(
|
||||
pathElements: pathElements,
|
||||
identityPathIndex: identityPathIndex,
|
||||
root: MerkleNode(root),
|
||||
validRoots: validRoots,
|
||||
))
|
||||
except CatchableError as e:
|
||||
err("Failed to parse proof: " & e.msg)
|
||||
|
||||
proc makeFetchLatestRoots*(): FetchLatestRootsCallback =
|
||||
return proc(): Future[Result[seq[MerkleNode], string]] {.async, gcsafe, raises: [].} =
|
||||
let cached = getCachedRoots()
|
||||
if cached.len > 0:
|
||||
let res = parseRootsJson(cached)
|
||||
if res.isOk:
|
||||
trace "Using cached roots from event push", count = res.get().len
|
||||
return res
|
||||
let (configAccount, _) = getRlnConfig()
|
||||
if configAccount.len == 0:
|
||||
return err("RLN config not set")
|
||||
let rootsJson = callRlnFetcher("get_valid_roots", configAccount)
|
||||
if rootsJson.isErr:
|
||||
return err(rootsJson.error)
|
||||
let res = parseRootsJson(rootsJson.get())
|
||||
if res.isOk:
|
||||
trace "Fetched roots from RLN module via fetcher", count = res.get().len
|
||||
return res
|
||||
|
||||
proc makeFetchMerkleProof*(): FetchMerkleProofCallback =
|
||||
return proc(
|
||||
index: MembershipIndex
|
||||
): Future[Result[ExternalMerkleProof, string]] {.async, gcsafe, raises: [].} =
|
||||
let cached = getCachedProof()
|
||||
if cached.len > 0:
|
||||
let res = parseExternalProof(cached)
|
||||
if res.isOk:
|
||||
trace "Using cached proof from event push", index = index
|
||||
return res
|
||||
let (configAccount, _) = getRlnConfig()
|
||||
if configAccount.len == 0:
|
||||
return err("RLN config not set")
|
||||
let params = configAccount & "," & $index
|
||||
let proofJson = callRlnFetcher("get_merkle_proofs", params)
|
||||
if proofJson.isErr:
|
||||
return err(proofJson.error)
|
||||
return parseExternalProof(proofJson.get())
|
||||
|
||||
{.pop.}
|
||||
@ -110,6 +110,7 @@ proc new*(
|
||||
publishMessage: PublishMessage,
|
||||
userMessageLimit: Option[int] = none(int),
|
||||
disableSpamProtection: bool = false,
|
||||
useOnchainLEZ: bool = false,
|
||||
): WakuMixResult[T] =
|
||||
let mixPubKey = public(mixPrivKey)
|
||||
trace "mixPubKey", mixPubKey = mixPubKey
|
||||
@ -128,9 +129,12 @@ proc new*(
|
||||
|
||||
var spamProtectionOpt = default(Opt[SpamProtection])
|
||||
if not disableSpamProtection:
|
||||
# Initialize spam protection with persistent credentials
|
||||
# Initialize spam protection with persistent credentials. In LEZ mode,
|
||||
# roots/proofs are fetched from the on-chain tree via the RLN module;
|
||||
# only the identity is local.
|
||||
let peerId = peermgr.switch.peerInfo.peerId
|
||||
var spamProtectionConfig = defaultConfig()
|
||||
spamProtectionConfig.useOnchainLEZ = useOnchainLEZ
|
||||
spamProtectionConfig.keystorePath = "rln_keystore_" & $peerId & ".json"
|
||||
spamProtectionConfig.keystorePassword = "mix-rln-password"
|
||||
if userMessageLimit.isSome():
|
||||
@ -385,6 +389,24 @@ proc registerDoSProtectionWithNetwork*(mix: WakuMix) =
|
||||
return
|
||||
mix.dosRegistrationTask = mix.dosRegistrationRetryLoop()
|
||||
|
||||
proc publishGossipsubTrigger*(mix: WakuMix) {.async.} =
|
||||
## Publish dummy messages to spam protection topics to trigger gossipsub
|
||||
## SUBSCRIBE flow and kademlia peer exchange. Call AFTER the node is fully
|
||||
## started (switch running, peers connected) so the messages actually reach
|
||||
## gossipsub mesh peers. In off-chain mode the side effect of registerSelf()
|
||||
## broadcasting already triggers SUBSCRIBE; in LEZ mode registerSelf() doesn't
|
||||
## fire (membership comes from on-chain) so we publish explicitly.
|
||||
if mix.publishMessage.isNil or mix.mixRlnSpamProtection.isNil:
|
||||
return
|
||||
let spTopics = mix.getSpamProtectionContentTopics()
|
||||
for ct in spTopics:
|
||||
let msg = WakuMessage(contentTopic: ct, payload: @[byte(0)])
|
||||
let pubRes = await mix.publishMessage(msg)
|
||||
if pubRes.isErr:
|
||||
debug "gossipsub trigger failed (expected if no peers yet)", contentTopic = ct
|
||||
else:
|
||||
info "Published gossipsub trigger (post-start)", contentTopic = ct
|
||||
|
||||
method stop*(mix: WakuMix) {.async.} =
|
||||
# Cancel the in-flight DoS-protection registration retry loop, if any
|
||||
if not mix.dosRegistrationTask.isNil and not mix.dosRegistrationTask.finished:
|
||||
|
||||
176
logos_delivery/waku/waku_rln_relay/rln_gifter/client.nim
Normal file
176
logos_delivery/waku/waku_rln_relay/rln_gifter/client.nim
Normal file
@ -0,0 +1,176 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options, results, chronicles, chronos, bearssl/rand
|
||||
import libp2p/stream/connection
|
||||
import
|
||||
../../node/peer_manager,
|
||||
../../waku_core,
|
||||
../../utils/requests,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
|
||||
logScope:
|
||||
topics = "waku rln-gifter client"
|
||||
|
||||
type
|
||||
RlnGifterResult* = Result[MembershipAllocationSuccess, string]
|
||||
|
||||
WakuRlnGifterClient* = ref object
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
|
||||
proc new*(
|
||||
T: type WakuRlnGifterClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
|
||||
): T =
|
||||
WakuRlnGifterClient(peerManager: peerManager, rng: rng)
|
||||
|
||||
proc requestMembership*(
|
||||
wc: WakuRlnGifterClient,
|
||||
identityCommitment: seq[byte],
|
||||
rateLimit: Option[uint64],
|
||||
peer: RemotePeerInfo,
|
||||
authenticationType: seq[byte] = @[],
|
||||
authenticationPayload: seq[byte] = @[],
|
||||
): Future[RlnGifterResult] {.async.} =
|
||||
let request = RlnGifterRequest(
|
||||
requestId: generateRequestId(wc.rng),
|
||||
authenticationType: authenticationType,
|
||||
authenticationPayload: authenticationPayload,
|
||||
identityCommitment: identityCommitment,
|
||||
rateLimit: rateLimit,
|
||||
)
|
||||
|
||||
info "requesting RLN membership from gifter",
|
||||
requestId = request.requestId,
|
||||
identityCommitmentLen = identityCommitment.len
|
||||
|
||||
# Retry dial with backoff (gifter node may still be initializing)
|
||||
var connection: Connection
|
||||
var dialAttempts = 0
|
||||
while true:
|
||||
let connOpt = await wc.peerManager.dialPeer(peer, WakuRlnGifterCodec)
|
||||
if connOpt.isSome:
|
||||
connection = connOpt.get()
|
||||
break
|
||||
dialAttempts += 1
|
||||
if dialAttempts >= 5:
|
||||
return err("failed to dial gifter peer after " & $dialAttempts & " attempts")
|
||||
warn "gifter dial failed, retrying", attempt = dialAttempts
|
||||
await sleepAsync(seconds(5))
|
||||
|
||||
try:
|
||||
await connection.writeLP(request.encode().buffer)
|
||||
except LPStreamError:
|
||||
return err("failed to write request: " & getCurrentExceptionMsg())
|
||||
|
||||
var buffer: seq[byte]
|
||||
try:
|
||||
buffer = await connection.readLp(DefaultMaxRpcSize)
|
||||
except LPStreamError:
|
||||
return err("failed to read response: " & getCurrentExceptionMsg())
|
||||
|
||||
# Do NOT close the connection here. Let it leak and be cleaned up by GC.
|
||||
# Calling closeWithEOF triggers yamux cleanup that crashes the delivery module
|
||||
# process when the FFI boundary returns to C++ before yamux completes.
|
||||
|
||||
let response = RlnGifterResponse.decode(buffer).valueOr:
|
||||
return err("failed to decode response: " & $error)
|
||||
|
||||
if response.requestId != request.requestId:
|
||||
return err("requestId mismatch")
|
||||
|
||||
if not response.authSuccess:
|
||||
let desc = response.error.get(
|
||||
if response.failure.isSome: response.failure.get().errorMessage
|
||||
else: "authentication failed"
|
||||
)
|
||||
return err("authentication failed: " & desc)
|
||||
|
||||
if response.failure.isSome:
|
||||
return err("registration failed: " & response.failure.get().errorMessage)
|
||||
|
||||
let success = response.success.valueOr:
|
||||
return err("response missing success/failure result")
|
||||
|
||||
info "RLN membership granted", leafIndex = success.leafIndex
|
||||
|
||||
return ok(success)
|
||||
|
||||
proc queryMembershipStatus*(
|
||||
wc: WakuRlnGifterClient,
|
||||
peer: RemotePeerInfo,
|
||||
configAccountId: string,
|
||||
identityCommitment: seq[byte],
|
||||
): Future[Result[MembershipStatusResponse, string]] {.async.} =
|
||||
## Short-lived RPC: each call opens its own stream so pollers don't hold
|
||||
## a stream open past the libp2p timeout.
|
||||
let connOpt = await wc.peerManager.dialPeer(peer, WakuRlnGifterStatusCodec)
|
||||
if connOpt.isNone:
|
||||
return err("failed to dial gifter status peer")
|
||||
let connection = connOpt.get()
|
||||
|
||||
let req = MembershipStatusRequest(
|
||||
configAccountId: configAccountId,
|
||||
identityCommitment: identityCommitment,
|
||||
)
|
||||
try:
|
||||
await connection.writeLP(req.encode().buffer)
|
||||
except LPStreamError:
|
||||
return err("failed to write status request: " & getCurrentExceptionMsg())
|
||||
|
||||
var buffer: seq[byte]
|
||||
try:
|
||||
buffer = await connection.readLp(DefaultMaxRpcSize)
|
||||
except LPStreamError:
|
||||
return err("failed to read status response: " & getCurrentExceptionMsg())
|
||||
|
||||
let resp = MembershipStatusResponse.decode(buffer).valueOr:
|
||||
return err("failed to decode status response: " & $error)
|
||||
|
||||
return ok(resp)
|
||||
|
||||
proc watchMembershipConfirmation*(
|
||||
wc: WakuRlnGifterClient,
|
||||
peer: RemotePeerInfo,
|
||||
configAccountId: string,
|
||||
identityCommitment: seq[byte],
|
||||
optimisticLeaf: uint64,
|
||||
label: string,
|
||||
onConfirmed: proc(authLeaf: uint64) {.gcsafe, raises: [].},
|
||||
): Future[void] {.async.} =
|
||||
## Poll the status codec until the membership PDA exists, then call
|
||||
## `onConfirmed` with the authoritative leaf — whether or not it differs
|
||||
## from `optimisticLeaf`. Returns on first confirmation or after the
|
||||
## deadline elapses.
|
||||
const pollEveryMs = 30_000
|
||||
const deadlineMs = 1_800_000
|
||||
let deadline = Moment.now() + chronos.milliseconds(deadlineMs)
|
||||
while Moment.now() < deadline:
|
||||
try:
|
||||
await sleepAsync(chronos.milliseconds(pollEveryMs))
|
||||
except CancelledError:
|
||||
return
|
||||
let qr =
|
||||
try:
|
||||
await wc.queryMembershipStatus(peer, configAccountId, identityCommitment)
|
||||
except CancelledError:
|
||||
return
|
||||
except CatchableError as e:
|
||||
Result[MembershipStatusResponse, string].err(
|
||||
"queryMembershipStatus raised: " & e.msg)
|
||||
if qr.isErr: continue
|
||||
let resp = qr.get()
|
||||
if resp.errorMessage.isSome: continue
|
||||
if not resp.registered: continue
|
||||
if resp.leafIndex.isSome:
|
||||
let authLeaf = resp.leafIndex.get()
|
||||
if authLeaf != optimisticLeaf:
|
||||
info "membership leaf corrected from optimistic",
|
||||
label = label, optimistic = optimisticLeaf, authoritative = authLeaf
|
||||
else:
|
||||
info "membership confirmed on-chain",
|
||||
label = label, leafIndex = authLeaf
|
||||
onConfirmed(authLeaf)
|
||||
return
|
||||
warn "membership confirmation timed out",
|
||||
label = label, optimisticLeaf = optimisticLeaf
|
||||
249
logos_delivery/waku/waku_rln_relay/rln_gifter/protocol.nim
Normal file
249
logos_delivery/waku/waku_rln_relay/rln_gifter/protocol.nim
Normal file
@ -0,0 +1,249 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sets],
|
||||
results,
|
||||
chronicles,
|
||||
chronos,
|
||||
bearssl/rand,
|
||||
eth/common/[addresses, keys]
|
||||
import
|
||||
../../node/peer_manager/peer_manager,
|
||||
../../waku_core,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
export rpc
|
||||
|
||||
logScope:
|
||||
topics = "waku rln-gifter"
|
||||
|
||||
type
|
||||
RegisterMemberHandler* = proc(
|
||||
identityCommitment: seq[byte], rateLimit: uint64
|
||||
): Future[Result[MembershipAllocationSuccess, string]] {.async, gcsafe.}
|
||||
|
||||
MembershipStatusHandler* = proc(
|
||||
configAccountId: string, identityCommitment: seq[byte]
|
||||
): Future[Result[MembershipStatusResponse, string]] {.async, gcsafe.}
|
||||
|
||||
EthAllowlistAuth* = ref object
|
||||
addresses*: HashSet[Address]
|
||||
consumed*: HashSet[Address]
|
||||
|
||||
WakuRlnGifter* = ref object of LPProtocol
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
registerHandler*: RegisterMemberHandler
|
||||
statusHandler*: MembershipStatusHandler
|
||||
auth*: Option[EthAllowlistAuth]
|
||||
|
||||
WakuRlnGifterStatus* = ref object of LPProtocol
|
||||
statusHandler*: MembershipStatusHandler
|
||||
|
||||
proc toHexLower(b: openArray[byte]): string =
|
||||
result = newStringOfCap(b.len * 2)
|
||||
const digits = "0123456789abcdef"
|
||||
for x in b:
|
||||
result.add(digits[int(x shr 4)])
|
||||
result.add(digits[int(x and 0x0f)])
|
||||
|
||||
proc eip191Message*(idCommitment: openArray[byte]): seq[byte] =
|
||||
## The EIP-191 personal_sign envelope wraps the lowercase hex representation
|
||||
## of the 32-byte identity commitment. Hex is used (rather than raw bytes)
|
||||
## so the signed message is human-readable in wallets that surface it.
|
||||
let hex = toHexLower(idCommitment)
|
||||
let prefix = "\x19Ethereum Signed Message:\n" & $hex.len
|
||||
result = newSeqOfCap[byte](prefix.len + hex.len)
|
||||
for c in prefix:
|
||||
result.add(byte(c))
|
||||
for c in hex:
|
||||
result.add(byte(c))
|
||||
|
||||
proc verifyEip191*(
|
||||
idCommitment: openArray[byte], sigBytes: openArray[byte]
|
||||
): Result[Address, string] =
|
||||
if sigBytes.len != 65:
|
||||
return err("signature must be 65 bytes, got " & $sigBytes.len)
|
||||
let sig = Signature.fromRaw(sigBytes).valueOr:
|
||||
return err("invalid signature encoding: " & $error)
|
||||
let pub = sig.recover(eip191Message(idCommitment)).valueOr:
|
||||
return err("signature recovery failed: " & $error)
|
||||
ok(pub.to(Address))
|
||||
|
||||
proc failureResponse(
|
||||
requestId: string, authSuccess: bool, message: string
|
||||
): RlnGifterResponse =
|
||||
RlnGifterResponse(
|
||||
requestId: requestId,
|
||||
authSuccess: authSuccess,
|
||||
error: some(message),
|
||||
failure: some(MembershipAllocationFailure(errorMessage: message)),
|
||||
)
|
||||
|
||||
proc handleRequest(
|
||||
wg: WakuRlnGifter, peerId: PeerId, buffer: seq[byte]
|
||||
): Future[RlnGifterResponse] {.async.} =
|
||||
let request = RlnGifterRequest.decode(buffer).valueOr:
|
||||
error "failed to decode RLN gifter request", error = $error
|
||||
return failureResponse("N/A", false, "decode error: " & $error)
|
||||
|
||||
info "handling RLN gifter request",
|
||||
peerId = peerId,
|
||||
requestId = request.requestId,
|
||||
identityCommitment = toHexLower(request.identityCommitment)[0 .. min(15, request.identityCommitment.len * 2 - 1)] & "..."
|
||||
|
||||
if request.identityCommitment.len != 32:
|
||||
return failureResponse(
|
||||
request.requestId, true, "identity_commitment must be 32 bytes"
|
||||
)
|
||||
|
||||
var authorizedSigner: Option[Address]
|
||||
if wg.auth.isSome:
|
||||
let auth = wg.auth.get()
|
||||
let authType =
|
||||
block:
|
||||
var s = newStringOfCap(request.authenticationType.len)
|
||||
for b in request.authenticationType: s.add(char(b))
|
||||
s
|
||||
if authType != EthAllowlistAuthType:
|
||||
return failureResponse(
|
||||
request.requestId, false,
|
||||
"unsupported authentication_type: '" & authType & "'",
|
||||
)
|
||||
if request.authenticationPayload.len == 0:
|
||||
return failureResponse(
|
||||
request.requestId, false, "missing authentication_payload"
|
||||
)
|
||||
let signer = verifyEip191(
|
||||
request.identityCommitment, request.authenticationPayload
|
||||
).valueOr:
|
||||
return failureResponse(
|
||||
request.requestId, false, "signature verification failed: " & error
|
||||
)
|
||||
if signer notin auth.addresses:
|
||||
return failureResponse(
|
||||
request.requestId, false, "address not allowlisted: " & signer.to0xHex()
|
||||
)
|
||||
if signer in auth.consumed:
|
||||
return failureResponse(
|
||||
request.requestId, false, "address already used: " & signer.to0xHex()
|
||||
)
|
||||
authorizedSigner = some(signer)
|
||||
|
||||
let effectiveRateLimit = request.rateLimit.get(100'u64)
|
||||
let success = (await wg.registerHandler(request.identityCommitment, effectiveRateLimit)).valueOr:
|
||||
error "RLN gifter registration failed", error = error
|
||||
return RlnGifterResponse(
|
||||
requestId: request.requestId,
|
||||
authSuccess: true,
|
||||
failure: some(MembershipAllocationFailure(errorMessage: error)),
|
||||
)
|
||||
|
||||
if authorizedSigner.isSome and wg.auth.isSome:
|
||||
wg.auth.get().consumed.incl(authorizedSigner.get())
|
||||
|
||||
info "RLN gifter registration succeeded",
|
||||
leafIndex = success.leafIndex,
|
||||
requestId = request.requestId
|
||||
|
||||
return RlnGifterResponse(
|
||||
requestId: request.requestId,
|
||||
authSuccess: true,
|
||||
success: some(success),
|
||||
)
|
||||
|
||||
proc initProtocolHandler(wg: WakuRlnGifter) =
|
||||
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
||||
var rpc: RlnGifterResponse
|
||||
# NOTE: Do NOT close the connection from the server side. The client closes
|
||||
# its side after reading the response. If the server closes first, the remote
|
||||
# FIN triggers yamux cleanup on the client side after createNode returns,
|
||||
# causing a use-after-free crash in the delivery module process.
|
||||
|
||||
var buffer: seq[byte]
|
||||
try:
|
||||
buffer = await conn.readLp(DefaultMaxRpcSize)
|
||||
except LPStreamError:
|
||||
error "rln-gifter read stream failed", error = getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
try:
|
||||
rpc = await wg.handleRequest(conn.peerId, buffer)
|
||||
except CatchableError:
|
||||
error "rln-gifter handleRequest failed", error = getCurrentExceptionMsg()
|
||||
rpc = failureResponse("N/A", true, "internal error")
|
||||
|
||||
try:
|
||||
await conn.writeLp(rpc.encode().buffer)
|
||||
except LPStreamError:
|
||||
error "rln-gifter write stream failed", error = getCurrentExceptionMsg()
|
||||
|
||||
wg.handler = handler
|
||||
wg.codec = WakuRlnGifterCodec
|
||||
|
||||
proc new*(
|
||||
T: type WakuRlnGifter,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
registerHandler: RegisterMemberHandler,
|
||||
auth: Option[EthAllowlistAuth] = none(EthAllowlistAuth),
|
||||
statusHandler: MembershipStatusHandler = nil,
|
||||
): T =
|
||||
let wg = WakuRlnGifter(
|
||||
rng: rng,
|
||||
peerManager: peerManager,
|
||||
registerHandler: registerHandler,
|
||||
statusHandler: statusHandler,
|
||||
auth: auth,
|
||||
)
|
||||
wg.initProtocolHandler()
|
||||
return wg
|
||||
|
||||
proc initStatusProtocolHandler(ws: WakuRlnGifterStatus) =
|
||||
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
||||
var buffer: seq[byte]
|
||||
try:
|
||||
buffer = await conn.readLp(DefaultMaxRpcSize)
|
||||
except LPStreamError:
|
||||
error "rln-gifter-status read stream failed",
|
||||
error = getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
var resp = MembershipStatusResponse(registered: false)
|
||||
let req = MembershipStatusRequest.decode(buffer).valueOr:
|
||||
resp.errorMessage = some("decode error: " & $error)
|
||||
try:
|
||||
await conn.writeLp(resp.encode().buffer)
|
||||
except LPStreamError:
|
||||
discard
|
||||
return
|
||||
|
||||
if ws.statusHandler.isNil:
|
||||
resp.errorMessage = some("status handler not wired")
|
||||
else:
|
||||
try:
|
||||
let r = await ws.statusHandler(req.configAccountId, req.identityCommitment)
|
||||
if r.isErr:
|
||||
resp.errorMessage = some(r.error)
|
||||
else:
|
||||
resp = r.get()
|
||||
except CatchableError:
|
||||
resp.errorMessage = some(
|
||||
"status handler raised: " & getCurrentExceptionMsg())
|
||||
|
||||
try:
|
||||
await conn.writeLp(resp.encode().buffer)
|
||||
except LPStreamError:
|
||||
error "rln-gifter-status write stream failed",
|
||||
error = getCurrentExceptionMsg()
|
||||
|
||||
ws.handler = handler
|
||||
ws.codec = WakuRlnGifterStatusCodec
|
||||
|
||||
proc new*(
|
||||
T: type WakuRlnGifterStatus,
|
||||
statusHandler: MembershipStatusHandler,
|
||||
): T =
|
||||
let ws = WakuRlnGifterStatus(statusHandler: statusHandler)
|
||||
ws.initStatusProtocolHandler()
|
||||
return ws
|
||||
42
logos_delivery/waku/waku_rln_relay/rln_gifter/rpc.nim
Normal file
42
logos_delivery/waku/waku_rln_relay/rln_gifter/rpc.nim
Normal file
@ -0,0 +1,42 @@
|
||||
import std/options
|
||||
|
||||
type
|
||||
MembershipAllocationSuccess* = object
|
||||
leafIndex*: uint64
|
||||
merkleRoot*: seq[byte]
|
||||
blockNumber*: uint64
|
||||
transactionHash*: seq[byte]
|
||||
# Non-spec extension (high tag): LEZ config account that owns the
|
||||
# membership. Required so the client can route on-chain queries.
|
||||
configAccountId*: Option[string]
|
||||
|
||||
MembershipAllocationFailure* = object
|
||||
errorMessage*: string
|
||||
|
||||
RlnGifterRequest* = object
|
||||
requestId*: string
|
||||
authenticationType*: seq[byte]
|
||||
authenticationPayload*: seq[byte]
|
||||
identityCommitment*: seq[byte]
|
||||
rateLimit*: Option[uint64]
|
||||
|
||||
RlnGifterResponse* = object
|
||||
requestId*: string
|
||||
authSuccess*: bool
|
||||
error*: Option[string]
|
||||
success*: Option[MembershipAllocationSuccess]
|
||||
failure*: Option[MembershipAllocationFailure]
|
||||
|
||||
MembershipStatusRequest* = object
|
||||
configAccountId*: string
|
||||
identityCommitment*: seq[byte]
|
||||
|
||||
MembershipStatusResponse* = object
|
||||
registered*: bool
|
||||
leafIndex*: Option[uint64]
|
||||
# Set on lookup failure (e.g. wallet RPC error); distinct from a
|
||||
# successful registered=false answer.
|
||||
errorMessage*: Option[string]
|
||||
|
||||
const
|
||||
EthAllowlistAuthType* = "eth-allowlist"
|
||||
182
logos_delivery/waku/waku_rln_relay/rln_gifter/rpc_codec.nim
Normal file
182
logos_delivery/waku/waku_rln_relay/rln_gifter/rpc_codec.nim
Normal file
@ -0,0 +1,182 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options
|
||||
import ../../common/protobuf, ./rpc
|
||||
|
||||
const DefaultMaxRpcSize* = 4096
|
||||
|
||||
proc encode*(rpc: MembershipAllocationSuccess): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write3(1, rpc.leafIndex)
|
||||
pb.write3(2, rpc.merkleRoot)
|
||||
pb.write3(3, rpc.blockNumber)
|
||||
pb.write3(4, rpc.transactionHash)
|
||||
if rpc.configAccountId.isSome:
|
||||
pb.write3(100, rpc.configAccountId.get())
|
||||
pb.finish3()
|
||||
return pb
|
||||
|
||||
proc decode*(T: type MembershipAllocationSuccess, buffer: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var msg = MembershipAllocationSuccess()
|
||||
|
||||
var leafIndex: uint64
|
||||
if ?pb.getField(1, leafIndex):
|
||||
msg.leafIndex = leafIndex
|
||||
|
||||
var merkleRoot: seq[byte]
|
||||
if ?pb.getField(2, merkleRoot):
|
||||
msg.merkleRoot = merkleRoot
|
||||
|
||||
var blockNumber: uint64
|
||||
if ?pb.getField(3, blockNumber):
|
||||
msg.blockNumber = blockNumber
|
||||
|
||||
var transactionHash: seq[byte]
|
||||
if ?pb.getField(4, transactionHash):
|
||||
msg.transactionHash = transactionHash
|
||||
|
||||
var configAccountId: string
|
||||
if ?pb.getField(100, configAccountId):
|
||||
msg.configAccountId = some(configAccountId)
|
||||
|
||||
return ok(msg)
|
||||
|
||||
proc encode*(rpc: MembershipAllocationFailure): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write3(1, rpc.errorMessage)
|
||||
pb.finish3()
|
||||
return pb
|
||||
|
||||
proc decode*(T: type MembershipAllocationFailure, buffer: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var msg = MembershipAllocationFailure()
|
||||
var errorMessage: string
|
||||
if ?pb.getField(1, errorMessage):
|
||||
msg.errorMessage = errorMessage
|
||||
return ok(msg)
|
||||
|
||||
proc encode*(rpc: RlnGifterRequest): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write3(1, rpc.requestId)
|
||||
pb.write3(2, rpc.authenticationType)
|
||||
pb.write3(3, rpc.authenticationPayload)
|
||||
pb.write3(4, rpc.identityCommitment)
|
||||
if rpc.rateLimit.isSome:
|
||||
pb.write3(5, rpc.rateLimit.get())
|
||||
pb.finish3()
|
||||
return pb
|
||||
|
||||
proc decode*(T: type RlnGifterRequest, buffer: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var rpc = RlnGifterRequest()
|
||||
|
||||
var requestId: string
|
||||
if not ?pb.getField(1, requestId):
|
||||
return err(ProtobufError.missingRequiredField("request_id"))
|
||||
rpc.requestId = requestId
|
||||
|
||||
var authenticationType: seq[byte]
|
||||
discard ?pb.getField(2, authenticationType)
|
||||
rpc.authenticationType = authenticationType
|
||||
|
||||
var authenticationPayload: seq[byte]
|
||||
discard ?pb.getField(3, authenticationPayload)
|
||||
rpc.authenticationPayload = authenticationPayload
|
||||
|
||||
var identityCommitment: seq[byte]
|
||||
if not ?pb.getField(4, identityCommitment):
|
||||
return err(ProtobufError.missingRequiredField("identity_commitment"))
|
||||
rpc.identityCommitment = identityCommitment
|
||||
|
||||
var rateLimit: uint64
|
||||
if ?pb.getField(5, rateLimit):
|
||||
rpc.rateLimit = some(rateLimit)
|
||||
|
||||
return ok(rpc)
|
||||
|
||||
proc encode*(rpc: RlnGifterResponse): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write3(1, rpc.requestId)
|
||||
pb.write3(2, rpc.authSuccess)
|
||||
if rpc.error.isSome:
|
||||
pb.write3(3, rpc.error.get())
|
||||
if rpc.success.isSome:
|
||||
pb.write3(4, rpc.success.get().encode().buffer)
|
||||
if rpc.failure.isSome:
|
||||
pb.write3(5, rpc.failure.get().encode().buffer)
|
||||
pb.finish3()
|
||||
return pb
|
||||
|
||||
proc decode*(T: type RlnGifterResponse, buffer: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var rpc = RlnGifterResponse()
|
||||
|
||||
var requestId: string
|
||||
if not ?pb.getField(1, requestId):
|
||||
return err(ProtobufError.missingRequiredField("request_id"))
|
||||
rpc.requestId = requestId
|
||||
|
||||
var authSuccess: bool
|
||||
if not ?pb.getField(2, authSuccess):
|
||||
return err(ProtobufError.missingRequiredField("auth_success"))
|
||||
rpc.authSuccess = authSuccess
|
||||
|
||||
var error: string
|
||||
if ?pb.getField(3, error):
|
||||
rpc.error = some(error)
|
||||
|
||||
var successBuf: seq[byte]
|
||||
if ?pb.getField(4, successBuf):
|
||||
rpc.success = some(?MembershipAllocationSuccess.decode(successBuf))
|
||||
|
||||
var failureBuf: seq[byte]
|
||||
if ?pb.getField(5, failureBuf):
|
||||
rpc.failure = some(?MembershipAllocationFailure.decode(failureBuf))
|
||||
|
||||
return ok(rpc)
|
||||
|
||||
proc encode*(req: MembershipStatusRequest): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write3(1, req.configAccountId)
|
||||
pb.write3(2, req.identityCommitment)
|
||||
pb.finish3()
|
||||
return pb
|
||||
|
||||
proc decode*(T: type MembershipStatusRequest, buffer: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var req = MembershipStatusRequest()
|
||||
var configAccountId: string
|
||||
if not ?pb.getField(1, configAccountId):
|
||||
return err(ProtobufError.missingRequiredField("config_account_id"))
|
||||
req.configAccountId = configAccountId
|
||||
var identityCommitment: seq[byte]
|
||||
if not ?pb.getField(2, identityCommitment):
|
||||
return err(ProtobufError.missingRequiredField("identity_commitment"))
|
||||
req.identityCommitment = identityCommitment
|
||||
return ok(req)
|
||||
|
||||
proc encode*(resp: MembershipStatusResponse): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write3(1, resp.registered)
|
||||
if resp.leafIndex.isSome:
|
||||
pb.write3(2, resp.leafIndex.get())
|
||||
if resp.errorMessage.isSome:
|
||||
pb.write3(3, resp.errorMessage.get())
|
||||
pb.finish3()
|
||||
return pb
|
||||
|
||||
proc decode*(T: type MembershipStatusResponse, buffer: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var resp = MembershipStatusResponse()
|
||||
var registered: bool
|
||||
if not ?pb.getField(1, registered):
|
||||
return err(ProtobufError.missingRequiredField("registered"))
|
||||
resp.registered = registered
|
||||
var leafIndex: uint64
|
||||
if ?pb.getField(2, leafIndex):
|
||||
resp.leafIndex = some(leafIndex)
|
||||
var errorMessage: string
|
||||
if ?pb.getField(3, errorMessage):
|
||||
resp.errorMessage = some(errorMessage)
|
||||
return ok(resp)
|
||||
@ -1,6 +1,7 @@
|
||||
{ pkgs
|
||||
, src
|
||||
, zerokitRln
|
||||
, zerokitMixRln ? null
|
||||
, targets ? []
|
||||
, gitVersion ? "n/a"
|
||||
, enablePostgres ? true
|
||||
@ -42,6 +43,12 @@ let
|
||||
# few mode-specific flags (e.g. --app:lib, --noMain, --header); everything
|
||||
# else (paths, defines, threading, gc, nimcache, rln linkage) is constant.
|
||||
# $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase.
|
||||
# Optional mix-rln link flag: appended when zerokitMixRln is provided so
|
||||
# liblogosdelivery's mix-rln-spam-protection-plugin path resolves at link
|
||||
# time. Empty otherwise (vanilla liblogosdelivery build).
|
||||
mixRlnLinkArg = pkgs.lib.optionalString (zerokitMixRln != null)
|
||||
"--passL:${zerokitMixRln}/lib/librln.a --passL:-lm";
|
||||
|
||||
nimCompile = { outFile, sourceFile, extraArgs ? [] }: ''
|
||||
nim c \
|
||||
--noNimblePath \
|
||||
@ -49,6 +56,7 @@ let
|
||||
--path:$NAT_TRAV \
|
||||
--path:$NAT_TRAV/src \
|
||||
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
|
||||
${mixRlnLinkArg} \
|
||||
${nimDefineArgs} \
|
||||
--threads:on \
|
||||
--mm:refc \
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
# zerokit rln built from source; overrides the stale v2.0.2 vendor cargoHash.
|
||||
{ zerokit, system }:
|
||||
# zerokit rln built from source; overrides the stale vendor cargoHash.
|
||||
# vendorHash differs per zerokit version (2.0.0 vs 2.0.2 etc.) and must be
|
||||
# passed in by the caller so the same builder works for both pins.
|
||||
{ zerokit, system, vendorHash }:
|
||||
zerokit.packages.${system}.rln.overrideAttrs (old: {
|
||||
cargoDeps = old.cargoDeps.overrideAttrs (oldCargoDeps: {
|
||||
vendorStaging = oldCargoDeps.vendorStaging.overrideAttrs (_: {
|
||||
outputHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU=";
|
||||
outputHash = vendorHash;
|
||||
});
|
||||
});
|
||||
})
|
||||
|
||||
43
scripts/build_rln_mix.sh
Executable file
43
scripts/build_rln_mix.sh
Executable file
@ -0,0 +1,43 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# Build a separate, pinned RLN library for mix spam-protection usage.
|
||||
# This keeps the main nwaku RLN dependency flow unchanged.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
source_dir="${1:-}"
|
||||
version="${2:-}"
|
||||
output_file="${3:-}"
|
||||
repo_url="${4:-https://github.com/vacp2p/zerokit.git}"
|
||||
|
||||
if [[ -z "${source_dir}" || -z "${version}" || -z "${output_file}" ]]; then
|
||||
echo "Usage: $0 <source_dir> <version_tag> <output_file> [repo_url]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
mkdir -p "$(dirname "${source_dir}")"
|
||||
mkdir -p "$(dirname "${output_file}")"
|
||||
|
||||
if [[ ! -d "${source_dir}/.git" ]]; then
|
||||
echo "Cloning zerokit ${version} from ${repo_url}..."
|
||||
if [[ -e "${source_dir}" ]]; then
|
||||
echo "Path exists but is not a git repository: ${source_dir}"
|
||||
echo "Please remove it and retry."
|
||||
exit 1
|
||||
fi
|
||||
git clone --depth 1 --branch "${version}" "${repo_url}" "${source_dir}"
|
||||
else
|
||||
echo "Using existing zerokit checkout in ${source_dir}"
|
||||
current_tag="$(git -C "${source_dir}" describe --tags --exact-match 2>/dev/null || true)"
|
||||
if [[ "${current_tag}" != "${version}" ]]; then
|
||||
echo "Updating zerokit checkout to ${version}..."
|
||||
git -C "${source_dir}" fetch --tags origin "${version}"
|
||||
git -C "${source_dir}" checkout --detach "${version}"
|
||||
fi
|
||||
fi
|
||||
|
||||
echo "Building mix RLN library from source (version ${version})..."
|
||||
cargo build --release -p rln --manifest-path "${source_dir}/rln/Cargo.toml"
|
||||
|
||||
cp "${source_dir}/target/release/librln.a" "${output_file}"
|
||||
echo "Successfully built ${output_file}"
|
||||
@ -4,4 +4,5 @@ import
|
||||
./test_rln_group_manager_onchain,
|
||||
./test_waku_rln_relay,
|
||||
./test_wakunode_rln_relay,
|
||||
./test_rln_nonce_manager
|
||||
./test_rln_nonce_manager,
|
||||
./test_rln_gifter
|
||||
|
||||
136
tests/waku_rln_relay/test_rln_gifter.nim
Normal file
136
tests/waku_rln_relay/test_rln_gifter.nim
Normal file
@ -0,0 +1,136 @@
|
||||
{.used.}
|
||||
|
||||
import std/options, results
|
||||
import testutils/unittests
|
||||
import eth/common/[addresses, keys]
|
||||
import waku/waku_rln_relay/rln_gifter/rpc
|
||||
import waku/waku_rln_relay/rln_gifter/rpc_codec
|
||||
import waku/waku_rln_relay/rln_gifter/protocol as rln_gifter_protocol
|
||||
|
||||
proc eip191Sign(seckey: PrivateKey, idCommitment: openArray[byte]): seq[byte] =
|
||||
@(seckey.sign(eip191Message(idCommitment)).toRaw())
|
||||
|
||||
proc bytesOf(s: string): seq[byte] =
|
||||
result = newSeqOfCap[byte](s.len)
|
||||
for c in s:
|
||||
result.add(byte(c))
|
||||
|
||||
const
|
||||
TestSecretHex =
|
||||
"1111111111111111111111111111111111111111111111111111111111111111"
|
||||
|
||||
let TestCommitment = @[
|
||||
byte 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab,
|
||||
0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab,
|
||||
0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab,
|
||||
0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab,
|
||||
]
|
||||
|
||||
let TamperedCommitment = @[
|
||||
byte 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
|
||||
0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
|
||||
0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
|
||||
0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
|
||||
]
|
||||
|
||||
suite "RLN gifter EIP-191 auth":
|
||||
test "verifyEip191 recovers the signer address":
|
||||
let sk = PrivateKey.fromHex(TestSecretHex).expect("valid key")
|
||||
let expected = sk.toPublicKey().to(Address)
|
||||
let sig = eip191Sign(sk, TestCommitment)
|
||||
|
||||
let recovered = verifyEip191(TestCommitment, sig).expect("recoverable")
|
||||
check recovered == expected
|
||||
|
||||
test "verifyEip191 rejects wrong-length payload":
|
||||
let bad = newSeq[byte](64)
|
||||
let res = verifyEip191(TestCommitment, bad)
|
||||
check res.isErr
|
||||
|
||||
test "verifyEip191 produces a different address when the message differs":
|
||||
let sk = PrivateKey.fromHex(TestSecretHex).expect("valid key")
|
||||
let expected = sk.toPublicKey().to(Address)
|
||||
let sig = eip191Sign(sk, TestCommitment)
|
||||
|
||||
let recovered = verifyEip191(TamperedCommitment, sig).expect("recoverable")
|
||||
check recovered != expected
|
||||
|
||||
test "verifyEip191 rejects malformed signature bytes":
|
||||
let bogus = newSeq[byte](65)
|
||||
let res = verifyEip191(TestCommitment, bogus)
|
||||
check res.isErr
|
||||
|
||||
suite "RLN gifter request codec":
|
||||
test "round-trips all fields when populated":
|
||||
let req = RlnGifterRequest(
|
||||
requestId: "req-1",
|
||||
authenticationType: bytesOf("eth-allowlist"),
|
||||
authenticationPayload: @[byte 0xde, 0xad, 0xbe, 0xef],
|
||||
identityCommitment: TestCommitment,
|
||||
rateLimit: some(42'u64),
|
||||
)
|
||||
let decoded = RlnGifterRequest.decode(req.encode().buffer).expect("decodes")
|
||||
check:
|
||||
decoded.requestId == "req-1"
|
||||
decoded.authenticationType == bytesOf("eth-allowlist")
|
||||
decoded.authenticationPayload == @[byte 0xde, 0xad, 0xbe, 0xef]
|
||||
decoded.identityCommitment == TestCommitment
|
||||
decoded.rateLimit == some(42'u64)
|
||||
|
||||
test "rate_limit is optional on the wire":
|
||||
let req = RlnGifterRequest(
|
||||
requestId: "req-2",
|
||||
authenticationType: @[],
|
||||
authenticationPayload: @[],
|
||||
identityCommitment: TestCommitment,
|
||||
rateLimit: none(uint64),
|
||||
)
|
||||
let decoded = RlnGifterRequest.decode(req.encode().buffer).expect("decodes")
|
||||
check:
|
||||
decoded.requestId == "req-2"
|
||||
decoded.identityCommitment == TestCommitment
|
||||
decoded.rateLimit.isNone
|
||||
|
||||
suite "RLN gifter response codec":
|
||||
test "encodes/decodes a success result":
|
||||
let resp = RlnGifterResponse(
|
||||
requestId: "req-1",
|
||||
authSuccess: true,
|
||||
error: none(string),
|
||||
success: some(MembershipAllocationSuccess(
|
||||
leafIndex: 7'u64,
|
||||
merkleRoot: @[byte 0x01, 0x02, 0x03],
|
||||
blockNumber: 42'u64,
|
||||
transactionHash: @[byte 0xaa, 0xbb],
|
||||
configAccountId: some("acct-123"),
|
||||
)),
|
||||
failure: none(MembershipAllocationFailure),
|
||||
)
|
||||
let decoded = RlnGifterResponse.decode(resp.encode().buffer).expect("decodes")
|
||||
check:
|
||||
decoded.requestId == "req-1"
|
||||
decoded.authSuccess == true
|
||||
decoded.success.isSome
|
||||
decoded.success.get().leafIndex == 7'u64
|
||||
decoded.success.get().merkleRoot == @[byte 0x01, 0x02, 0x03]
|
||||
decoded.success.get().blockNumber == 42'u64
|
||||
decoded.success.get().transactionHash == @[byte 0xaa, 0xbb]
|
||||
decoded.success.get().configAccountId == some("acct-123")
|
||||
decoded.failure.isNone
|
||||
|
||||
test "encodes/decodes a failure result":
|
||||
let resp = RlnGifterResponse(
|
||||
requestId: "req-2",
|
||||
authSuccess: false,
|
||||
error: some("address not allowlisted"),
|
||||
success: none(MembershipAllocationSuccess),
|
||||
failure: some(MembershipAllocationFailure(errorMessage: "address not allowlisted")),
|
||||
)
|
||||
let decoded = RlnGifterResponse.decode(resp.encode().buffer).expect("decodes")
|
||||
check:
|
||||
decoded.requestId == "req-2"
|
||||
decoded.authSuccess == false
|
||||
decoded.error == some("address not allowlisted")
|
||||
decoded.failure.isSome
|
||||
decoded.failure.get().errorMessage == "address not allowlisted"
|
||||
decoded.success.isNone
|
||||
@ -654,6 +654,45 @@ with the drawback of consuming some more bandwidth.""",
|
||||
name: "mix-disable-spam-protection"
|
||||
.}: bool
|
||||
|
||||
mixOnchainLEZ* {.
|
||||
desc: "Use on-chain LEZ (LSSA sequencer) for mix RLN spam protection instead of off-chain keystores.",
|
||||
defaultValue: false,
|
||||
name: "mix-onchain-lez"
|
||||
.}: bool
|
||||
|
||||
mixGifterService* {.
|
||||
desc: "Run as RLN gifter service for mix nodes.",
|
||||
defaultValue: false,
|
||||
name: "mix-gifter-service"
|
||||
.}: bool
|
||||
|
||||
mixGifterWalletAccount* {.
|
||||
desc: "Wallet account ID for RLN gifter registration payments.",
|
||||
defaultValue: "",
|
||||
name: "mix-gifter-wallet-account"
|
||||
.}: string
|
||||
|
||||
mixGifterNode* {.
|
||||
desc: "Multiaddress of the RLN gifter node (for client auto-registration).",
|
||||
defaultValue: "",
|
||||
name: "mix-gifter-node"
|
||||
.}: string
|
||||
|
||||
mixGifterAllowlist* {.
|
||||
desc:
|
||||
"Comma-separated 0x-prefixed Ethereum addresses allowed to redeem an RLN membership via the gifter (one-shot per address). Empty disables auth.",
|
||||
defaultValue: "",
|
||||
name: "mix-gifter-allowlist"
|
||||
.}: string
|
||||
|
||||
mixGifterAuthKey* {.
|
||||
desc:
|
||||
"64-hex-char secp256k1 private key used to sign gifter-membership requests (EIP-191 over the idCommitment hex). Empty disables signing.",
|
||||
defaultValue: "",
|
||||
name: "mix-gifter-auth-key"
|
||||
.}: string
|
||||
|
||||
|
||||
# Kademlia Discovery config
|
||||
enableKadDiscovery* {.
|
||||
desc:
|
||||
@ -1089,6 +1128,12 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
||||
b.mixConf.withEnabled(n.mix)
|
||||
b.mixConf.withMixNodes(n.mixnodes)
|
||||
b.withMix(n.mix)
|
||||
b.mixConf.withUseOnchainLEZ(n.mixOnchainLEZ)
|
||||
b.mixConf.withGifterService(n.mixGifterService)
|
||||
b.mixConf.withGifterWalletAccount(n.mixGifterWalletAccount)
|
||||
b.mixConf.withGifterNode(n.mixGifterNode)
|
||||
b.mixConf.withGifterAllowlist(n.mixGifterAllowlist)
|
||||
b.mixConf.withGifterAuthKey(n.mixGifterAuthKey)
|
||||
if n.mixkey.isSome():
|
||||
b.mixConf.withMixKey(n.mixkey.get())
|
||||
if n.mixUserMessageLimit.isSome():
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user