diff --git a/Makefile b/Makefile index 5da2d6076..d15668673 100644 --- a/Makefile +++ b/Makefile @@ -165,7 +165,7 @@ nimbus-build-system-nimble-dir: .PHONY: librln LIBRLN_BUILDDIR := $(CURDIR)/vendor/zerokit -LIBRLN_VERSION := v0.5.1 +LIBRLN_VERSION := v0.7.0 ifeq ($(detected_OS),Windows) LIBRLN_FILE := rln.lib diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index b6fc44e27..25a3166ce 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -3,7 +3,7 @@ {.push raises: [].} import - std/[options, sequtils, deques], + std/[options, sequtils, deques, random], results, stew/byteutils, testutils/unittests, @@ -13,7 +13,8 @@ import web3, libp2p/crypto/crypto, eth/keys, - tests/testlib/testasync + tests/testlib/testasync, + tests/testlib/testutils import waku/[ @@ -47,7 +48,6 @@ suite "Onchain group manager": manager.ethRpc.isSome() manager.wakuRlnContract.isSome() manager.initialized - manager.rlnContractDeployedBlockNumber > 0.Quantity manager.rlnRelayMaxMessageLimit == 100 asyncTest "should error on initialization when chainId does not match": @@ -97,18 +97,13 @@ suite "Onchain group manager": echo "---" asyncTest "should error if contract does not exist": - var triggeredError = false - manager.ethContractAddress = "0x0000000000000000000000000000000000000000" - manager.onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = - echo "---" - discard - "Failed to get the deployed block number. Have you set the correct contract address?: No response from the Web3 provider" - echo msg - echo "---" - triggeredError = true - discard await manager.init() + var triggeredError = false + try: + discard await manager.init() + except CatchableError: + triggeredError = true check triggeredError @@ -119,103 +114,71 @@ suite "Onchain group manager": (await manager.init()).isErrOr: raiseAssert "Expected error when keystore file doesn't exist" - asyncTest "startGroupSync: should start group sync": + asyncTest "trackRootChanges: start tracking roots": (await manager.init()).isOkOr: raiseAssert $error - (await manager.startGroupSync()).isOkOr: - raiseAssert $error + discard manager.trackRootChanges() - asyncTest "startGroupSync: should guard against uninitialized state": - (await manager.startGroupSync()).isErrOr: - raiseAssert "Expected error when not initialized" + asyncTest "trackRootChanges: should guard against uninitialized state": + try: + discard manager.trackRootChanges() + except CatchableError: + check getCurrentExceptionMsg().len == 38 - asyncTest "startGroupSync: should sync to the state of the group": + asyncTest "trackRootChanges: should sync to the state of the group": let credentials = generateCredentials(manager.rlnInstance) - let rateCommitment = getRateCommitment(credentials, UserMessageLimit(1)).valueOr: - raiseAssert $error (await manager.init()).isOkOr: raiseAssert $error - let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: - raiseAssert $error - - let fut = newFuture[void]("startGroupSync") - - proc generateCallback(fut: Future[void]): OnRegisterCallback = - proc callback(registrations: seq[Membership]): Future[void] {.async.} = - check: - registrations.len == 1 - registrations[0].index == 0 - registrations[0].rateCommitment == rateCommitment - fut.complete() - - return callback + let merkleRootBefore = manager.fetchMerkleRoot() try: - manager.onRegister(generateCallback(fut)) await manager.register(credentials, UserMessageLimit(1)) - (await manager.startGroupSync()).isOkOr: - raiseAssert $error except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - await fut + discard await withTimeout(trackRootChanges(manager), 15.seconds) - let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: + let merkleRootAfter = manager.fetchMerkleRoot() + + let metadataSetRes = manager.setMetadata() + assert metadataSetRes.isOk(), metadataSetRes.error + + let metadataOpt = getMetadata(manager.rlnInstance).valueOr: raiseAssert $error - let metadataOpt = manager.rlnInstance.getMetadata().valueOr: - raiseAssert $error + assert metadataOpt.isSome(), "metadata is not set" + let metadata = metadataOpt.get() + check: - metadataOpt.get().validRoots == manager.validRoots.toSeq() + metadata.validRoots == manager.validRoots.toSeq() merkleRootBefore != merkleRootAfter - asyncTest "startGroupSync: should fetch history correctly": + asyncTest "trackRootChanges: should fetch history correctly": + # TODO: We can't use `trackRootChanges()` directly in this test because its current implementation + # relies on a busy loop rather than event-based monitoring. As a result, some root changes + # may be missed, leading to inconsistent test results (i.e., it may randomly return true or false). + # To ensure reliability, we use the `updateRoots()` function to validate the `validRoots` window + # after each registration. const credentialCount = 6 let credentials = generateCredentials(manager.rlnInstance, credentialCount) (await manager.init()).isOkOr: raiseAssert $error - let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: - raiseAssert $error - - type TestGroupSyncFuts = array[0 .. credentialCount - 1, Future[void]] - var futures: TestGroupSyncFuts - for i in 0 ..< futures.len(): - futures[i] = newFuture[void]() - proc generateCallback( - futs: TestGroupSyncFuts, credentials: seq[IdentityCredential] - ): OnRegisterCallback = - var futureIndex = 0 - proc callback(registrations: seq[Membership]): Future[void] {.async.} = - let rateCommitment = - getRateCommitment(credentials[futureIndex], UserMessageLimit(1)) - if registrations.len == 1 and - registrations[0].rateCommitment == rateCommitment.get() and - registrations[0].index == MembershipIndex(futureIndex): - futs[futureIndex].complete() - futureIndex += 1 - - return callback + let merkleRootBefore = manager.fetchMerkleRoot() try: - manager.onRegister(generateCallback(futures, credentials)) - (await manager.startGroupSync()).isOkOr: - raiseAssert $error - for i in 0 ..< credentials.len(): await manager.register(credentials[i], UserMessageLimit(1)) + discard await manager.updateRoots() except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - await allFutures(futures) - - let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: - raiseAssert $error + let merkleRootAfter = manager.fetchMerkleRoot() check: merkleRootBefore != merkleRootAfter - manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize + manager.validRoots.len() == credentialCount asyncTest "register: should guard against uninitialized state": let dummyCommitment = default(IDCommitment) @@ -232,14 +195,12 @@ suite "Onchain group manager": assert false, "exception raised: " & getCurrentExceptionMsg() asyncTest "register: should register successfully": + # TODO :- similar to ```trackRootChanges: should fetch history correctly``` (await manager.init()).isOkOr: raiseAssert $error - (await manager.startGroupSync()).isOkOr: - raiseAssert $error let idCommitment = generateCredentials(manager.rlnInstance).idCommitment - let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: - raiseAssert $error + let merkleRootBefore = manager.fetchMerkleRoot() try: await manager.register( @@ -251,10 +212,10 @@ suite "Onchain group manager": assert false, "exception raised when calling register: " & getCurrentExceptionMsg() - let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: - raiseAssert $error + let merkleRootAfter = manager.fetchMerkleRoot() + check: - merkleRootAfter.inHex() != merkleRootBefore.inHex() + merkleRootAfter != merkleRootBefore manager.latestIndex == 1 asyncTest "register: callback is called": @@ -264,19 +225,19 @@ suite "Onchain group manager": let fut = newFuture[void]() proc callback(registrations: seq[Membership]): Future[void] {.async.} = - let rateCommitment = getRateCommitment(idCredentials, UserMessageLimit(1)) + let rateCommitment = getRateCommitment(idCredentials, UserMessageLimit(1)).get() check: registrations.len == 1 - registrations[0].rateCommitment == rateCommitment.get() + registrations[0].rateCommitment == rateCommitment registrations[0].index == 0 fut.complete() - manager.onRegister(callback) (await manager.init()).isOkOr: raiseAssert $error + + manager.onRegister(callback) + try: - (await manager.startGroupSync()).isOkOr: - raiseAssert $error await manager.register( RateCommitment( idCommitment: idCommitment, userMessageLimit: UserMessageLimit(1) @@ -298,38 +259,43 @@ suite "Onchain group manager": assert false, "exception raised: " & getCurrentExceptionMsg() asyncTest "validateRoot: should validate good root": - let credentials = generateCredentials(manager.rlnInstance) - (await manager.init()).isOkOr: - raiseAssert $error + let idCredentials = generateCredentials(manager.rlnInstance) + let idCommitment = idCredentials.idCommitment let fut = newFuture[void]() proc callback(registrations: seq[Membership]): Future[void] {.async.} = if registrations.len == 1 and registrations[0].rateCommitment == - getRateCommitment(credentials, UserMessageLimit(1)).get() and + getRateCommitment(idCredentials, UserMessageLimit(1)).get() and registrations[0].index == 0: - manager.idCredentials = some(credentials) + manager.idCredentials = some(idCredentials) fut.complete() manager.onRegister(callback) + (await manager.init()).isOkOr: + raiseAssert $error + try: - (await manager.startGroupSync()).isOkOr: - raiseAssert $error - await manager.register(credentials, UserMessageLimit(1)) + await manager.register(idCredentials, UserMessageLimit(1)) except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() await fut + let rootUpdated = await manager.updateRoots() + + if rootUpdated: + let proofResult = await manager.fetchMerkleProofElements() + if proofResult.isErr(): + error "Failed to fetch Merkle proof", error = proofResult.error + manager.merkleProofCache = proofResult.get() let messageBytes = "Hello".toBytes() - # prepare the epoch let epoch = default(Epoch) debug "epoch in bytes", epochHex = epoch.inHex() - # generate proof let validProofRes = manager.generateProof( data = messageBytes, epoch = epoch, messageId = MessageId(1) ) @@ -338,38 +304,39 @@ suite "Onchain group manager": validProofRes.isOk() let validProof = validProofRes.get() - # validate the root (should be true) let validated = manager.validateRoot(validProof.merkleRoot) check: validated asyncTest "validateRoot: should reject bad root": + let idCredentials = generateCredentials(manager.rlnInstance) + let idCommitment = idCredentials.idCommitment + (await manager.init()).isOkOr: raiseAssert $error - (await manager.startGroupSync()).isOkOr: - raiseAssert $error - let credentials = generateCredentials(manager.rlnInstance) - - ## Assume the registration occured out of band - manager.idCredentials = some(credentials) - manager.membershipIndex = some(MembershipIndex(0)) manager.userMessageLimit = some(UserMessageLimit(1)) + manager.membershipIndex = some(MembershipIndex(0)) + manager.idCredentials = some(idCredentials) + + manager.merkleProofCache = newSeq[byte](640) + for i in 0 ..< 640: + manager.merkleProofCache[i] = byte(rand(255)) let messageBytes = "Hello".toBytes() - # prepare the epoch let epoch = default(Epoch) debug "epoch in bytes", epochHex = epoch.inHex() - # generate proof - let validProof = manager.generateProof( - data = messageBytes, epoch = epoch, messageId = MessageId(0) - ).valueOr: - raiseAssert $error + let validProofRes = manager.generateProof( + data = messageBytes, epoch = epoch, messageId = MessageId(1) + ) + + check: + validProofRes.isOk() + let validProof = validProofRes.get() - # validate the root (should be false) let validated = manager.validateRoot(validProof.merkleRoot) check: @@ -393,13 +360,19 @@ suite "Onchain group manager": manager.onRegister(callback) try: - (await manager.startGroupSync()).isOkOr: - raiseAssert $error await manager.register(credentials, UserMessageLimit(1)) except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() await fut + let rootUpdated = await manager.updateRoots() + + if rootUpdated: + let proofResult = await manager.fetchMerkleProofElements() + if proofResult.isErr(): + error "Failed to fetch Merkle proof", error = proofResult.error + manager.merkleProofCache = proofResult.get() + let messageBytes = "Hello".toBytes() # prepare the epoch @@ -412,7 +385,6 @@ suite "Onchain group manager": ).valueOr: raiseAssert $error - # verify the proof (should be true) let verified = manager.verifyProof(messageBytes, validProof).valueOr: raiseAssert $error @@ -422,31 +394,23 @@ suite "Onchain group manager": asyncTest "verifyProof: should reject invalid proof": (await manager.init()).isOkOr: raiseAssert $error - (await manager.startGroupSync()).isOkOr: - raiseAssert $error let idCredential = generateCredentials(manager.rlnInstance) try: - await manager.register( - RateCommitment( - idCommitment: idCredential.idCommitment, userMessageLimit: UserMessageLimit(1) - ) - ) + await manager.register(idCredential, UserMessageLimit(1)) except Exception, CatchableError: assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() - let idCredential2 = generateCredentials(manager.rlnInstance) - - ## Assume the registration occured out of band - manager.idCredentials = some(idCredential2) - manager.membershipIndex = some(MembershipIndex(0)) - manager.userMessageLimit = some(UserMessageLimit(1)) - let messageBytes = "Hello".toBytes() - # prepare the epoch + let rootUpdated = await manager.updateRoots() + + manager.merkleProofCache = newSeq[byte](640) + for i in 0 ..< 640: + manager.merkleProofCache[i] = byte(rand(255)) + let epoch = default(Epoch) debug "epoch in bytes", epochHex = epoch.inHex() @@ -466,8 +430,8 @@ suite "Onchain group manager": check: verified == false - asyncTest "backfillRootQueue: should backfill roots in event of chain reorg": - const credentialCount = 6 + asyncTest "root queue should be updated correctly": + const credentialCount = 12 let credentials = generateCredentials(manager.rlnInstance, credentialCount) (await manager.init()).isOkOr: raiseAssert $error @@ -493,33 +457,17 @@ suite "Onchain group manager": try: manager.onRegister(generateCallback(futures, credentials)) - (await manager.startGroupSync()).isOkOr: - raiseAssert $error for i in 0 ..< credentials.len(): await manager.register(credentials[i], UserMessageLimit(1)) + discard await manager.updateRoots() except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() await allFutures(futures) - # At this point, we should have a full root queue, 5 roots, and partial buffer of 1 root check: - manager.validRoots.len() == credentialCount - 1 - manager.validRootBuffer.len() == 1 - - # We can now simulate a chain reorg by calling backfillRootQueue - let expectedLastRoot = manager.validRootBuffer[0] - try: - await manager.backfillRootQueue(1) - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() - - # We should now have 5 roots in the queue, and no partial buffer - check: - manager.validRoots.len() == credentialCount - 1 - manager.validRootBuffer.len() == 0 - manager.validRoots[credentialCount - 2] == expectedLastRoot + manager.validRoots.len() == credentialCount asyncTest "isReady should return false if ethRpc is none": (await manager.init()).isOkOr: @@ -536,25 +484,9 @@ suite "Onchain group manager": check: isReady == false - asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed": - (await manager.init()).isOkOr: - raiseAssert $error - - var isReady = true - try: - isReady = await manager.isReady() - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() - - check: - isReady == false - asyncTest "isReady should return true if ethRpc is ready": (await manager.init()).isOkOr: raiseAssert $error - # node can only be ready after group sync is done - (await manager.startGroupSync()).isOkOr: - raiseAssert $error var isReady = false try: diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index a5237dab1..3ff6923e0 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -529,7 +529,6 @@ procSuite "WakuNode - RLN relay": xasyncTest "clearNullifierLog: should clear epochs > MaxEpochGap": ## This is skipped because is flaky and made CI randomly fail but is useful to run manually - # Given two nodes let contentTopic = ContentTopic("/waku/2/default-content/proto") diff --git a/vendor/zerokit b/vendor/zerokit index b9d27039c..ba467d370 160000 --- a/vendor/zerokit +++ b/vendor/zerokit @@ -1 +1 @@ -Subproject commit b9d27039c3266af108882d7a8bafc37400d29855 +Subproject commit ba467d370c56b7432522227de22fbd664d44ef3e diff --git a/waku/waku_rln_relay/conversion_utils.nim b/waku/waku_rln_relay/conversion_utils.nim index e710fea62..4a168ebeb 100644 --- a/waku/waku_rln_relay/conversion_utils.nim +++ b/waku/waku_rln_relay/conversion_utils.nim @@ -27,9 +27,6 @@ proc inHex*( valueHex = "0" & valueHex return toLowerAscii(valueHex) -proc toUserMessageLimit*(v: UInt256): UserMessageLimit = - return cast[UserMessageLimit](v) - proc encodeLengthPrefix*(input: openArray[byte]): seq[byte] = ## returns length prefixed version of the input ## with the following format [len<8>|input] @@ -78,6 +75,31 @@ proc serialize*( ) return output +proc serialize*(witness: RLNWitnessInput): seq[byte] = + ## Serializes the RLN witness into a byte array following zerokit's expected format. + ## The serialized format includes: + ## - identity_secret (32 bytes, little-endian with zero padding) + ## - user_message_limit (32 bytes, little-endian with zero padding) + ## - message_id (32 bytes, little-endian with zero padding) + ## - merkle tree depth (8 bytes, little-endian) = path_elements.len / 32 + ## - path_elements (each 32 bytes, ordered bottom-to-top) + ## - merkle tree depth again (8 bytes, little-endian) + ## - identity_path_index (sequence of bits as bytes, 0 = left, 1 = right) + ## - x (32 bytes, little-endian with zero padding) + ## - external_nullifier (32 bytes, little-endian with zero padding) + var buffer: seq[byte] + buffer.add(@(witness.identity_secret)) + buffer.add(@(witness.user_message_limit)) + buffer.add(@(witness.message_id)) + buffer.add(toBytes(uint64(witness.path_elements.len / 32), Endianness.littleEndian)) + for element in witness.path_elements: + buffer.add(element) + buffer.add(toBytes(uint64(witness.path_elements.len / 32), Endianness.littleEndian)) + buffer.add(witness.identity_path_index) + buffer.add(@(witness.x)) + buffer.add(@(witness.external_nullifier)) + return buffer + proc serialize*(proof: RateLimitProof, data: openArray[byte]): seq[byte] = ## a private proc to convert RateLimitProof and data to a byte seq ## this conversion is used in the proof verification proc @@ -133,3 +155,25 @@ func `+`*(a, b: Quantity): Quantity {.borrow.} func u256*(n: Quantity): UInt256 {.inline.} = n.uint64.stuint(256) + +proc uint64ToField*(n: uint64): array[32, byte] = + var output: array[32, byte] + let bytes = toBytes(n, Endianness.littleEndian) + output[0 ..< bytes.len] = bytes + return output + +proc UInt256ToField*(v: UInt256): array[32, byte] = + return cast[array[32, byte]](v) # already doesn't use `result` + +proc seqToField*(s: seq[byte]): array[32, byte] = + var output: array[32, byte] + let len = min(s.len, 32) + for i in 0 ..< len: + output[i] = s[i] + return output + +proc uint64ToIndex*(index: MembershipIndex, depth: int): seq[byte] = + var output = newSeq[byte](depth) + for i in 0 ..< depth: + output[i] = byte((index shr i) and 1) # LSB-first bit decomposition + return output diff --git a/waku/waku_rln_relay/group_manager/group_manager_base.nim b/waku/waku_rln_relay/group_manager/group_manager_base.nim index 818b36140..4a1c84e55 100644 --- a/waku/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/waku_rln_relay/group_manager/group_manager_base.nim @@ -145,7 +145,6 @@ method validateRoot*( g: GroupManager, root: MerkleNode ): bool {.base, gcsafe, raises: [].} = ## validates the root against the valid roots queue - # Check if the root is in the valid roots queue if g.indexOfRoot(root) >= 0: return true return false @@ -175,7 +174,7 @@ method verifyProof*( method generateProof*( g: GroupManager, - data: openArray[byte], + data: seq[byte], epoch: Epoch, messageId: MessageId, rlnIdentifier = DefaultRlnIdentifier, @@ -189,6 +188,7 @@ method generateProof*( return err("membership index is not set") if g.userMessageLimit.isNone(): return err("user message limit is not set") + waku_rln_proof_generation_duration_seconds.nanosecondTime: let proof = proofGen( rlnInstance = g.rlnInstance, @@ -201,8 +201,6 @@ method generateProof*( ).valueOr: return err("proof generation failed: " & $error) - waku_rln_remaining_proofs_per_epoch.dec() - waku_rln_total_generated_proofs.inc() return ok(proof) method isReady*(g: GroupManager): Future[bool] {.base, async.} = diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index e61ffb956..fe3db9102 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -10,19 +10,18 @@ import nimcrypto/keccak as keccak, stint, json, - std/tables, + std/[strutils, tables, algorithm], stew/[byteutils, arrayops], - sequtils, - strutils + sequtils + import ../../../waku_keystore, ../../rln, + ../../rln/rln_interface, ../../conversion_utils, ../group_manager_base, ./retry_wrapper -from strutils import parseHexInt - export group_manager_base logScope: @@ -31,19 +30,23 @@ logScope: # using the when predicate does not work within the contract macro, hence need to dupe contract(WakuRlnContract): # this serves as an entrypoint into the rln membership set - proc register(idCommitment: UInt256, userMessageLimit: EthereumUInt32) + proc register(idCommitment: UInt256, userMessageLimit: UInt32) # Initializes the implementation contract (only used in unit tests) proc initialize(maxMessageLimit: UInt256) # this event is raised when a new member is registered - proc MemberRegistered(rateCommitment: UInt256, index: EthereumUInt32) {.event.} + proc MemberRegistered(rateCommitment: UInt256, index: UInt32) {.event.} # this function denotes existence of a given user - proc memberExists(idCommitment: Uint256): UInt256 {.view.} + proc memberExists(idCommitment: UInt256): UInt256 {.view.} # this constant describes the next index of a new member proc commitmentIndex(): UInt256 {.view.} # this constant describes the block number this contract was deployed on proc deployedBlockNumber(): UInt256 {.view.} # this constant describes max message limit of rln contract proc MAX_MESSAGE_LIMIT(): UInt256 {.view.} + # this function returns the merkleProof for a given index + # proc merkleProofElements(index: UInt40): seq[byte] {.view.} + # this function returns the merkle root + proc root(): UInt256 {.view.} type WakuRlnContractWithSender = Sender[WakuRlnContract] @@ -52,42 +55,14 @@ type ethPrivateKey*: Option[string] ethContractAddress*: string ethRpc*: Option[Web3] - rlnContractDeployedBlockNumber*: BlockNumber wakuRlnContract*: Option[WakuRlnContractWithSender] - latestProcessedBlock*: BlockNumber registrationTxHash*: Option[TxHash] chainId*: uint keystorePath*: Option[string] keystorePassword*: Option[string] registrationHandler*: Option[RegistrationHandler] - # this buffer exists to backfill appropriate roots for the merkle tree, - # in event of a reorg. we store 5 in the buffer. Maybe need to revisit this, - # because the average reorg depth is 1 to 2 blocks. - validRootBuffer*: Deque[MerkleNode] - # interval loop to shut down gracefully - blockFetchingActive*: bool - -const DefaultKeyStorePath* = "rlnKeystore.json" -const DefaultKeyStorePassword* = "password" - -const DefaultBlockPollRate* = 6.seconds - -template initializedGuard(g: OnchainGroupManager): untyped = - if not g.initialized: - raise newException(CatchableError, "OnchainGroupManager is not initialized") - -proc resultifiedInitGuard(g: OnchainGroupManager): GroupManagerResult[void] = - try: - initializedGuard(g) - return ok() - except CatchableError: - return err("OnchainGroupManager is not initialized") - -template retryWrapper( - g: OnchainGroupManager, res: auto, errStr: string, body: untyped -): auto = - retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction): - body + latestProcessedBlock*: BlockNumber + merkleProofCache*: seq[byte] proc setMetadata*( g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber) @@ -112,33 +87,109 @@ proc setMetadata*( return err("failed to persist rln metadata: " & getCurrentExceptionMsg()) return ok() -method atomicBatch*( - g: OnchainGroupManager, - start: MembershipIndex, - rateCommitments = newSeq[RawRateCommitment](), - toRemoveIndices = newSeq[MembershipIndex](), -): Future[void] {.async: (raises: [Exception]), base.} = - initializedGuard(g) +proc fetchMerkleProofElements*( + g: OnchainGroupManager +): Future[Result[seq[byte], string]] {.async.} = + try: + let membershipIndex = g.membershipIndex.get() + let index40 = stuint(membershipIndex, 40) - waku_rln_membership_insertion_duration_seconds.nanosecondTime: - let operationSuccess = - g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices) - if not operationSuccess: - raise newException(CatchableError, "atomic batch operation failed") - # TODO: when slashing is enabled, we need to track slashed members - waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) + let methodSig = "merkleProofElements(uint40)" + let methodIdDigest = keccak.keccak256.digest(methodSig) + let methodId = methodIdDigest.data[0 .. 3] - if g.registerCb.isSome(): - var membersSeq = newSeq[Membership]() - for i in 0 ..< rateCommitments.len: - var index = start + MembershipIndex(i) - debug "registering member to callback", - rateCommitment = rateCommitments[i], index = index - let member = Membership(rateCommitment: rateCommitments[i], index: index) - membersSeq.add(member) - await g.registerCb.get()(membersSeq) + var paddedParam = newSeq[byte](32) + let indexBytes = index40.toBytesBE() + for i in 0 ..< min(indexBytes.len, paddedParam.len): + paddedParam[paddedParam.len - indexBytes.len + i] = indexBytes[i] - g.validRootBuffer = g.slideRootQueue() + var callData = newSeq[byte]() + for b in methodId: + callData.add(b) + callData.add(paddedParam) + + var tx: TransactionArgs + tx.to = Opt.some(fromHex(Address, g.ethContractAddress)) + tx.data = Opt.some(callData) + + let responseBytes = await g.ethRpc.get().provider.eth_call(tx, "latest") + + return ok(responseBytes) + except CatchableError: + error "Failed to fetch Merkle proof elements", error = getCurrentExceptionMsg() + return err("Failed to fetch merkle proof elements: " & getCurrentExceptionMsg()) + +proc fetchMerkleRoot*( + g: OnchainGroupManager +): Future[Result[UInt256, string]] {.async.} = + try: + let merkleRootInvocation = g.wakuRlnContract.get().root() + let merkleRoot = await merkleRootInvocation.call() + return ok(merkleRoot) + except CatchableError: + error "Failed to fetch Merkle root", error = getCurrentExceptionMsg() + return err("Failed to fetch merkle root: " & getCurrentExceptionMsg()) + +template initializedGuard(g: OnchainGroupManager): untyped = + if not g.initialized: + raise newException(CatchableError, "OnchainGroupManager is not initialized") + +template retryWrapper( + g: OnchainGroupManager, res: auto, errStr: string, body: untyped +): auto = + retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction): + body + +method validateRoot*(g: OnchainGroupManager, root: MerkleNode): bool = + if g.validRoots.find(root) >= 0: + return true + return false + +proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} = + let rootRes = await g.fetchMerkleRoot() + if rootRes.isErr(): + return false + + let merkleRoot = UInt256ToField(rootRes.get()) + if g.validRoots.len == 0: + g.validRoots.addLast(merkleRoot) + return true + + if g.validRoots[g.validRoots.len - 1] != merkleRoot: + if g.validRoots.len > AcceptableRootWindowSize: + discard g.validRoots.popFirst() + g.validRoots.addLast(merkleRoot) + return true + + return false + +proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError]).} = + try: + initializedGuard(g) + let ethRpc = g.ethRpc.get() + let wakuRlnContract = g.wakuRlnContract.get() + + const rpcDelay = 5.seconds + + while true: + let rootUpdated = await g.updateRoots() + + if rootUpdated: + if g.membershipIndex.isNone(): + error "membershipIndex is not set; skipping proof update" + else: + let proofResult = await g.fetchMerkleProofElements() + if proofResult.isErr(): + error "Failed to fetch Merkle proof", error = proofResult.error + g.merkleProofCache = proofResult.get() + + # also need update registerd membership + let memberCount = cast[int64](await wakuRlnContract.commitmentIndex().call()) + waku_rln_number_registered_memberships.set(float64(memberCount)) + + await sleepAsync(rpcDelay) + except CatchableError: + error "Fatal error in trackRootChanges", error = getCurrentExceptionMsg() method register*( g: OnchainGroupManager, rateCommitment: RateCommitment @@ -147,18 +198,14 @@ method register*( try: let leaf = rateCommitment.toLeaf().get() - await g.registerBatch(@[leaf]) + if g.registerCb.isSome(): + let idx = g.latestIndex + debug "registering member via callback", rateCommitment = leaf, index = idx + await g.registerCb.get()(@[Membership(rateCommitment: leaf, index: idx)]) + g.latestIndex.inc() except CatchableError: raise newException(ValueError, getCurrentExceptionMsg()) -method registerBatch*( - g: OnchainGroupManager, rateCommitments: seq[RawRateCommitment] -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) - - await g.atomicBatch(g.latestIndex, rateCommitments) - g.latestIndex += MembershipIndex(rateCommitments.len) - method register*( g: OnchainGroupManager, identityCredential: IdentityCredential, @@ -212,8 +259,19 @@ method register*( debug "parsed membershipIndex", membershipIndex g.userMessageLimit = some(userMessageLimit) g.membershipIndex = some(membershipIndex.toMembershipIndex()) + g.idCredentials = some(identityCredential) + + let rateCommitment = RateCommitment( + idCommitment: identityCredential.idCommitment, userMessageLimit: userMessageLimit + ) + .toLeaf() + .get() + + if g.registerCb.isSome(): + let member = Membership(rateCommitment: rateCommitment, index: g.latestIndex) + await g.registerCb.get()(@[member]) + g.latestIndex.inc() - # don't handle member insertion into the tree here, it will be handled by the event listener return method withdraw*( @@ -226,304 +284,173 @@ method withdrawBatch*( ): Future[void] {.async: (raises: [Exception]).} = initializedGuard(g) - # TODO: after slashing is enabled on the contract, use atomicBatch internally +proc getRootFromProofAndIndex( + g: OnchainGroupManager, elements: seq[byte], bits: seq[byte] +): GroupManagerResult[array[32, byte]] = + # this is a helper function to get root from merkle proof elements and index + # it's currently not used anywhere, but can be used to verify the root from the proof and index + # Compute leaf hash from idCommitment and messageLimit + let messageLimitField = uint64ToField(g.userMessageLimit.get()) + let leafHashRes = poseidon(@[g.idCredentials.get().idCommitment, @messageLimitField]) + if leafHashRes.isErr(): + return err("Failed to compute leaf hash: " & leafHashRes.error) -proc parseEvent( - event: type MemberRegistered, log: JsonNode -): GroupManagerResult[Membership] = - ## parses the `data` parameter of the `MemberRegistered` event `log` - ## returns an error if it cannot parse the `data` parameter - var rateCommitment: UInt256 - var index: UInt256 - var data: seq[byte] - try: - data = hexToSeqByte(log["data"].getStr()) - except ValueError: - return err( - "failed to parse the data field of the MemberRegistered event: " & - getCurrentExceptionMsg() - ) - var offset = 0 - try: - # Parse the rateCommitment - offset += decode(data, 0, offset, rateCommitment) - # Parse the index - offset += decode(data, 0, offset, index) - return ok( - Membership( - rateCommitment: rateCommitment.toRateCommitment(), - index: index.toMembershipIndex(), - ) - ) - except CatchableError: - return err("failed to parse the data field of the MemberRegistered event") + var hash = leafHashRes.get() + for i in 0 ..< bits.len: + let sibling = elements[i * 32 .. (i + 1) * 32 - 1] -type BlockTable* = OrderedTable[BlockNumber, seq[(Membership, bool)]] + let hashRes = + if bits[i] == 0: + poseidon(@[@hash, sibling]) + else: + poseidon(@[sibling, @hash]) -proc backfillRootQueue*( - g: OnchainGroupManager, len: uint -): Future[void] {.async: (raises: [Exception]).} = - if len > 0: - # backfill the tree's acceptable roots - for i in 0 .. len - 1: - # remove the last root - g.validRoots.popLast() - for i in 0 .. len - 1: - # add the backfilled root - g.validRoots.addLast(g.validRootBuffer.popLast()) + hash = hashRes.valueOr: + return err("Failed to compute poseidon hash: " & error) + hash = hashRes.get() -proc insert( - blockTable: var BlockTable, - blockNumber: BlockNumber, - member: Membership, - removed: bool, -) = - let memberTuple = (member, removed) - if blockTable.hasKeyOrPut(blockNumber, @[memberTuple]): - try: - blockTable[blockNumber].add(memberTuple) - except KeyError: # qed - error "could not insert member into block table", - blockNumber = blockNumber, member = member + return ok(hash) -proc getRawEvents( - g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber -): Future[JsonNode] {.async: (raises: [Exception]).} = - initializedGuard(g) +method generateProof*( + g: OnchainGroupManager, + data: seq[byte], + epoch: Epoch, + messageId: MessageId, + rlnIdentifier = DefaultRlnIdentifier, +): GroupManagerResult[RateLimitProof] {.gcsafe, raises: [].} = + ## Generates an RLN proof using the cached Merkle proof and custom witness + # Ensure identity credentials and membership index are set + if g.idCredentials.isNone(): + return err("identity credentials are not set") + if g.membershipIndex.isNone(): + return err("membership index is not set") + if g.userMessageLimit.isNone(): + return err("user message limit is not set") - let ethRpc = g.ethRpc.get() - let wakuRlnContract = g.wakuRlnContract.get() + if (g.merkleProofCache.len mod 32) != 0: + return err("Invalid merkle proof cache length") - var eventStrs: seq[JsonString] - g.retryWrapper(eventStrs, "Failed to get the events"): - await wakuRlnContract.getJsonLogs( - MemberRegistered, - fromBlock = Opt.some(fromBlock.blockId()), - toBlock = Opt.some(toBlock.blockId()), - ) + let identity_secret = seqToField(g.idCredentials.get().idSecretHash) + let user_message_limit = uint64ToField(g.userMessageLimit.get()) + let message_id = uint64ToField(messageId) + var path_elements = newSeq[byte](0) - var events = newJArray() - for eventStr in eventStrs: - events.add(parseJson(eventStr.string)) - return events + if (g.merkleProofCache.len mod 32) != 0: + return err("Invalid merkle proof cache length") -proc getBlockTable( - g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber -): Future[BlockTable] {.async: (raises: [Exception]).} = - initializedGuard(g) + let identity_path_index = uint64ToIndex(g.membershipIndex.get(), 20) + for i in 0 ..< g.merkleProofCache.len div 32: + let chunk = g.merkleProofCache[i * 32 .. (i + 1) * 32 - 1] + path_elements.add(chunk.reversed()) - var blockTable = default(BlockTable) + let x = keccak.keccak256.digest(data) - let events = await g.getRawEvents(fromBlock, toBlock) + let extNullifier = poseidon(@[@(epoch), @(rlnIdentifier)]).valueOr: + return err("Failed to compute external nullifier: " & error) - if events.len == 0: - trace "no events found" - return blockTable + let witness = RLNWitnessInput( + identity_secret: identity_secret, + user_message_limit: user_message_limit, + message_id: message_id, + path_elements: path_elements, + identity_path_index: identity_path_index, + x: x, + external_nullifier: extNullifier, + ) - for event in events: - let blockNumber = parseHexInt(event["blockNumber"].getStr()).BlockNumber - let removed = event["removed"].getBool() - let parsedEventRes = parseEvent(MemberRegistered, event) - if parsedEventRes.isErr(): - error "failed to parse the MemberRegistered event", error = parsedEventRes.error() - raise newException(ValueError, "failed to parse the MemberRegistered event") - let parsedEvent = parsedEventRes.get() - blockTable.insert(blockNumber, parsedEvent, removed) + let serializedWitness = serialize(witness) - return blockTable + var input_witness_buffer = toBuffer(serializedWitness) -proc handleEvents( - g: OnchainGroupManager, blockTable: BlockTable -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) + # Generate the proof using the zerokit API + var output_witness_buffer: Buffer + let witness_success = generate_proof_with_witness( + g.rlnInstance, addr input_witness_buffer, addr output_witness_buffer + ) - for blockNumber, members in blockTable.pairs(): - try: - let startIndex = blockTable[blockNumber].filterIt(not it[1])[0][0].index - let removalIndices = members.filterIt(it[1]).mapIt(it[0].index) - let rateCommitments = members.mapIt(it[0].rateCommitment) - await g.atomicBatch( - start = startIndex, - rateCommitments = rateCommitments, - toRemoveIndices = removalIndices, - ) - g.latestIndex = startIndex + MembershipIndex(rateCommitments.len) - trace "new members added to the Merkle tree", - commitments = rateCommitments.mapIt(it.inHex) - except CatchableError: - error "failed to insert members into the tree", error = getCurrentExceptionMsg() - raise newException(ValueError, "failed to insert members into the tree") + if not witness_success: + return err("Failed to generate proof") - return + # Parse the proof into a RateLimitProof object + var proofValue = cast[ptr array[320, byte]](output_witness_buffer.`ptr`) + let proofBytes: array[320, byte] = proofValue[] -proc handleRemovedEvents( - g: OnchainGroupManager, blockTable: BlockTable -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) + ## parse the proof as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ] + let + proofOffset = 128 + rootOffset = proofOffset + 32 + externalNullifierOffset = rootOffset + 32 + shareXOffset = externalNullifierOffset + 32 + shareYOffset = shareXOffset + 32 + nullifierOffset = shareYOffset + 32 - # count number of blocks that have been removed - var numRemovedBlocks: uint = 0 - for blockNumber, members in blockTable.pairs(): - if members.anyIt(it[1]): - numRemovedBlocks += 1 + var + zkproof: ZKSNARK + proofRoot, shareX, shareY: MerkleNode + externalNullifier: ExternalNullifier + nullifier: Nullifier - await g.backfillRootQueue(numRemovedBlocks) + discard zkproof.copyFrom(proofBytes[0 .. proofOffset - 1]) + discard proofRoot.copyFrom(proofBytes[proofOffset .. rootOffset - 1]) + discard + externalNullifier.copyFrom(proofBytes[rootOffset .. externalNullifierOffset - 1]) + discard shareX.copyFrom(proofBytes[externalNullifierOffset .. shareXOffset - 1]) + discard shareY.copyFrom(proofBytes[shareXOffset .. shareYOffset - 1]) + discard nullifier.copyFrom(proofBytes[shareYOffset .. nullifierOffset - 1]) -proc getAndHandleEvents( - g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber -): Future[bool] {.async: (raises: [Exception]).} = - initializedGuard(g) - let blockTable = await g.getBlockTable(fromBlock, toBlock) - try: - await g.handleEvents(blockTable) - await g.handleRemovedEvents(blockTable) - except CatchableError: - error "failed to handle events", error = getCurrentExceptionMsg() - raise newException(ValueError, "failed to handle events") + # Create the RateLimitProof object + let output = RateLimitProof( + proof: zkproof, + merkleRoot: proofRoot, + externalNullifier: externalNullifier, + epoch: epoch, + rlnIdentifier: rlnIdentifier, + shareX: shareX, + shareY: shareY, + nullifier: nullifier, + ) - g.latestProcessedBlock = toBlock - return true + debug "Proof generated successfully" -proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) = - g.blockFetchingActive = false + waku_rln_remaining_proofs_per_epoch.dec() + waku_rln_total_generated_proofs.inc() + return ok(output) - proc runIntervalLoop() {.async, gcsafe.} = - g.blockFetchingActive = true +method verifyProof*( + g: OnchainGroupManager, # verifier context + input: seq[byte], # raw message data (signal) + proof: RateLimitProof, # proof received from the peer +): GroupManagerResult[bool] {.gcsafe, raises: [].} = + ## -- Verifies an RLN rate-limit proof against the set of valid Merkle roots -- - while g.blockFetchingActive: - var retCb: bool - g.retryWrapper(retCb, "Failed to run the interval block fetching loop"): - await cb() - await sleepAsync(interval) + var normalizedProof = proof - # using asyncSpawn is OK here since - # we make use of the error handling provided by - # OnFatalErrorHandler - asyncSpawn runIntervalLoop() + normalizedProof.externalNullifier = poseidon( + @[@(proof.epoch), @(proof.rlnIdentifier)] + ).valueOr: + return err("Failed to compute external nullifier: " & error) -proc getNewBlockCallback(g: OnchainGroupManager): proc = - let ethRpc = g.ethRpc.get() - proc wrappedCb(): Future[bool] {.async, gcsafe.} = - var latestBlock: BlockNumber - g.retryWrapper(latestBlock, "Failed to get the latest block number"): - cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + let proofBytes = serialize(normalizedProof, input) + let proofBuffer = proofBytes.toBuffer() - if latestBlock <= g.latestProcessedBlock: - return - # get logs from the last block - # inc by 1 to prevent double processing - let fromBlock = g.latestProcessedBlock + 1 - var handleBlockRes: bool - g.retryWrapper(handleBlockRes, "Failed to handle new block"): - await g.getAndHandleEvents(fromBlock, latestBlock) + let rootsBytes = serialize(g.validRoots.items().toSeq()) + let rootsBuffer = rootsBytes.toBuffer() - # cannot use isOkOr here because results in a compile-time error that - # shows the error is void for some reason - let setMetadataRes = g.setMetadata() - if setMetadataRes.isErr(): - error "failed to persist rln metadata", error = setMetadataRes.error + var validProof: bool # out-param + let ffiOk = verify_with_roots( + g.rlnInstance, # RLN context created at init() + addr proofBuffer, # (proof + signal) + addr rootsBuffer, # valid Merkle roots + addr validProof # will be set by the FFI call + , + ) - return handleBlockRes + if not ffiOk: + return err("could not verify the proof") + else: + trace "Proof verified successfully !" - return wrappedCb - -proc startListeningToEvents( - g: OnchainGroupManager -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) - - let ethRpc = g.ethRpc.get() - let newBlockCallback = g.getNewBlockCallback() - g.runInInterval(newBlockCallback, DefaultBlockPollRate) - -proc batchAwaitBlockHandlingFuture( - g: OnchainGroupManager, futs: seq[Future[bool]] -): Future[void] {.async: (raises: [Exception]).} = - for fut in futs: - try: - var handleBlockRes: bool - g.retryWrapper(handleBlockRes, "Failed to handle block"): - await fut - except CatchableError: - raise newException( - CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg() - ) - -proc startOnchainSync( - g: OnchainGroupManager -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) - - let ethRpc = g.ethRpc.get() - - # static block chunk size - let blockChunkSize = 2_000.BlockNumber - # delay between rpc calls to not overload the rate limit - let rpcDelay = 200.milliseconds - # max number of futures to run concurrently - let maxFutures = 10 - - var fromBlock: BlockNumber = - if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber: - info "syncing from last processed block", blockNumber = g.latestProcessedBlock - g.latestProcessedBlock + 1 - else: - info "syncing from rln contract deployed block", - blockNumber = g.rlnContractDeployedBlockNumber - g.rlnContractDeployedBlockNumber - - var futs = newSeq[Future[bool]]() - var currentLatestBlock: BlockNumber - g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): - cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) - - try: - # we always want to sync from last processed block => latest - # chunk events - while true: - # if the fromBlock is less than 2k blocks behind the current block - # then fetch the new toBlock - if fromBlock >= currentLatestBlock: - break - - if fromBlock + blockChunkSize > currentLatestBlock: - g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): - cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) - - let toBlock = min(fromBlock + blockChunkSize, currentLatestBlock) - debug "fetching events", fromBlock = fromBlock, toBlock = toBlock - await sleepAsync(rpcDelay) - futs.add(g.getAndHandleEvents(fromBlock, toBlock)) - if futs.len >= maxFutures or toBlock == currentLatestBlock: - await g.batchAwaitBlockHandlingFuture(futs) - g.setMetadata(lastProcessedBlock = some(toBlock)).isOkOr: - error "failed to persist rln metadata", error = $error - futs = newSeq[Future[bool]]() - fromBlock = toBlock + 1 - except CatchableError: - raise newException( - CatchableError, - "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg(), - ) - - # listen to blockheaders and contract events - try: - await g.startListeningToEvents() - except CatchableError: - raise newException( - ValueError, "failed to start listening to events: " & getCurrentExceptionMsg() - ) - -method startGroupSync*( - g: OnchainGroupManager -): Future[GroupManagerResult[void]] {.async.} = - ?resultifiedInitGuard(g) - # Get archive history - try: - await startOnchainSync(g) - return ok() - except CatchableError, Exception: - return err("failed to start group sync: " & getCurrentExceptionMsg()) + return ok(validProof) method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} = g.registerCb = some(cb) @@ -609,53 +536,27 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} let metadata = metadataGetOptRes.get().get() if metadata.chainId != uint(g.chainId): return err("persisted data: chain id mismatch") - if metadata.contractAddress != g.ethContractAddress.toLower(): return err("persisted data: contract address mismatch") - g.latestProcessedBlock = metadata.lastProcessedBlock.BlockNumber - g.validRoots = metadata.validRoots.toDeque() - var deployedBlockNumber: Uint256 - g.retryWrapper( - deployedBlockNumber, - "Failed to get the deployed block number. Have you set the correct contract address?", - ): - await wakuRlnContract.deployedBlockNumber().call() - debug "using rln contract", deployedBlockNumber, rlnContractAddress = contractAddress - g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) - g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) g.rlnRelayMaxMessageLimit = cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call()) proc onDisconnect() {.async.} = error "Ethereum client disconnected" - let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) - info "reconnecting with the Ethereum client, and restarting group sync", - fromBlock = fromBlock var newEthRpc: Web3 g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"): await newWeb3(g.ethClientUrl) newEthRpc.ondisconnect = ethRpc.ondisconnect g.ethRpc = some(newEthRpc) - try: - await g.startOnchainSync() - except CatchableError, Exception: - g.onFatalErrorAction( - "failed to restart group sync" & ": " & getCurrentExceptionMsg() - ) - ethRpc.ondisconnect = proc() = asyncSpawn onDisconnect() - waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) g.initialized = true - return ok() method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} = - g.blockFetchingActive = false - if g.ethRpc.isSome(): g.ethRpc.get().ondisconnect = nil await g.ethRpc.get().close() @@ -665,26 +566,13 @@ method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} = g.initialized = false -proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async, gcsafe.} = - let ethRpc = g.ethRpc.get() - - var syncing: SyncingStatus - g.retryWrapper(syncing, "Failed to get the syncing status"): - await ethRpc.provider.eth_syncing() - return syncing.syncing - method isReady*(g: OnchainGroupManager): Future[bool] {.async.} = initializedGuard(g) if g.ethRpc.isNone(): return false - var currentBlock: BlockNumber - g.retryWrapper(currentBlock, "Failed to get the current block number"): - cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber()) - - # the node is still able to process messages if it is behind the latest block by a factor of the valid roots - if u256(g.latestProcessedBlock.uint64) < (u256(currentBlock) - u256(g.validRoots.len)): + if g.wakuRlnContract.isNone(): return false - return not (await g.isSyncing()) + return true diff --git a/waku/waku_rln_relay/protocol_metrics.nim b/waku/waku_rln_relay/protocol_metrics.nim index 121727809..2210328f4 100644 --- a/waku/waku_rln_relay/protocol_metrics.nim +++ b/waku/waku_rln_relay/protocol_metrics.nim @@ -85,6 +85,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = var cumulativeProofsVerified = 0.float64 var cumulativeProofsGenerated = 0.float64 var cumulativeProofsRemaining = 100.float64 + var cumulativeRegisteredMember = 0.float64 when defined(metrics): logMetrics = proc() = @@ -107,6 +108,9 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = let freshProofsRemainingCount = parseAndAccumulate( waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining ) + let freshRegisteredMemberCount = parseAndAccumulate( + waku_rln_number_registered_memberships, cumulativeRegisteredMember + ) info "Total messages", count = freshMsgCount info "Total spam messages", count = freshSpamCount @@ -116,5 +120,6 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = info "Total proofs verified", count = freshProofsVerifiedCount info "Total proofs generated", count = freshProofsGeneratedCount info "Total proofs remaining", count = freshProofsRemainingCount + info "Total registered members", count = freshRegisteredMemberCount return logMetrics diff --git a/waku/waku_rln_relay/protocol_types.nim b/waku/waku_rln_relay/protocol_types.nim index 97b1c34ea..c6f52e00b 100644 --- a/waku/waku_rln_relay/protocol_types.nim +++ b/waku/waku_rln_relay/protocol_types.nim @@ -52,6 +52,20 @@ type RateLimitProof* = object ## the external nullifier used for the generation of the `proof` (derived from poseidon([epoch, rln_identifier])) externalNullifier*: ExternalNullifier +type UInt40* = StUint[40] +type UInt32* = StUint[32] + +type + Field = array[32, byte] # Field element representation (256 bits) + RLNWitnessInput* = object + identity_secret*: Field + user_message_limit*: Field + message_id*: Field + path_elements*: seq[byte] + identity_path_index*: seq[byte] + x*: Field + external_nullifier*: Field + type ProofMetadata* = object nullifier*: Nullifier shareX*: MerkleNode diff --git a/waku/waku_rln_relay/rln/rln_interface.nim b/waku/waku_rln_relay/rln/rln_interface.nim index cc468b124..27b3bbee9 100644 --- a/waku/waku_rln_relay/rln/rln_interface.nim +++ b/waku/waku_rln_relay/rln/rln_interface.nim @@ -130,6 +130,21 @@ proc generate_proof*( ## integers wrapped in <> indicate value sizes in bytes ## the return bool value indicates the success or failure of the operation +proc generate_proof_with_witness*( + ctx: ptr RLN, input_buffer: ptr Buffer, output_buffer: ptr Buffer +): bool {.importc: "generate_rln_proof_with_witness".} + +## rln-v2 +## "witness" term refer to collection of secret inputs with proper serialization +## input_buffer has to be serialized as [ identity_secret<32> | user_message_limit<32> | message_id<32> | path_elements> | identity_path_index> | x<32> | external_nullifier<32> ] +## output_buffer holds the proof data and should be parsed as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ] +## rln-v1 +## input_buffer has to be serialized as [ id_key<32> | path_elements> | identity_path_index> | x<32> | epoch<32> | rln_identifier<32> ] +## output_buffer holds the proof data and should be parsed as [ proof<128> | root<32> | epoch<32> | share_x<32> | share_y<32> | nullifier<32> | rln_identifier<32> ] +## integers wrapped in <> indicate value sizes in bytes +## path_elements and identity_path_index serialize a merkle proof and are vectors of elements of 32 and 1 bytes respectively +## the return bool value indicates the success or failure of the operation + proc verify*( ctx: ptr RLN, proof_buffer: ptr Buffer, proof_is_valid_ptr: ptr bool ): bool {.importc: "verify_rln_proof".} diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 268f1c93d..a42b04f8b 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -98,6 +98,7 @@ type WakuRLNRelay* = ref object of RootObj onFatalErrorAction*: OnFatalErrorHandler nonceManager*: NonceManager epochMonitorFuture*: Future[void] + rootChangesFuture*: Future[void] proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch = ## gets time `t` as `flaot64` with subseconds resolution in the fractional part @@ -252,6 +253,7 @@ proc validateMessage*( waku_rln_errors_total.inc(labelValues = ["proof_verification"]) warn "invalid message: proof verification failed", payloadLen = msg.payload.len return MessageValidationResult.Invalid + if not proofVerificationRes.value(): # invalid proof warn "invalid message: invalid proof", payloadLen = msg.payload.len @@ -467,9 +469,6 @@ proc mount( # Initialize the groupManager (await groupManager.init()).isOkOr: return err("could not initialize the group manager: " & $error) - # Start the group sync - (await groupManager.startGroupSync()).isOkOr: - return err("could not start the group sync: " & $error) wakuRlnRelay = WakuRLNRelay( groupManager: groupManager, @@ -479,6 +478,11 @@ proc mount( onFatalErrorAction: conf.onFatalErrorAction, ) + # track root changes on smart contract merkle tree + if groupManager of OnchainGroupManager: + let onchainManager = cast[OnchainGroupManager](groupManager) + wakuRlnRelay.rootChangesFuture = onchainManager.trackRootChanges() + # Start epoch monitoring in the background wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay) return ok(wakuRlnRelay)