feat: deprecated sync

This commit is contained in:
darshankabariya 2025-03-18 18:11:01 +05:30
parent 80ed94e302
commit 278b5089be

View File

@ -12,7 +12,172 @@ import
logScope:
topics = "waku rln_relay onchain_sync_group_manager"
type OnChainSyncGroupManager* = ref object of OnchainGroupManager
type OnchainSyncGroupManager* = ref object of GroupManager
ethClientUrl*: string
ethContractAddress*: string
ethRpc*: Option[Web3]
wakuRlnContract*: Option[WakuRlnContractWithSender]
chainId*: uint
keystorePath*: Option[string]
keystorePassword*: Option[string]
registrationHandler*: Option[RegistrationHandler]
# Much simpler state tracking
contractSynced*: bool
template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(CatchableError, "OnchainGroupManager is not initialized")
proc resultifiedInitGuard(g: OnchainGroupManager): GroupManagerResult[void] =
try:
initializedGuard(g)
return ok()
except CatchableError:
return err("OnchainGroupManager is not initialized")
template retryWrapper(
g: OnchainGroupManager, res: auto, errStr: string, body: untyped
): auto =
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
body
proc setMetadata*(
g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber)
): GroupManagerResult[void] =
let normalizedBlock =
if lastProcessedBlock.isSome():
lastProcessedBlock.get()
else:
g.latestProcessedBlock
try:
let metadataSetRes = g.rlnInstance.setMetadata(
RlnMetadata(
lastProcessedBlock: normalizedBlock.uint64,
chainId: g.chainId,
contractAddress: g.ethContractAddress,
validRoots: g.validRoots.toSeq(),
)
)
if metadataSetRes.isErr():
return err("failed to persist rln metadata: " & metadataSetRes.error)
except CatchableError:
return err("failed to persist rln metadata: " & getCurrentExceptionMsg())
return ok()
method atomicBatch*(
g: OnchainGroupManager,
start: MembershipIndex,
rateCommitments = newSeq[RawRateCommitment](),
toRemoveIndices = newSeq[MembershipIndex](),
): Future[void] {.async: (raises: [Exception]), base.} =
initializedGuard(g)
waku_rln_membership_insertion_duration_seconds.nanosecondTime:
let operationSuccess =
g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices)
if not operationSuccess:
raise newException(CatchableError, "atomic batch operation failed")
# TODO: when slashing is enabled, we need to track slashed members
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
if g.registerCb.isSome():
var membersSeq = newSeq[Membership]()
for i in 0 ..< rateCommitments.len:
var index = start + MembershipIndex(i)
debug "registering member to callback",
rateCommitment = rateCommitments[i], index = index
let member = Membership(rateCommitment: rateCommitments[i], index: index)
membersSeq.add(member)
await g.registerCb.get()(membersSeq)
g.validRootBuffer = g.slideRootQueue()
method register*(
g: OnchainGroupManager, rateCommitment: RateCommitment
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
try:
let leaf = rateCommitment.toLeaf().get()
await g.registerBatch(@[leaf])
except CatchableError:
raise newException(ValueError, getCurrentExceptionMsg())
method registerBatch*(
g: OnchainGroupManager, rateCommitments: seq[RawRateCommitment]
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
await g.atomicBatch(g.latestIndex, rateCommitments)
g.latestIndex += MembershipIndex(rateCommitments.len)
method register*(
g: OnchainGroupManager,
identityCredential: IdentityCredential,
userMessageLimit: UserMessageLimit,
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
let ethRpc = g.ethRpc.get()
let wakuRlnContract = g.wakuRlnContract.get()
var gasPrice: int
g.retryWrapper(gasPrice, "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = identityCredential.idCommitment.toUInt256()
debug "registering the member",
idCommitment = idCommitment, userMessageLimit = userMessageLimit
var txHash: TxHash
g.retryWrapper(txHash, "Failed to register the member"):
await wakuRlnContract.register(idCommitment, userMessageLimit.stuint(32)).send(
gasPrice = gasPrice
)
# wait for the transaction to be mined
var tsReceipt: ReceiptObject
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash)
# the receipt topic holds the hash of signature of the raised events
# TODO: make this robust. search within the event list for the event
debug "ts receipt", receipt = tsReceipt[]
if tsReceipt.status.isNone() or tsReceipt.status.get() != 1.Quantity:
raise newException(ValueError, "register: transaction failed")
let firstTopic = tsReceipt.logs[0].topics[0]
# the hash of the signature of MemberRegistered(uint256,uint32) event is equal to the following hex value
if firstTopic !=
cast[FixedBytes[32]](keccak.keccak256.digest("MemberRegistered(uint256,uint32)").data):
raise newException(ValueError, "register: unexpected event signature")
# the arguments of the raised event i.e., MemberRegistered are encoded inside the data field
# data = rateCommitment encoded as 256 bits || index encoded as 32 bits
let arguments = tsReceipt.logs[0].data
debug "tx log data", arguments = arguments
let
# In TX log data, uints are encoded in big endian
membershipIndex = UInt256.fromBytesBE(arguments[32 ..^ 1])
debug "parsed membershipIndex", membershipIndex
g.userMessageLimit = some(userMessageLimit)
g.membershipIndex = some(membershipIndex.toMembershipIndex())
# don't handle member insertion into the tree here, it will be handled by the event listener
return
method withdraw*(
g: OnchainGroupManager, idCommitment: IDCommitment
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g) # TODO: after slashing is enabled on the contract
method withdrawBatch*(
g: OnchainGroupManager, idCommitments: seq[IDCommitment]
): Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)
proc fetchMerkleProof*(g: OnchainSyncGroupManager) {.async.} =
let index = stuint(g.membershipIndex.get(), 256)
@ -30,7 +195,7 @@ proc fetchMerkleRoot*(g: OnchainSyncGroupManager) {.async.} =
return merkleRoot
method generateProof*(
g: OnChainSyncGroupManager,
g: OnchainSyncGroupManager,
data: seq[byte],
epoch: Epoch,
messageId: MessageId,
@ -108,7 +273,7 @@ method generateProof*(
return ok(output)
method verifyProof*(
g: OnChainSyncGroupManager, input: openArray[byte], proof: RateLimitProof
g: OnchainSyncGroupManager, input: openArray[byte], proof: RateLimitProof
): GroupManagerResult[bool] {.base, gcsafe, raises: [].} =
## verifies the proof, returns an error if the proof verification fails
## returns true if the proof is valid
@ -139,4 +304,114 @@ method verifyProof*(
if not validProof:
return ok(false)
else:
return ok(true)
return ok(true)
method init*(g: OnchainSyncGroupManager): Future[GroupManagerResult[void]] {.async.} =
# check if the Ethereum client is reachable
var ethRpc: Web3
g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
await newWeb3(g.ethClientUrl)
var fetchedChainId: uint
g.retryWrapper(fetchedChainId, "Failed to get the chain id"):
uint(await ethRpc.provider.eth_chainId())
# Set the chain id
if g.chainId == 0:
warn "Chain ID not set in config, using RPC Provider's Chain ID",
providerChainId = fetchedChainId
if g.chainId != 0 and g.chainId != fetchedChainId:
return err(
"The RPC Provided a Chain ID which is different than the provided Chain ID: provided = " &
$g.chainId & ", actual = " & $fetchedChainId
)
g.chainId = fetchedChainId
if g.ethPrivateKey.isSome():
let pk = g.ethPrivateKey.get()
let parsedPk = keys.PrivateKey.fromHex(pk).valueOr:
return err("failed to parse the private key" & ": " & $error)
ethRpc.privateKey = Opt.some(parsedPk)
ethRpc.defaultAccount =
ethRpc.privateKey.get().toPublicKey().toCanonicalAddress().Address
let contractAddress = web3.fromHex(web3.Address, g.ethContractAddress)
let wakuRlnContract = ethRpc.contractSender(WakuRlnContract, contractAddress)
g.ethRpc = some(ethRpc)
g.wakuRlnContract = some(wakuRlnContract)
if g.keystorePath.isSome() and g.keystorePassword.isSome():
if not fileExists(g.keystorePath.get()):
error "File provided as keystore path does not exist", path = g.keystorePath.get()
return err("File provided as keystore path does not exist")
var keystoreQuery = KeystoreMembership(
membershipContract:
MembershipContract(chainId: $g.chainId, address: g.ethContractAddress)
)
if g.membershipIndex.isSome():
keystoreQuery.treeIndex = MembershipIndex(g.membershipIndex.get())
waku_rln_membership_credentials_import_duration_seconds.nanosecondTime:
let keystoreCred = getMembershipCredentials(
path = g.keystorePath.get(),
password = g.keystorePassword.get(),
query = keystoreQuery,
appInfo = RLNAppInfo,
).valueOr:
return err("failed to get the keystore credentials: " & $error)
g.membershipIndex = some(keystoreCred.treeIndex)
g.userMessageLimit = some(keystoreCred.userMessageLimit)
# now we check on the contract if the commitment actually has a membership
try:
let membershipExists = await wakuRlnContract
.memberExists(keystoreCred.identityCredential.idCommitment.toUInt256())
.call()
if membershipExists == 0:
return err("the commitment does not have a membership")
except CatchableError:
return err("failed to check if the commitment has a membership")
g.idCredentials = some(keystoreCred.identityCredential)
let metadataGetOptRes = g.rlnInstance.getMetadata()
if metadataGetOptRes.isErr():
warn "could not initialize with persisted rln metadata"
elif metadataGetOptRes.get().isSome():
let metadata = metadataGetOptRes.get().get()
if metadata.chainId != uint(g.chainId):
return err("persisted data: chain id mismatch")
if metadata.contractAddress != g.ethContractAddress.toLower():
return err("persisted data: contract address mismatch")
g.rlnRelayMaxMessageLimit =
cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call())
proc onDisconnect() {.async.} =
error "Ethereum client disconnected"
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
info "reconnecting with the Ethereum client, and restarting group sync",
fromBlock = fromBlock
var newEthRpc: Web3
g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"):
await newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc)
try:
await g.startOnchainSync()
except CatchableError, Exception:
g.onFatalErrorAction(
"failed to restart group sync" & ": " & getCurrentExceptionMsg()
)
ethRpc.ondisconnect = proc() =
asyncSpawn onDisconnect()
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true
return ok()