deploy: f86cc88592c0529c3f31b786e8787ae6cf1ba4db

This commit is contained in:
rymnc 2022-11-10 17:43:11 +00:00
parent 4f2d556e08
commit 3ba8134d10
6 changed files with 284 additions and 141 deletions

8
.gitignore vendored
View File

@ -43,3 +43,11 @@ testPath.txt
# Nimbus Build System
nimbus-build-system.paths
# sqlite db
*.db
*.db-shm
*.db-wal
*.sqlite3
*.sqlite3-shm
*.sqlite3-wal

View File

@ -214,6 +214,19 @@ suite "Waku rln relay":
check:
deletionSuccess
test "insertMembers rln utils":
# create an RLN instance which also includes an empty Merkle tree
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
# generate a key pair
let keyPairRes = rln.membershipKeyGen()
require:
keypairRes.isOk()
check:
rln.insertMembers(0, @[keyPairRes.get().idCommitment])
test "insertMember rln utils":
# create an RLN instance which also includes an empty Merkle tree
let rlnInstance = createRLNInstance()
@ -330,7 +343,7 @@ suite "Waku rln relay":
let keyPairRes = rln.membershipKeyGen()
require:
keyPairRes.isOk()
let memberInserted = rln.insertMember(keypairRes.get().idCommitment)
let memberInserted = rln.insertMembers(0, @[keypairRes.get().idCommitment])
require:
memberInserted
@ -502,7 +515,7 @@ suite "Waku rln relay":
let
# peer's index in the Merkle Tree
index = 5
index = 5'u
# create a membership key pair
memKeysRes = membershipKeyGen(rln)
@ -511,21 +524,23 @@ suite "Waku rln relay":
let memKeys = memKeysRes.get()
var members = newSeq[IDCommitment]()
# Create a Merkle tree with random members
for i in 0..10:
var memberAdded: bool = false
for i in 0'u..10'u:
if (i == index):
# insert the current peer's pk
memberAdded = rln.insertMember(memKeys.idCommitment)
members.add(memKeys.idCommitment)
else:
# create a new key pair
let memberKeysRes = rln.membershipKeyGen()
require:
memberKeysRes.isOk()
memberAdded = rln.insertMember(memberKeysRes.get().idCommitment)
# check the member is added
require:
memberAdded
members.add(memberKeysRes.get().idCommitment)
# Batch the insert
let batchInsertRes = rln.insertMembers(0, members)
require:
batchInsertRes
# prepare the message
let messageBytes = "Hello".toBytes()
@ -545,7 +560,8 @@ suite "Waku rln relay":
# verify the proof
let verified = rln.proofVerify(data = messageBytes,
proof = proof)
proof = proof,
validRoots = @[rln.getMerkleRoot().value()])
# Ensure the proof verification did not error out
@ -561,7 +577,7 @@ suite "Waku rln relay":
let
# peer's index in the Merkle Tree
index = 5
index = 5'u
# create a membership key pair
memKeysRes = membershipKeyGen(rln)
@ -571,17 +587,17 @@ suite "Waku rln relay":
let memKeys = memKeysRes.get()
# Create a Merkle tree with random members
for i in 0..10:
for i in 0'u..10'u:
var memberAdded: bool = false
if (i == index):
# insert the current peer's pk
memberAdded = rln.insertMember(memKeys.idCommitment)
memberAdded = rln.insertMembers(i, @[memKeys.idCommitment])
else:
# create a new key pair
let memberKeysRes = rln.membershipKeyGen()
require:
memberKeysRes.isOk()
memberAdded = rln.insertMember(memberKeysRes.get().idCommitment)
memberAdded = rln.insertMembers(i, @[memberKeysRes.get().idCommitment])
# check the member is added
require:
memberAdded
@ -628,7 +644,7 @@ suite "Waku rln relay":
let
# peer's index in the Merkle Tree.
index = 5
index = 5'u
# create a membership key pair
memKeysRes = membershipKeyGen(rlnRelay.rlnInstance)
@ -637,24 +653,27 @@ suite "Waku rln relay":
let memKeys = memKeysRes.get()
let membershipCount = AcceptableRootWindowSize + 5
let membershipCount: uint = AcceptableRootWindowSize + 5'u
# Create a Merkle tree with random members
for i in 0..membershipCount:
var memberIsAdded: RlnRelayResult[void]
var members = newSeq[MembershipKeyPair]()
# Generate membership keys
for i in 0'u..membershipCount:
if (i == index):
# insert the current peer's pk
memberIsAdded = rlnRelay.insertMember(memKeys.idCommitment)
members.add(memKeys)
else:
# create a new key pair
let memberKeysRes = rlnRelay.rlnInstance.membershipKeyGen()
require:
memberKeysRes.isOk()
memberIsAdded = rlnRelay.insertMember(memberKeysRes.get().idCommitment)
# require that the member is added
require:
memberIsAdded.isOk()
members.add(memberKeysRes.get())
# Batch inserts into the tree
let insertedRes = rlnRelay.insertMembers(0, members.mapIt(it.idCommitment))
require:
insertedRes.isOk()
# Given:
# This step includes constructing a valid message with the latest merkle root
# prepare the message
@ -686,11 +705,12 @@ suite "Waku rln relay":
# Progress the local tree by removing members
for i in 0..AcceptableRootWindowSize - 2:
discard rlnRelay.removeMember(MembershipIndex(i))
let res = rlnRelay.removeMember(MembershipIndex(i))
# Ensure the local tree root has changed
let currentMerkleRoot = rlnRelay.rlnInstance.getMerkleRoot()
require:
res.isOk()
currentMerkleRoot.isOk()
currentMerkleRoot.value() != validProof.merkleRoot
@ -720,7 +740,7 @@ suite "Waku rln relay":
let
# peer's index in the Merkle Tree.
index = 6
index = 6'u
# create a membership key pair
memKeysRes = membershipKeyGen(rlnRelay.rlnInstance)
@ -729,20 +749,20 @@ suite "Waku rln relay":
let memKeys = memKeysRes.get()
let membershipCount = AcceptableRootWindowSize + 5
let membershipCount: uint = AcceptableRootWindowSize + 5'u
# Create a Merkle tree with random members
for i in 0..membershipCount:
for i in 0'u..membershipCount:
var memberIsAdded: RlnRelayResult[void]
if (i == index):
# insert the current peer's pk
memberIsAdded = rlnRelay.insertMember(memKeys.idCommitment)
memberIsAdded = rlnRelay.insertMembers(i, @[memKeys.idCommitment])
else:
# create a new key pair
let memberKeysRes = rlnRelay.rlnInstance.membershipKeyGen()
require:
memberKeysRes.isOk()
memberIsAdded = rlnRelay.insertMember(memberKeysRes.get().idCommitment)
memberIsAdded = rlnRelay.insertMembers(i, @[memberKeysRes.get().idCommitment])
# require that the member is added
require:
memberIsAdded.isOk()

