mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
chore(rln): remove old and add new rln metric (#1926)
This commit is contained in:
parent
66ab55a8ef
commit
fa716d9b92
@ -69,9 +69,9 @@ template initializedGuard(g: OnchainGroupManager): untyped =
|
|||||||
if not g.initialized:
|
if not g.initialized:
|
||||||
raise newException(ValueError, "OnchainGroupManager is not initialized")
|
raise newException(ValueError, "OnchainGroupManager is not initialized")
|
||||||
|
|
||||||
method atomicBatch*(g: OnchainGroupManager,
|
method atomicBatch*(g: OnchainGroupManager,
|
||||||
start: MembershipIndex,
|
start: MembershipIndex,
|
||||||
idCommitments = newSeq[IDCommitment](),
|
idCommitments = newSeq[IDCommitment](),
|
||||||
toRemoveIndices = newSeq[MembershipIndex]()): Future[void] {.async.} =
|
toRemoveIndices = newSeq[MembershipIndex]()): Future[void] {.async.} =
|
||||||
initializedGuard(g)
|
initializedGuard(g)
|
||||||
|
|
||||||
@ -79,6 +79,7 @@ method atomicBatch*(g: OnchainGroupManager,
|
|||||||
let operationSuccess = g.rlnInstance.atomicWrite(some(start), idCommitments, toRemoveIndices)
|
let operationSuccess = g.rlnInstance.atomicWrite(some(start), idCommitments, toRemoveIndices)
|
||||||
if not operationSuccess:
|
if not operationSuccess:
|
||||||
raise newException(ValueError, "atomic batch operation failed")
|
raise newException(ValueError, "atomic batch operation failed")
|
||||||
|
waku_rln_number_registered_memberships.inc(int64(idCommitments.len - toRemoveIndices.len))
|
||||||
|
|
||||||
if g.registerCb.isSome():
|
if g.registerCb.isSome():
|
||||||
var membersSeq = newSeq[Membership]()
|
var membersSeq = newSeq[Membership]()
|
||||||
@ -249,7 +250,7 @@ proc getBlockTable(g: OnchainGroupManager,
|
|||||||
|
|
||||||
return blockTable
|
return blockTable
|
||||||
|
|
||||||
proc handleEvents(g: OnchainGroupManager,
|
proc handleEvents(g: OnchainGroupManager,
|
||||||
blockTable: BlockTable): Future[void] {.async.} =
|
blockTable: BlockTable): Future[void] {.async.} =
|
||||||
initializedGuard(g)
|
initializedGuard(g)
|
||||||
|
|
||||||
@ -259,7 +260,7 @@ proc handleEvents(g: OnchainGroupManager,
|
|||||||
let removalIndices = members.filterIt(it[1]).mapIt(it[0].index)
|
let removalIndices = members.filterIt(it[1]).mapIt(it[0].index)
|
||||||
let idCommitments = members.mapIt(it[0].idCommitment)
|
let idCommitments = members.mapIt(it[0].idCommitment)
|
||||||
await g.atomicBatch(start = startIndex,
|
await g.atomicBatch(start = startIndex,
|
||||||
idCommitments = idCommitments,
|
idCommitments = idCommitments,
|
||||||
toRemoveIndices = removalIndices)
|
toRemoveIndices = removalIndices)
|
||||||
g.latestIndex = startIndex + MembershipIndex(idCommitments.len())
|
g.latestIndex = startIndex + MembershipIndex(idCommitments.len())
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
@ -277,7 +278,7 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable): Future
|
|||||||
for blockNumber, members in blockTable.pairs():
|
for blockNumber, members in blockTable.pairs():
|
||||||
if members.anyIt(it[1]):
|
if members.anyIt(it[1]):
|
||||||
numRemovedBlocks += 1
|
numRemovedBlocks += 1
|
||||||
|
|
||||||
await g.backfillRootQueue(numRemovedBlocks)
|
await g.backfillRootQueue(numRemovedBlocks)
|
||||||
|
|
||||||
proc getAndHandleEvents(g: OnchainGroupManager,
|
proc getAndHandleEvents(g: OnchainGroupManager,
|
||||||
@ -289,7 +290,7 @@ proc getAndHandleEvents(g: OnchainGroupManager,
|
|||||||
await g.handleEvents(blockTable)
|
await g.handleEvents(blockTable)
|
||||||
await g.handleRemovedEvents(blockTable)
|
await g.handleRemovedEvents(blockTable)
|
||||||
|
|
||||||
let latestProcessedBlock = if toBlock.isSome(): toBlock.get()
|
let latestProcessedBlock = if toBlock.isSome(): toBlock.get()
|
||||||
else: fromBlock
|
else: fromBlock
|
||||||
g.latestProcessedBlock = some(latestProcessedBlock)
|
g.latestProcessedBlock = some(latestProcessedBlock)
|
||||||
let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata(
|
let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata(
|
||||||
@ -343,7 +344,7 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||||
try:
|
try:
|
||||||
# we always want to sync from last processed block => latest
|
# we always want to sync from last processed block => latest
|
||||||
if fromBlock == BlockNumber(0) or
|
if fromBlock == BlockNumber(0) or
|
||||||
fromBlock + BlockNumber(blockChunkSize) < latestBlock:
|
fromBlock + BlockNumber(blockChunkSize) < latestBlock:
|
||||||
# chunk events
|
# chunk events
|
||||||
while true:
|
while true:
|
||||||
@ -421,8 +422,7 @@ method startGroupSync*(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
g.idCredentials = some(idCredential)
|
g.idCredentials = some(idCredential)
|
||||||
|
|
||||||
debug "registering commitment on contract"
|
debug "registering commitment on contract"
|
||||||
waku_rln_registration_duration_seconds.nanosecondTime:
|
await g.register(idCredential)
|
||||||
await g.register(idCredential)
|
|
||||||
if g.registrationHandler.isSome():
|
if g.registrationHandler.isSome():
|
||||||
# We need to callback with the tx hash
|
# We need to callback with the tx hash
|
||||||
let handler = g.registrationHandler.get()
|
let handler = g.registrationHandler.get()
|
||||||
@ -518,5 +518,5 @@ method stop*(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
let flushed = g.rlnInstance.flush()
|
let flushed = g.rlnInstance.flush()
|
||||||
if not flushed:
|
if not flushed:
|
||||||
error "failed to flush to the tree db"
|
error "failed to flush to the tree db"
|
||||||
|
|
||||||
g.initialized = false
|
g.initialized = false
|
||||||
|
|||||||
@ -35,12 +35,11 @@ declarePublicHistogram(identifier = waku_rln_valid_messages_total,
|
|||||||
buckets = generateBucketsForHistogram(AcceptableRootWindowSize))
|
buckets = generateBucketsForHistogram(AcceptableRootWindowSize))
|
||||||
declarePublicCounter(waku_rln_errors_total, "number of errors detected while operating the rln relay", ["type"])
|
declarePublicCounter(waku_rln_errors_total, "number of errors detected while operating the rln relay", ["type"])
|
||||||
declarePublicCounter(waku_rln_proof_verification_total, "number of times the rln proofs are verified")
|
declarePublicCounter(waku_rln_proof_verification_total, "number of times the rln proofs are verified")
|
||||||
|
declarePublicCounter(waku_rln_number_registered_memberships, "number of registered and active rln memberships")
|
||||||
|
|
||||||
# Timing metrics
|
# Timing metrics
|
||||||
declarePublicGauge(waku_rln_proof_verification_duration_seconds, "time taken to verify a proof")
|
declarePublicGauge(waku_rln_proof_verification_duration_seconds, "time taken to verify a proof")
|
||||||
declarePublicGauge(waku_rln_relay_mounting_duration_seconds, "time taken to mount the waku rln relay")
|
|
||||||
declarePublicGauge(waku_rln_proof_generation_duration_seconds, "time taken to generate a proof")
|
declarePublicGauge(waku_rln_proof_generation_duration_seconds, "time taken to generate a proof")
|
||||||
declarePublicGauge(waku_rln_registration_duration_seconds, "time taken to register to a rln membership set")
|
|
||||||
declarePublicGauge(waku_rln_instance_creation_duration_seconds, "time taken to create an rln instance")
|
declarePublicGauge(waku_rln_instance_creation_duration_seconds, "time taken to create an rln instance")
|
||||||
declarePublicGauge(waku_rln_membership_insertion_duration_seconds, "time taken to insert a new member into the local merkle tree")
|
declarePublicGauge(waku_rln_membership_insertion_duration_seconds, "time taken to insert a new member into the local merkle tree")
|
||||||
declarePublicGauge(waku_rln_membership_credentials_import_duration_seconds, "time taken to import membership credentials")
|
declarePublicGauge(waku_rln_membership_credentials_import_duration_seconds, "time taken to import membership credentials")
|
||||||
|
|||||||
@ -399,9 +399,7 @@ proc new*(T: type WakuRlnRelay,
|
|||||||
## Returns an error if the rln-relay protocol could not be mounted.
|
## Returns an error if the rln-relay protocol could not be mounted.
|
||||||
debug "rln-relay input validation passed"
|
debug "rln-relay input validation passed"
|
||||||
try:
|
try:
|
||||||
waku_rln_relay_mounting_duration_seconds.nanosecondTime:
|
let rlnRelay = await mount(conf, registrationHandler)
|
||||||
let rlnRelay = await mount(conf,
|
|
||||||
registrationHandler)
|
|
||||||
return ok(rlnRelay)
|
return ok(rlnRelay)
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
return err(e.msg)
|
return err(e.msg)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user