From 830653df6d683c19e0ff2b4b46fb995dbbb7b123 Mon Sep 17 00:00:00 2001 From: darshankabariya Date: Tue, 18 Mar 2025 18:11:01 +0530 Subject: [PATCH] feat: deprecated sync --- .../on_chain_sync/group_manager.nim | 283 +++++++++++++++++- 1 file changed, 279 insertions(+), 4 deletions(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain_sync/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain_sync/group_manager.nim index 4fa4969af..e2640283f 100644 --- a/waku/waku_rln_relay/group_manager/on_chain_sync/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain_sync/group_manager.nim @@ -12,7 +12,172 @@ import logScope: topics = "waku rln_relay onchain_sync_group_manager" -type OnChainSyncGroupManager* = ref object of OnchainGroupManager +type OnchainSyncGroupManager* = ref object of GroupManager + ethClientUrl*: string + ethContractAddress*: string + ethRpc*: Option[Web3] + wakuRlnContract*: Option[WakuRlnContractWithSender] + chainId*: uint + keystorePath*: Option[string] + keystorePassword*: Option[string] + registrationHandler*: Option[RegistrationHandler] + # Much simpler state tracking + contractSynced*: bool + + +template initializedGuard(g: OnchainGroupManager): untyped = + if not g.initialized: + raise newException(CatchableError, "OnchainGroupManager is not initialized") + +proc resultifiedInitGuard(g: OnchainGroupManager): GroupManagerResult[void] = + try: + initializedGuard(g) + return ok() + except CatchableError: + return err("OnchainGroupManager is not initialized") + +template retryWrapper( + g: OnchainGroupManager, res: auto, errStr: string, body: untyped +): auto = + retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction): + body + +proc setMetadata*( + g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber) +): GroupManagerResult[void] = + let normalizedBlock = + if lastProcessedBlock.isSome(): + lastProcessedBlock.get() + else: + g.latestProcessedBlock + try: + let metadataSetRes = g.rlnInstance.setMetadata( + RlnMetadata( + lastProcessedBlock: normalizedBlock.uint64, + chainId: g.chainId, + contractAddress: g.ethContractAddress, + validRoots: g.validRoots.toSeq(), + ) + ) + if metadataSetRes.isErr(): + return err("failed to persist rln metadata: " & metadataSetRes.error) + except CatchableError: + return err("failed to persist rln metadata: " & getCurrentExceptionMsg()) + return ok() + +method atomicBatch*( + g: OnchainGroupManager, + start: MembershipIndex, + rateCommitments = newSeq[RawRateCommitment](), + toRemoveIndices = newSeq[MembershipIndex](), +): Future[void] {.async: (raises: [Exception]), base.} = + initializedGuard(g) + + waku_rln_membership_insertion_duration_seconds.nanosecondTime: + let operationSuccess = + g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices) + if not operationSuccess: + raise newException(CatchableError, "atomic batch operation failed") + # TODO: when slashing is enabled, we need to track slashed members + waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) + + if g.registerCb.isSome(): + var membersSeq = newSeq[Membership]() + for i in 0 ..< rateCommitments.len: + var index = start + MembershipIndex(i) + debug "registering member to callback", + rateCommitment = rateCommitments[i], index = index + let member = Membership(rateCommitment: rateCommitments[i], index: index) + membersSeq.add(member) + await g.registerCb.get()(membersSeq) + + g.validRootBuffer = g.slideRootQueue() + +method register*( + g: OnchainGroupManager, rateCommitment: RateCommitment +): Future[void] {.async: (raises: [Exception]).} = + initializedGuard(g) + + try: + let leaf = rateCommitment.toLeaf().get() + await g.registerBatch(@[leaf]) + except CatchableError: + raise newException(ValueError, getCurrentExceptionMsg()) + +method registerBatch*( + g: OnchainGroupManager, rateCommitments: seq[RawRateCommitment] +): Future[void] {.async: (raises: [Exception]).} = + initializedGuard(g) + + await g.atomicBatch(g.latestIndex, rateCommitments) + g.latestIndex += MembershipIndex(rateCommitments.len) + +method register*( + g: OnchainGroupManager, + identityCredential: IdentityCredential, + userMessageLimit: UserMessageLimit, +): Future[void] {.async: (raises: [Exception]).} = + initializedGuard(g) + + let ethRpc = g.ethRpc.get() + let wakuRlnContract = g.wakuRlnContract.get() + + var gasPrice: int + g.retryWrapper(gasPrice, "Failed to get gas price"): + int(await ethRpc.provider.eth_gasPrice()) * 2 + let idCommitment = identityCredential.idCommitment.toUInt256() + + debug "registering the member", + idCommitment = idCommitment, userMessageLimit = userMessageLimit + var txHash: TxHash + g.retryWrapper(txHash, "Failed to register the member"): + await wakuRlnContract.register(idCommitment, userMessageLimit.stuint(32)).send( + gasPrice = gasPrice + ) + + # wait for the transaction to be mined + var tsReceipt: ReceiptObject + g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"): + await ethRpc.getMinedTransactionReceipt(txHash) + debug "registration transaction mined", txHash = txHash + g.registrationTxHash = some(txHash) + # the receipt topic holds the hash of signature of the raised events + # TODO: make this robust. search within the event list for the event + debug "ts receipt", receipt = tsReceipt[] + + if tsReceipt.status.isNone() or tsReceipt.status.get() != 1.Quantity: + raise newException(ValueError, "register: transaction failed") + + let firstTopic = tsReceipt.logs[0].topics[0] + # the hash of the signature of MemberRegistered(uint256,uint32) event is equal to the following hex value + if firstTopic != + cast[FixedBytes[32]](keccak.keccak256.digest("MemberRegistered(uint256,uint32)").data): + raise newException(ValueError, "register: unexpected event signature") + + # the arguments of the raised event i.e., MemberRegistered are encoded inside the data field + # data = rateCommitment encoded as 256 bits || index encoded as 32 bits + let arguments = tsReceipt.logs[0].data + debug "tx log data", arguments = arguments + let + # In TX log data, uints are encoded in big endian + membershipIndex = UInt256.fromBytesBE(arguments[32 ..^ 1]) + + debug "parsed membershipIndex", membershipIndex + g.userMessageLimit = some(userMessageLimit) + g.membershipIndex = some(membershipIndex.toMembershipIndex()) + + # don't handle member insertion into the tree here, it will be handled by the event listener + return + +method withdraw*( + g: OnchainGroupManager, idCommitment: IDCommitment +): Future[void] {.async: (raises: [Exception]).} = + initializedGuard(g) # TODO: after slashing is enabled on the contract + +method withdrawBatch*( + g: OnchainGroupManager, idCommitments: seq[IDCommitment] +): Future[void] {.async: (raises: [Exception]).} = + initializedGuard(g) proc fetchMerkleProof*(g: OnchainSyncGroupManager) {.async.} = let index = stuint(g.membershipIndex.get(), 256) @@ -30,7 +195,7 @@ proc fetchMerkleRoot*(g: OnchainSyncGroupManager) {.async.} = return merkleRoot method generateProof*( - g: OnChainSyncGroupManager, + g: OnchainSyncGroupManager, data: seq[byte], epoch: Epoch, messageId: MessageId, @@ -108,7 +273,7 @@ method generateProof*( return ok(output) method verifyProof*( - g: OnChainSyncGroupManager, input: openArray[byte], proof: RateLimitProof + g: OnchainSyncGroupManager, input: openArray[byte], proof: RateLimitProof ): GroupManagerResult[bool] {.base, gcsafe, raises: [].} = ## verifies the proof, returns an error if the proof verification fails ## returns true if the proof is valid @@ -139,4 +304,114 @@ method verifyProof*( if not validProof: return ok(false) else: - return ok(true) \ No newline at end of file + return ok(true) + +method init*(g: OnchainSyncGroupManager): Future[GroupManagerResult[void]] {.async.} = + # check if the Ethereum client is reachable + var ethRpc: Web3 + g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"): + await newWeb3(g.ethClientUrl) + + var fetchedChainId: uint + g.retryWrapper(fetchedChainId, "Failed to get the chain id"): + uint(await ethRpc.provider.eth_chainId()) + + # Set the chain id + if g.chainId == 0: + warn "Chain ID not set in config, using RPC Provider's Chain ID", + providerChainId = fetchedChainId + + if g.chainId != 0 and g.chainId != fetchedChainId: + return err( + "The RPC Provided a Chain ID which is different than the provided Chain ID: provided = " & + $g.chainId & ", actual = " & $fetchedChainId + ) + + g.chainId = fetchedChainId + + if g.ethPrivateKey.isSome(): + let pk = g.ethPrivateKey.get() + let parsedPk = keys.PrivateKey.fromHex(pk).valueOr: + return err("failed to parse the private key" & ": " & $error) + ethRpc.privateKey = Opt.some(parsedPk) + ethRpc.defaultAccount = + ethRpc.privateKey.get().toPublicKey().toCanonicalAddress().Address + + let contractAddress = web3.fromHex(web3.Address, g.ethContractAddress) + let wakuRlnContract = ethRpc.contractSender(WakuRlnContract, contractAddress) + + g.ethRpc = some(ethRpc) + g.wakuRlnContract = some(wakuRlnContract) + + if g.keystorePath.isSome() and g.keystorePassword.isSome(): + if not fileExists(g.keystorePath.get()): + error "File provided as keystore path does not exist", path = g.keystorePath.get() + return err("File provided as keystore path does not exist") + + var keystoreQuery = KeystoreMembership( + membershipContract: + MembershipContract(chainId: $g.chainId, address: g.ethContractAddress) + ) + if g.membershipIndex.isSome(): + keystoreQuery.treeIndex = MembershipIndex(g.membershipIndex.get()) + waku_rln_membership_credentials_import_duration_seconds.nanosecondTime: + let keystoreCred = getMembershipCredentials( + path = g.keystorePath.get(), + password = g.keystorePassword.get(), + query = keystoreQuery, + appInfo = RLNAppInfo, + ).valueOr: + return err("failed to get the keystore credentials: " & $error) + + g.membershipIndex = some(keystoreCred.treeIndex) + g.userMessageLimit = some(keystoreCred.userMessageLimit) + # now we check on the contract if the commitment actually has a membership + try: + let membershipExists = await wakuRlnContract + .memberExists(keystoreCred.identityCredential.idCommitment.toUInt256()) + .call() + if membershipExists == 0: + return err("the commitment does not have a membership") + except CatchableError: + return err("failed to check if the commitment has a membership") + + g.idCredentials = some(keystoreCred.identityCredential) + + let metadataGetOptRes = g.rlnInstance.getMetadata() + if metadataGetOptRes.isErr(): + warn "could not initialize with persisted rln metadata" + elif metadataGetOptRes.get().isSome(): + let metadata = metadataGetOptRes.get().get() + if metadata.chainId != uint(g.chainId): + return err("persisted data: chain id mismatch") + if metadata.contractAddress != g.ethContractAddress.toLower(): + return err("persisted data: contract address mismatch") + + g.rlnRelayMaxMessageLimit = + cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call()) + + proc onDisconnect() {.async.} = + error "Ethereum client disconnected" + let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) + info "reconnecting with the Ethereum client, and restarting group sync", + fromBlock = fromBlock + var newEthRpc: Web3 + g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"): + await newWeb3(g.ethClientUrl) + newEthRpc.ondisconnect = ethRpc.ondisconnect + g.ethRpc = some(newEthRpc) + + try: + await g.startOnchainSync() + except CatchableError, Exception: + g.onFatalErrorAction( + "failed to restart group sync" & ": " & getCurrentExceptionMsg() + ) + + ethRpc.ondisconnect = proc() = + asyncSpawn onDisconnect() + + waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) + g.initialized = true + + return ok() \ No newline at end of file