View File

@ -3,7 +3,7 @@
{.used.}
import
std/[options, osproc, streams, strutils],
std/[options, osproc, streams, strutils, sequtils],
testutils/unittests, chronos, chronicles, stint, web3, json,
stew/byteutils, stew/shims/net as stewNet,
libp2p/crypto/crypto,
@ -279,13 +279,15 @@ procSuite "Waku-rln-relay":
debug "membership commitment key", pk2 = pk2
var events = [newFuture[void](), newFuture[void]()]
proc handler(pubkey: Uint256, index: Uint256): RlnRelayResult[void] =
debug "handler is called", pubkey = pubkey, index = index
if pubkey == pk:
events[0].complete()
if pubkey == pk2:
events[1].complete()
let isSuccessful = rlnPeer.rlnInstance.insertMember(pubkey.toIDCommitment())
var futIndex = 0
var handler: GroupUpdateHandler
handler = proc (blockNumber: BlockNumber,
members: seq[MembershipTuple]): RlnRelayResult[void] =
debug "handler is called", members = members
events[futIndex].complete()
futIndex += 1
let index = members[0].index
let isSuccessful = rlnPeer.rlnInstance.insertMembers(index, members.mapIt(it.idComm))
check:
isSuccessful
return ok()
@ -305,7 +307,7 @@ procSuite "Waku-rln-relay":
let tx2 = await contractObj.register(pk2).send(value = MembershipFee)
debug "a member is registered", tx2 = tx2
# wait for all the events to be received by the rlnPeer
# wait for the events to be processed
await all(events)
# release resources -----------------------
@ -405,12 +407,12 @@ procSuite "Waku-rln-relay":
# Create a group of 10 members
var group = newSeq[IDCommitment]()
for i in 0..10:
for i in 0'u..10'u:
var memberAdded: bool = false
if (uint(i) == index):
if (i == index):
# insert the current peer's pk
group.add(keyPair.idCommitment)
memberAdded = rln.insertMember(keyPair.idCommitment)
memberAdded = rln.insertMembers(i, @[keyPair.idCommitment])
doAssert(memberAdded)
debug "member key", key = keyPair.idCommitment.inHex
else:
@ -419,7 +421,7 @@ procSuite "Waku-rln-relay":
memberKeyPairRes.isOk()
let memberKeyPair = memberKeyPairRes.get()
group.add(memberKeyPair.idCommitment)
let memberAdded = rln.insertMember(memberKeyPair.idCommitment)
let memberAdded = rln.insertMembers(i, @[memberKeyPair.idCommitment])
require:
memberAdded
debug "member key", key = memberKeyPair.idCommitment.inHex
@ -491,8 +493,8 @@ procSuite "Waku-rln-relay":
# add the rln keys to the Merkle tree
let
memberIsAdded1 = rln.insertMember(keyPair1.idCommitment)
memberIsAdded2 = rln.insertMember(keyPair2.idCommitment)
memberIsAdded1 = rln.insertMembers(0, @[keyPair1.idCommitment])
memberIsAdded2 = rln.insertMembers(1, @[keyPair2.idCommitment])
require:
memberIsAdded1

