From cd73029a0c1d2a7d7e54ee1149c92814b8423fb4 Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Tue, 1 Nov 2022 08:15:34 +0530 Subject: [PATCH] feat(rln-relay): track last seen event (#1296) * feat(rln-relay): track last seen event * fix(rln-relay): clean up subscribeToMemberRegistrations proc * fix(rln-relay): tests * fix(rln-relay): unnecessary try-except * fix(rln-relay): proc descriptions, logging Co-authored-by: G <28568419+s1fr0@users.noreply.github.com> --- tests/v2/test_waku_rln_relay_onchain.nim | 9 +- .../waku_rln_relay/waku_rln_relay_types.nim | 2 + .../waku_rln_relay/waku_rln_relay_utils.nim | 106 ++++++++++++------ 3 files changed, 79 insertions(+), 38 deletions(-) diff --git a/tests/v2/test_waku_rln_relay_onchain.nim b/tests/v2/test_waku_rln_relay_onchain.nim index f1243a3f6..9b1c927b7 100644 --- a/tests/v2/test_waku_rln_relay_onchain.nim +++ b/tests/v2/test_waku_rln_relay_onchain.nim @@ -205,7 +205,7 @@ procSuite "Waku-rln-relay": debug "membership commitment key", pk2 = pk2 var events = [newFuture[void](), newFuture[void]()] - proc handler(pubkey: Uint256, index: Uint256) = + proc handler(pubkey: Uint256, index: Uint256): RlnRelayResult[void] = debug "handler is called", pubkey = pubkey, index = index if pubkey == pk: events[0].complete() @@ -214,9 +214,14 @@ procSuite "Waku-rln-relay": let isSuccessful = rlnPeer.rlnInstance.insertMember(pubkey.toIDCommitment()) check: isSuccessful + return ok() # mount the handler for listening to the contract events - await rlnPeer.handleGroupUpdates(handler) + await subscribeToGroupEvents(ethClientUri = EthClient, + ethAccountAddress = some(accounts[0]), + contractAddress = contractAddress, + blockNumber = "0x0", + handler = handler) # register a member to the contract let tx = await contractObj.register(pk).send(value = MembershipFee) diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim index 7d66a44a3..4713d439a 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim @@ -106,6 +106,7 @@ when defined(rln) or (not defined(rln) and not defined(rlnzerokit)): nullifierLog*: Table[Epoch, seq[ProofMetadata]] lastEpoch*: Epoch # the epoch of the last published rln message validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion + lastSeenMembershipIndex*: MembershipIndex # the last seen membership index when defined(rlnzerokit): type WakuRLNRelay* = ref object @@ -130,6 +131,7 @@ when defined(rlnzerokit): nullifierLog*: Table[Epoch, seq[ProofMetadata]] lastEpoch*: Epoch # the epoch of the last published rln message validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion + lastSeenMembershipIndex*: MembershipIndex # the last seen membership index type MessageValidationResult* {.pure.} = enum diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index 96dc4a890..19b876d7e 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -195,8 +195,8 @@ proc inHex*(value: IDKey or IDCommitment or MerkleNode or Nullifier or Epoch or return valueHex proc toMembershipIndex(v: UInt256): MembershipIndex = - let result: MembershipIndex = cast[MembershipIndex](v) - return result + let membershipIndex: MembershipIndex = cast[MembershipIndex](v) + return membershipIndex proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAccountPrivKey: keys.PrivateKey, ethClientAddress: string, membershipContractAddress: Address, registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)): Future[Result[MembershipIndex, string]] {.async.} = # TODO may need to also get eth Account Private Key as PrivateKey @@ -920,23 +920,62 @@ proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResul return err(memberAdded.error()) return ok() -# the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface -type RegistrationEventHandler = proc(pubkey: Uint256, index: Uint256): void {.gcsafe, closure, raises: [Defect].} +type GroupUpdateHandler* = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe, raises: [Defect].} -proc subscribeToMemberRegistrations(web3: Web3, contractAddress: Address, handler: RegistrationEventHandler, fromBlock: string = "0x0"): Future[Subscription] {.async, gcsafe} = - var contractObj = web3.contractSender(MembershipContract, contractAddress) - return await contractObj.subscribe(MemberRegistered, %*{"fromBlock": fromBlock, "address": contractAddress}) do(pubkey: Uint256, index: Uint256){.raises: [Defect], gcsafe.}: +proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = + ## assuming all the members arrive in order + ## TODO: check the index and the pubkey depending on + ## the group update operation + var handler: GroupUpdateHandler + handler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.raises: [Defect].} = + var pk: IDCommitment try: - debug "onRegister", pubkey = pubkey, index = index - handler(pubkey, index) - except Exception as err: - # chronos still raises exceptions which inherit directly from Exception - error "Error handling new member registration: ", err=err.msg - doAssert false, err.msg - do (err: CatchableError): - error "Error from subscription: ", err=err.msg + pk = pubkey.toIDCommitment() + except: + return err("invalid pubkey") + let isSuccessful = rlnPeer.insertMember(pk) + if isSuccessful.isErr(): + return err("failed to add a new member to the Merkle tree") + else: + debug "new member added to the Merkle tree", pubkey=pubkey, index=index + debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex) + let membershipIndex = index.toMembershipIndex() + if rlnPeer.lastSeenMembershipIndex != membershipIndex + 1: + warn "membership index gap, may have lost connection", gap = membershipIndex - rlnPeer.lastSeenMembershipIndex + rlnPeer.lastSeenMembershipIndex = membershipIndex + return ok() + return handler -proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Address] = none(Address), contractAddress: Address, blockNumber: string = "0x0", handler: RegistrationEventHandler) {.async, gcsafe.} = +proc subscribeToMemberRegistrations(web3: Web3, + contractAddress: Address, + fromBlock: string = "0x0", + handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} = + ## subscribes to member registrations, on a given membership group contract + ## `fromBlock` indicates the block number from which the subscription starts + ## `handler` is a callback that is called when a new member is registered + ## the callback is called with the pubkey and the index of the new member + ## TODO: need a similar proc for member deletions + var contractObj = web3.contractSender(MembershipContract, contractAddress) + + let onMemberRegistered = proc (pubkey: Uint256, index: Uint256) {.gcsafe.} = + debug "onRegister", pubkey = pubkey, index = index + let groupUpdateRes = handler(pubkey, index) + if groupUpdateRes.isErr(): + error "Error handling new member registration", err=groupUpdateRes.error() + + let onError = proc (err: CatchableError) = + error "Error in subscription", err=err.msg + + return await contractObj.subscribe(MemberRegistered, + %*{"fromBlock": fromBlock, "address": contractAddress}, + onMemberRegistered, + onError) + +proc subscribeToGroupEvents*(ethClientUri: string, + ethAccountAddress: Option[Address] = none(Address), + contractAddress: Address, + blockNumber: string = "0x0", + handler: GroupUpdateHandler) {.async, gcsafe.} = ## connects to the eth client whose URI is supplied as `ethClientUri` ## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress` ## it collects all the events starting from the given `blockNumber` @@ -952,21 +991,27 @@ proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Addr proc startSubscription(web3: Web3) {.async, gcsafe.} = # subscribe to the MemberRegistered events - # TODO can do similarly for deletion events, though it is not yet supported - discard await subscribeToMemberRegistrations(web3, contractAddress, handler, blockNumber) + # TODO: can do similarly for deletion events, though it is not yet supported + # TODO: add block number for reconnection logic + discard await subscribeToMemberRegistrations(web3 = web3, + contractAddress = contractAddress, + handler = handler) await startSubscription(web3) web3.onDisconnect = proc() = debug "connection to ethereum node dropped", lastBlock = latestBlock +proc handleGroupUpdates*(rlnPeer: WakuRLNRelay) {.async, gcsafe.} = + ## generates the groupUpdateHandler which is called when a new member is registered, + ## and has the WakuRLNRelay instance as a closure + let handler = generateGroupUpdateHandler(rlnPeer) + await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, + ethAccountAddress = rlnPeer.ethAccountAddress, + contractAddress = rlnPeer.membershipContractAddress, + handler = handler) -proc handleGroupUpdates*(rlnPeer: WakuRLNRelay, handler: RegistrationEventHandler) {.async, gcsafe.} = - # mounts the supplied handler for the registration events emitting from the membership contract - await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, ethAccountAddress = rlnPeer.ethAccountAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler) - - proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) = ## this procedure is a thin wrapper for the pubsub addValidator method ## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic @@ -1063,8 +1108,7 @@ proc mountRlnRelayStatic*(node: WakuNode, node.addRLNRelayValidator(pubsubTopic, contentTopic, spamHandler) debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic - node.wakuRlnRelay = rlnPeer - + node.wakuRlnRelay = rlnPeer proc mountRlnRelayDynamic*(node: WakuNode, ethClientAddr: string = "", @@ -1131,17 +1175,7 @@ proc mountRlnRelayDynamic*(node: WakuNode, pubsubTopic: pubsubTopic, contentTopic: contentTopic) - - proc handler(pubkey: Uint256, index: Uint256) = - debug "a new key is added", pubkey=pubkey - # assuming all the members arrive in order - let pk = pubkey.toIDCommitment() - let isSuccessful = rlnPeer.insertMember(pk) - debug "received pk", pk=pk.inHex, index=index - debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex) - doAssert(isSuccessful.isOk()) - - asyncSpawn rlnPeer.handleGroupUpdates(handler) + asyncSpawn rlnPeer.handleGroupUpdates() debug "dynamic group management is started" # adds a topic validator for the supplied pubsub topic at the relay protocol # messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped