diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index a8398787d..6dd24fb47 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -628,6 +628,12 @@ proc start*(node: WakuNode) {.async.} = # After switch.start, run custom Logos Delivery relay start logic await node.reconnectRelayPeers() + # Kick off the DoS-protection registration broadcast now that peers are + # reconnected. Fire-and-forget: the proc returns immediately and an + # internal background task retries until the broadcast lands. + if not node.wakuMix.isNil(): + node.wakuMix.registerDoSProtectionWithNetwork() + node.started = true if not node.wakuFilterClient.isNil(): diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index 7400d7011..8a12250a3 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -39,6 +39,10 @@ type pubKey*: Curve25519Key mixRlnSpamProtection*: MixRlnSpamProtection publishMessage*: PublishMessage + dosRegistrationTask: Future[void] + ## Background task that retries DoS-protection self-registration until + ## it succeeds. nil until kicked off via registerDoSProtectionWithNetwork; + ## cancelled in stop(). WakuMixResult*[T] = Result[T, string] @@ -233,54 +237,113 @@ proc loadSpamProtectionTree*(mix: WakuMix): Result[void, string] = ) method start*(mix: WakuMix) {.async.} = + ## Local-only mix protocol initialization. Does NOT touch the network. + ## The network-dependent self-registration broadcast is handled separately + ## by registerDoSProtectionWithNetwork so that this proc can run before + ## peers are connected without blocking on relay startup. info "starting waku mix protocol" - # Set up spam protection callbacks and start if not mix.mixRlnSpamProtection.isNil(): # Initialize spam protection (MixProtocol.init() does NOT call init() on the plugin) let initRes = await mix.mixRlnSpamProtection.init() if initRes.isErr: error "Failed to initialize spam protection", error = initRes.error + return + + # Load existing tree to sync with other members. + # Should be done after init() (which loads credentials) but before + # registerSelf() (which adds us to the tree). + let loadRes = mix.mixRlnSpamProtection.loadTree() + if loadRes.isErr: + debug "No existing tree found or failed to load, starting fresh", + error = loadRes.error else: - # Load existing tree to sync with other members - # This should be done after init() (which loads credentials) - # but before registerSelf() (which adds us to the tree) - let loadRes = mix.mixRlnSpamProtection.loadTree() - if loadRes.isErr: - debug "No existing tree found or failed to load, starting fresh", - error = loadRes.error - else: - debug "Loaded existing spam protection membership tree from disk" + debug "Loaded existing spam protection membership tree from disk" - # Restore our credentials to the tree (after tree load, whether it succeeded or not) - # This ensures our member is in the tree if we have an index from keystore - let restoreRes = mix.mixRlnSpamProtection.restoreCredentialsToTree() - if restoreRes.isErr: - error "Failed to restore credentials to tree", error = restoreRes.error + # Restore our credentials to the tree (after tree load, whether it succeeded or not). + # Ensures our member is in the tree if we have an index from keystore. + let restoreRes = mix.mixRlnSpamProtection.restoreCredentialsToTree() + if restoreRes.isErr: + error "Failed to restore credentials to tree", error = restoreRes.error - # Set up publish callback (must be before start so registerSelf can use it) - mix.setupSpamProtectionCallbacks() + # Set up publish callback. Must be before the network-side registration so + # the plugin's groupManager.register can broadcast the membership update. + mix.setupSpamProtectionCallbacks() - let startRes = await mix.mixRlnSpamProtection.start() - if startRes.isErr: - error "Failed to start spam protection", error = startRes.error - else: - # Register self to broadcast membership to the network - let registerRes = await mix.mixRlnSpamProtection.registerSelf() - if registerRes.isErr: - error "Failed to register spam protection credentials", - error = registerRes.error - else: - debug "Registered spam protection credentials", index = registerRes.get() + let startRes = await mix.mixRlnSpamProtection.start() + if startRes.isErr: + error "Failed to start spam protection", error = startRes.error - # Save tree to persist membership state + info "waku mix protocol started" + +proc dosRegistrationRetryLoop(mix: WakuMix) {.async.} = + ## Indefinitely retry the DoS-protection self-registration broadcast until + ## it succeeds (or this task is cancelled by WakuMix.stop()). For nodes that + ## already have a membership index in their keystore, registerSelf early- + ## returns and the loop exits on the first attempt. For fresh nodes, the + ## broadcast needs at least one relay peer subscribed to the membership + ## topic to land — this loop survives transient "no peers yet" failures. + ## + ## TODO: Remove once RLN membership moves on-chain. With on-chain membership + ## peers discover each other via the contract / a watcher rather than via a + ## pubsub broadcast, so the retry loop (and the whole publishCallback path + ## from registerSelf) becomes unnecessary. + ## + ## Retry pacing uses exponential backoff (5s, 10s, 20s, ..., capped at 5min) + ## so persistent misconfiguration — e.g., relay never available — degrades + ## to one log line every 5 minutes after the initial ramp instead of every + ## 5 seconds forever. + const InitialRetryDelay = chronos.seconds(5) + const MaxRetryDelay = chronos.minutes(5) + var delay = InitialRetryDelay + while true: + try: + let registerRes = await mix.mixRlnSpamProtection.registerSelf() + if registerRes.isOk(): + debug "DoS-protection self-registration succeeded", + index = registerRes.get() + # Persist tree only after a successful register — for fresh nodes this + # captures the new index; for keystore nodes it's a harmless no-op. let saveRes = mix.mixRlnSpamProtection.saveTree() if saveRes.isErr: warn "Failed to save spam protection tree", error = saveRes.error else: trace "Saved spam protection tree to disk" + return # success — exit the loop + warn "DoS-protection self-registration failed, retrying", + error = registerRes.error, nextDelay = delay + except CancelledError as e: + debug "DoS-protection registration loop cancelled" + raise e + except CatchableError as e: + warn "DoS-protection registration raised, retrying", + error = e.msg, nextDelay = delay + await sleepAsync(delay) + delay = min(delay * 2, MaxRetryDelay) + +proc registerDoSProtectionWithNetwork*(mix: WakuMix) = + ## Kick off an indefinite background task that broadcasts this node's + ## DoS-protection (RLN) membership registration to other mix nodes via + ## relay. Returns immediately so callers don't block on a possibly-slow + ## broadcast. The task is cancelled when WakuMix.stop() is called. + if mix.mixRlnSpamProtection.isNil(): + return + # Guard against kicking off the retry loop when the plugin isn't actually + # usable (e.g., mix.start()'s init/start steps failed). Without this check + # the loop would spin forever logging "Plugin not initialized" warnings. + if not mix.mixRlnSpamProtection.isReady(): + warn "Skipping DoS-protection registration: plugin not ready" + return + # Re-call safety: don't spawn a second loop if one is still in flight. + if not mix.dosRegistrationTask.isNil and not mix.dosRegistrationTask.finished: + debug "DoS-protection registration already in progress, skipping" + return + mix.dosRegistrationTask = mix.dosRegistrationRetryLoop() method stop*(mix: WakuMix) {.async.} = + # Cancel the in-flight DoS-protection registration retry loop, if any + if not mix.dosRegistrationTask.isNil and not mix.dosRegistrationTask.finished: + await mix.dosRegistrationTask.cancelAndWait() # Stop spam protection if not mix.mixRlnSpamProtection.isNil(): await mix.mixRlnSpamProtection.stop()