mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 08:23:08 +00:00
feat: make clean
This commit is contained in:
parent
3e33d3803c
commit
1b2be76e5f
@ -50,7 +50,7 @@ suite "Onchain group manager":
|
||||
manager.ethRpc.isSome()
|
||||
manager.wakuRlnContract.isSome()
|
||||
manager.initialized
|
||||
manager.rlnContractDeployedBlockNumber > 0.Quantity
|
||||
# manager.rlnContractDeployedBlockNumber > 0.Quantity
|
||||
manager.rlnRelayMaxMessageLimit == 100
|
||||
|
||||
asyncTest "should error on initialization when chainId does not match":
|
||||
|
||||
@ -1,701 +1,5 @@
|
||||
{.push raises: [].}
|
||||
|
||||
# {.push raises: [].}
|
||||
#
|
||||
# import
|
||||
# os,
|
||||
# web3,
|
||||
# web3/eth_api_types,
|
||||
# web3/primitives,
|
||||
# eth/keys as keys,
|
||||
# chronicles,
|
||||
# nimcrypto/keccak as keccak,
|
||||
# stint,
|
||||
# json,
|
||||
# std/tables,
|
||||
# stew/[byteutils, arrayops],
|
||||
# sequtils,
|
||||
# strutils
|
||||
# import
|
||||
# ../../../waku_keystore,
|
||||
# ../../rln,
|
||||
# ../../conversion_utils,
|
||||
# ../group_manager_base,
|
||||
# ./retry_wrapper
|
||||
#
|
||||
# from strutils import parseHexInt
|
||||
#
|
||||
# export group_manager_base
|
||||
#
|
||||
# logScope:
|
||||
# topics = "waku rln_relay onchain_group_manager"
|
||||
#
|
||||
# # using the when predicate does not work within the contract macro, hence need to dupe
|
||||
# contract(WakuRlnContract):
|
||||
# # this serves as an entrypoint into the rln membership set
|
||||
# proc register(idCommitment: UInt256, userMessageLimit: EthereumUInt32)
|
||||
# # Initializes the implementation contract (only used in unit tests)
|
||||
# proc initialize(maxMessageLimit: UInt256)
|
||||
# # this event is raised when a new member is registered
|
||||
# proc MemberRegistered(rateCommitment: UInt256, index: EthereumUInt32) {.event.}
|
||||
# # this function denotes existence of a given user
|
||||
# proc memberExists(idCommitment: Uint256): UInt256 {.view.}
|
||||
# # this constant describes the next index of a new member
|
||||
# proc commitmentIndex(): UInt256 {.view.}
|
||||
# # this constant describes the block number this contract was deployed on
|
||||
# proc deployedBlockNumber(): UInt256 {.view.}
|
||||
# # this constant describes max message limit of rln contract
|
||||
# proc MAX_MESSAGE_LIMIT(): UInt256 {.view.}
|
||||
# # this function returns the merkleProof for a given index
|
||||
# proc merkleProofElements(index: Uint256): seq[Uint256] {.view.}
|
||||
# # this function returns the Merkle root
|
||||
# proc root(): Uint256 {.view.}
|
||||
#
|
||||
# type
|
||||
# WakuRlnContractWithSender = Sender[WakuRlnContract]
|
||||
# OnchainGroupManager* = ref object of GroupManager
|
||||
# ethClientUrl*: string
|
||||
# ethPrivateKey*: Option[string]
|
||||
# ethContractAddress*: string
|
||||
# ethRpc*: Option[Web3]
|
||||
# rlnContractDeployedBlockNumber*: BlockNumber
|
||||
# wakuRlnContract*: Option[WakuRlnContractWithSender]
|
||||
# latestProcessedBlock*: BlockNumber
|
||||
# registrationTxHash*: Option[TxHash]
|
||||
# chainId*: uint
|
||||
# keystorePath*: Option[string]
|
||||
# keystorePassword*: Option[string]
|
||||
# registrationHandler*: Option[RegistrationHandler]
|
||||
# # this buffer exists to backfill appropriate roots for the merkle tree,
|
||||
# # in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
|
||||
# # because the average reorg depth is 1 to 2 blocks.
|
||||
# validRootBuffer*: Deque[MerkleNode]
|
||||
# # interval loop to shut down gracefully
|
||||
# blockFetchingActive*: bool
|
||||
#
|
||||
# const DefaultKeyStorePath* = "rlnKeystore.json"
|
||||
# const DefaultKeyStorePassword* = "password"
|
||||
#
|
||||
# const DefaultBlockPollRate* = 6.seconds
|
||||
#
|
||||
# template initializedGuard(g: OnchainGroupManager): untyped =
|
||||
# if not g.initialized:
|
||||
# raise newException(CatchableError, "OnchainGroupManager is not initialized")
|
||||
#
|
||||
# proc resultifiedInitGuard(g: OnchainGroupManager): GroupManagerResult[void] =
|
||||
# try:
|
||||
# initializedGuard(g)
|
||||
# return ok()
|
||||
# except CatchableError:
|
||||
# return err("OnchainGroupManager is not initialized")
|
||||
#
|
||||
# template retryWrapper(
|
||||
# g: OnchainGroupManager, res: auto, errStr: string, body: untyped
|
||||
# ): auto =
|
||||
# retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
|
||||
# body
|
||||
#
|
||||
# proc setMetadata*(
|
||||
# g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber)
|
||||
# ): GroupManagerResult[void] =
|
||||
# let normalizedBlock =
|
||||
# if lastProcessedBlock.isSome():
|
||||
# lastProcessedBlock.get()
|
||||
# else:
|
||||
# g.latestProcessedBlock
|
||||
# try:
|
||||
# let metadataSetRes = g.rlnInstance.setMetadata(
|
||||
# RlnMetadata(
|
||||
# lastProcessedBlock: normalizedBlock.uint64,
|
||||
# chainId: g.chainId,
|
||||
# contractAddress: g.ethContractAddress,
|
||||
# validRoots: g.validRoots.toSeq(),
|
||||
# )
|
||||
# )
|
||||
# if metadataSetRes.isErr():
|
||||
# return err("failed to persist rln metadata: " & metadataSetRes.error)
|
||||
# except CatchableError:
|
||||
# return err("failed to persist rln metadata: " & getCurrentExceptionMsg())
|
||||
# return ok()
|
||||
#
|
||||
# method atomicBatch*(
|
||||
# g: OnchainGroupManager,
|
||||
# start: MembershipIndex,
|
||||
# rateCommitments = newSeq[RawRateCommitment](),
|
||||
# toRemoveIndices = newSeq[MembershipIndex](),
|
||||
# ): Future[void] {.async: (raises: [Exception]), base.} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# waku_rln_membership_insertion_duration_seconds.nanosecondTime:
|
||||
# let operationSuccess =
|
||||
# g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices)
|
||||
# if not operationSuccess:
|
||||
# raise newException(CatchableError, "atomic batch operation failed")
|
||||
# # TODO: when slashing is enabled, we need to track slashed members
|
||||
# waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
|
||||
#
|
||||
# if g.registerCb.isSome():
|
||||
# var membersSeq = newSeq[Membership]()
|
||||
# for i in 0 ..< rateCommitments.len:
|
||||
# var index = start + MembershipIndex(i)
|
||||
# debug "registering member to callback",
|
||||
# rateCommitment = rateCommitments[i], index = index
|
||||
# let member = Membership(rateCommitment: rateCommitments[i], index: index)
|
||||
# membersSeq.add(member)
|
||||
# await g.registerCb.get()(membersSeq)
|
||||
#
|
||||
# g.validRootBuffer = g.slideRootQueue()
|
||||
#
|
||||
# method register*(
|
||||
# g: OnchainGroupManager, rateCommitment: RateCommitment
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# try:
|
||||
# let leaf = rateCommitment.toLeaf().get()
|
||||
# await g.registerBatch(@[leaf])
|
||||
# except CatchableError:
|
||||
# raise newException(ValueError, getCurrentExceptionMsg())
|
||||
#
|
||||
# method registerBatch*(
|
||||
# g: OnchainGroupManager, rateCommitments: seq[RawRateCommitment]
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# await g.atomicBatch(g.latestIndex, rateCommitments)
|
||||
# g.latestIndex += MembershipIndex(rateCommitments.len)
|
||||
#
|
||||
# method register*(
|
||||
# g: OnchainGroupManager,
|
||||
# identityCredential: IdentityCredential,
|
||||
# userMessageLimit: UserMessageLimit,
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# let ethRpc = g.ethRpc.get()
|
||||
# let wakuRlnContract = g.wakuRlnContract.get()
|
||||
#
|
||||
# var gasPrice: int
|
||||
# g.retryWrapper(gasPrice, "Failed to get gas price"):
|
||||
# int(await ethRpc.provider.eth_gasPrice()) * 2
|
||||
# let idCommitment = identityCredential.idCommitment.toUInt256()
|
||||
#
|
||||
# debug "registering the member",
|
||||
# idCommitment = idCommitment, userMessageLimit = userMessageLimit
|
||||
# var txHash: TxHash
|
||||
# g.retryWrapper(txHash, "Failed to register the member"):
|
||||
# await wakuRlnContract.register(idCommitment, userMessageLimit.stuint(32)).send(
|
||||
# gasPrice = gasPrice
|
||||
# )
|
||||
#
|
||||
# # wait for the transaction to be mined
|
||||
# var tsReceipt: ReceiptObject
|
||||
# g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
|
||||
# await ethRpc.getMinedTransactionReceipt(txHash)
|
||||
# debug "registration transaction mined", txHash = txHash
|
||||
# g.registrationTxHash = some(txHash)
|
||||
# # the receipt topic holds the hash of signature of the raised events
|
||||
# # TODO: make this robust. search within the event list for the event
|
||||
# debug "ts receipt", receipt = tsReceipt[]
|
||||
#
|
||||
# if tsReceipt.status.isNone() or tsReceipt.status.get() != 1.Quantity:
|
||||
# raise newException(ValueError, "register: transaction failed")
|
||||
#
|
||||
# let firstTopic = tsReceipt.logs[0].topics[0]
|
||||
# # the hash of the signature of MemberRegistered(uint256,uint32) event is equal to the following hex value
|
||||
# if firstTopic !=
|
||||
# cast[FixedBytes[32]](keccak.keccak256.digest("MemberRegistered(uint256,uint32)").data):
|
||||
# raise newException(ValueError, "register: unexpected event signature")
|
||||
#
|
||||
# # the arguments of the raised event i.e., MemberRegistered are encoded inside the data field
|
||||
# # data = rateCommitment encoded as 256 bits || index encoded as 32 bits
|
||||
# let arguments = tsReceipt.logs[0].data
|
||||
# debug "tx log data", arguments = arguments
|
||||
# let
|
||||
# # In TX log data, uints are encoded in big endian
|
||||
# membershipIndex = UInt256.fromBytesBE(arguments[32 ..^ 1])
|
||||
#
|
||||
# debug "parsed membershipIndex", membershipIndex
|
||||
# g.userMessageLimit = some(userMessageLimit)
|
||||
# g.membershipIndex = some(membershipIndex.toMembershipIndex())
|
||||
#
|
||||
# # don't handle member insertion into the tree here, it will be handled by the event listener
|
||||
# return
|
||||
#
|
||||
# method withdraw*(
|
||||
# g: OnchainGroupManager, idCommitment: IDCommitment
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g) # TODO: after slashing is enabled on the contract
|
||||
#
|
||||
# method withdrawBatch*(
|
||||
# g: OnchainGroupManager, idCommitments: seq[IDCommitment]
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# # TODO: after slashing is enabled on the contract, use atomicBatch internally
|
||||
#
|
||||
# proc parseEvent(
|
||||
# event: type MemberRegistered, log: JsonNode
|
||||
# ): GroupManagerResult[Membership] =
|
||||
# ## parses the `data` parameter of the `MemberRegistered` event `log`
|
||||
# ## returns an error if it cannot parse the `data` parameter
|
||||
# var rateCommitment: UInt256
|
||||
# var index: UInt256
|
||||
# var data: seq[byte]
|
||||
# try:
|
||||
# data = hexToSeqByte(log["data"].getStr())
|
||||
# except ValueError:
|
||||
# return err(
|
||||
# "failed to parse the data field of the MemberRegistered event: " &
|
||||
# getCurrentExceptionMsg()
|
||||
# )
|
||||
# var offset = 0
|
||||
# try:
|
||||
# # Parse the rateCommitment
|
||||
# offset += decode(data, 0, offset, rateCommitment)
|
||||
# # Parse the index
|
||||
# offset += decode(data, 0, offset, index)
|
||||
# return ok(
|
||||
# Membership(
|
||||
# rateCommitment: rateCommitment.toRateCommitment(),
|
||||
# index: index.toMembershipIndex(),
|
||||
# )
|
||||
# )
|
||||
# except CatchableError:
|
||||
# return err("failed to parse the data field of the MemberRegistered event")
|
||||
#
|
||||
# type BlockTable* = OrderedTable[BlockNumber, seq[(Membership, bool)]]
|
||||
#
|
||||
# proc backfillRootQueue*(
|
||||
# g: OnchainGroupManager, len: uint
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# if len > 0:
|
||||
# # backfill the tree's acceptable roots
|
||||
# for i in 0 .. len - 1:
|
||||
# # remove the last root
|
||||
# g.validRoots.popLast()
|
||||
# for i in 0 .. len - 1:
|
||||
# # add the backfilled root
|
||||
# g.validRoots.addLast(g.validRootBuffer.popLast())
|
||||
#
|
||||
# proc insert(
|
||||
# blockTable: var BlockTable,
|
||||
# blockNumber: BlockNumber,
|
||||
# member: Membership,
|
||||
# removed: bool,
|
||||
# ) =
|
||||
# let memberTuple = (member, removed)
|
||||
# if blockTable.hasKeyOrPut(blockNumber, @[memberTuple]):
|
||||
# try:
|
||||
# blockTable[blockNumber].add(memberTuple)
|
||||
# except KeyError: # qed
|
||||
# error "could not insert member into block table",
|
||||
# blockNumber = blockNumber, member = member
|
||||
#
|
||||
# proc getRawEvents(
|
||||
# g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber
|
||||
# ): Future[JsonNode] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# let ethRpc = g.ethRpc.get()
|
||||
# let wakuRlnContract = g.wakuRlnContract.get()
|
||||
#
|
||||
# var eventStrs: seq[JsonString]
|
||||
# g.retryWrapper(eventStrs, "Failed to get the events"):
|
||||
# await wakuRlnContract.getJsonLogs(
|
||||
# MemberRegistered,
|
||||
# fromBlock = Opt.some(fromBlock.blockId()),
|
||||
# toBlock = Opt.some(toBlock.blockId()),
|
||||
# )
|
||||
#
|
||||
# var events = newJArray()
|
||||
# for eventStr in eventStrs:
|
||||
# events.add(parseJson(eventStr.string))
|
||||
# return events
|
||||
#
|
||||
# proc getBlockTable(
|
||||
# g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber
|
||||
# ): Future[BlockTable] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# var blockTable = default(BlockTable)
|
||||
#
|
||||
# let events = await g.getRawEvents(fromBlock, toBlock)
|
||||
#
|
||||
# if events.len == 0:
|
||||
# trace "no events found"
|
||||
# return blockTable
|
||||
#
|
||||
# for event in events:
|
||||
# let blockNumber = parseHexInt(event["blockNumber"].getStr()).BlockNumber
|
||||
# let removed = event["removed"].getBool()
|
||||
# let parsedEventRes = parseEvent(MemberRegistered, event)
|
||||
# if parsedEventRes.isErr():
|
||||
# error "failed to parse the MemberRegistered event", error = parsedEventRes.error()
|
||||
# raise newException(ValueError, "failed to parse the MemberRegistered event")
|
||||
# let parsedEvent = parsedEventRes.get()
|
||||
# blockTable.insert(blockNumber, parsedEvent, removed)
|
||||
#
|
||||
# return blockTable
|
||||
#
|
||||
# proc handleEvents(
|
||||
# g: OnchainGroupManager, blockTable: BlockTable
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# for blockNumber, members in blockTable.pairs():
|
||||
# try:
|
||||
# let startIndex = blockTable[blockNumber].filterIt(not it[1])[0][0].index
|
||||
# let removalIndices = members.filterIt(it[1]).mapIt(it[0].index)
|
||||
# let rateCommitments = members.mapIt(it[0].rateCommitment)
|
||||
# await g.atomicBatch(
|
||||
# start = startIndex,
|
||||
# rateCommitments = rateCommitments,
|
||||
# toRemoveIndices = removalIndices,
|
||||
# )
|
||||
#
|
||||
# g.latestIndex = startIndex + MembershipIndex(rateCommitments.len)
|
||||
# trace "new members added to the Merkle tree",
|
||||
# commitments = rateCommitments.mapIt(it.inHex)
|
||||
# except CatchableError:
|
||||
# error "failed to insert members into the tree", error = getCurrentExceptionMsg()
|
||||
# raise newException(ValueError, "failed to insert members into the tree")
|
||||
#
|
||||
# return
|
||||
#
|
||||
# proc handleRemovedEvents(
|
||||
# g: OnchainGroupManager, blockTable: BlockTable
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# # count number of blocks that have been removed
|
||||
# var numRemovedBlocks: uint = 0
|
||||
# for blockNumber, members in blockTable.pairs():
|
||||
# if members.anyIt(it[1]):
|
||||
# numRemovedBlocks += 1
|
||||
#
|
||||
# await g.backfillRootQueue(numRemovedBlocks)
|
||||
#
|
||||
# proc getAndHandleEvents(
|
||||
# g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber
|
||||
# ): Future[bool] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
# let blockTable = await g.getBlockTable(fromBlock, toBlock)
|
||||
# try:
|
||||
# await g.handleEvents(blockTable)
|
||||
# await g.handleRemovedEvents(blockTable)
|
||||
# except CatchableError:
|
||||
# error "failed to handle events", error = getCurrentExceptionMsg()
|
||||
# raise newException(ValueError, "failed to handle events")
|
||||
#
|
||||
# g.latestProcessedBlock = toBlock
|
||||
# return true
|
||||
#
|
||||
# proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) =
|
||||
# g.blockFetchingActive = false
|
||||
#
|
||||
# proc runIntervalLoop() {.async, gcsafe.} =
|
||||
# g.blockFetchingActive = true
|
||||
#
|
||||
# while g.blockFetchingActive:
|
||||
# var retCb: bool
|
||||
# g.retryWrapper(retCb, "Failed to run the interval block fetching loop"):
|
||||
# await cb()
|
||||
# await sleepAsync(interval)
|
||||
#
|
||||
# # using asyncSpawn is OK here since
|
||||
# # we make use of the error handling provided by
|
||||
# # OnFatalErrorHandler
|
||||
# asyncSpawn runIntervalLoop()
|
||||
#
|
||||
# proc getNewBlockCallback(g: OnchainGroupManager): proc =
|
||||
# let ethRpc = g.ethRpc.get()
|
||||
# proc wrappedCb(): Future[bool] {.async, gcsafe.} =
|
||||
# var latestBlock: BlockNumber
|
||||
# g.retryWrapper(latestBlock, "Failed to get the latest block number"):
|
||||
# cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
#
|
||||
# if latestBlock <= g.latestProcessedBlock:
|
||||
# return
|
||||
# # get logs from the last block
|
||||
# # inc by 1 to prevent double processing
|
||||
# let fromBlock = g.latestProcessedBlock + 1
|
||||
# var handleBlockRes: bool
|
||||
# g.retryWrapper(handleBlockRes, "Failed to handle new block"):
|
||||
# await g.getAndHandleEvents(fromBlock, latestBlock)
|
||||
#
|
||||
# # cannot use isOkOr here because results in a compile-time error that
|
||||
# # shows the error is void for some reason
|
||||
# let setMetadataRes = g.setMetadata()
|
||||
# if setMetadataRes.isErr():
|
||||
# error "failed to persist rln metadata", error = setMetadataRes.error
|
||||
#
|
||||
# return handleBlockRes
|
||||
#
|
||||
# return wrappedCb
|
||||
#
|
||||
# proc startListeningToEvents(
|
||||
# g: OnchainGroupManager
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# let ethRpc = g.ethRpc.get()
|
||||
# let newBlockCallback = g.getNewBlockCallback()
|
||||
# g.runInInterval(newBlockCallback, DefaultBlockPollRate)
|
||||
#
|
||||
# proc batchAwaitBlockHandlingFuture(
|
||||
# g: OnchainGroupManager, futs: seq[Future[bool]]
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# for fut in futs:
|
||||
# try:
|
||||
# var handleBlockRes: bool
|
||||
# g.retryWrapper(handleBlockRes, "Failed to handle block"):
|
||||
# await fut
|
||||
# except CatchableError:
|
||||
# raise newException(
|
||||
# CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg()
|
||||
# )
|
||||
#
|
||||
# proc startOnchain(
|
||||
# g: OnchainGroupManager
|
||||
# ): Future[void] {.async: (raises: [Exception]).} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# let ethRpc = g.ethRpc.get()
|
||||
#
|
||||
# # static block chunk size
|
||||
# let blockChunkSize = 2_000.BlockNumber
|
||||
# # delay between rpc calls to not overload the rate limit
|
||||
# let rpcDelay = 200.milliseconds
|
||||
# # max number of futures to run concurrently
|
||||
# let maxFutures = 10
|
||||
#
|
||||
# var fromBlock: BlockNumber =
|
||||
# if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
|
||||
# info "syncing from last processed block", blockNumber = g.latestProcessedBlock
|
||||
# g.latestProcessedBlock + 1
|
||||
# else:
|
||||
# info "syncing from rln contract deployed block",
|
||||
# blockNumber = g.rlnContractDeployedBlockNumber
|
||||
# g.rlnContractDeployedBlockNumber
|
||||
#
|
||||
# var futs = newSeq[Future[bool]]()
|
||||
# var currentLatestBlock: BlockNumber
|
||||
# g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
|
||||
# cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
#
|
||||
# try:
|
||||
# # we always want to sync from last processed block => latest
|
||||
# # chunk events
|
||||
# while true:
|
||||
# # if the fromBlock is less than 2k blocks behind the current block
|
||||
# # then fetch the new toBlock
|
||||
# if fromBlock >= currentLatestBlock:
|
||||
# break
|
||||
#
|
||||
# if fromBlock + blockChunkSize > currentLatestBlock:
|
||||
# g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
|
||||
# cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
#
|
||||
# let toBlock = min(fromBlock + blockChunkSize, currentLatestBlock)
|
||||
# debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
|
||||
# await sleepAsync(rpcDelay)
|
||||
# futs.add(g.getAndHandleEvents(fromBlock, toBlock))
|
||||
# if futs.len >= maxFutures or toBlock == currentLatestBlock:
|
||||
# await g.batchAwaitBlockHandlingFuture(futs)
|
||||
# g.setMetadata(lastProcessedBlock = some(toBlock)).isOkOr:
|
||||
# error "failed to persist rln metadata", error = $error
|
||||
# futs = newSeq[Future[bool]]()
|
||||
# fromBlock = toBlock + 1
|
||||
# except CatchableError:
|
||||
# raise newException(
|
||||
# CatchableError,
|
||||
# "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg(),
|
||||
# )
|
||||
#
|
||||
# # listen to blockheaders and contract events
|
||||
# try:
|
||||
# await g.startListeningToEvents()
|
||||
# except CatchableError:
|
||||
# raise newException(
|
||||
# ValueError, "failed to start listening to events: " & getCurrentExceptionMsg()
|
||||
# )
|
||||
#
|
||||
# method startGroupSync*(
|
||||
# g: OnchainGroupManager
|
||||
# ): Future[GroupManagerResult[void]] {.async.} =
|
||||
# ?resultifiedInitGuard(g)
|
||||
# # Get archive history
|
||||
# try:
|
||||
# await startOnchain(g)
|
||||
# return ok()
|
||||
# except CatchableError, Exception:
|
||||
# return err("failed to start group sync: " & getCurrentExceptionMsg())
|
||||
#
|
||||
# method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} =
|
||||
# g.registerCb = some(cb)
|
||||
#
|
||||
# method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
|
||||
# g.withdrawCb = some(cb)
|
||||
#
|
||||
# method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} =
|
||||
# # check if the Ethereum client is reachable
|
||||
# var ethRpc: Web3
|
||||
# g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
|
||||
# await newWeb3(g.ethClientUrl)
|
||||
#
|
||||
# var fetchedChainId: uint
|
||||
# g.retryWrapper(fetchedChainId, "Failed to get the chain id"):
|
||||
# uint(await ethRpc.provider.eth_chainId())
|
||||
#
|
||||
# # Set the chain id
|
||||
# if g.chainId == 0:
|
||||
# warn "Chain ID not set in config, using RPC Provider's Chain ID",
|
||||
# providerChainId = fetchedChainId
|
||||
#
|
||||
# if g.chainId != 0 and g.chainId != fetchedChainId:
|
||||
# return err(
|
||||
# "The RPC Provided a Chain ID which is different than the provided Chain ID: provided = " &
|
||||
# $g.chainId & ", actual = " & $fetchedChainId
|
||||
# )
|
||||
#
|
||||
# g.chainId = fetchedChainId
|
||||
#
|
||||
# if g.ethPrivateKey.isSome():
|
||||
# let pk = g.ethPrivateKey.get()
|
||||
# let parsedPk = keys.PrivateKey.fromHex(pk).valueOr:
|
||||
# return err("failed to parse the private key" & ": " & $error)
|
||||
# ethRpc.privateKey = Opt.some(parsedPk)
|
||||
# ethRpc.defaultAccount =
|
||||
# ethRpc.privateKey.get().toPublicKey().toCanonicalAddress().Address
|
||||
#
|
||||
# let contractAddress = web3.fromHex(web3.Address, g.ethContractAddress)
|
||||
# let wakuRlnContract = ethRpc.contractSender(WakuRlnContract, contractAddress)
|
||||
#
|
||||
# g.ethRpc = some(ethRpc)
|
||||
# g.wakuRlnContract = some(wakuRlnContract)
|
||||
#
|
||||
# if g.keystorePath.isSome() and g.keystorePassword.isSome():
|
||||
# if not fileExists(g.keystorePath.get()):
|
||||
# error "File provided as keystore path does not exist", path = g.keystorePath.get()
|
||||
# return err("File provided as keystore path does not exist")
|
||||
#
|
||||
# var keystoreQuery = KeystoreMembership(
|
||||
# membershipContract:
|
||||
# MembershipContract(chainId: $g.chainId, address: g.ethContractAddress)
|
||||
# )
|
||||
# if g.membershipIndex.isSome():
|
||||
# keystoreQuery.treeIndex = MembershipIndex(g.membershipIndex.get())
|
||||
# waku_rln_membership_credentials_import_duration_seconds.nanosecondTime:
|
||||
# let keystoreCred = getMembershipCredentials(
|
||||
# path = g.keystorePath.get(),
|
||||
# password = g.keystorePassword.get(),
|
||||
# query = keystoreQuery,
|
||||
# appInfo = RLNAppInfo,
|
||||
# ).valueOr:
|
||||
# return err("failed to get the keystore credentials: " & $error)
|
||||
#
|
||||
# g.membershipIndex = some(keystoreCred.treeIndex)
|
||||
# g.userMessageLimit = some(keystoreCred.userMessageLimit)
|
||||
# # now we check on the contract if the commitment actually has a membership
|
||||
# try:
|
||||
# let membershipExists = await wakuRlnContract
|
||||
# .memberExists(keystoreCred.identityCredential.idCommitment.toUInt256())
|
||||
# .call()
|
||||
# if membershipExists == 0:
|
||||
# return err("the commitment does not have a membership")
|
||||
# except CatchableError:
|
||||
# return err("failed to check if the commitment has a membership")
|
||||
#
|
||||
# g.idCredentials = some(keystoreCred.identityCredential)
|
||||
#
|
||||
# let metadataGetOptRes = g.rlnInstance.getMetadata()
|
||||
# if metadataGetOptRes.isErr():
|
||||
# warn "could not initialize with persisted rln metadata"
|
||||
# elif metadataGetOptRes.get().isSome():
|
||||
# let metadata = metadataGetOptRes.get().get()
|
||||
# if metadata.chainId != uint(g.chainId):
|
||||
# return err("persisted data: chain id mismatch")
|
||||
#
|
||||
# if metadata.contractAddress != g.ethContractAddress.toLower():
|
||||
# return err("persisted data: contract address mismatch")
|
||||
# g.latestProcessedBlock = metadata.lastProcessedBlock.BlockNumber
|
||||
# g.validRoots = metadata.validRoots.toDeque()
|
||||
#
|
||||
# var deployedBlockNumber: Uint256
|
||||
# g.retryWrapper(
|
||||
# deployedBlockNumber,
|
||||
# "Failed to get the deployed block number. Have you set the correct contract address?",
|
||||
# ):
|
||||
# await wakuRlnContract.deployedBlockNumber().call()
|
||||
# debug "using rln contract", deployedBlockNumber, rlnContractAddress = contractAddress
|
||||
# g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
|
||||
# g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
|
||||
# g.rlnRelayMaxMessageLimit =
|
||||
# cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call())
|
||||
#
|
||||
# proc onDisconnect() {.async.} =
|
||||
# error "Ethereum client disconnected"
|
||||
# let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
|
||||
# info "reconnecting with the Ethereum client, and restarting group sync",
|
||||
# fromBlock = fromBlock
|
||||
# var newEthRpc: Web3
|
||||
# g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"):
|
||||
# await newWeb3(g.ethClientUrl)
|
||||
# newEthRpc.ondisconnect = ethRpc.ondisconnect
|
||||
# g.ethRpc = some(newEthRpc)
|
||||
#
|
||||
# try:
|
||||
# await g.startOnchain()
|
||||
# except CatchableError, Exception:
|
||||
# g.onFatalErrorAction(
|
||||
# "failed to restart group sync" & ": " & getCurrentExceptionMsg()
|
||||
# )
|
||||
#
|
||||
# ethRpc.ondisconnect = proc() =
|
||||
# asyncSpawn onDisconnect()
|
||||
#
|
||||
# waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
|
||||
# g.initialized = true
|
||||
#
|
||||
# return ok()
|
||||
#
|
||||
# method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} =
|
||||
# g.blockFetchingActive = false
|
||||
#
|
||||
# if g.ethRpc.isSome():
|
||||
# g.ethRpc.get().ondisconnect = nil
|
||||
# await g.ethRpc.get().close()
|
||||
# let flushed = g.rlnInstance.flush()
|
||||
# if not flushed:
|
||||
# error "failed to flush to the tree db"
|
||||
#
|
||||
# g.initialized = false
|
||||
#
|
||||
# proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async, gcsafe.} =
|
||||
# let ethRpc = g.ethRpc.get()
|
||||
#
|
||||
# var syncing: SyncingStatus
|
||||
# g.retryWrapper(syncing, "Failed to get the syncing status"):
|
||||
# await ethRpc.provider.eth_syncing()
|
||||
# return syncing.syncing
|
||||
#
|
||||
# method isReady*(g: OnchainGroupManager): Future[bool] {.async.} =
|
||||
# initializedGuard(g)
|
||||
#
|
||||
# if g.ethRpc.isNone():
|
||||
# return false
|
||||
#
|
||||
# var currentBlock: BlockNumber
|
||||
# g.retryWrapper(currentBlock, "Failed to get the current block number"):
|
||||
# cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber())
|
||||
#
|
||||
# # the node is still able to process messages if it is behind the latest block by a factor of the valid roots
|
||||
# if u256(g.latestProcessedBlock.uint64) < (u256(currentBlock) - u256(g.validRoots.len)):
|
||||
# return false
|
||||
#
|
||||
# return not (await g.isSyncing())
|
||||
|
||||
import
|
||||
os,
|
||||
web3,
|
||||
|
||||
@ -1,417 +0,0 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[tables, options],
|
||||
chronos,
|
||||
web3,
|
||||
stint,
|
||||
../on_chain/group_manager as onchain,
|
||||
../../rln,
|
||||
../../conversion_utils
|
||||
|
||||
logScope:
|
||||
topics = "waku rln_relay onchain_sync_group_manager"
|
||||
|
||||
# using the when predicate does not work within the contract macro, hence need to dupe
|
||||
contract(WakuRlnContract):
|
||||
# this serves as an entrypoint into the rln membership set
|
||||
proc register(idCommitment: UInt256, userMessageLimit: EthereumUInt32)
|
||||
# Initializes the implementation contract (only used in unit tests)
|
||||
proc initialize(maxMessageLimit: UInt256)
|
||||
# this event is raised when a new member is registered
|
||||
proc MemberRegistered(rateCommitment: UInt256, index: EthereumUInt32) {.event.}
|
||||
# this function denotes existence of a given user
|
||||
proc memberExists(idCommitment: Uint256): UInt256 {.view.}
|
||||
# this constant describes the next index of a new member
|
||||
proc commitmentIndex(): UInt256 {.view.}
|
||||
# this constant describes the block number this contract was deployed on
|
||||
proc deployedBlockNumber(): UInt256 {.view.}
|
||||
# this constant describes max message limit of rln contract
|
||||
proc MAX_MESSAGE_LIMIT(): UInt256 {.view.}
|
||||
# this function returns the merkleProof for a given index
|
||||
proc merkleProofElements(index: Uint256): seq[Uint256] {.view.}
|
||||
# this function returns the Merkle root
|
||||
proc root(): Uint256 {.view.}
|
||||
|
||||
type OnchainSyncGroupManager* = ref object of GroupManager
|
||||
ethClientUrl*: string
|
||||
ethContractAddress*: string
|
||||
ethRpc*: Option[Web3]
|
||||
wakuRlnContract*: Option[WakuRlnContractWithSender]
|
||||
chainId*: uint
|
||||
keystorePath*: Option[string]
|
||||
keystorePassword*: Option[string]
|
||||
registrationHandler*: Option[RegistrationHandler]
|
||||
validRootBuffer*: Deque[MerkleNode]
|
||||
|
||||
proc fetchMerkleProof*(g: OnchainSyncGroupManager) {.async.} =
|
||||
let index = stuint(g.membershipIndex.get(), 256)
|
||||
try:
|
||||
let merkleProofInvocation = g.wakuRlnContract.get().merkleProofElements(index)
|
||||
let merkleProof = await merkleProofInvocation.call()
|
||||
# Await the contract call and extract the result
|
||||
return merkleProof
|
||||
except CatchableError:
|
||||
error "Failed to fetch merkle proof: " & getCurrentExceptionMsg()
|
||||
|
||||
proc fetchMerkleRoot*(g: OnchainSyncGroupManager) {.async.} =
|
||||
let merkleRootInvocation = g.wakuRlnContract.get().root()
|
||||
let merkleRoot = await merkleRootInvocation.call()
|
||||
return merkleRoot
|
||||
|
||||
template initializedGuard(g: OnchainGroupManager): untyped =
|
||||
if not g.initialized:
|
||||
raise newException(CatchableError, "OnchainGroupManager is not initialized")
|
||||
|
||||
template retryWrapper(
|
||||
g: OnchainSyncGroupManager, res: auto, errStr: string, body: untyped
|
||||
): auto =
|
||||
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
|
||||
body
|
||||
|
||||
method validateRoot*(
|
||||
g: OnchainSyncGroupManager, root: MerkleNode
|
||||
): bool {.base, gcsafe, raises: [].} =
|
||||
if g.validRootBuffer.find(root) >= 0:
|
||||
return true
|
||||
return false
|
||||
|
||||
proc slideRootQueue*(g: OnchainSyncGroupManager): untyped =
|
||||
let rootRes = g.fetchMerkleRoot()
|
||||
if rootRes.isErr():
|
||||
raise newException(ValueError, "failed to get merkle root")
|
||||
let rootAfterUpdate = rootRes.get()
|
||||
|
||||
let overflowCount = g.validRootBuffer.len - AcceptableRootWindowSize + 1
|
||||
if overflowCount > 0:
|
||||
for i in 0 ..< overflowCount:
|
||||
g.validRootBuffer.popFirst()
|
||||
|
||||
g.validRootBuffer.addLast(rootAfterUpdate)
|
||||
|
||||
method atomicBatch*(
|
||||
g: OnchainSyncGroupManager,
|
||||
start: MembershipIndex,
|
||||
rateCommitments = newSeq[RawRateCommitment](),
|
||||
toRemoveIndices = newSeq[MembershipIndex](),
|
||||
): Future[void] {.async: (raises: [Exception]), base.} =
|
||||
initializedGuard(g)
|
||||
|
||||
if g.registerCb.isSome():
|
||||
var membersSeq = newSeq[Membership]()
|
||||
for i in 0 ..< rateCommitments.len:
|
||||
var index = start + MembershipIndex(i)
|
||||
debug "registering member to callback",
|
||||
rateCommitment = rateCommitments[i], index = index
|
||||
let member = Membership(rateCommitment: rateCommitments[i], index: index)
|
||||
membersSeq.add(member)
|
||||
await g.registerCb.get()(membersSeq)
|
||||
|
||||
g.slideRootQueue()
|
||||
|
||||
method register*(
|
||||
g: OnchainSyncGroupManager, rateCommitment: RateCommitment
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
|
||||
try:
|
||||
let leaf = rateCommitment.toLeaf().get()
|
||||
await g.registerBatch(@[leaf])
|
||||
except CatchableError:
|
||||
raise newException(ValueError, getCurrentExceptionMsg())
|
||||
|
||||
method registerBatch*(
|
||||
g: OnchainSyncGroupManager, rateCommitments: seq[RawRateCommitment]
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
|
||||
await g.atomicBatch(g.latestIndex, rateCommitments)
|
||||
g.latestIndex += MembershipIndex(rateCommitments.len)
|
||||
|
||||
method register*(
|
||||
g: OnchainSyncGroupManager,
|
||||
identityCredential: IdentityCredential,
|
||||
userMessageLimit: UserMessageLimit,
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
|
||||
let ethRpc = g.ethRpc.get()
|
||||
let wakuRlnContract = g.wakuRlnContract.get()
|
||||
|
||||
var gasPrice: int
|
||||
g.retryWrapper(gasPrice, "Failed to get gas price"):
|
||||
int(await ethRpc.provider.eth_gasPrice()) * 2
|
||||
let idCommitment = identityCredential.idCommitment.toUInt256()
|
||||
|
||||
debug "registering the member",
|
||||
idCommitment = idCommitment, userMessageLimit = userMessageLimit
|
||||
var txHash: TxHash
|
||||
g.retryWrapper(txHash, "Failed to register the member"):
|
||||
await wakuRlnContract.register(idCommitment, userMessageLimit.stuint(32)).send(
|
||||
gasPrice = gasPrice
|
||||
)
|
||||
|
||||
# wait for the transaction to be mined
|
||||
var tsReceipt: ReceiptObject
|
||||
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
|
||||
await ethRpc.getMinedTransactionReceipt(txHash)
|
||||
debug "registration transaction mined", txHash = txHash
|
||||
g.registrationTxHash = some(txHash)
|
||||
# the receipt topic holds the hash of signature of the raised events
|
||||
# TODO: make this robust. search within the event list for the event
|
||||
debug "ts receipt", receipt = tsReceipt[]
|
||||
|
||||
if tsReceipt.status.isNone() or tsReceipt.status.get() != 1.Quantity:
|
||||
raise newException(ValueError, "register: transaction failed")
|
||||
|
||||
let firstTopic = tsReceipt.logs[0].topics[0]
|
||||
# the hash of the signature of MemberRegistered(uint256,uint32) event is equal to the following hex value
|
||||
if firstTopic !=
|
||||
cast[FixedBytes[32]](keccak.keccak256.digest("MemberRegistered(uint256,uint32)").data):
|
||||
raise newException(ValueError, "register: unexpected event signature")
|
||||
|
||||
# the arguments of the raised event i.e., MemberRegistered are encoded inside the data field
|
||||
# data = rateCommitment encoded as 256 bits || index encoded as 32 bits
|
||||
let arguments = tsReceipt.logs[0].data
|
||||
debug "tx log data", arguments = arguments
|
||||
let
|
||||
# In TX log data, uints are encoded in big endian
|
||||
membershipIndex = UInt256.fromBytesBE(arguments[32 ..^ 1])
|
||||
|
||||
debug "parsed membershipIndex", membershipIndex
|
||||
g.userMessageLimit = some(userMessageLimit)
|
||||
g.membershipIndex = some(membershipIndex.toMembershipIndex())
|
||||
|
||||
return
|
||||
|
||||
method withdraw*(
|
||||
g: OnchainSyncGroupManager, idCommitment: IDCommitment
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g) # TODO: after slashing is enabled on the contract
|
||||
|
||||
method withdrawBatch*(
|
||||
g: OnchainSyncGroupManager, idCommitments: seq[IDCommitment]
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
|
||||
method generateProof*(
|
||||
g: OnchainSyncGroupManager,
|
||||
data: seq[byte],
|
||||
epoch: Epoch,
|
||||
messageId: MessageId,
|
||||
rlnIdentifier = DefaultRlnIdentifier,
|
||||
): Future[GroupManagerResult[RateLimitProof]] {.async.} =
|
||||
## Generates an RLN proof using the cached Merkle proof and custom witness
|
||||
# Ensure identity credentials and membership index are set
|
||||
if g.idCredentials.isNone():
|
||||
return err("identity credentials are not set")
|
||||
if g.membershipIndex.isNone():
|
||||
return err("membership index is not set")
|
||||
if g.userMessageLimit.isNone():
|
||||
return err("user message limit is not set")
|
||||
|
||||
# Prepare the witness
|
||||
let witness = Witness(
|
||||
identity_secret: g.idCredentials.get().idSecretHash,
|
||||
user_message_limit: g.userMessageLimit.get(),
|
||||
message_id: messageId,
|
||||
path_elements: g.fetchMerkleProof(),
|
||||
identity_path_index: g.membershipIndex.get(),
|
||||
x: data,
|
||||
external_nullifier: poseidon_hash([epoch, rln_identifier]),
|
||||
)
|
||||
|
||||
let serializedWitness = serialize(witness)
|
||||
var inputBuffer = toBuffer(serializedWitness)
|
||||
|
||||
# Generate the proof using the zerokit API
|
||||
var outputBuffer: Buffer
|
||||
let success = generate_proof_with_witness(
|
||||
g.fetchMerkleRoot(), addr inputBuffer, addr outputBuffer
|
||||
)
|
||||
if not success:
|
||||
return err("Failed to generate proof")
|
||||
|
||||
# Parse the proof into a RateLimitProof object
|
||||
var proofValue = cast[ptr array[320, byte]](outputBuffer.`ptr`)
|
||||
let proofBytes: array[320, byte] = proofValue[]
|
||||
|
||||
## parse the proof as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ]
|
||||
let
|
||||
proofOffset = 128
|
||||
rootOffset = proofOffset + 32
|
||||
externalNullifierOffset = rootOffset + 32
|
||||
shareXOffset = externalNullifierOffset + 32
|
||||
shareYOffset = shareXOffset + 32
|
||||
nullifierOffset = shareYOffset + 32
|
||||
|
||||
var
|
||||
zkproof: ZKSNARK
|
||||
proofRoot, shareX, shareY: MerkleNode
|
||||
externalNullifier: ExternalNullifier
|
||||
nullifier: Nullifier
|
||||
|
||||
discard zkproof.copyFrom(proofBytes[0 .. proofOffset - 1])
|
||||
discard proofRoot.copyFrom(proofBytes[proofOffset .. rootOffset - 1])
|
||||
discard
|
||||
externalNullifier.copyFrom(proofBytes[rootOffset .. externalNullifierOffset - 1])
|
||||
discard shareX.copyFrom(proofBytes[externalNullifierOffset .. shareXOffset - 1])
|
||||
discard shareY.copyFrom(proofBytes[shareXOffset .. shareYOffset - 1])
|
||||
discard nullifier.copyFrom(proofBytes[shareYOffset .. nullifierOffset - 1])
|
||||
|
||||
# Create the RateLimitProof object
|
||||
let output = RateLimitProof(
|
||||
proof: zkproof,
|
||||
merkleRoot: proofRoot,
|
||||
externalNullifier: externalNullifier,
|
||||
epoch: epoch,
|
||||
rlnIdentifier: rlnIdentifier,
|
||||
shareX: shareX,
|
||||
shareY: shareY,
|
||||
nullifier: nullifier,
|
||||
)
|
||||
return ok(output)
|
||||
|
||||
method verifyProof*(
|
||||
g: OnchainSyncGroupManager, input: openArray[byte], proof: RateLimitProof
|
||||
): GroupManagerResult[bool] {.base, gcsafe, raises: [].} =
|
||||
## verifies the proof, returns an error if the proof verification fails
|
||||
## returns true if the proof is valid
|
||||
var normalizedProof = proof
|
||||
# when we do this, we ensure that we compute the proof for the derived value
|
||||
# of the externalNullifier. The proof verification will fail if a malicious peer
|
||||
# attaches invalid epoch+rlnidentifier pair
|
||||
normalizedProof.externalNullifier = poseidon_hash([epoch, rln_identifier]).valueOr:
|
||||
return err("could not construct the external nullifier")
|
||||
|
||||
var
|
||||
proofBytes = serialize(normalizedProof, data)
|
||||
proofBuffer = proofBytes.toBuffer()
|
||||
validProof: bool
|
||||
rootsBytes = serialize(validRoots)
|
||||
rootsBuffer = rootsBytes.toBuffer()
|
||||
|
||||
trace "serialized proof", proof = byteutils.toHex(proofBytes)
|
||||
|
||||
let verifyIsSuccessful = verify_with_roots(
|
||||
g.fetchMerkleRoot(), addr proofBuffer, addr rootsBuffer, addr validProof
|
||||
)
|
||||
if not verifyIsSuccessful:
|
||||
# something went wrong in verification call
|
||||
warn "could not verify validity of the proof", proof = proof
|
||||
return err("could not verify the proof")
|
||||
|
||||
if not validProof:
|
||||
return ok(false)
|
||||
else:
|
||||
return ok(true)
|
||||
|
||||
method init*(g: OnchainSyncGroupManager): Future[GroupManagerResult[void]] {.async.} =
|
||||
# check if the Ethereum client is reachable
|
||||
var ethRpc: Web3
|
||||
g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
|
||||
await newWeb3(g.ethClientUrl)
|
||||
|
||||
var fetchedChainId: uint
|
||||
g.retryWrapper(fetchedChainId, "Failed to get the chain id"):
|
||||
uint(await ethRpc.provider.eth_chainId())
|
||||
|
||||
# Set the chain id
|
||||
if g.chainId == 0:
|
||||
warn "Chain ID not set in config, using RPC Provider's Chain ID",
|
||||
providerChainId = fetchedChainId
|
||||
|
||||
if g.chainId != 0 and g.chainId != fetchedChainId:
|
||||
return err(
|
||||
"The RPC Provided a Chain ID which is different than the provided Chain ID: provided = " &
|
||||
$g.chainId & ", actual = " & $fetchedChainId
|
||||
)
|
||||
|
||||
g.chainId = fetchedChainId
|
||||
|
||||
if g.ethPrivateKey.isSome():
|
||||
let pk = g.ethPrivateKey.get()
|
||||
let parsedPk = keys.PrivateKey.fromHex(pk).valueOr:
|
||||
return err("failed to parse the private key" & ": " & $error)
|
||||
ethRpc.privateKey = Opt.some(parsedPk)
|
||||
ethRpc.defaultAccount =
|
||||
ethRpc.privateKey.get().toPublicKey().toCanonicalAddress().Address
|
||||
|
||||
let contractAddress = web3.fromHex(web3.Address, g.ethContractAddress)
|
||||
let wakuRlnContract = ethRpc.contractSender(WakuRlnContract, contractAddress)
|
||||
|
||||
g.ethRpc = some(ethRpc)
|
||||
g.wakuRlnContract = some(wakuRlnContract)
|
||||
|
||||
if g.keystorePath.isSome() and g.keystorePassword.isSome():
|
||||
if not fileExists(g.keystorePath.get()):
|
||||
error "File provided as keystore path does not exist", path = g.keystorePath.get()
|
||||
return err("File provided as keystore path does not exist")
|
||||
|
||||
var keystoreQuery = KeystoreMembership(
|
||||
membershipContract:
|
||||
MembershipContract(chainId: $g.chainId, address: g.ethContractAddress)
|
||||
)
|
||||
if g.membershipIndex.isSome():
|
||||
keystoreQuery.treeIndex = MembershipIndex(g.membershipIndex.get())
|
||||
waku_rln_membership_credentials_import_duration_seconds.nanosecondTime:
|
||||
let keystoreCred = getMembershipCredentials(
|
||||
path = g.keystorePath.get(),
|
||||
password = g.keystorePassword.get(),
|
||||
query = keystoreQuery,
|
||||
appInfo = RLNAppInfo,
|
||||
).valueOr:
|
||||
return err("failed to get the keystore credentials: " & $error)
|
||||
|
||||
g.membershipIndex = some(keystoreCred.treeIndex)
|
||||
g.userMessageLimit = some(keystoreCred.userMessageLimit)
|
||||
# now we check on the contract if the commitment actually has a membership
|
||||
try:
|
||||
let membershipExists = await wakuRlnContract
|
||||
.memberExists(keystoreCred.identityCredential.idCommitment.toUInt256())
|
||||
.call()
|
||||
if membershipExists == 0:
|
||||
return err("the commitment does not have a membership")
|
||||
except CatchableError:
|
||||
return err("failed to check if the commitment has a membership")
|
||||
|
||||
g.idCredentials = some(keystoreCred.identityCredential)
|
||||
|
||||
let metadataGetOptRes = g.rlnInstance.getMetadata()
|
||||
if metadataGetOptRes.isErr():
|
||||
warn "could not initialize with persisted rln metadata"
|
||||
elif metadataGetOptRes.get().isSome():
|
||||
let metadata = metadataGetOptRes.get().get()
|
||||
if metadata.chainId != uint(g.chainId):
|
||||
return err("persisted data: chain id mismatch")
|
||||
if metadata.contractAddress != g.ethContractAddress.toLower():
|
||||
return err("persisted data: contract address mismatch")
|
||||
|
||||
g.rlnRelayMaxMessageLimit =
|
||||
cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call())
|
||||
|
||||
proc onDisconnect() {.async.} =
|
||||
error "Ethereum client disconnected"
|
||||
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
|
||||
info "reconnecting with the Ethereum client, and restarting group sync",
|
||||
fromBlock = fromBlock
|
||||
var newEthRpc: Web3
|
||||
g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"):
|
||||
await newWeb3(g.ethClientUrl)
|
||||
newEthRpc.ondisconnect = ethRpc.ondisconnect
|
||||
g.ethRpc = some(newEthRpc)
|
||||
|
||||
try:
|
||||
await g.startOnchainSync()
|
||||
except CatchableError, Exception:
|
||||
g.onFatalErrorAction(
|
||||
"failed to restart group sync" & ": " & getCurrentExceptionMsg()
|
||||
)
|
||||
|
||||
ethRpc.ondisconnect = proc() =
|
||||
asyncSpawn onDisconnect()
|
||||
|
||||
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
|
||||
g.initialized = true
|
||||
|
||||
return ok()
|
||||
Loading…
x
Reference in New Issue
Block a user