mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-13 00:05:10 +00:00
chore: everything in one comment
This commit is contained in:
parent
049fbeabbb
commit
3b374a3ab0
@ -200,14 +200,10 @@ method generateProof*(
|
|||||||
messageId = messageId,
|
messageId = messageId,
|
||||||
).valueOr:
|
).valueOr:
|
||||||
return err("proof generation failed: " & $error)
|
return err("proof generation failed: " & $error)
|
||||||
return ok(proof)
|
|
||||||
|
|
||||||
if lastProcessedEpoch != epoch:
|
waku_rln_remaining_proofs_per_epoch.dec()
|
||||||
lastProcessedEpoch = epoch
|
waku_rln_total_generated_proofs.inc()
|
||||||
waku_rln_proof_remining.set(g.userMessageLimit.get().float64 - 1)
|
return ok(proof)
|
||||||
else:
|
|
||||||
waku_rln_proof_remining.dec()
|
|
||||||
waku_rln_proofs_generated_total.inc()
|
|
||||||
|
|
||||||
method isReady*(g: GroupManager): Future[bool] {.base, async.} =
|
method isReady*(g: GroupManager): Future[bool] {.base, async.} =
|
||||||
raise newException(
|
raise newException(
|
||||||
|
@ -63,12 +63,12 @@ declarePublicGauge(
|
|||||||
)
|
)
|
||||||
|
|
||||||
declarePublicGauge(
|
declarePublicGauge(
|
||||||
waku_rln_proof_remining,
|
waku_rln_remaining_proofs_per_epoch,
|
||||||
"number of proofs remaining to be generated for the current epoch",
|
"number of proofs remaining to be generated for the current epoch",
|
||||||
)
|
)
|
||||||
|
|
||||||
declarePublicGauge(
|
declarePublicGauge(
|
||||||
waku_rln_proofs_generated_total,
|
waku_rln_total_generated_proofs,
|
||||||
"total number of proofs generated since the node started",
|
"total number of proofs generated since the node started",
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -84,6 +84,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
|
|||||||
var cumulativeValidMessages = 0.float64
|
var cumulativeValidMessages = 0.float64
|
||||||
var cumulativeProofsVerified = 0.float64
|
var cumulativeProofsVerified = 0.float64
|
||||||
var cumulativeProofsGenerated = 0.float64
|
var cumulativeProofsGenerated = 0.float64
|
||||||
|
var cumulativeProofsRemaining = 100.float64
|
||||||
|
|
||||||
when defined(metrics):
|
when defined(metrics):
|
||||||
logMetrics = proc() =
|
logMetrics = proc() =
|
||||||
@ -102,7 +103,9 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
|
|||||||
waku_rln_proof_verification_total, cumulativeProofsVerified
|
waku_rln_proof_verification_total, cumulativeProofsVerified
|
||||||
)
|
)
|
||||||
let freshProofsGeneratedCount =
|
let freshProofsGeneratedCount =
|
||||||
parseAndAccumulate(waku_rln_proofs_generated_total, cumulativeProofsGenerated)
|
parseAndAccumulate(waku_rln_total_generated_proofs, cumulativeProofsGenerated)
|
||||||
|
let freshProofsRemainingCount =
|
||||||
|
parseAndAccumulate(waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining)
|
||||||
|
|
||||||
info "Total messages", count = freshMsgCount
|
info "Total messages", count = freshMsgCount
|
||||||
info "Total spam messages", count = freshSpamCount
|
info "Total spam messages", count = freshSpamCount
|
||||||
@ -111,4 +114,5 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
|
|||||||
info "Total errors", count = freshErrorCount
|
info "Total errors", count = freshErrorCount
|
||||||
info "Total proofs verified", count = freshProofsVerifiedCount
|
info "Total proofs verified", count = freshProofsVerifiedCount
|
||||||
info "Total proofs generated", count = freshProofsGeneratedCount
|
info "Total proofs generated", count = freshProofsGeneratedCount
|
||||||
|
info "Total proofs remaining", count = freshProofsRemainingCount
|
||||||
return logMetrics
|
return logMetrics
|
||||||
|
@ -89,6 +89,7 @@ type WakuRLNRelay* = ref object of RootObj
|
|||||||
groupManager*: GroupManager
|
groupManager*: GroupManager
|
||||||
onFatalErrorAction*: OnFatalErrorHandler
|
onFatalErrorAction*: OnFatalErrorHandler
|
||||||
nonceManager*: NonceManager
|
nonceManager*: NonceManager
|
||||||
|
epochMonitorFuture*: Future[void]
|
||||||
|
|
||||||
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
|
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
|
||||||
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
|
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
|
||||||
@ -96,12 +97,35 @@ proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
|
|||||||
let e = uint64(t / rlnPeer.rlnEpochSizeSec.float64)
|
let e = uint64(t / rlnPeer.rlnEpochSizeSec.float64)
|
||||||
return toEpoch(e)
|
return toEpoch(e)
|
||||||
|
|
||||||
|
proc nextEpoch*(rlnPeer: WakuRLNRelay, t: float64): float64 =
|
||||||
|
# Calculates the next epoch time from the given time `t`.
|
||||||
|
let currentEpoch = rlnPeer.calcEpoch(t)
|
||||||
|
var timePtr = t
|
||||||
|
|
||||||
|
# Increment by minutes until the epoch changes
|
||||||
|
while rlnPeer.calcEpoch(timePtr) == currentEpoch:
|
||||||
|
timePtr += 60 # 1 minute
|
||||||
|
|
||||||
|
# Backtrack to the last minute of the current epoch
|
||||||
|
timePtr -= 60
|
||||||
|
|
||||||
|
# Increment by seconds to find the exact transition
|
||||||
|
while rlnPeer.calcEpoch(timePtr) == currentEpoch:
|
||||||
|
timePtr += 1 # 1 second
|
||||||
|
|
||||||
|
# Ensure the returned time is in the future
|
||||||
|
if timePtr > epochTime():
|
||||||
|
return timePtr
|
||||||
|
else:
|
||||||
|
return epochTime()
|
||||||
|
|
||||||
proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
|
proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
|
||||||
## stops the rln-relay protocol
|
## stops the rln-relay protocol
|
||||||
## Throws an error if it cannot stop the rln-relay protocol
|
## Throws an error if it cannot stop the rln-relay protocol
|
||||||
|
|
||||||
# stop the group sync, and flush data to tree db
|
# stop the group sync, and flush data to tree db
|
||||||
info "stopping rln-relay"
|
info "stopping rln-relay"
|
||||||
|
await rlnPeer.epochMonitorFuture.cancelAndWait()
|
||||||
await rlnPeer.groupManager.stop()
|
await rlnPeer.groupManager.stop()
|
||||||
|
|
||||||
proc hasDuplicate*(
|
proc hasDuplicate*(
|
||||||
@ -392,6 +416,18 @@ proc generateRlnValidator*(
|
|||||||
|
|
||||||
return validator
|
return validator
|
||||||
|
|
||||||
|
proc monitorEpochs(wakuRlnRelay: WakuRLNRelay): Future[void] {.async.} =
|
||||||
|
while true:
|
||||||
|
try:
|
||||||
|
waku_rln_remaining_proofs_per_epoch.set(
|
||||||
|
wakuRlnRelay.groupManager.userMessageLimit.get().float64
|
||||||
|
)
|
||||||
|
except CatchableError:
|
||||||
|
error "Error in epoch monitoring", error = getCurrentExceptionMsg()
|
||||||
|
|
||||||
|
let nextEpochTime = wakuRlnRelay.nextEpoch(epochTime())
|
||||||
|
await sleepAsync(int(wakuRlnRelay.rlnEpochSizeSec * 1000))
|
||||||
|
|
||||||
proc mount(
|
proc mount(
|
||||||
conf: WakuRlnConfig, registrationHandler = none(RegistrationHandler)
|
conf: WakuRlnConfig, registrationHandler = none(RegistrationHandler)
|
||||||
): Future[RlnRelayResult[WakuRlnRelay]] {.async.} =
|
): Future[RlnRelayResult[WakuRlnRelay]] {.async.} =
|
||||||
@ -445,17 +481,19 @@ proc mount(
|
|||||||
(await groupManager.startGroupSync()).isOkOr:
|
(await groupManager.startGroupSync()).isOkOr:
|
||||||
return err("could not start the group sync: " & $error)
|
return err("could not start the group sync: " & $error)
|
||||||
|
|
||||||
return ok(
|
wakuRlnRelay = WakuRLNRelay(
|
||||||
WakuRLNRelay(
|
groupManager: groupManager,
|
||||||
groupManager: groupManager,
|
nonceManager:
|
||||||
nonceManager:
|
NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float),
|
||||||
NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float),
|
rlnEpochSizeSec: conf.rlnEpochSizeSec,
|
||||||
rlnEpochSizeSec: conf.rlnEpochSizeSec,
|
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1),
|
||||||
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1),
|
onFatalErrorAction: conf.onFatalErrorAction,
|
||||||
onFatalErrorAction: conf.onFatalErrorAction,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Start epoch monitoring in the background
|
||||||
|
wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay)
|
||||||
|
return ok(wakuRlnRelay)
|
||||||
|
|
||||||
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
|
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
|
||||||
## returns true if the rln-relay protocol is ready to relay messages
|
## returns true if the rln-relay protocol is ready to relay messages
|
||||||
## returns false otherwise
|
## returns false otherwise
|
||||||
|
Loading…
x
Reference in New Issue
Block a user