View File

@ -58,12 +58,20 @@ proc set_leaf*(ctx: ptr RLN, index: uint, input_buffer: ptr Buffer): bool {.impo
## the input_buffer holds a serialized leaf of 32 bytes
## the return bool value indicates the success or failure of the operation
proc set_leaves*(ctx: ptr RLN, input_buffer: ptr Buffer): bool {.importc: "set_leaves".}
proc init_tree_with_leaves*(ctx: ptr RLN, input_buffer: ptr Buffer): bool {.importc: "init_tree_with_leaves".}
## sets multiple leaves in the tree stored by ctx to the value passed by input_buffer
## the input_buffer holds a serialized vector of leaves (32 bytes each)
## the input_buffer size is prefixed by a 8 bytes integer indicating the number of leaves
## leaves are set one after each other starting from index 0
## the return bool value indicates the success or failure of the operation
proc set_leaves_from*(ctx: ptr RLN, index: uint, input_buffer: ptr Buffer): bool {.importc: "set_leaves_from".}
## sets multiple leaves in the tree stored by ctx to the value passed by input_buffer
## the input_buffer holds a serialized vector of leaves (32 bytes each)
## the input_buffer size is prefixed by a 8 bytes integer indicating the number of leaves
## leaves are set one after each other starting from index `index`
## the return bool value indicates the success or failure of the operation
proc reset_tree*(ctx: ptr RLN, tree_height: uint): bool {.importc: "set_tree".}
## resets the tree stored by ctx to the the empty tree (all leaves set to 0) of height tree_height
## the return bool value indicates the success or failure of the operation
@ -154,4 +162,4 @@ proc hash*(ctx: ptr RLN,
## the hash output is generated and populated inside output_buffer
## the output_buffer contains 32 bytes hash output
{.pop.}
{.pop.}

View File

@ -95,6 +95,7 @@ type WakuRLNRelay* = ref object
lastEpoch*: Epoch # the epoch of the last published rln message
validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion
lastSeenMembershipIndex*: MembershipIndex # the last seen membership index
lastProcessedBlock*: BlockNumber # the last processed block number
type
MessageValidationResult* {.pure.} = enum
@ -152,4 +153,4 @@ proc encode*(nsp: RateLimitProof): ProtoBuffer =
output.finish3()
return output
return output

View File

@ -7,6 +7,7 @@ import
std/[sequtils, tables, times, os, deques],
chronicles, options, chronos, stint,
confutils,
strutils,
web3, json,
web3/ethtypes,
eth/keys,
@ -41,7 +42,9 @@ type WakuRlnConfig* = object
type
SpamHandler* = proc(wakuMessage: WakuMessage): void {.gcsafe, closure, raises: [Defect].}
RegistrationHandler* = proc(txHash: string): void {.gcsafe, closure, raises: [Defect].}
GroupUpdateHandler* = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe.}
GroupUpdateHandler* = proc(blockNumber: BlockNumber,
members: seq[MembershipTuple]): RlnRelayResult[void] {.gcsafe.}
MembershipTuple* = tuple[index: MembershipIndex, idComm: IDCommitment]
# membership contract interface
contract(MembershipContract):
@ -158,7 +161,7 @@ proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAcco
# web3.privateKey = some(ethAccountPrivateKey)
var sender = web3.contractSender(MembershipContract, membershipContractAddress) # creates a Sender object with a web3 field and contract address of type Address
debug "registering an id commitment", idComm=idComm.inHex
debug "registering an id commitment", idComm=idComm.inHex()
let pk = idComm.toUInt256()
var txHash: TxHash
@ -185,7 +188,7 @@ proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAcco
eventIdCommUint = UInt256.fromBytesBE(argumentsBytes[0..31])
eventIndex = UInt256.fromBytesBE(argumentsBytes[32..^1])
eventIdComm = eventIdCommUint.toIDCommitment()
debug "the identity commitment key extracted from tx log", eventIdComm=eventIdComm.inHex
debug "the identity commitment key extracted from tx log", eventIdComm=eventIdComm.inHex()
debug "the index of registered identity commitment key", eventIndex=eventIndex
if eventIdComm != idComm:
@ -353,16 +356,51 @@ proc proofVerify*(rlnInstance: ptr RLN,
if not validProof:
return ok(false)
return ok(true)
else:
return ok(true)
proc insertMember*(rlnInstance: ptr RLN, idComm: IDCommitment): bool =
## inserts a member to the tree
## returns true if the member is inserted successfully
## returns false if the member could not be inserted
var pkBuffer = toBuffer(idComm)
let pkBufferPtr = addr pkBuffer
# add the member to the tree
var member_is_added = update_next_member(rlnInstance, pkBufferPtr)
return member_is_added
let memberAdded = update_next_member(rlnInstance, pkBufferPtr)
return memberAdded
proc serializeIdCommitments*(idComms: seq[IDCommitment]): seq[byte] =
## serializes a seq of IDCommitments to a byte seq
## the serialization is based on https://github.com/status-im/nwaku/blob/37bd29fbc37ce5cf636734e7dd410b1ed27b88c8/waku/v2/protocol/waku_rln_relay/rln.nim#L142
## the order of serialization is |id_commitment_len<8>|id_commitment<var>|
var idCommsBytes = newSeq[byte]()
# serialize the idComms, with its length prefixed
let len = toBytes(uint64(idComms.len), Endianness.littleEndian)
idCommsBytes.add(len)
for idComm in idComms:
idCommsBytes = concat(idCommsBytes, @idComm)
return idCommsBytes
proc insertMembers*(rlnInstance: ptr RLN,
index: MembershipIndex,
idComms: seq[IDCommitment]): bool =
## Insert multiple members i.e., identity commitments
## returns true if the insertion is successful
## returns false if any of the insertions fails
## Note: This proc is atomic, i.e., if any of the insertions fails, all the previous insertions are rolled back
# serialize the idComms
let idCommsBytes = serializeIdCommitments(idComms)
var idCommsBuffer = idCommsBytes.toBuffer()
let idCommsBufferPtr = addr idCommsBuffer
# add the member to the tree
let membersAdded = set_leaves_from(rlnInstance, index, idCommsBufferPtr)
return membersAdded
proc removeMember*(rlnInstance: ptr RLN, index: MembershipIndex): bool =
let deletion_success = delete_member(rlnInstance, index)
@ -392,14 +430,16 @@ proc updateValidRootQueue*(wakuRlnRelay: WakuRLNRelay, root: MerkleNode): void =
# Push the next root into the queue
wakuRlnRelay.validMerkleRoots.addLast(root)
proc insertMember*(wakuRlnRelay: WakuRLNRelay, idComm: IDCommitment): RlnRelayResult[void] =
## inserts a new id commitment into the local merkle tree, and adds the changed root to the
proc insertMembers*(wakuRlnRelay: WakuRLNRelay,
index: MembershipIndex,
idComms: seq[IDCommitment]): RlnRelayResult[void] =
## inserts a sequence of id commitments into the local merkle tree, and adds the changed root to the
## queue of valid roots
## Returns an error if the insertion fails
waku_rln_membership_insertion_duration_seconds.nanosecondTime:
let actionSucceeded = wakuRlnRelay.rlnInstance.insertMember(idComm)
let actionSucceeded = wakuRlnRelay.rlnInstance.insertMembers(index, idComms)
if not actionSucceeded:
return err("could not insert id commitment into the merkle tree")
return err("could not insert id commitments into the merkle tree")
let rootAfterUpdate = ?wakuRlnRelay.rlnInstance.getMerkleRoot()
wakuRlnRelay.updateValidRootQueue(rootAfterUpdate)
@ -453,12 +493,10 @@ proc calcMerkleRoot*(list: seq[IDCommitment]): RlnRelayResult[string] =
let rln = rlnInstance.get()
# create a Merkle tree
for i in 0..list.len-1:
var member_is_added = false
member_is_added = rln.insertMember(list[i])
doAssert(member_is_added)
let root = rln.getMerkleRoot().value().inHex
let membersAdded = rln.insertMembers(0, list)
if not membersAdded:
return err("could not insert members into the tree")
let root = rln.getMerkleRoot().value().inHex()
return ok(root)
proc createMembershipList*(n: int): RlnRelayResult[(
@ -476,6 +514,7 @@ proc createMembershipList*(n: int): RlnRelayResult[(
let rln = rlnInstance.get()
var output = newSeq[(string, string)]()
var idCommitments = newSeq[IDCommitment]()
for i in 0..n-1:
# generate a key pair
@ -483,16 +522,17 @@ proc createMembershipList*(n: int): RlnRelayResult[(
if keypairRes.isErr():
return err("could not generate a key pair: " & keypairRes.error())
let keypair = keypairRes.get()
let keyTuple = (keypair.idKey.inHex, keypair.idCommitment.inHex)
let keyTuple = (keypair.idKey.inHex(), keypair.idCommitment.inHex())
output.add(keyTuple)
# insert the key to the Merkle tree
let inserted = rln.insertMember(keypair.idCommitment)
if not inserted:
return err("could not insert the key into the Merkle tree")
idCommitments.add(keypair.idCommitment)
# Insert members into tree
let membersAdded = rln.insertMembers(0, idCommitments)
if not membersAdded:
return err("could not insert members into the tree")
let root = rln.getMerkleRoot().value().inHex
let root = rln.getMerkleRoot().value().inHex()
return ok((output, root))
proc rlnRelayStaticSetUp*(rlnRelayMembershipIndex: MembershipIndex): RlnRelayResult[(Option[seq[
@ -663,14 +703,14 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
if gap > MaxEpochGap:
# message's epoch is too old or too ahead
# accept messages whose epoch is within +-MaxEpochGap from the current epoch
debug "invalid message: epoch gap exceeds a threshold", gap = gap,
warn "invalid message: epoch gap exceeds a threshold", gap = gap,
payload = string.fromBytes(msg.payload)
waku_rln_invalid_messages_total.inc(labelValues=["invalid_epoch"])
return MessageValidationResult.Invalid
## TODO: FIXME after resolving this issue https://github.com/status-im/nwaku/issues/1247
if not rlnPeer.validateRoot(msg.proof.merkleRoot):
debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex)
debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex())
waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"])
# return MessageValidationResult.Invalid
@ -685,6 +725,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
if proofVerificationRes.isErr():
waku_rln_errors_total.inc(labelValues=["proof_verification"])
warn "invalid message: proof verification failed", payload = string.fromBytes(msg.payload)
return MessageValidationResult.Invalid
if not proofVerificationRes.value():
# invalid proof
@ -741,11 +782,9 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay, msg: var WakuMessage,
proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResult[void] =
# add members to the Merkle tree of the `rlnInstance`
## Returns an error if it cannot add any member to the Merkle tree
for i in 0..list.len-1:
let member = list[i]
let memberAdded = wakuRlnRelay.insertMember(member)
if not memberAdded.isOk():
return err(memberAdded.error())
let membersAdded = wakuRlnRelay.insertMembers(0, list)
if not membersAdded.isOk():
return err("failed to add members to the Merkle tree")
return ok()
proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler =
@ -753,53 +792,84 @@ proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler =
## TODO: check the index and the pubkey depending on
## the group update operation
var handler: GroupUpdateHandler
handler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] =
var pk: IDCommitment
try:
pk = pubkey.toIDCommitment()
except:
return err("invalid pubkey")
let isSuccessful = rlnPeer.insertMember(pk)
handler = proc(blockNumber: BlockNumber, members: seq[MembershipTuple]): RlnRelayResult[void] =
let startingIndex = members[0].index
debug "starting index", startingIndex = startingIndex, members = members.mapIt(it.idComm.inHex())
let isSuccessful = rlnPeer.insertMembers(startingIndex, members.mapIt(it.idComm))
if isSuccessful.isErr():
return err("failed to add a new member to the Merkle tree")
return err("failed to add new members to the Merkle tree")
else:
debug "new member added to the Merkle tree", pubkey=pubkey, index=index
debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex)
let membershipIndex = index.toMembershipIndex()
if rlnPeer.lastSeenMembershipIndex != membershipIndex + 1:
warn "membership index gap, may have lost connection", gap = membershipIndex - rlnPeer.lastSeenMembershipIndex
rlnPeer.lastSeenMembershipIndex = membershipIndex
debug "new members added to the Merkle tree", pubkeys=members.mapIt(it.idComm.inHex()) , startingIndex=startingIndex
debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex())
let lastIndex = members[0].index + members.len.uint - 1
let indexGap = startingIndex - rlnPeer.lastSeenMembershipIndex
if not (toSeq(startingIndex..lastIndex) == members.mapIt(it.index)):
return err("the indexes of the new members are not in order")
if indexGap != 1.uint:
warn "membership index gap, may have lost connection", lastIndex, currIndex=rlnPeer.lastSeenMembershipIndex, indexGap = indexGap
rlnPeer.lastSeenMembershipIndex = lastIndex
rlnPeer.lastProcessedBlock = blockNumber
debug "last processed block", blockNumber = blockNumber
return ok()
return handler
proc subscribeToMemberRegistrations(web3: Web3,
contractAddress: Address,
fromBlock: string = "0x0",
handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} =
## subscribes to member registrations, on a given membership group contract
## `fromBlock` indicates the block number from which the subscription starts
## `handler` is a callback that is called when a new member is registered
## the callback is called with the pubkey and the index of the new member
## TODO: need a similar proc for member deletions
var contractObj = web3.contractSender(MembershipContract, contractAddress)
proc parse*(event: type MemberRegistered,
log: JsonNode): RlnRelayResult[MembershipTuple] =
## parses the `data` parameter of the `MemberRegistered` event `log`
## returns an error if it cannot parse the `data` parameter
var pubkey: UInt256
var index: UInt256
var data: string
# Remove the 0x prefix
try:
data = strip0xPrefix(log["data"].getStr())
except CatchableError:
return err("failed to parse the data field of the MemberRegistered event: " & getCurrentExceptionMsg())
var offset = 0
try:
# Parse the pubkey
offset += decode(data, offset, pubkey)
# Parse the index
offset += decode(data, offset, index)
return ok((index: index.toMembershipIndex(),
idComm: pubkey.toIDCommitment()))
except:
return err("failed to parse the data field of the MemberRegistered event")
let onMemberRegistered = proc (pubkey: Uint256, index: Uint256) {.gcsafe.} =
debug "onRegister", pubkey = pubkey, index = index
var groupUpdateRes: RlnRelayResult[void]
try:
groupUpdateRes = handler(pubkey, index)
except Exception as err:
error "failed to handle group update", err = err.msg
if groupUpdateRes.isErr():
error "Error handling new member registration", err=groupUpdateRes.error()
type BlockTable = OrderedTable[BlockNumber, seq[MembershipTuple]]
proc getHistoricalEvents*(ethClientUri: string,
contractAddress: Address,
fromBlock: string = "0x0",
toBlock: string = "latest"): Future[RlnRelayResult[BlockTable]] {.async, gcsafe.} =
## `ethClientUri` is the URI of the Ethereum client
## `contractAddress` is the address of the contract
## `fromBlock` is the block number from which the events are fetched
## `toBlock` is the block number to which the events are fetched
## returns a table that maps block numbers to the list of members registered in that block
## returns an error if it cannot retrieve the historical events
let web3 = await newWeb3(ethClientUri)
let contract = web3.contractSender(MembershipContract, contractAddress)
# Get the historical events, and insert memberships into the tree
let historicalEvents = await contract.getJsonLogs(MemberRegistered,
fromBlock=some(fromBlock.blockId()),
toBlock=some(toBlock.blockId()))
# Create a table that maps block numbers to the list of members registered in that block
var blockTable = OrderedTable[BlockNumber, seq[MembershipTuple]]()
for log in historicalEvents:
# batch according to log.blockNumber
let blockNumber = parseHexInt(log["blockNumber"].getStr()).uint
let parsedEventRes = parse(MemberRegistered, log)
let onError = proc (err: CatchableError) =
error "Error in subscription", err=err.msg
return await contractObj.subscribe(MemberRegistered,
%*{"fromBlock": fromBlock, "address": contractAddress},
onMemberRegistered,
onError)
if parsedEventRes.isErr():
error "failed to parse the MemberRegistered event", error=parsedEventRes.error()
return err("failed to parse the MemberRegistered event")
let parsedEvent = parsedEventRes.get()
# Add the parsed event to the table
if blockTable.hasKey(blockNumber):
blockTable[blockNumber].add(parsedEvent)
else:
blockTable[blockNumber] = @[parsedEvent]
return ok(blockTable)
proc subscribeToGroupEvents*(ethClientUri: string,
ethAccountAddress: Option[Address] = none(Address),
@ -809,25 +879,61 @@ proc subscribeToGroupEvents*(ethClientUri: string,
## connects to the eth client whose URI is supplied as `ethClientUri`
## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress`
## it collects all the events starting from the given `blockNumber`
## for every received event, it calls the `handler`
## for every received block, it calls the `handler`
let web3 = await newWeb3(ethClientUri)
var latestBlock: Quantity
let contract = web3.contractSender(MembershipContract, contractAddress)
let blockTableRes = await getHistoricalEvents(ethClientUri,
contractAddress,
fromBlock=blockNumber)
if blockTableRes.isErr():
error "failed to retrieve historical events", error=blockTableRes.error
return
let blockTable = blockTableRes.get()
# Update MT by batch
for blockNumber, members in blockTable.pairs():
debug "updating the Merkle tree", blockNumber=blockNumber, members=members
let res = handler(blockNumber, members)
if res.isErr():
error "failed to update the Merkle tree", error=res.error
# We don't need the block table after this point
discard blockTable
var latestBlock: BlockNumber
let handleLog = proc(blockHeader: BlockHeader) {.async, gcsafe.} =
try:
let membershipRegistrationLogs = await contract.getJsonLogs(MemberRegistered,
blockHash = some(blockheader.hash))
if membershipRegistrationLogs.len == 0:
return
var members: seq[MembershipTuple]
for log in membershipRegistrationLogs:
let parsedEventRes = parse(MemberRegistered, log)
if parsedEventRes.isErr():
fatal "failed to parse the MemberRegistered event", error=parsedEventRes.error()
return
let parsedEvent = parsedEventRes.get()
members.add(parsedEvent)
let res = handler(blockHeader.number.uint, members)
if res.isErr():
error "failed to update the Merkle tree", error=res.error
except CatchableError:
warn "failed to get logs", error=getCurrentExceptionMsg()
return
let newHeadCallback = proc (blockheader: BlockHeader) {.gcsafe.} =
latestBlock = blockheader.number
latestBlock = blockheader.number.uint
debug "block received", blockNumber = latestBlock
# get logs from the last block
try:
asyncSpawn handleLog(blockHeader)
except CatchableError:
warn "failed to handle log: ", error=getCurrentExceptionMsg()
let newHeadErrorHandler = proc (err: CatchableError) {.gcsafe.} =
error "Error from subscription: ", err=err.msg
discard await web3.subscribeForBlockHeaders(newHeadCallback, newHeadErrorHandler)
proc startSubscription(web3: Web3) {.async, gcsafe.} =
# subscribe to the MemberRegistered events
# TODO: can do similarly for deletion events, though it is not yet supported
# TODO: add block number for reconnection logic
discard await subscribeToMemberRegistrations(web3 = web3,
contractAddress = contractAddress,
handler = handler)
await startSubscription(web3)
web3.onDisconnect = proc() =
debug "connection to ethereum node dropped", lastBlock = latestBlock
@ -924,12 +1030,10 @@ proc mountRlnRelayStatic*(node: WakuNode,
pubsubTopic: pubsubTopic,
contentTopic: contentTopic)
# add members to the Merkle tree
for index in 0..group.len-1:
let member = group[index]
let memberAdded = rlnPeer.insertMember(member)
if memberAdded.isErr():
return err("member addition to the Merkle tree failed: " & memberAdded.error())
# add members to the Merkle tree
let membersAdded = rlnPeer.insertMembers(0, group)
if membersAdded.isErr():
return err("member addition to the Merkle tree failed: " & membersAdded.error)
# adds a topic validator for the supplied pubsub topic at the relay protocol
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
@ -1096,8 +1200,8 @@ proc mount(node: WakuNode,
if mountRes.isErr():
return err("Failed to mount WakuRLNRelay: " & mountRes.error())
info "membership id key", idkey=memKeyPairOpt.get().idKey.inHex
info "membership id commitment key", idCommitmentkey=memKeyPairOpt.get().idCommitment.inHex
info "membership id key", idkey=memKeyPairOpt.get().idKey.inHex()
info "membership id commitment key", idCommitmentkey=memKeyPairOpt.get().idCommitment.inHex()
# check the correct construction of the tree by comparing the calculated root against the expected root
# no error should happen as it is already captured in the unit tests
@ -1111,7 +1215,7 @@ proc mount(node: WakuNode,
let root = rootRes.value()
if root.inHex != expectedRoot:
if root.inHex() != expectedRoot:
error "root mismatch: something went wrong not in Merkle tree construction"
debug "the calculated root", root
info "WakuRLNRelay is mounted successfully", pubsubtopic=conf.rlnRelayPubsubTopic, contentTopic=conf.rlnRelayContentTopic