test(rln-relay): setup heartbeat ws subscription and log when connection drops (#1253)

* test(rln-relay): setup heartbeat ws subscription and log when connection drops

* fix(rln-relay): quantity

* fix(rln-relay): allow fromBlock to be passed to sub proc

* chore(rln-relay): cleaner logs

* fix(rln-relay): typo
This commit is contained in:
Aaryamann Challani 2022-10-10 17:55:12 +05:30 committed by GitHub
parent db9ccbe52a
commit 3a80fae634
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 32 additions and 21 deletions

View File

@ -824,7 +824,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
## TODO: FIXME after resolving this issue https://github.com/status-im/nwaku/issues/1247 ## TODO: FIXME after resolving this issue https://github.com/status-im/nwaku/issues/1247
if not rlnPeer.validateRoot(msg.proof.merkleRoot): if not rlnPeer.validateRoot(msg.proof.merkleRoot):
debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt("0x" & (it.toHex))
waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"]) waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"])
# return MessageValidationResult.Invalid # return MessageValidationResult.Invalid
@ -906,24 +906,9 @@ proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResul
# the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface # the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
type RegistrationEventHandler = proc(pubkey: Uint256, index: Uint256): void {.gcsafe, closure, raises: [Defect].} type RegistrationEventHandler = proc(pubkey: Uint256, index: Uint256): void {.gcsafe, closure, raises: [Defect].}
proc subscribeToMemberRegistrations(web3: Web3, contractAddress: Address, handler: RegistrationEventHandler, fromBlock: string = "0x0"): Future[Subscription] {.async, gcsafe} =
proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Address, contractAddress: Address, blockNumber: string = "0x0", handler: RegistrationEventHandler) {.async, gcsafe.} =
## connects to the eth client whose URI is supplied as `ethClientUri`
## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress`
## it collects all the events starting from the given `blockNumber`
## for every received event, it calls the `handler`
# connect to the eth client
let web3 = await newWeb3(ethClientUri)
# prepare a contract sender to interact with it
var contractObj = web3.contractSender(MembershipContract, contractAddress) var contractObj = web3.contractSender(MembershipContract, contractAddress)
web3.defaultAccount = ethAccountAddress return await contractObj.subscribe(MemberRegistered, %*{"fromBlock": fromBlock, "address": contractAddress}) do(pubkey: Uint256, index: Uint256){.raises: [Defect], gcsafe.}:
# set the gas price twice the suggested price in order for the fast mining
# let gasPrice = int(await web3.provider.eth_gasPrice()) * 2
# subscribe to the MemberRegistered events
# TODO can do similarly for deletion events, though it is not yet supported
discard await contractObj.subscribe(MemberRegistered, %*{"fromBlock": blockNumber, "address": contractAddress}) do(pubkey: Uint256, index: Uint256){.raises: [Defect], gcsafe.}:
try: try:
debug "onRegister", pubkey = pubkey, index = index debug "onRegister", pubkey = pubkey, index = index
handler(pubkey, index) handler(pubkey, index)
@ -934,6 +919,32 @@ proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Address, co
do (err: CatchableError): do (err: CatchableError):
error "Error from subscription: ", err=err.msg error "Error from subscription: ", err=err.msg
proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Address, contractAddress: Address, blockNumber: string = "0x0", handler: RegistrationEventHandler) {.async, gcsafe.} =
## connects to the eth client whose URI is supplied as `ethClientUri`
## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress`
## it collects all the events starting from the given `blockNumber`
## for every received event, it calls the `handler`
let web3 = await newWeb3(ethClientUri)
var latestBlock: Quantity
let newHeadCallback = proc (blockheader: BlockHeader) {.gcsafe.} =
latestBlock = blockheader.number
debug "block received", blockNumber = latestBlock
let newHeadErrorHandler = proc (err: CatchableError) {.gcsafe.} =
error "Error from subscription: ", err=err.msg
discard await web3.subscribeForBlockHeaders(newHeadCallback, newHeadErrorHandler)
proc startSubscription(web3: Web3) {.async, gcsafe.} =
# subscribe to the MemberRegistered events
# TODO can do similarly for deletion events, though it is not yet supported
discard await subscribeToMemberRegistrations(web3, contractAddress, handler, blockNumber)
await startSubscription(web3)
web3.onDisconnect = proc() =
debug "connection to ethereum node dropped", lastBlock = latestBlock
proc handleGroupUpdates*(rlnPeer: WakuRLNRelay, handler: RegistrationEventHandler) {.async, gcsafe.} = proc handleGroupUpdates*(rlnPeer: WakuRLNRelay, handler: RegistrationEventHandler) {.async, gcsafe.} =
# mounts the supplied handler for the registration events emitting from the membership contract # mounts the supplied handler for the registration events emitting from the membership contract
await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, ethAccountAddress = rlnPeer.ethAccountAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler) await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, ethAccountAddress = rlnPeer.ethAccountAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler)
@ -1109,8 +1120,8 @@ proc mountRlnRelayDynamic*(node: WakuNode,
# assuming all the members arrive in order # assuming all the members arrive in order
let pk = pubkey.toIDCommitment() let pk = pubkey.toIDCommitment()
let isSuccessful = rlnPeer.insertMember(pk) let isSuccessful = rlnPeer.insertMember(pk)
debug "received pk", pk=pk.toHex, index =index debug "received pk", pk=pk.toHex, index=index
debug "acceptable window", validRoots=rlnPeer.validMerkleRoots debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt("0x" & (it.toHex))
doAssert(isSuccessful.isOk()) doAssert(isSuccessful.isOk())
asyncSpawn rlnPeer.handleGroupUpdates(handler) asyncSpawn rlnPeer.handleGroupUpdates(handler)