mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 08:23:08 +00:00
chore: improve epoch monitoring (#3197)
This commit is contained in:
parent
d5f18cf455
commit
aef2a7045f
@ -200,14 +200,10 @@ method generateProof*(
|
||||
messageId = messageId,
|
||||
).valueOr:
|
||||
return err("proof generation failed: " & $error)
|
||||
return ok(proof)
|
||||
|
||||
if lastProcessedEpoch != epoch:
|
||||
lastProcessedEpoch = epoch
|
||||
waku_rln_proof_remining.set(g.userMessageLimit.get().float64 - 1)
|
||||
else:
|
||||
waku_rln_proof_remining.dec()
|
||||
waku_rln_proofs_generated_total.inc()
|
||||
waku_rln_remaining_proofs_per_epoch.dec()
|
||||
waku_rln_total_generated_proofs.inc()
|
||||
return ok(proof)
|
||||
|
||||
method isReady*(g: GroupManager): Future[bool] {.base, async.} =
|
||||
raise newException(
|
||||
|
||||
@ -63,12 +63,12 @@ declarePublicGauge(
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
waku_rln_proof_remining,
|
||||
waku_rln_remaining_proofs_per_epoch,
|
||||
"number of proofs remaining to be generated for the current epoch",
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
waku_rln_proofs_generated_total,
|
||||
waku_rln_total_generated_proofs,
|
||||
"total number of proofs generated since the node started",
|
||||
)
|
||||
|
||||
@ -84,6 +84,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
|
||||
var cumulativeValidMessages = 0.float64
|
||||
var cumulativeProofsVerified = 0.float64
|
||||
var cumulativeProofsGenerated = 0.float64
|
||||
var cumulativeProofsRemaining = 100.float64
|
||||
|
||||
when defined(metrics):
|
||||
logMetrics = proc() =
|
||||
@ -102,7 +103,10 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
|
||||
waku_rln_proof_verification_total, cumulativeProofsVerified
|
||||
)
|
||||
let freshProofsGeneratedCount =
|
||||
parseAndAccumulate(waku_rln_proofs_generated_total, cumulativeProofsGenerated)
|
||||
parseAndAccumulate(waku_rln_total_generated_proofs, cumulativeProofsGenerated)
|
||||
let freshProofsRemainingCount = parseAndAccumulate(
|
||||
waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining
|
||||
)
|
||||
|
||||
info "Total messages", count = freshMsgCount
|
||||
info "Total spam messages", count = freshSpamCount
|
||||
@ -111,4 +115,6 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
|
||||
info "Total errors", count = freshErrorCount
|
||||
info "Total proofs verified", count = freshProofsVerifiedCount
|
||||
info "Total proofs generated", count = freshProofsGeneratedCount
|
||||
info "Total proofs remaining", count = freshProofsRemainingCount
|
||||
|
||||
return logMetrics
|
||||
|
||||
@ -89,6 +89,7 @@ type WakuRLNRelay* = ref object of RootObj
|
||||
groupManager*: GroupManager
|
||||
onFatalErrorAction*: OnFatalErrorHandler
|
||||
nonceManager*: NonceManager
|
||||
epochMonitorFuture*: Future[void]
|
||||
|
||||
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
|
||||
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
|
||||
@ -96,6 +97,18 @@ proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
|
||||
let e = uint64(t / rlnPeer.rlnEpochSizeSec.float64)
|
||||
return toEpoch(e)
|
||||
|
||||
proc nextEpoch*(rlnPeer: WakuRLNRelay, time: float64): float64 =
|
||||
let
|
||||
currentEpoch = uint64(time / rlnPeer.rlnEpochSizeSec.float64)
|
||||
nextEpochTime = float64(currentEpoch + 1) * rlnPeer.rlnEpochSizeSec.float64
|
||||
currentTime = epochTime()
|
||||
|
||||
# Ensure we always return a future time
|
||||
if nextEpochTime > currentTime:
|
||||
return nextEpochTime
|
||||
else:
|
||||
return epochTime()
|
||||
|
||||
proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
|
||||
## stops the rln-relay protocol
|
||||
## Throws an error if it cannot stop the rln-relay protocol
|
||||
@ -392,6 +405,19 @@ proc generateRlnValidator*(
|
||||
|
||||
return validator
|
||||
|
||||
proc monitorEpochs(wakuRlnRelay: WakuRLNRelay) {.async.} =
|
||||
while true:
|
||||
try:
|
||||
waku_rln_remaining_proofs_per_epoch.set(
|
||||
wakuRlnRelay.groupManager.userMessageLimit.get().float64
|
||||
)
|
||||
except CatchableError:
|
||||
error "Error in epoch monitoring", error = getCurrentExceptionMsg()
|
||||
|
||||
let nextEpochTime = wakuRlnRelay.nextEpoch(epochTime())
|
||||
let sleepDuration = int((nextEpochTime - epochTime()) * 1000)
|
||||
await sleepAsync(sleepDuration)
|
||||
|
||||
proc mount(
|
||||
conf: WakuRlnConfig, registrationHandler = none(RegistrationHandler)
|
||||
): Future[RlnRelayResult[WakuRlnRelay]] {.async.} =
|
||||
@ -445,16 +471,18 @@ proc mount(
|
||||
(await groupManager.startGroupSync()).isOkOr:
|
||||
return err("could not start the group sync: " & $error)
|
||||
|
||||
return ok(
|
||||
WakuRLNRelay(
|
||||
groupManager: groupManager,
|
||||
nonceManager:
|
||||
NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float),
|
||||
rlnEpochSizeSec: conf.rlnEpochSizeSec,
|
||||
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1),
|
||||
onFatalErrorAction: conf.onFatalErrorAction,
|
||||
)
|
||||
wakuRlnRelay = WakuRLNRelay(
|
||||
groupManager: groupManager,
|
||||
nonceManager:
|
||||
NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float),
|
||||
rlnEpochSizeSec: conf.rlnEpochSizeSec,
|
||||
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1),
|
||||
onFatalErrorAction: conf.onFatalErrorAction,
|
||||
)
|
||||
|
||||
# Start epoch monitoring in the background
|
||||
wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay)
|
||||
return ok(wakuRlnRelay)
|
||||
|
||||
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
|
||||
## returns true if the rln-relay protocol is ready to relay messages
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user