diff --git a/waku/waku_rln_relay/group_manager/on_chain_sync/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain_sync/group_manager.nim index e2640283f..a6074292d 100644 --- a/waku/waku_rln_relay/group_manager/on_chain_sync/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain_sync/group_manager.nim @@ -21,66 +21,82 @@ type OnchainSyncGroupManager* = ref object of GroupManager keystorePath*: Option[string] keystorePassword*: Option[string] registrationHandler*: Option[RegistrationHandler] - # Much simpler state tracking - contractSynced*: bool + validRootBuffer*: Deque[MerkleNode] +# using the when predicate does not work within the contract macro, hence need to dupe +contract(WakuRlnContract): + # this serves as an entrypoint into the rln membership set + proc register(idCommitment: UInt256, userMessageLimit: EthereumUInt32) + # Initializes the implementation contract (only used in unit tests) + proc initialize(maxMessageLimit: UInt256) + # this event is raised when a new member is registered + proc MemberRegistered(rateCommitment: UInt256, index: EthereumUInt32) {.event.} + # this function denotes existence of a given user + proc memberExists(idCommitment: Uint256): UInt256 {.view.} + # this constant describes the next index of a new member + proc commitmentIndex(): UInt256 {.view.} + # this constant describes the block number this contract was deployed on + proc deployedBlockNumber(): UInt256 {.view.} + # this constant describes max message limit of rln contract + proc MAX_MESSAGE_LIMIT(): UInt256 {.view.} + # this function returns the merkleProof for a given index + proc merkleProofElements(index: Uint256): seq[Uint256] {.view.} + # this function returns the Merkle root + proc root(): Uint256 {.view.} + +proc fetchMerkleProof*(g: OnchainSyncGroupManager) {.async.} = + let index = stuint(g.membershipIndex.get(), 256) + try: + let merkleProofInvocation = g.wakuRlnContract.get().merkleProofElements(index) + let merkleProof = await merkleProofInvocation.call() + # Await the contract call and extract the result + return merkleProof + except CatchableError: + error "Failed to fetch merkle proof: " & getCurrentExceptionMsg() + +proc fetchMerkleRoot*(g: OnchainSyncGroupManager) {.async.} = + let merkleRootInvocation = g.wakuRlnContract.get().root() + let merkleRoot = await merkleRootInvocation.call() + return merkleRoot template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: raise newException(CatchableError, "OnchainGroupManager is not initialized") -proc resultifiedInitGuard(g: OnchainGroupManager): GroupManagerResult[void] = - try: - initializedGuard(g) - return ok() - except CatchableError: - return err("OnchainGroupManager is not initialized") - template retryWrapper( - g: OnchainGroupManager, res: auto, errStr: string, body: untyped + g: OnchainSyncGroupManager, res: auto, errStr: string, body: untyped ): auto = retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction): body -proc setMetadata*( - g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber) -): GroupManagerResult[void] = - let normalizedBlock = - if lastProcessedBlock.isSome(): - lastProcessedBlock.get() - else: - g.latestProcessedBlock - try: - let metadataSetRes = g.rlnInstance.setMetadata( - RlnMetadata( - lastProcessedBlock: normalizedBlock.uint64, - chainId: g.chainId, - contractAddress: g.ethContractAddress, - validRoots: g.validRoots.toSeq(), - ) - ) - if metadataSetRes.isErr(): - return err("failed to persist rln metadata: " & metadataSetRes.error) - except CatchableError: - return err("failed to persist rln metadata: " & getCurrentExceptionMsg()) - return ok() +method validateRoot*( + g: OnchainSyncGroupManager, root: MerkleNode +): bool {.base, gcsafe, raises: [].} = + if g.validRootBuffer.find(root) >= 0: + return true + return false + +proc slideRootQueue*(g: OnchainSyncGroupManager): untyped = + let rootRes = g.fetchMerkleRoot() + if rootRes.isErr(): + raise newException(ValueError, "failed to get merkle root") + let rootAfterUpdate = rootRes.get() + + let overflowCount = g.validRootBuffer.len - AcceptableRootWindowSize + 1 + if overflowCount > 0: + for i in 0 ..< overflowCount: + g.validRootBuffer.popFirst() + + g.validRootBuffer.addLast(rootAfterUpdate) method atomicBatch*( - g: OnchainGroupManager, + g: OnchainSyncGroupManager, start: MembershipIndex, rateCommitments = newSeq[RawRateCommitment](), toRemoveIndices = newSeq[MembershipIndex](), ): Future[void] {.async: (raises: [Exception]), base.} = initializedGuard(g) - waku_rln_membership_insertion_duration_seconds.nanosecondTime: - let operationSuccess = - g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices) - if not operationSuccess: - raise newException(CatchableError, "atomic batch operation failed") - # TODO: when slashing is enabled, we need to track slashed members - waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) - if g.registerCb.isSome(): var membersSeq = newSeq[Membership]() for i in 0 ..< rateCommitments.len: @@ -91,10 +107,10 @@ method atomicBatch*( membersSeq.add(member) await g.registerCb.get()(membersSeq) - g.validRootBuffer = g.slideRootQueue() + g.slideRootQueue() method register*( - g: OnchainGroupManager, rateCommitment: RateCommitment + g: OnchainSyncGroupManager, rateCommitment: RateCommitment ): Future[void] {.async: (raises: [Exception]).} = initializedGuard(g) @@ -105,7 +121,7 @@ method register*( raise newException(ValueError, getCurrentExceptionMsg()) method registerBatch*( - g: OnchainGroupManager, rateCommitments: seq[RawRateCommitment] + g: OnchainSyncGroupManager, rateCommitments: seq[RawRateCommitment] ): Future[void] {.async: (raises: [Exception]).} = initializedGuard(g) @@ -113,7 +129,7 @@ method registerBatch*( g.latestIndex += MembershipIndex(rateCommitments.len) method register*( - g: OnchainGroupManager, + g: OnchainSyncGroupManager, identityCredential: IdentityCredential, userMessageLimit: UserMessageLimit, ): Future[void] {.async: (raises: [Exception]).} = @@ -166,34 +182,18 @@ method register*( g.userMessageLimit = some(userMessageLimit) g.membershipIndex = some(membershipIndex.toMembershipIndex()) - # don't handle member insertion into the tree here, it will be handled by the event listener return method withdraw*( - g: OnchainGroupManager, idCommitment: IDCommitment + g: OnchainSyncGroupManager, idCommitment: IDCommitment ): Future[void] {.async: (raises: [Exception]).} = initializedGuard(g) # TODO: after slashing is enabled on the contract method withdrawBatch*( - g: OnchainGroupManager, idCommitments: seq[IDCommitment] + g: OnchainSyncGroupManager, idCommitments: seq[IDCommitment] ): Future[void] {.async: (raises: [Exception]).} = initializedGuard(g) -proc fetchMerkleProof*(g: OnchainSyncGroupManager) {.async.} = - let index = stuint(g.membershipIndex.get(), 256) - try: - let merkleProofInvocation = g.wakuRlnContract.get().merkleProofElements(index) - let merkleProof = await merkleProofInvocation.call() - # Await the contract call and extract the result - return merkleProof - except CatchableError: - error "Failed to fetch merkle proof: " & getCurrentExceptionMsg() - -proc fetchMerkleRoot*(g: OnchainSyncGroupManager) {.async.} = - let merkleRootInvocation = g.wakuRlnContract.get().root() - let merkleRoot = await merkleRootInvocation.call() - return merkleRoot - method generateProof*( g: OnchainSyncGroupManager, data: seq[byte], @@ -386,7 +386,7 @@ method init*(g: OnchainSyncGroupManager): Future[GroupManagerResult[void]] {.asy return err("persisted data: chain id mismatch") if metadata.contractAddress != g.ethContractAddress.toLower(): return err("persisted data: contract address mismatch") - + g.rlnRelayMaxMessageLimit = cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call())