mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-08 16:03:11 +00:00
feat: deprecated sync
This commit is contained in:
parent
c9e5bc98f5
commit
830653df6d
@ -12,7 +12,172 @@ import
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "waku rln_relay onchain_sync_group_manager"
|
topics = "waku rln_relay onchain_sync_group_manager"
|
||||||
|
|
||||||
type OnChainSyncGroupManager* = ref object of OnchainGroupManager
|
type OnchainSyncGroupManager* = ref object of GroupManager
|
||||||
|
ethClientUrl*: string
|
||||||
|
ethContractAddress*: string
|
||||||
|
ethRpc*: Option[Web3]
|
||||||
|
wakuRlnContract*: Option[WakuRlnContractWithSender]
|
||||||
|
chainId*: uint
|
||||||
|
keystorePath*: Option[string]
|
||||||
|
keystorePassword*: Option[string]
|
||||||
|
registrationHandler*: Option[RegistrationHandler]
|
||||||
|
# Much simpler state tracking
|
||||||
|
contractSynced*: bool
|
||||||
|
|
||||||
|
|
||||||
|
template initializedGuard(g: OnchainGroupManager): untyped =
|
||||||
|
if not g.initialized:
|
||||||
|
raise newException(CatchableError, "OnchainGroupManager is not initialized")
|
||||||
|
|
||||||
|
proc resultifiedInitGuard(g: OnchainGroupManager): GroupManagerResult[void] =
|
||||||
|
try:
|
||||||
|
initializedGuard(g)
|
||||||
|
return ok()
|
||||||
|
except CatchableError:
|
||||||
|
return err("OnchainGroupManager is not initialized")
|
||||||
|
|
||||||
|
template retryWrapper(
|
||||||
|
g: OnchainGroupManager, res: auto, errStr: string, body: untyped
|
||||||
|
): auto =
|
||||||
|
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
|
||||||
|
body
|
||||||
|
|
||||||
|
proc setMetadata*(
|
||||||
|
g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber)
|
||||||
|
): GroupManagerResult[void] =
|
||||||
|
let normalizedBlock =
|
||||||
|
if lastProcessedBlock.isSome():
|
||||||
|
lastProcessedBlock.get()
|
||||||
|
else:
|
||||||
|
g.latestProcessedBlock
|
||||||
|
try:
|
||||||
|
let metadataSetRes = g.rlnInstance.setMetadata(
|
||||||
|
RlnMetadata(
|
||||||
|
lastProcessedBlock: normalizedBlock.uint64,
|
||||||
|
chainId: g.chainId,
|
||||||
|
contractAddress: g.ethContractAddress,
|
||||||
|
validRoots: g.validRoots.toSeq(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if metadataSetRes.isErr():
|
||||||
|
return err("failed to persist rln metadata: " & metadataSetRes.error)
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to persist rln metadata: " & getCurrentExceptionMsg())
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
method atomicBatch*(
|
||||||
|
g: OnchainGroupManager,
|
||||||
|
start: MembershipIndex,
|
||||||
|
rateCommitments = newSeq[RawRateCommitment](),
|
||||||
|
toRemoveIndices = newSeq[MembershipIndex](),
|
||||||
|
): Future[void] {.async: (raises: [Exception]), base.} =
|
||||||
|
initializedGuard(g)
|
||||||
|
|
||||||
|
waku_rln_membership_insertion_duration_seconds.nanosecondTime:
|
||||||
|
let operationSuccess =
|
||||||
|
g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices)
|
||||||
|
if not operationSuccess:
|
||||||
|
raise newException(CatchableError, "atomic batch operation failed")
|
||||||
|
# TODO: when slashing is enabled, we need to track slashed members
|
||||||
|
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
|
||||||
|
|
||||||
|
if g.registerCb.isSome():
|
||||||
|
var membersSeq = newSeq[Membership]()
|
||||||
|
for i in 0 ..< rateCommitments.len:
|
||||||
|
var index = start + MembershipIndex(i)
|
||||||
|
debug "registering member to callback",
|
||||||
|
rateCommitment = rateCommitments[i], index = index
|
||||||
|
let member = Membership(rateCommitment: rateCommitments[i], index: index)
|
||||||
|
membersSeq.add(member)
|
||||||
|
await g.registerCb.get()(membersSeq)
|
||||||
|
|
||||||
|
g.validRootBuffer = g.slideRootQueue()
|
||||||
|
|
||||||
|
method register*(
|
||||||
|
g: OnchainGroupManager, rateCommitment: RateCommitment
|
||||||
|
): Future[void] {.async: (raises: [Exception]).} =
|
||||||
|
initializedGuard(g)
|
||||||
|
|
||||||
|
try:
|
||||||
|
let leaf = rateCommitment.toLeaf().get()
|
||||||
|
await g.registerBatch(@[leaf])
|
||||||
|
except CatchableError:
|
||||||
|
raise newException(ValueError, getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
method registerBatch*(
|
||||||
|
g: OnchainGroupManager, rateCommitments: seq[RawRateCommitment]
|
||||||
|
): Future[void] {.async: (raises: [Exception]).} =
|
||||||
|
initializedGuard(g)
|
||||||
|
|
||||||
|
await g.atomicBatch(g.latestIndex, rateCommitments)
|
||||||
|
g.latestIndex += MembershipIndex(rateCommitments.len)
|
||||||
|
|
||||||
|
method register*(
|
||||||
|
g: OnchainGroupManager,
|
||||||
|
identityCredential: IdentityCredential,
|
||||||
|
userMessageLimit: UserMessageLimit,
|
||||||
|
): Future[void] {.async: (raises: [Exception]).} =
|
||||||
|
initializedGuard(g)
|
||||||
|
|
||||||
|
let ethRpc = g.ethRpc.get()
|
||||||
|
let wakuRlnContract = g.wakuRlnContract.get()
|
||||||
|
|
||||||
|
var gasPrice: int
|
||||||
|
g.retryWrapper(gasPrice, "Failed to get gas price"):
|
||||||
|
int(await ethRpc.provider.eth_gasPrice()) * 2
|
||||||
|
let idCommitment = identityCredential.idCommitment.toUInt256()
|
||||||
|
|
||||||
|
debug "registering the member",
|
||||||
|
idCommitment = idCommitment, userMessageLimit = userMessageLimit
|
||||||
|
var txHash: TxHash
|
||||||
|
g.retryWrapper(txHash, "Failed to register the member"):
|
||||||
|
await wakuRlnContract.register(idCommitment, userMessageLimit.stuint(32)).send(
|
||||||
|
gasPrice = gasPrice
|
||||||
|
)
|
||||||
|
|
||||||
|
# wait for the transaction to be mined
|
||||||
|
var tsReceipt: ReceiptObject
|
||||||
|
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
|
||||||
|
await ethRpc.getMinedTransactionReceipt(txHash)
|
||||||
|
debug "registration transaction mined", txHash = txHash
|
||||||
|
g.registrationTxHash = some(txHash)
|
||||||
|
# the receipt topic holds the hash of signature of the raised events
|
||||||
|
# TODO: make this robust. search within the event list for the event
|
||||||
|
debug "ts receipt", receipt = tsReceipt[]
|
||||||
|
|
||||||
|
if tsReceipt.status.isNone() or tsReceipt.status.get() != 1.Quantity:
|
||||||
|
raise newException(ValueError, "register: transaction failed")
|
||||||
|
|
||||||
|
let firstTopic = tsReceipt.logs[0].topics[0]
|
||||||
|
# the hash of the signature of MemberRegistered(uint256,uint32) event is equal to the following hex value
|
||||||
|
if firstTopic !=
|
||||||
|
cast[FixedBytes[32]](keccak.keccak256.digest("MemberRegistered(uint256,uint32)").data):
|
||||||
|
raise newException(ValueError, "register: unexpected event signature")
|
||||||
|
|
||||||
|
# the arguments of the raised event i.e., MemberRegistered are encoded inside the data field
|
||||||
|
# data = rateCommitment encoded as 256 bits || index encoded as 32 bits
|
||||||
|
let arguments = tsReceipt.logs[0].data
|
||||||
|
debug "tx log data", arguments = arguments
|
||||||
|
let
|
||||||
|
# In TX log data, uints are encoded in big endian
|
||||||
|
membershipIndex = UInt256.fromBytesBE(arguments[32 ..^ 1])
|
||||||
|
|
||||||
|
debug "parsed membershipIndex", membershipIndex
|
||||||
|
g.userMessageLimit = some(userMessageLimit)
|
||||||
|
g.membershipIndex = some(membershipIndex.toMembershipIndex())
|
||||||
|
|
||||||
|
# don't handle member insertion into the tree here, it will be handled by the event listener
|
||||||
|
return
|
||||||
|
|
||||||
|
method withdraw*(
|
||||||
|
g: OnchainGroupManager, idCommitment: IDCommitment
|
||||||
|
): Future[void] {.async: (raises: [Exception]).} =
|
||||||
|
initializedGuard(g) # TODO: after slashing is enabled on the contract
|
||||||
|
|
||||||
|
method withdrawBatch*(
|
||||||
|
g: OnchainGroupManager, idCommitments: seq[IDCommitment]
|
||||||
|
): Future[void] {.async: (raises: [Exception]).} =
|
||||||
|
initializedGuard(g)
|
||||||
|
|
||||||
proc fetchMerkleProof*(g: OnchainSyncGroupManager) {.async.} =
|
proc fetchMerkleProof*(g: OnchainSyncGroupManager) {.async.} =
|
||||||
let index = stuint(g.membershipIndex.get(), 256)
|
let index = stuint(g.membershipIndex.get(), 256)
|
||||||
@ -30,7 +195,7 @@ proc fetchMerkleRoot*(g: OnchainSyncGroupManager) {.async.} =
|
|||||||
return merkleRoot
|
return merkleRoot
|
||||||
|
|
||||||
method generateProof*(
|
method generateProof*(
|
||||||
g: OnChainSyncGroupManager,
|
g: OnchainSyncGroupManager,
|
||||||
data: seq[byte],
|
data: seq[byte],
|
||||||
epoch: Epoch,
|
epoch: Epoch,
|
||||||
messageId: MessageId,
|
messageId: MessageId,
|
||||||
@ -108,7 +273,7 @@ method generateProof*(
|
|||||||
return ok(output)
|
return ok(output)
|
||||||
|
|
||||||
method verifyProof*(
|
method verifyProof*(
|
||||||
g: OnChainSyncGroupManager, input: openArray[byte], proof: RateLimitProof
|
g: OnchainSyncGroupManager, input: openArray[byte], proof: RateLimitProof
|
||||||
): GroupManagerResult[bool] {.base, gcsafe, raises: [].} =
|
): GroupManagerResult[bool] {.base, gcsafe, raises: [].} =
|
||||||
## verifies the proof, returns an error if the proof verification fails
|
## verifies the proof, returns an error if the proof verification fails
|
||||||
## returns true if the proof is valid
|
## returns true if the proof is valid
|
||||||
@ -139,4 +304,114 @@ method verifyProof*(
|
|||||||
if not validProof:
|
if not validProof:
|
||||||
return ok(false)
|
return ok(false)
|
||||||
else:
|
else:
|
||||||
return ok(true)
|
return ok(true)
|
||||||
|
|
||||||
|
method init*(g: OnchainSyncGroupManager): Future[GroupManagerResult[void]] {.async.} =
|
||||||
|
# check if the Ethereum client is reachable
|
||||||
|
var ethRpc: Web3
|
||||||
|
g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
|
||||||
|
await newWeb3(g.ethClientUrl)
|
||||||
|
|
||||||
|
var fetchedChainId: uint
|
||||||
|
g.retryWrapper(fetchedChainId, "Failed to get the chain id"):
|
||||||
|
uint(await ethRpc.provider.eth_chainId())
|
||||||
|
|
||||||
|
# Set the chain id
|
||||||
|
if g.chainId == 0:
|
||||||
|
warn "Chain ID not set in config, using RPC Provider's Chain ID",
|
||||||
|
providerChainId = fetchedChainId
|
||||||
|
|
||||||
|
if g.chainId != 0 and g.chainId != fetchedChainId:
|
||||||
|
return err(
|
||||||
|
"The RPC Provided a Chain ID which is different than the provided Chain ID: provided = " &
|
||||||
|
$g.chainId & ", actual = " & $fetchedChainId
|
||||||
|
)
|
||||||
|
|
||||||
|
g.chainId = fetchedChainId
|
||||||
|
|
||||||
|
if g.ethPrivateKey.isSome():
|
||||||
|
let pk = g.ethPrivateKey.get()
|
||||||
|
let parsedPk = keys.PrivateKey.fromHex(pk).valueOr:
|
||||||
|
return err("failed to parse the private key" & ": " & $error)
|
||||||
|
ethRpc.privateKey = Opt.some(parsedPk)
|
||||||
|
ethRpc.defaultAccount =
|
||||||
|
ethRpc.privateKey.get().toPublicKey().toCanonicalAddress().Address
|
||||||
|
|
||||||
|
let contractAddress = web3.fromHex(web3.Address, g.ethContractAddress)
|
||||||
|
let wakuRlnContract = ethRpc.contractSender(WakuRlnContract, contractAddress)
|
||||||
|
|
||||||
|
g.ethRpc = some(ethRpc)
|
||||||
|
g.wakuRlnContract = some(wakuRlnContract)
|
||||||
|
|
||||||
|
if g.keystorePath.isSome() and g.keystorePassword.isSome():
|
||||||
|
if not fileExists(g.keystorePath.get()):
|
||||||
|
error "File provided as keystore path does not exist", path = g.keystorePath.get()
|
||||||
|
return err("File provided as keystore path does not exist")
|
||||||
|
|
||||||
|
var keystoreQuery = KeystoreMembership(
|
||||||
|
membershipContract:
|
||||||
|
MembershipContract(chainId: $g.chainId, address: g.ethContractAddress)
|
||||||
|
)
|
||||||
|
if g.membershipIndex.isSome():
|
||||||
|
keystoreQuery.treeIndex = MembershipIndex(g.membershipIndex.get())
|
||||||
|
waku_rln_membership_credentials_import_duration_seconds.nanosecondTime:
|
||||||
|
let keystoreCred = getMembershipCredentials(
|
||||||
|
path = g.keystorePath.get(),
|
||||||
|
password = g.keystorePassword.get(),
|
||||||
|
query = keystoreQuery,
|
||||||
|
appInfo = RLNAppInfo,
|
||||||
|
).valueOr:
|
||||||
|
return err("failed to get the keystore credentials: " & $error)
|
||||||
|
|
||||||
|
g.membershipIndex = some(keystoreCred.treeIndex)
|
||||||
|
g.userMessageLimit = some(keystoreCred.userMessageLimit)
|
||||||
|
# now we check on the contract if the commitment actually has a membership
|
||||||
|
try:
|
||||||
|
let membershipExists = await wakuRlnContract
|
||||||
|
.memberExists(keystoreCred.identityCredential.idCommitment.toUInt256())
|
||||||
|
.call()
|
||||||
|
if membershipExists == 0:
|
||||||
|
return err("the commitment does not have a membership")
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to check if the commitment has a membership")
|
||||||
|
|
||||||
|
g.idCredentials = some(keystoreCred.identityCredential)
|
||||||
|
|
||||||
|
let metadataGetOptRes = g.rlnInstance.getMetadata()
|
||||||
|
if metadataGetOptRes.isErr():
|
||||||
|
warn "could not initialize with persisted rln metadata"
|
||||||
|
elif metadataGetOptRes.get().isSome():
|
||||||
|
let metadata = metadataGetOptRes.get().get()
|
||||||
|
if metadata.chainId != uint(g.chainId):
|
||||||
|
return err("persisted data: chain id mismatch")
|
||||||
|
if metadata.contractAddress != g.ethContractAddress.toLower():
|
||||||
|
return err("persisted data: contract address mismatch")
|
||||||
|
|
||||||
|
g.rlnRelayMaxMessageLimit =
|
||||||
|
cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call())
|
||||||
|
|
||||||
|
proc onDisconnect() {.async.} =
|
||||||
|
error "Ethereum client disconnected"
|
||||||
|
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
|
||||||
|
info "reconnecting with the Ethereum client, and restarting group sync",
|
||||||
|
fromBlock = fromBlock
|
||||||
|
var newEthRpc: Web3
|
||||||
|
g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"):
|
||||||
|
await newWeb3(g.ethClientUrl)
|
||||||
|
newEthRpc.ondisconnect = ethRpc.ondisconnect
|
||||||
|
g.ethRpc = some(newEthRpc)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await g.startOnchainSync()
|
||||||
|
except CatchableError, Exception:
|
||||||
|
g.onFatalErrorAction(
|
||||||
|
"failed to restart group sync" & ": " & getCurrentExceptionMsg()
|
||||||
|
)
|
||||||
|
|
||||||
|
ethRpc.ondisconnect = proc() =
|
||||||
|
asyncSpawn onDisconnect()
|
||||||
|
|
||||||
|
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
|
||||||
|
g.initialized = true
|
||||||
|
|
||||||
|
return ok()
|
||||||
Loading…
x
Reference in New Issue
Block a user