diff --git a/waku/waku_rln_relay/group_manager/group_manager_base.nim b/waku/waku_rln_relay/group_manager/group_manager_base.nim index 64eb7964d..818b36140 100644 --- a/waku/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/waku_rln_relay/group_manager/group_manager_base.nim @@ -200,14 +200,10 @@ method generateProof*( messageId = messageId, ).valueOr: return err("proof generation failed: " & $error) - return ok(proof) - if lastProcessedEpoch != epoch: - lastProcessedEpoch = epoch - waku_rln_proof_remining.set(g.userMessageLimit.get().float64 - 1) - else: - waku_rln_proof_remining.dec() - waku_rln_proofs_generated_total.inc() + waku_rln_remaining_proofs_per_epoch.dec() + waku_rln_total_generated_proofs.inc() + return ok(proof) method isReady*(g: GroupManager): Future[bool] {.base, async.} = raise newException( diff --git a/waku/waku_rln_relay/protocol_metrics.nim b/waku/waku_rln_relay/protocol_metrics.nim index 06fa909bc..121727809 100644 --- a/waku/waku_rln_relay/protocol_metrics.nim +++ b/waku/waku_rln_relay/protocol_metrics.nim @@ -63,12 +63,12 @@ declarePublicGauge( ) declarePublicGauge( - waku_rln_proof_remining, + waku_rln_remaining_proofs_per_epoch, "number of proofs remaining to be generated for the current epoch", ) declarePublicGauge( - waku_rln_proofs_generated_total, + waku_rln_total_generated_proofs, "total number of proofs generated since the node started", ) @@ -84,6 +84,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = var cumulativeValidMessages = 0.float64 var cumulativeProofsVerified = 0.float64 var cumulativeProofsGenerated = 0.float64 + var cumulativeProofsRemaining = 100.float64 when defined(metrics): logMetrics = proc() = @@ -102,7 +103,10 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = waku_rln_proof_verification_total, cumulativeProofsVerified ) let freshProofsGeneratedCount = - parseAndAccumulate(waku_rln_proofs_generated_total, cumulativeProofsGenerated) + parseAndAccumulate(waku_rln_total_generated_proofs, cumulativeProofsGenerated) + let freshProofsRemainingCount = parseAndAccumulate( + waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining + ) info "Total messages", count = freshMsgCount info "Total spam messages", count = freshSpamCount @@ -111,4 +115,6 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = info "Total errors", count = freshErrorCount info "Total proofs verified", count = freshProofsVerifiedCount info "Total proofs generated", count = freshProofsGeneratedCount + info "Total proofs remaining", count = freshProofsRemainingCount + return logMetrics diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 535cee4a2..a1a6afc23 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -89,6 +89,7 @@ type WakuRLNRelay* = ref object of RootObj groupManager*: GroupManager onFatalErrorAction*: OnFatalErrorHandler nonceManager*: NonceManager + epochMonitorFuture*: Future[void] proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch = ## gets time `t` as `flaot64` with subseconds resolution in the fractional part @@ -96,6 +97,18 @@ proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch = let e = uint64(t / rlnPeer.rlnEpochSizeSec.float64) return toEpoch(e) +proc nextEpoch*(rlnPeer: WakuRLNRelay, time: float64): float64 = + let + currentEpoch = uint64(time / rlnPeer.rlnEpochSizeSec.float64) + nextEpochTime = float64(currentEpoch + 1) * rlnPeer.rlnEpochSizeSec.float64 + currentTime = epochTime() + + # Ensure we always return a future time + if nextEpochTime > currentTime: + return nextEpochTime + else: + return epochTime() + proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} = ## stops the rln-relay protocol ## Throws an error if it cannot stop the rln-relay protocol @@ -392,6 +405,19 @@ proc generateRlnValidator*( return validator +proc monitorEpochs(wakuRlnRelay: WakuRLNRelay) {.async.} = + while true: + try: + waku_rln_remaining_proofs_per_epoch.set( + wakuRlnRelay.groupManager.userMessageLimit.get().float64 + ) + except CatchableError: + error "Error in epoch monitoring", error = getCurrentExceptionMsg() + + let nextEpochTime = wakuRlnRelay.nextEpoch(epochTime()) + let sleepDuration = int((nextEpochTime - epochTime()) * 1000) + await sleepAsync(sleepDuration) + proc mount( conf: WakuRlnConfig, registrationHandler = none(RegistrationHandler) ): Future[RlnRelayResult[WakuRlnRelay]] {.async.} = @@ -445,16 +471,18 @@ proc mount( (await groupManager.startGroupSync()).isOkOr: return err("could not start the group sync: " & $error) - return ok( - WakuRLNRelay( - groupManager: groupManager, - nonceManager: - NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float), - rlnEpochSizeSec: conf.rlnEpochSizeSec, - rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1), - onFatalErrorAction: conf.onFatalErrorAction, - ) + wakuRlnRelay = WakuRLNRelay( + groupManager: groupManager, + nonceManager: + NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float), + rlnEpochSizeSec: conf.rlnEpochSizeSec, + rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1), + onFatalErrorAction: conf.onFatalErrorAction, ) + + # Start epoch monitoring in the background + wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay) + return ok(wakuRlnRelay) proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} = ## returns true if the rln-relay protocol is ready to relay messages