feat: deprecate sync / local merkle tree (#3312)

This commit is contained in:
Darshan K 2025-05-09 05:37:58 +05:30 committed by GitHub
parent cc66c7fe78
commit d86babac3a
11 changed files with 458 additions and 559 deletions

View File

@ -165,7 +165,7 @@ nimbus-build-system-nimble-dir:
.PHONY: librln
LIBRLN_BUILDDIR := $(CURDIR)/vendor/zerokit
LIBRLN_VERSION := v0.5.1
LIBRLN_VERSION := v0.7.0
ifeq ($(detected_OS),Windows)
LIBRLN_FILE := rln.lib

View File

@ -3,7 +3,7 @@
{.push raises: [].}
import
std/[options, sequtils, deques],
std/[options, sequtils, deques, random],
results,
stew/byteutils,
testutils/unittests,
@ -13,7 +13,8 @@ import
web3,
libp2p/crypto/crypto,
eth/keys,
tests/testlib/testasync
tests/testlib/testasync,
tests/testlib/testutils
import
waku/[
@ -47,7 +48,6 @@ suite "Onchain group manager":
manager.ethRpc.isSome()
manager.wakuRlnContract.isSome()
manager.initialized
manager.rlnContractDeployedBlockNumber > 0.Quantity
manager.rlnRelayMaxMessageLimit == 100
asyncTest "should error on initialization when chainId does not match":
@ -97,18 +97,13 @@ suite "Onchain group manager":
echo "---"
asyncTest "should error if contract does not exist":
var triggeredError = false
manager.ethContractAddress = "0x0000000000000000000000000000000000000000"
manager.onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
echo "---"
discard
"Failed to get the deployed block number. Have you set the correct contract address?: No response from the Web3 provider"
echo msg
echo "---"
triggeredError = true
discard await manager.init()
var triggeredError = false
try:
discard await manager.init()
except CatchableError:
triggeredError = true
check triggeredError
@ -119,103 +114,71 @@ suite "Onchain group manager":
(await manager.init()).isErrOr:
raiseAssert "Expected error when keystore file doesn't exist"
asyncTest "startGroupSync: should start group sync":
asyncTest "trackRootChanges: start tracking roots":
(await manager.init()).isOkOr:
raiseAssert $error
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
discard manager.trackRootChanges()
asyncTest "startGroupSync: should guard against uninitialized state":
(await manager.startGroupSync()).isErrOr:
raiseAssert "Expected error when not initialized"
asyncTest "trackRootChanges: should guard against uninitialized state":
try:
discard manager.trackRootChanges()
except CatchableError:
check getCurrentExceptionMsg().len == 38
asyncTest "startGroupSync: should sync to the state of the group":
asyncTest "trackRootChanges: should sync to the state of the group":
let credentials = generateCredentials(manager.rlnInstance)
let rateCommitment = getRateCommitment(credentials, UserMessageLimit(1)).valueOr:
raiseAssert $error
(await manager.init()).isOkOr:
raiseAssert $error
let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr:
raiseAssert $error
let fut = newFuture[void]("startGroupSync")
proc generateCallback(fut: Future[void]): OnRegisterCallback =
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
check:
registrations.len == 1
registrations[0].index == 0
registrations[0].rateCommitment == rateCommitment
fut.complete()
return callback
let merkleRootBefore = manager.fetchMerkleRoot()
try:
manager.onRegister(generateCallback(fut))
await manager.register(credentials, UserMessageLimit(1))
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await fut
discard await withTimeout(trackRootChanges(manager), 15.seconds)
let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr:
let merkleRootAfter = manager.fetchMerkleRoot()
let metadataSetRes = manager.setMetadata()
assert metadataSetRes.isOk(), metadataSetRes.error
let metadataOpt = getMetadata(manager.rlnInstance).valueOr:
raiseAssert $error
let metadataOpt = manager.rlnInstance.getMetadata().valueOr:
raiseAssert $error
assert metadataOpt.isSome(), "metadata is not set"
let metadata = metadataOpt.get()
check:
metadataOpt.get().validRoots == manager.validRoots.toSeq()
metadata.validRoots == manager.validRoots.toSeq()
merkleRootBefore != merkleRootAfter
asyncTest "startGroupSync: should fetch history correctly":
asyncTest "trackRootChanges: should fetch history correctly":
# TODO: We can't use `trackRootChanges()` directly in this test because its current implementation
# relies on a busy loop rather than event-based monitoring. As a result, some root changes
# may be missed, leading to inconsistent test results (i.e., it may randomly return true or false).
# To ensure reliability, we use the `updateRoots()` function to validate the `validRoots` window
# after each registration.
const credentialCount = 6
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
(await manager.init()).isOkOr:
raiseAssert $error
let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr:
raiseAssert $error
type TestGroupSyncFuts = array[0 .. credentialCount - 1, Future[void]]
var futures: TestGroupSyncFuts
for i in 0 ..< futures.len():
futures[i] = newFuture[void]()
proc generateCallback(
futs: TestGroupSyncFuts, credentials: seq[IdentityCredential]
): OnRegisterCallback =
var futureIndex = 0
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
let rateCommitment =
getRateCommitment(credentials[futureIndex], UserMessageLimit(1))
if registrations.len == 1 and
registrations[0].rateCommitment == rateCommitment.get() and
registrations[0].index == MembershipIndex(futureIndex):
futs[futureIndex].complete()
futureIndex += 1
return callback
let merkleRootBefore = manager.fetchMerkleRoot()
try:
manager.onRegister(generateCallback(futures, credentials))
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
for i in 0 ..< credentials.len():
await manager.register(credentials[i], UserMessageLimit(1))
discard await manager.updateRoots()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await allFutures(futures)
let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr:
raiseAssert $error
let merkleRootAfter = manager.fetchMerkleRoot()
check:
merkleRootBefore != merkleRootAfter
manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize
manager.validRoots.len() == credentialCount
asyncTest "register: should guard against uninitialized state":
let dummyCommitment = default(IDCommitment)
@ -232,14 +195,12 @@ suite "Onchain group manager":
assert false, "exception raised: " & getCurrentExceptionMsg()
asyncTest "register: should register successfully":
# TODO :- similar to ```trackRootChanges: should fetch history correctly```
(await manager.init()).isOkOr:
raiseAssert $error
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
let idCommitment = generateCredentials(manager.rlnInstance).idCommitment
let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr:
raiseAssert $error
let merkleRootBefore = manager.fetchMerkleRoot()
try:
await manager.register(
@ -251,10 +212,10 @@ suite "Onchain group manager":
assert false,
"exception raised when calling register: " & getCurrentExceptionMsg()
let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr:
raiseAssert $error
let merkleRootAfter = manager.fetchMerkleRoot()
check:
merkleRootAfter.inHex() != merkleRootBefore.inHex()
merkleRootAfter != merkleRootBefore
manager.latestIndex == 1
asyncTest "register: callback is called":
@ -264,19 +225,19 @@ suite "Onchain group manager":
let fut = newFuture[void]()
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
let rateCommitment = getRateCommitment(idCredentials, UserMessageLimit(1))
let rateCommitment = getRateCommitment(idCredentials, UserMessageLimit(1)).get()
check:
registrations.len == 1
registrations[0].rateCommitment == rateCommitment.get()
registrations[0].rateCommitment == rateCommitment
registrations[0].index == 0
fut.complete()
manager.onRegister(callback)
(await manager.init()).isOkOr:
raiseAssert $error
manager.onRegister(callback)
try:
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
await manager.register(
RateCommitment(
idCommitment: idCommitment, userMessageLimit: UserMessageLimit(1)
@ -298,38 +259,43 @@ suite "Onchain group manager":
assert false, "exception raised: " & getCurrentExceptionMsg()
asyncTest "validateRoot: should validate good root":
let credentials = generateCredentials(manager.rlnInstance)
(await manager.init()).isOkOr:
raiseAssert $error
let idCredentials = generateCredentials(manager.rlnInstance)
let idCommitment = idCredentials.idCommitment
let fut = newFuture[void]()
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
registrations[0].rateCommitment ==
getRateCommitment(credentials, UserMessageLimit(1)).get() and
getRateCommitment(idCredentials, UserMessageLimit(1)).get() and
registrations[0].index == 0:
manager.idCredentials = some(credentials)
manager.idCredentials = some(idCredentials)
fut.complete()
manager.onRegister(callback)
(await manager.init()).isOkOr:
raiseAssert $error
try:
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
await manager.register(credentials, UserMessageLimit(1))
await manager.register(idCredentials, UserMessageLimit(1))
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await fut
let rootUpdated = await manager.updateRoots()
if rootUpdated:
let proofResult = await manager.fetchMerkleProofElements()
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
manager.merkleProofCache = proofResult.get()
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let validProofRes = manager.generateProof(
data = messageBytes, epoch = epoch, messageId = MessageId(1)
)
@ -338,38 +304,39 @@ suite "Onchain group manager":
validProofRes.isOk()
let validProof = validProofRes.get()
# validate the root (should be true)
let validated = manager.validateRoot(validProof.merkleRoot)
check:
validated
asyncTest "validateRoot: should reject bad root":
let idCredentials = generateCredentials(manager.rlnInstance)
let idCommitment = idCredentials.idCommitment
(await manager.init()).isOkOr:
raiseAssert $error
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
let credentials = generateCredentials(manager.rlnInstance)
## Assume the registration occured out of band
manager.idCredentials = some(credentials)
manager.membershipIndex = some(MembershipIndex(0))
manager.userMessageLimit = some(UserMessageLimit(1))
manager.membershipIndex = some(MembershipIndex(0))
manager.idCredentials = some(idCredentials)
manager.merkleProofCache = newSeq[byte](640)
for i in 0 ..< 640:
manager.merkleProofCache[i] = byte(rand(255))
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let validProof = manager.generateProof(
data = messageBytes, epoch = epoch, messageId = MessageId(0)
).valueOr:
raiseAssert $error
let validProofRes = manager.generateProof(
data = messageBytes, epoch = epoch, messageId = MessageId(1)
)
check:
validProofRes.isOk()
let validProof = validProofRes.get()
# validate the root (should be false)
let validated = manager.validateRoot(validProof.merkleRoot)
check:
@ -393,13 +360,19 @@ suite "Onchain group manager":
manager.onRegister(callback)
try:
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
await manager.register(credentials, UserMessageLimit(1))
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await fut
let rootUpdated = await manager.updateRoots()
if rootUpdated:
let proofResult = await manager.fetchMerkleProofElements()
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
manager.merkleProofCache = proofResult.get()
let messageBytes = "Hello".toBytes()
# prepare the epoch
@ -412,7 +385,6 @@ suite "Onchain group manager":
).valueOr:
raiseAssert $error
# verify the proof (should be true)
let verified = manager.verifyProof(messageBytes, validProof).valueOr:
raiseAssert $error
@ -422,31 +394,23 @@ suite "Onchain group manager":
asyncTest "verifyProof: should reject invalid proof":
(await manager.init()).isOkOr:
raiseAssert $error
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
let idCredential = generateCredentials(manager.rlnInstance)
try:
await manager.register(
RateCommitment(
idCommitment: idCredential.idCommitment, userMessageLimit: UserMessageLimit(1)
)
)
await manager.register(idCredential, UserMessageLimit(1))
except Exception, CatchableError:
assert false,
"exception raised when calling startGroupSync: " & getCurrentExceptionMsg()
let idCredential2 = generateCredentials(manager.rlnInstance)
## Assume the registration occured out of band
manager.idCredentials = some(idCredential2)
manager.membershipIndex = some(MembershipIndex(0))
manager.userMessageLimit = some(UserMessageLimit(1))
let messageBytes = "Hello".toBytes()
# prepare the epoch
let rootUpdated = await manager.updateRoots()
manager.merkleProofCache = newSeq[byte](640)
for i in 0 ..< 640:
manager.merkleProofCache[i] = byte(rand(255))
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
@ -466,8 +430,8 @@ suite "Onchain group manager":
check:
verified == false
asyncTest "backfillRootQueue: should backfill roots in event of chain reorg":
const credentialCount = 6
asyncTest "root queue should be updated correctly":
const credentialCount = 12
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
(await manager.init()).isOkOr:
raiseAssert $error
@ -493,33 +457,17 @@ suite "Onchain group manager":
try:
manager.onRegister(generateCallback(futures, credentials))
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
for i in 0 ..< credentials.len():
await manager.register(credentials[i], UserMessageLimit(1))
discard await manager.updateRoots()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await allFutures(futures)
# At this point, we should have a full root queue, 5 roots, and partial buffer of 1 root
check:
manager.validRoots.len() == credentialCount - 1
manager.validRootBuffer.len() == 1
# We can now simulate a chain reorg by calling backfillRootQueue
let expectedLastRoot = manager.validRootBuffer[0]
try:
await manager.backfillRootQueue(1)
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
# We should now have 5 roots in the queue, and no partial buffer
check:
manager.validRoots.len() == credentialCount - 1
manager.validRootBuffer.len() == 0
manager.validRoots[credentialCount - 2] == expectedLastRoot
manager.validRoots.len() == credentialCount
asyncTest "isReady should return false if ethRpc is none":
(await manager.init()).isOkOr:
@ -536,25 +484,9 @@ suite "Onchain group manager":
check:
isReady == false
asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
(await manager.init()).isOkOr:
raiseAssert $error
var isReady = true
try:
isReady = await manager.isReady()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
check:
isReady == false
asyncTest "isReady should return true if ethRpc is ready":
(await manager.init()).isOkOr:
raiseAssert $error
# node can only be ready after group sync is done
(await manager.startGroupSync()).isOkOr:
raiseAssert $error
var isReady = false
try:

View File

@ -529,7 +529,6 @@ procSuite "WakuNode - RLN relay":
xasyncTest "clearNullifierLog: should clear epochs > MaxEpochGap":
## This is skipped because is flaky and made CI randomly fail but is useful to run manually
# Given two nodes
let
contentTopic = ContentTopic("/waku/2/default-content/proto")

2
vendor/zerokit vendored

@ -1 +1 @@
Subproject commit b9d27039c3266af108882d7a8bafc37400d29855
Subproject commit ba467d370c56b7432522227de22fbd664d44ef3e

View File

@ -27,9 +27,6 @@ proc inHex*(
valueHex = "0" & valueHex
return toLowerAscii(valueHex)
proc toUserMessageLimit*(v: UInt256): UserMessageLimit =
return cast[UserMessageLimit](v)
proc encodeLengthPrefix*(input: openArray[byte]): seq[byte] =
## returns length prefixed version of the input
## with the following format [len<8>|input<var>]
@ -78,6 +75,31 @@ proc serialize*(
)
return output
proc serialize*(witness: RLNWitnessInput): seq[byte] =
## Serializes the RLN witness into a byte array following zerokit's expected format.
## The serialized format includes:
## - identity_secret (32 bytes, little-endian with zero padding)
## - user_message_limit (32 bytes, little-endian with zero padding)
## - message_id (32 bytes, little-endian with zero padding)
## - merkle tree depth (8 bytes, little-endian) = path_elements.len / 32
## - path_elements (each 32 bytes, ordered bottom-to-top)
## - merkle tree depth again (8 bytes, little-endian)
## - identity_path_index (sequence of bits as bytes, 0 = left, 1 = right)
## - x (32 bytes, little-endian with zero padding)
## - external_nullifier (32 bytes, little-endian with zero padding)
var buffer: seq[byte]
buffer.add(@(witness.identity_secret))
buffer.add(@(witness.user_message_limit))
buffer.add(@(witness.message_id))
buffer.add(toBytes(uint64(witness.path_elements.len / 32), Endianness.littleEndian))
for element in witness.path_elements:
buffer.add(element)
buffer.add(toBytes(uint64(witness.path_elements.len / 32), Endianness.littleEndian))
buffer.add(witness.identity_path_index)
buffer.add(@(witness.x))
buffer.add(@(witness.external_nullifier))
return buffer
proc serialize*(proof: RateLimitProof, data: openArray[byte]): seq[byte] =
## a private proc to convert RateLimitProof and data to a byte seq
## this conversion is used in the proof verification proc
@ -133,3 +155,25 @@ func `+`*(a, b: Quantity): Quantity {.borrow.}
func u256*(n: Quantity): UInt256 {.inline.} =
n.uint64.stuint(256)
proc uint64ToField*(n: uint64): array[32, byte] =
var output: array[32, byte]
let bytes = toBytes(n, Endianness.littleEndian)
output[0 ..< bytes.len] = bytes
return output
proc UInt256ToField*(v: UInt256): array[32, byte] =
return cast[array[32, byte]](v) # already doesn't use `result`
proc seqToField*(s: seq[byte]): array[32, byte] =
var output: array[32, byte]
let len = min(s.len, 32)
for i in 0 ..< len:
output[i] = s[i]
return output
proc uint64ToIndex*(index: MembershipIndex, depth: int): seq[byte] =
var output = newSeq[byte](depth)
for i in 0 ..< depth:
output[i] = byte((index shr i) and 1) # LSB-first bit decomposition
return output

View File

@ -145,7 +145,6 @@ method validateRoot*(
g: GroupManager, root: MerkleNode
): bool {.base, gcsafe, raises: [].} =
## validates the root against the valid roots queue
# Check if the root is in the valid roots queue
if g.indexOfRoot(root) >= 0:
return true
return false
@ -175,7 +174,7 @@ method verifyProof*(
method generateProof*(
g: GroupManager,
data: openArray[byte],
data: seq[byte],
epoch: Epoch,
messageId: MessageId,
rlnIdentifier = DefaultRlnIdentifier,
@ -189,6 +188,7 @@ method generateProof*(
return err("membership index is not set")
if g.userMessageLimit.isNone():
return err("user message limit is not set")
waku_rln_proof_generation_duration_seconds.nanosecondTime:
let proof = proofGen(
rlnInstance = g.rlnInstance,
@ -201,8 +201,6 @@ method generateProof*(
).valueOr:
return err("proof generation failed: " & $error)
waku_rln_remaining_proofs_per_epoch.dec()
waku_rln_total_generated_proofs.inc()
return ok(proof)
method isReady*(g: GroupManager): Future[bool] {.base, async.} =

View File

@ -10,19 +10,18 @@ import
nimcrypto/keccak as keccak,
stint,
json,
std/tables,
std/[strutils, tables, algorithm],
stew/[byteutils, arrayops],
sequtils,
strutils
sequtils
import
../../../waku_keystore,
../../rln,
../../rln/rln_interface,
../../conversion_utils,
../group_manager_base,
./retry_wrapper
from strutils import parseHexInt
export group_manager_base
logScope:
@ -31,19 +30,23 @@ logScope:
# using the when predicate does not work within the contract macro, hence need to dupe
contract(WakuRlnContract):
# this serves as an entrypoint into the rln membership set
proc register(idCommitment: UInt256, userMessageLimit: EthereumUInt32)
proc register(idCommitment: UInt256, userMessageLimit: UInt32)
# Initializes the implementation contract (only used in unit tests)
proc initialize(maxMessageLimit: UInt256)
# this event is raised when a new member is registered
proc MemberRegistered(rateCommitment: UInt256, index: EthereumUInt32) {.event.}
proc MemberRegistered(rateCommitment: UInt256, index: UInt32) {.event.}
# this function denotes existence of a given user
proc memberExists(idCommitment: Uint256): UInt256 {.view.}
proc memberExists(idCommitment: UInt256): UInt256 {.view.}
# this constant describes the next index of a new member
proc commitmentIndex(): UInt256 {.view.}
# this constant describes the block number this contract was deployed on
proc deployedBlockNumber(): UInt256 {.view.}
# this constant describes max message limit of rln contract
proc MAX_MESSAGE_LIMIT(): UInt256 {.view.}
# this function returns the merkleProof for a given index
# proc merkleProofElements(index: UInt40): seq[byte] {.view.}
# this function returns the merkle root
proc root(): UInt256 {.view.}
type
WakuRlnContractWithSender = Sender[WakuRlnContract]
@ -52,42 +55,14 @@ type
ethPrivateKey*: Option[string]
ethContractAddress*: string
ethRpc*: Option[Web3]
rlnContractDeployedBlockNumber*: BlockNumber
wakuRlnContract*: Option[WakuRlnContractWithSender]
latestProcessedBlock*: BlockNumber
registrationTxHash*: Option[TxHash]
chainId*: uint
keystorePath*: Option[string]
keystorePassword*: Option[string]
registrationHandler*: Option[RegistrationHandler]
# this buffer exists to backfill appropriate roots for the merkle tree,
# in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
# because the average reorg depth is 1 to 2 blocks.
validRootBuffer*: Deque[MerkleNode]
# interval loop to shut down gracefully
blockFetchingActive*: bool
const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"
const DefaultBlockPollRate* = 6.seconds
template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(CatchableError, "OnchainGroupManager is not initialized")
proc resultifiedInitGuard(g: OnchainGroupManager): GroupManagerResult[void] =
try:
initializedGuard(g)
return ok()
except CatchableError:
return err("OnchainGroupManager is not initialized")
template retryWrapper(
g: OnchainGroupManager, res: auto, errStr: string, body: untyped
): auto =
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
body
latestProcessedBlock*: BlockNumber
merkleProofCache*: seq[byte]
proc setMetadata*(
g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber)
@ -112,33 +87,109 @@ proc setMetadata*(
return err("failed to persist rln metadata: " & getCurrentExceptionMsg())
return ok()
method atomicBatch*(
g: OnchainGroupManager,
start: MembershipIndex,
rateCommitments = newSeq[RawRateCommitment](),
toRemoveIndices = newSeq[MembershipIndex](),
): Future[void] {.async: (raises: [Exception]), base.} =
initializedGuard(g)
proc fetchMerkleProofElements*(
g: OnchainGroupManager
): Future[Result[seq[byte], string]] {.async.} =
try:
let membershipIndex = g.membershipIndex.get()
let index40 = stuint(membershipIndex, 40)
waku_rln_membership_insertion_duration_seconds.nanosecondTime:
let operationSuccess =
g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices)
if not operationSuccess:
raise newException(CatchableError, "atomic batch operation failed")
# TODO: when slashing is enabled, we need to track slashed members
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
let methodSig = "merkleProofElements(uint40)"
let methodIdDigest = keccak.keccak256.digest(methodSig)
let methodId = methodIdDigest.data[0 .. 3]
if g.registerCb.isSome():
var membersSeq = newSeq[Membership]()
for i in 0 ..< rateCommitments.len:
var index = start + MembershipIndex(i)
debug "registering member to callback",
rateCommitment = rateCommitments[i], index = index
let member = Membership(rateCommitment: rateCommitments[i], index: index)
membersSeq.add(member)
await g.registerCb.get()(membersSeq)
var paddedParam = newSeq[byte](32)
let indexBytes = index40.toBytesBE()
for i in 0 ..< min(indexBytes.len, paddedParam.len):
paddedParam[paddedParam.len - indexBytes.len + i] = indexBytes[i]
g.validRootBuffer = g.slideRootQueue()
var callData = newSeq[byte]()
for b in methodId:
callData.add(b)
callData.add(paddedParam)
var tx: TransactionArgs
tx.to = Opt.some(fromHex(Address, g.ethContractAddress))
tx.data = Opt.some(callData)
let responseBytes = await g.ethRpc.get().provider.eth_call(tx, "latest")
return ok(responseBytes)
except CatchableError:
error "Failed to fetch Merkle proof elements", error = getCurrentExceptionMsg()
return err("Failed to fetch merkle proof elements: " & getCurrentExceptionMsg())
proc fetchMerkleRoot*(
g: OnchainGroupManager
): Future[Result[UInt256, string]] {.async.} =
try:
let merkleRootInvocation = g.wakuRlnContract.get().root()
let merkleRoot = await merkleRootInvocation.call()
return ok(merkleRoot)
except CatchableError:
error "Failed to fetch Merkle root", error = getCurrentExceptionMsg()
return err("Failed to fetch merkle root: " & getCurrentExceptionMsg())
template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(CatchableError, "OnchainGroupManager is not initialized")
template retryWrapper(
g: OnchainGroupManager, res: auto, errStr: string, body: untyped
): auto =
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
body
method validateRoot*(g: OnchainGroupManager, root: MerkleNode): bool =
if g.validRoots.find(root) >= 0:
return true
return false
proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
let rootRes = await g.fetchMerkleRoot()
if rootRes.isErr():
return false
let merkleRoot = UInt256ToField(rootRes.get())
if g.validRoots.len == 0:
g.validRoots.addLast(merkleRoot)
return true
if g.validRoots[g.validRoots.len - 1] != merkleRoot:
if g.validRoots.len > AcceptableRootWindowSize:
discard g.validRoots.popFirst()
g.validRoots.addLast(merkleRoot)
return true
return false
proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError]).} =
try:
initializedGuard(g)
let ethRpc = g.ethRpc.get()
let wakuRlnContract = g.wakuRlnContract.get()
const rpcDelay = 5.seconds
while true:
let rootUpdated = await g.updateRoots()
if rootUpdated:
if g.membershipIndex.isNone():
error "membershipIndex is not set; skipping proof update"
else:
let proofResult = await g.fetchMerkleProofElements()
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
g.merkleProofCache = proofResult.get()
# also need update registerd membership
let memberCount = cast[int64](await wakuRlnContract.commitmentIndex().call())
waku_rln_number_registered_memberships.set(float64(memberCount))
await sleepAsync(rpcDelay)
except CatchableError:
error "Fatal error in trackRootChanges", error = getCurrentExceptionMsg()
method register*(
g: OnchainGroupManager, rateCommitment: RateCommitment
@ -147,18 +198,14 @@ method register*(
try:
let leaf = rateCommitment.toLeaf().get()
await g.registerBatch(@[leaf])
if g.registerCb.isSome():
let idx = g.latestIndex
debug "registering member via callback", rateCommitment = leaf, index = idx
await g.registerCb.get()(@[Membership(rateCommitment: leaf, index: idx)])
g.latestIndex.inc()
except CatchableError:
raise newException(ValueError, getCurrentExceptionMsg())
method registerBatch*(
g: OnchainGroupManager, rateCommitments: seq[RawRateCommitment]
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
await g.atomicBatch(g.latestIndex, rateCommitments)
g.latestIndex += MembershipIndex(rateCommitments.len)
method register*(
g: OnchainGroupManager,
identityCredential: IdentityCredential,
@ -212,8 +259,19 @@ method register*(
debug "parsed membershipIndex", membershipIndex
g.userMessageLimit = some(userMessageLimit)
g.membershipIndex = some(membershipIndex.toMembershipIndex())
g.idCredentials = some(identityCredential)
let rateCommitment = RateCommitment(
idCommitment: identityCredential.idCommitment, userMessageLimit: userMessageLimit
)
.toLeaf()
.get()
if g.registerCb.isSome():
let member = Membership(rateCommitment: rateCommitment, index: g.latestIndex)
await g.registerCb.get()(@[member])
g.latestIndex.inc()
# don't handle member insertion into the tree here, it will be handled by the event listener
return
method withdraw*(
@ -226,304 +284,173 @@ method withdrawBatch*(
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
# TODO: after slashing is enabled on the contract, use atomicBatch internally
proc getRootFromProofAndIndex(
g: OnchainGroupManager, elements: seq[byte], bits: seq[byte]
): GroupManagerResult[array[32, byte]] =
# this is a helper function to get root from merkle proof elements and index
# it's currently not used anywhere, but can be used to verify the root from the proof and index
# Compute leaf hash from idCommitment and messageLimit
let messageLimitField = uint64ToField(g.userMessageLimit.get())
let leafHashRes = poseidon(@[g.idCredentials.get().idCommitment, @messageLimitField])
if leafHashRes.isErr():
return err("Failed to compute leaf hash: " & leafHashRes.error)
proc parseEvent(
event: type MemberRegistered, log: JsonNode
): GroupManagerResult[Membership] =
## parses the `data` parameter of the `MemberRegistered` event `log`
## returns an error if it cannot parse the `data` parameter
var rateCommitment: UInt256
var index: UInt256
var data: seq[byte]
try:
data = hexToSeqByte(log["data"].getStr())
except ValueError:
return err(
"failed to parse the data field of the MemberRegistered event: " &
getCurrentExceptionMsg()
)
var offset = 0
try:
# Parse the rateCommitment
offset += decode(data, 0, offset, rateCommitment)
# Parse the index
offset += decode(data, 0, offset, index)
return ok(
Membership(
rateCommitment: rateCommitment.toRateCommitment(),
index: index.toMembershipIndex(),
)
)
except CatchableError:
return err("failed to parse the data field of the MemberRegistered event")
var hash = leafHashRes.get()
for i in 0 ..< bits.len:
let sibling = elements[i * 32 .. (i + 1) * 32 - 1]
type BlockTable* = OrderedTable[BlockNumber, seq[(Membership, bool)]]
let hashRes =
if bits[i] == 0:
poseidon(@[@hash, sibling])
else:
poseidon(@[sibling, @hash])
proc backfillRootQueue*(
g: OnchainGroupManager, len: uint
): Future[void] {.async: (raises: [Exception]).} =
if len > 0:
# backfill the tree's acceptable roots
for i in 0 .. len - 1:
# remove the last root
g.validRoots.popLast()
for i in 0 .. len - 1:
# add the backfilled root
g.validRoots.addLast(g.validRootBuffer.popLast())
hash = hashRes.valueOr:
return err("Failed to compute poseidon hash: " & error)
hash = hashRes.get()
proc insert(
blockTable: var BlockTable,
blockNumber: BlockNumber,
member: Membership,
removed: bool,
) =
let memberTuple = (member, removed)
if blockTable.hasKeyOrPut(blockNumber, @[memberTuple]):
try:
blockTable[blockNumber].add(memberTuple)
except KeyError: # qed
error "could not insert member into block table",
blockNumber = blockNumber, member = member
return ok(hash)
proc getRawEvents(
g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber
): Future[JsonNode] {.async: (raises: [Exception]).} =
initializedGuard(g)
method generateProof*(
g: OnchainGroupManager,
data: seq[byte],
epoch: Epoch,
messageId: MessageId,
rlnIdentifier = DefaultRlnIdentifier,
): GroupManagerResult[RateLimitProof] {.gcsafe, raises: [].} =
## Generates an RLN proof using the cached Merkle proof and custom witness
# Ensure identity credentials and membership index are set
if g.idCredentials.isNone():
return err("identity credentials are not set")
if g.membershipIndex.isNone():
return err("membership index is not set")
if g.userMessageLimit.isNone():
return err("user message limit is not set")
let ethRpc = g.ethRpc.get()
let wakuRlnContract = g.wakuRlnContract.get()
if (g.merkleProofCache.len mod 32) != 0:
return err("Invalid merkle proof cache length")
var eventStrs: seq[JsonString]
g.retryWrapper(eventStrs, "Failed to get the events"):
await wakuRlnContract.getJsonLogs(
MemberRegistered,
fromBlock = Opt.some(fromBlock.blockId()),
toBlock = Opt.some(toBlock.blockId()),
)
let identity_secret = seqToField(g.idCredentials.get().idSecretHash)
let user_message_limit = uint64ToField(g.userMessageLimit.get())
let message_id = uint64ToField(messageId)
var path_elements = newSeq[byte](0)
var events = newJArray()
for eventStr in eventStrs:
events.add(parseJson(eventStr.string))
return events
if (g.merkleProofCache.len mod 32) != 0:
return err("Invalid merkle proof cache length")
proc getBlockTable(
g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber
): Future[BlockTable] {.async: (raises: [Exception]).} =
initializedGuard(g)
let identity_path_index = uint64ToIndex(g.membershipIndex.get(), 20)
for i in 0 ..< g.merkleProofCache.len div 32:
let chunk = g.merkleProofCache[i * 32 .. (i + 1) * 32 - 1]
path_elements.add(chunk.reversed())
var blockTable = default(BlockTable)
let x = keccak.keccak256.digest(data)
let events = await g.getRawEvents(fromBlock, toBlock)
let extNullifier = poseidon(@[@(epoch), @(rlnIdentifier)]).valueOr:
return err("Failed to compute external nullifier: " & error)
if events.len == 0:
trace "no events found"
return blockTable
let witness = RLNWitnessInput(
identity_secret: identity_secret,
user_message_limit: user_message_limit,
message_id: message_id,
path_elements: path_elements,
identity_path_index: identity_path_index,
x: x,
external_nullifier: extNullifier,
)
for event in events:
let blockNumber = parseHexInt(event["blockNumber"].getStr()).BlockNumber
let removed = event["removed"].getBool()
let parsedEventRes = parseEvent(MemberRegistered, event)
if parsedEventRes.isErr():
error "failed to parse the MemberRegistered event", error = parsedEventRes.error()
raise newException(ValueError, "failed to parse the MemberRegistered event")
let parsedEvent = parsedEventRes.get()
blockTable.insert(blockNumber, parsedEvent, removed)
let serializedWitness = serialize(witness)
return blockTable
var input_witness_buffer = toBuffer(serializedWitness)
proc handleEvents(
g: OnchainGroupManager, blockTable: BlockTable
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
# Generate the proof using the zerokit API
var output_witness_buffer: Buffer
let witness_success = generate_proof_with_witness(
g.rlnInstance, addr input_witness_buffer, addr output_witness_buffer
)
for blockNumber, members in blockTable.pairs():
try:
let startIndex = blockTable[blockNumber].filterIt(not it[1])[0][0].index
let removalIndices = members.filterIt(it[1]).mapIt(it[0].index)
let rateCommitments = members.mapIt(it[0].rateCommitment)
await g.atomicBatch(
start = startIndex,
rateCommitments = rateCommitments,
toRemoveIndices = removalIndices,
)
g.latestIndex = startIndex + MembershipIndex(rateCommitments.len)
trace "new members added to the Merkle tree",
commitments = rateCommitments.mapIt(it.inHex)
except CatchableError:
error "failed to insert members into the tree", error = getCurrentExceptionMsg()
raise newException(ValueError, "failed to insert members into the tree")
if not witness_success:
return err("Failed to generate proof")
return
# Parse the proof into a RateLimitProof object
var proofValue = cast[ptr array[320, byte]](output_witness_buffer.`ptr`)
let proofBytes: array[320, byte] = proofValue[]
proc handleRemovedEvents(
g: OnchainGroupManager, blockTable: BlockTable
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
## parse the proof as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ]
let
proofOffset = 128
rootOffset = proofOffset + 32
externalNullifierOffset = rootOffset + 32
shareXOffset = externalNullifierOffset + 32
shareYOffset = shareXOffset + 32
nullifierOffset = shareYOffset + 32
# count number of blocks that have been removed
var numRemovedBlocks: uint = 0
for blockNumber, members in blockTable.pairs():
if members.anyIt(it[1]):
numRemovedBlocks += 1
var
zkproof: ZKSNARK
proofRoot, shareX, shareY: MerkleNode
externalNullifier: ExternalNullifier
nullifier: Nullifier
await g.backfillRootQueue(numRemovedBlocks)
discard zkproof.copyFrom(proofBytes[0 .. proofOffset - 1])
discard proofRoot.copyFrom(proofBytes[proofOffset .. rootOffset - 1])
discard
externalNullifier.copyFrom(proofBytes[rootOffset .. externalNullifierOffset - 1])
discard shareX.copyFrom(proofBytes[externalNullifierOffset .. shareXOffset - 1])
discard shareY.copyFrom(proofBytes[shareXOffset .. shareYOffset - 1])
discard nullifier.copyFrom(proofBytes[shareYOffset .. nullifierOffset - 1])
proc getAndHandleEvents(
g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber
): Future[bool] {.async: (raises: [Exception]).} =
initializedGuard(g)
let blockTable = await g.getBlockTable(fromBlock, toBlock)
try:
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)
except CatchableError:
error "failed to handle events", error = getCurrentExceptionMsg()
raise newException(ValueError, "failed to handle events")
# Create the RateLimitProof object
let output = RateLimitProof(
proof: zkproof,
merkleRoot: proofRoot,
externalNullifier: externalNullifier,
epoch: epoch,
rlnIdentifier: rlnIdentifier,
shareX: shareX,
shareY: shareY,
nullifier: nullifier,
)
g.latestProcessedBlock = toBlock
return true
debug "Proof generated successfully"
proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) =
g.blockFetchingActive = false
waku_rln_remaining_proofs_per_epoch.dec()
waku_rln_total_generated_proofs.inc()
return ok(output)
proc runIntervalLoop() {.async, gcsafe.} =
g.blockFetchingActive = true
method verifyProof*(
g: OnchainGroupManager, # verifier context
input: seq[byte], # raw message data (signal)
proof: RateLimitProof, # proof received from the peer
): GroupManagerResult[bool] {.gcsafe, raises: [].} =
## -- Verifies an RLN rate-limit proof against the set of valid Merkle roots --
while g.blockFetchingActive:
var retCb: bool
g.retryWrapper(retCb, "Failed to run the interval block fetching loop"):
await cb()
await sleepAsync(interval)
var normalizedProof = proof
# using asyncSpawn is OK here since
# we make use of the error handling provided by
# OnFatalErrorHandler
asyncSpawn runIntervalLoop()
normalizedProof.externalNullifier = poseidon(
@[@(proof.epoch), @(proof.rlnIdentifier)]
).valueOr:
return err("Failed to compute external nullifier: " & error)
proc getNewBlockCallback(g: OnchainGroupManager): proc =
let ethRpc = g.ethRpc.get()
proc wrappedCb(): Future[bool] {.async, gcsafe.} =
var latestBlock: BlockNumber
g.retryWrapper(latestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
let proofBytes = serialize(normalizedProof, input)
let proofBuffer = proofBytes.toBuffer()
if latestBlock <= g.latestProcessedBlock:
return
# get logs from the last block
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
var handleBlockRes: bool
g.retryWrapper(handleBlockRes, "Failed to handle new block"):
await g.getAndHandleEvents(fromBlock, latestBlock)
let rootsBytes = serialize(g.validRoots.items().toSeq())
let rootsBuffer = rootsBytes.toBuffer()
# cannot use isOkOr here because results in a compile-time error that
# shows the error is void for some reason
let setMetadataRes = g.setMetadata()
if setMetadataRes.isErr():
error "failed to persist rln metadata", error = setMetadataRes.error
var validProof: bool # out-param
let ffiOk = verify_with_roots(
g.rlnInstance, # RLN context created at init()
addr proofBuffer, # (proof + signal)
addr rootsBuffer, # valid Merkle roots
addr validProof # will be set by the FFI call
,
)
return handleBlockRes
if not ffiOk:
return err("could not verify the proof")
else:
trace "Proof verified successfully !"
return wrappedCb
proc startListeningToEvents(
g: OnchainGroupManager
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
let ethRpc = g.ethRpc.get()
let newBlockCallback = g.getNewBlockCallback()
g.runInInterval(newBlockCallback, DefaultBlockPollRate)
proc batchAwaitBlockHandlingFuture(
g: OnchainGroupManager, futs: seq[Future[bool]]
): Future[void] {.async: (raises: [Exception]).} =
for fut in futs:
try:
var handleBlockRes: bool
g.retryWrapper(handleBlockRes, "Failed to handle block"):
await fut
except CatchableError:
raise newException(
CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg()
)
proc startOnchainSync(
g: OnchainGroupManager
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
let ethRpc = g.ethRpc.get()
# static block chunk size
let blockChunkSize = 2_000.BlockNumber
# delay between rpc calls to not overload the rate limit
let rpcDelay = 200.milliseconds
# max number of futures to run concurrently
let maxFutures = 10
var fromBlock: BlockNumber =
if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
info "syncing from last processed block", blockNumber = g.latestProcessedBlock
g.latestProcessedBlock + 1
else:
info "syncing from rln contract deployed block",
blockNumber = g.rlnContractDeployedBlockNumber
g.rlnContractDeployedBlockNumber
var futs = newSeq[Future[bool]]()
var currentLatestBlock: BlockNumber
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
try:
# we always want to sync from last processed block => latest
# chunk events
while true:
# if the fromBlock is less than 2k blocks behind the current block
# then fetch the new toBlock
if fromBlock >= currentLatestBlock:
break
if fromBlock + blockChunkSize > currentLatestBlock:
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
let toBlock = min(fromBlock + blockChunkSize, currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
await sleepAsync(rpcDelay)
futs.add(g.getAndHandleEvents(fromBlock, toBlock))
if futs.len >= maxFutures or toBlock == currentLatestBlock:
await g.batchAwaitBlockHandlingFuture(futs)
g.setMetadata(lastProcessedBlock = some(toBlock)).isOkOr:
error "failed to persist rln metadata", error = $error
futs = newSeq[Future[bool]]()
fromBlock = toBlock + 1
except CatchableError:
raise newException(
CatchableError,
"failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg(),
)
# listen to blockheaders and contract events
try:
await g.startListeningToEvents()
except CatchableError:
raise newException(
ValueError, "failed to start listening to events: " & getCurrentExceptionMsg()
)
method startGroupSync*(
g: OnchainGroupManager
): Future[GroupManagerResult[void]] {.async.} =
?resultifiedInitGuard(g)
# Get archive history
try:
await startOnchainSync(g)
return ok()
except CatchableError, Exception:
return err("failed to start group sync: " & getCurrentExceptionMsg())
return ok(validProof)
method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} =
g.registerCb = some(cb)
@ -609,53 +536,27 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
let metadata = metadataGetOptRes.get().get()
if metadata.chainId != uint(g.chainId):
return err("persisted data: chain id mismatch")
if metadata.contractAddress != g.ethContractAddress.toLower():
return err("persisted data: contract address mismatch")
g.latestProcessedBlock = metadata.lastProcessedBlock.BlockNumber
g.validRoots = metadata.validRoots.toDeque()
var deployedBlockNumber: Uint256
g.retryWrapper(
deployedBlockNumber,
"Failed to get the deployed block number. Have you set the correct contract address?",
):
await wakuRlnContract.deployedBlockNumber().call()
debug "using rln contract", deployedBlockNumber, rlnContractAddress = contractAddress
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
g.rlnRelayMaxMessageLimit =
cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call())
proc onDisconnect() {.async.} =
error "Ethereum client disconnected"
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
info "reconnecting with the Ethereum client, and restarting group sync",
fromBlock = fromBlock
var newEthRpc: Web3
g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"):
await newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc)
try:
await g.startOnchainSync()
except CatchableError, Exception:
g.onFatalErrorAction(
"failed to restart group sync" & ": " & getCurrentExceptionMsg()
)
ethRpc.ondisconnect = proc() =
asyncSpawn onDisconnect()
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true
return ok()
method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} =
g.blockFetchingActive = false
if g.ethRpc.isSome():
g.ethRpc.get().ondisconnect = nil
await g.ethRpc.get().close()
@ -665,26 +566,13 @@ method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} =
g.initialized = false
proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async, gcsafe.} =
let ethRpc = g.ethRpc.get()
var syncing: SyncingStatus
g.retryWrapper(syncing, "Failed to get the syncing status"):
await ethRpc.provider.eth_syncing()
return syncing.syncing
method isReady*(g: OnchainGroupManager): Future[bool] {.async.} =
initializedGuard(g)
if g.ethRpc.isNone():
return false
var currentBlock: BlockNumber
g.retryWrapper(currentBlock, "Failed to get the current block number"):
cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber())
# the node is still able to process messages if it is behind the latest block by a factor of the valid roots
if u256(g.latestProcessedBlock.uint64) < (u256(currentBlock) - u256(g.validRoots.len)):
if g.wakuRlnContract.isNone():
return false
return not (await g.isSyncing())
return true

View File

@ -85,6 +85,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
var cumulativeProofsVerified = 0.float64
var cumulativeProofsGenerated = 0.float64
var cumulativeProofsRemaining = 100.float64
var cumulativeRegisteredMember = 0.float64
when defined(metrics):
logMetrics = proc() =
@ -107,6 +108,9 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
let freshProofsRemainingCount = parseAndAccumulate(
waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining
)
let freshRegisteredMemberCount = parseAndAccumulate(
waku_rln_number_registered_memberships, cumulativeRegisteredMember
)
info "Total messages", count = freshMsgCount
info "Total spam messages", count = freshSpamCount
@ -116,5 +120,6 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
info "Total proofs verified", count = freshProofsVerifiedCount
info "Total proofs generated", count = freshProofsGeneratedCount
info "Total proofs remaining", count = freshProofsRemainingCount
info "Total registered members", count = freshRegisteredMemberCount
return logMetrics

View File

@ -52,6 +52,20 @@ type RateLimitProof* = object
## the external nullifier used for the generation of the `proof` (derived from poseidon([epoch, rln_identifier]))
externalNullifier*: ExternalNullifier
type UInt40* = StUint[40]
type UInt32* = StUint[32]
type
Field = array[32, byte] # Field element representation (256 bits)
RLNWitnessInput* = object
identity_secret*: Field
user_message_limit*: Field
message_id*: Field
path_elements*: seq[byte]
identity_path_index*: seq[byte]
x*: Field
external_nullifier*: Field
type ProofMetadata* = object
nullifier*: Nullifier
shareX*: MerkleNode

View File

@ -130,6 +130,21 @@ proc generate_proof*(
## integers wrapped in <> indicate value sizes in bytes
## the return bool value indicates the success or failure of the operation
proc generate_proof_with_witness*(
ctx: ptr RLN, input_buffer: ptr Buffer, output_buffer: ptr Buffer
): bool {.importc: "generate_rln_proof_with_witness".}
## rln-v2
## "witness" term refer to collection of secret inputs with proper serialization
## input_buffer has to be serialized as [ identity_secret<32> | user_message_limit<32> | message_id<32> | path_elements<Vec<32>> | identity_path_index<Vec<1>> | x<32> | external_nullifier<32> ]
## output_buffer holds the proof data and should be parsed as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ]
## rln-v1
## input_buffer has to be serialized as [ id_key<32> | path_elements<Vec<32>> | identity_path_index<Vec<1>> | x<32> | epoch<32> | rln_identifier<32> ]
## output_buffer holds the proof data and should be parsed as [ proof<128> | root<32> | epoch<32> | share_x<32> | share_y<32> | nullifier<32> | rln_identifier<32> ]
## integers wrapped in <> indicate value sizes in bytes
## path_elements and identity_path_index serialize a merkle proof and are vectors of elements of 32 and 1 bytes respectively
## the return bool value indicates the success or failure of the operation
proc verify*(
ctx: ptr RLN, proof_buffer: ptr Buffer, proof_is_valid_ptr: ptr bool
): bool {.importc: "verify_rln_proof".}

View File

@ -98,6 +98,7 @@ type WakuRLNRelay* = ref object of RootObj
onFatalErrorAction*: OnFatalErrorHandler
nonceManager*: NonceManager
epochMonitorFuture*: Future[void]
rootChangesFuture*: Future[void]
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
@ -252,6 +253,7 @@ proc validateMessage*(
waku_rln_errors_total.inc(labelValues = ["proof_verification"])
warn "invalid message: proof verification failed", payloadLen = msg.payload.len
return MessageValidationResult.Invalid
if not proofVerificationRes.value():
# invalid proof
warn "invalid message: invalid proof", payloadLen = msg.payload.len
@ -467,9 +469,6 @@ proc mount(
# Initialize the groupManager
(await groupManager.init()).isOkOr:
return err("could not initialize the group manager: " & $error)
# Start the group sync
(await groupManager.startGroupSync()).isOkOr:
return err("could not start the group sync: " & $error)
wakuRlnRelay = WakuRLNRelay(
groupManager: groupManager,
@ -479,6 +478,11 @@ proc mount(
onFatalErrorAction: conf.onFatalErrorAction,
)
# track root changes on smart contract merkle tree
if groupManager of OnchainGroupManager:
let onchainManager = cast[OnchainGroupManager](groupManager)
wakuRlnRelay.rootChangesFuture = onchainManager.trackRootChanges()
# Start epoch monitoring in the background
wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay)
return ok(wakuRlnRelay)