feat(rln-relay): track last seen event (#1296)

* feat(rln-relay): track last seen event

* fix(rln-relay): clean up subscribeToMemberRegistrations proc

* fix(rln-relay): tests

* fix(rln-relay): unnecessary try-except

* fix(rln-relay): proc descriptions, logging

Co-authored-by: G <28568419+s1fr0@users.noreply.github.com>
This commit is contained in:
Aaryamann Challani 2022-11-01 08:15:34 +05:30 committed by GitHub
parent 24d288ccb4
commit cd73029a0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 38 deletions

View File

@ -205,7 +205,7 @@ procSuite "Waku-rln-relay":
debug "membership commitment key", pk2 = pk2
var events = [newFuture[void](), newFuture[void]()]
proc handler(pubkey: Uint256, index: Uint256) =
proc handler(pubkey: Uint256, index: Uint256): RlnRelayResult[void] =
debug "handler is called", pubkey = pubkey, index = index
if pubkey == pk:
events[0].complete()
@ -214,9 +214,14 @@ procSuite "Waku-rln-relay":
let isSuccessful = rlnPeer.rlnInstance.insertMember(pubkey.toIDCommitment())
check:
isSuccessful
return ok()
# mount the handler for listening to the contract events
await rlnPeer.handleGroupUpdates(handler)
await subscribeToGroupEvents(ethClientUri = EthClient,
ethAccountAddress = some(accounts[0]),
contractAddress = contractAddress,
blockNumber = "0x0",
handler = handler)
# register a member to the contract
let tx = await contractObj.register(pk).send(value = MembershipFee)

View File

@ -106,6 +106,7 @@ when defined(rln) or (not defined(rln) and not defined(rlnzerokit)):
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion
lastSeenMembershipIndex*: MembershipIndex # the last seen membership index
when defined(rlnzerokit):
type WakuRLNRelay* = ref object
@ -130,6 +131,7 @@ when defined(rlnzerokit):
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion
lastSeenMembershipIndex*: MembershipIndex # the last seen membership index
type MessageValidationResult* {.pure.} = enum

View File

@ -195,8 +195,8 @@ proc inHex*(value: IDKey or IDCommitment or MerkleNode or Nullifier or Epoch or
return valueHex
proc toMembershipIndex(v: UInt256): MembershipIndex =
let result: MembershipIndex = cast[MembershipIndex](v)
return result
let membershipIndex: MembershipIndex = cast[MembershipIndex](v)
return membershipIndex
proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAccountPrivKey: keys.PrivateKey, ethClientAddress: string, membershipContractAddress: Address, registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)): Future[Result[MembershipIndex, string]] {.async.} =
# TODO may need to also get eth Account Private Key as PrivateKey
@ -920,23 +920,62 @@ proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResul
return err(memberAdded.error())
return ok()
# the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
type RegistrationEventHandler = proc(pubkey: Uint256, index: Uint256): void {.gcsafe, closure, raises: [Defect].}
type GroupUpdateHandler* = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe, raises: [Defect].}
proc subscribeToMemberRegistrations(web3: Web3, contractAddress: Address, handler: RegistrationEventHandler, fromBlock: string = "0x0"): Future[Subscription] {.async, gcsafe} =
var contractObj = web3.contractSender(MembershipContract, contractAddress)
return await contractObj.subscribe(MemberRegistered, %*{"fromBlock": fromBlock, "address": contractAddress}) do(pubkey: Uint256, index: Uint256){.raises: [Defect], gcsafe.}:
proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler =
## assuming all the members arrive in order
## TODO: check the index and the pubkey depending on
## the group update operation
var handler: GroupUpdateHandler
handler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.raises: [Defect].} =
var pk: IDCommitment
try:
debug "onRegister", pubkey = pubkey, index = index
handler(pubkey, index)
except Exception as err:
# chronos still raises exceptions which inherit directly from Exception
error "Error handling new member registration: ", err=err.msg
doAssert false, err.msg
do (err: CatchableError):
error "Error from subscription: ", err=err.msg
pk = pubkey.toIDCommitment()
except:
return err("invalid pubkey")
let isSuccessful = rlnPeer.insertMember(pk)
if isSuccessful.isErr():
return err("failed to add a new member to the Merkle tree")
else:
debug "new member added to the Merkle tree", pubkey=pubkey, index=index
debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex)
let membershipIndex = index.toMembershipIndex()
if rlnPeer.lastSeenMembershipIndex != membershipIndex + 1:
warn "membership index gap, may have lost connection", gap = membershipIndex - rlnPeer.lastSeenMembershipIndex
rlnPeer.lastSeenMembershipIndex = membershipIndex
return ok()
return handler
proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Address] = none(Address), contractAddress: Address, blockNumber: string = "0x0", handler: RegistrationEventHandler) {.async, gcsafe.} =
proc subscribeToMemberRegistrations(web3: Web3,
contractAddress: Address,
fromBlock: string = "0x0",
handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} =
## subscribes to member registrations, on a given membership group contract
## `fromBlock` indicates the block number from which the subscription starts
## `handler` is a callback that is called when a new member is registered
## the callback is called with the pubkey and the index of the new member
## TODO: need a similar proc for member deletions
var contractObj = web3.contractSender(MembershipContract, contractAddress)
let onMemberRegistered = proc (pubkey: Uint256, index: Uint256) {.gcsafe.} =
debug "onRegister", pubkey = pubkey, index = index
let groupUpdateRes = handler(pubkey, index)
if groupUpdateRes.isErr():
error "Error handling new member registration", err=groupUpdateRes.error()
let onError = proc (err: CatchableError) =
error "Error in subscription", err=err.msg
return await contractObj.subscribe(MemberRegistered,
%*{"fromBlock": fromBlock, "address": contractAddress},
onMemberRegistered,
onError)
proc subscribeToGroupEvents*(ethClientUri: string,
ethAccountAddress: Option[Address] = none(Address),
contractAddress: Address,
blockNumber: string = "0x0",
handler: GroupUpdateHandler) {.async, gcsafe.} =
## connects to the eth client whose URI is supplied as `ethClientUri`
## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress`
## it collects all the events starting from the given `blockNumber`
@ -952,21 +991,27 @@ proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Addr
proc startSubscription(web3: Web3) {.async, gcsafe.} =
# subscribe to the MemberRegistered events
# TODO can do similarly for deletion events, though it is not yet supported
discard await subscribeToMemberRegistrations(web3, contractAddress, handler, blockNumber)
# TODO: can do similarly for deletion events, though it is not yet supported
# TODO: add block number for reconnection logic
discard await subscribeToMemberRegistrations(web3 = web3,
contractAddress = contractAddress,
handler = handler)
await startSubscription(web3)
web3.onDisconnect = proc() =
debug "connection to ethereum node dropped", lastBlock = latestBlock
proc handleGroupUpdates*(rlnPeer: WakuRLNRelay) {.async, gcsafe.} =
## generates the groupUpdateHandler which is called when a new member is registered,
## and has the WakuRLNRelay instance as a closure
let handler = generateGroupUpdateHandler(rlnPeer)
await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress,
ethAccountAddress = rlnPeer.ethAccountAddress,
contractAddress = rlnPeer.membershipContractAddress,
handler = handler)
proc handleGroupUpdates*(rlnPeer: WakuRLNRelay, handler: RegistrationEventHandler) {.async, gcsafe.} =
# mounts the supplied handler for the registration events emitting from the membership contract
await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, ethAccountAddress = rlnPeer.ethAccountAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler)
proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) =
## this procedure is a thin wrapper for the pubsub addValidator method
## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
@ -1063,8 +1108,7 @@ proc mountRlnRelayStatic*(node: WakuNode,
node.addRLNRelayValidator(pubsubTopic, contentTopic, spamHandler)
debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic
node.wakuRlnRelay = rlnPeer
node.wakuRlnRelay = rlnPeer
proc mountRlnRelayDynamic*(node: WakuNode,
ethClientAddr: string = "",
@ -1131,17 +1175,7 @@ proc mountRlnRelayDynamic*(node: WakuNode,
pubsubTopic: pubsubTopic,
contentTopic: contentTopic)
proc handler(pubkey: Uint256, index: Uint256) =
debug "a new key is added", pubkey=pubkey
# assuming all the members arrive in order
let pk = pubkey.toIDCommitment()
let isSuccessful = rlnPeer.insertMember(pk)
debug "received pk", pk=pk.inHex, index=index
debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex)
doAssert(isSuccessful.isOk())
asyncSpawn rlnPeer.handleGroupUpdates(handler)
asyncSpawn rlnPeer.handleGroupUpdates()
debug "dynamic group management is started"
# adds a topic validator for the supplied pubsub topic at the relay protocol
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped