From 3ba8134d10c00c2ecada4f413b36c54642cdcee5 Mon Sep 17 00:00:00 2001 From: rymnc Date: Thu, 10 Nov 2022 17:43:11 +0000 Subject: [PATCH] deploy: f86cc88592c0529c3f31b786e8787ae6cf1ba4db --- .gitignore | 8 + tests/v2/test_waku_rln_relay.nim | 82 +++-- tests/v2/test_waku_rln_relay_onchain.nim | 32 +- waku/v2/protocol/waku_rln_relay/rln.nim | 12 +- .../waku_rln_relay/waku_rln_relay_types.nim | 3 +- .../waku_rln_relay/waku_rln_relay_utils.nim | 288 ++++++++++++------ 6 files changed, 284 insertions(+), 141 deletions(-) diff --git a/.gitignore b/.gitignore index f531e937a..488f534d1 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,11 @@ testPath.txt # Nimbus Build System nimbus-build-system.paths + +# sqlite db +*.db +*.db-shm +*.db-wal +*.sqlite3 +*.sqlite3-shm +*.sqlite3-wal \ No newline at end of file diff --git a/tests/v2/test_waku_rln_relay.nim b/tests/v2/test_waku_rln_relay.nim index c4690d793..9c8b668fb 100644 --- a/tests/v2/test_waku_rln_relay.nim +++ b/tests/v2/test_waku_rln_relay.nim @@ -214,6 +214,19 @@ suite "Waku rln relay": check: deletionSuccess + test "insertMembers rln utils": + # create an RLN instance which also includes an empty Merkle tree + let rlnInstance = createRLNInstance() + require: + rlnInstance.isOk() + let rln = rlnInstance.get() + # generate a key pair + let keyPairRes = rln.membershipKeyGen() + require: + keypairRes.isOk() + check: + rln.insertMembers(0, @[keyPairRes.get().idCommitment]) + test "insertMember rln utils": # create an RLN instance which also includes an empty Merkle tree let rlnInstance = createRLNInstance() @@ -330,7 +343,7 @@ suite "Waku rln relay": let keyPairRes = rln.membershipKeyGen() require: keyPairRes.isOk() - let memberInserted = rln.insertMember(keypairRes.get().idCommitment) + let memberInserted = rln.insertMembers(0, @[keypairRes.get().idCommitment]) require: memberInserted @@ -502,7 +515,7 @@ suite "Waku rln relay": let # peer's index in the Merkle Tree - index = 5 + index = 5'u # create a membership key pair memKeysRes = membershipKeyGen(rln) @@ -511,21 +524,23 @@ suite "Waku rln relay": let memKeys = memKeysRes.get() + var members = newSeq[IDCommitment]() # Create a Merkle tree with random members - for i in 0..10: - var memberAdded: bool = false + for i in 0'u..10'u: if (i == index): # insert the current peer's pk - memberAdded = rln.insertMember(memKeys.idCommitment) + members.add(memKeys.idCommitment) else: # create a new key pair let memberKeysRes = rln.membershipKeyGen() require: memberKeysRes.isOk() - memberAdded = rln.insertMember(memberKeysRes.get().idCommitment) - # check the member is added - require: - memberAdded + members.add(memberKeysRes.get().idCommitment) + + # Batch the insert + let batchInsertRes = rln.insertMembers(0, members) + require: + batchInsertRes # prepare the message let messageBytes = "Hello".toBytes() @@ -545,7 +560,8 @@ suite "Waku rln relay": # verify the proof let verified = rln.proofVerify(data = messageBytes, - proof = proof) + proof = proof, + validRoots = @[rln.getMerkleRoot().value()]) # Ensure the proof verification did not error out @@ -561,7 +577,7 @@ suite "Waku rln relay": let # peer's index in the Merkle Tree - index = 5 + index = 5'u # create a membership key pair memKeysRes = membershipKeyGen(rln) @@ -571,17 +587,17 @@ suite "Waku rln relay": let memKeys = memKeysRes.get() # Create a Merkle tree with random members - for i in 0..10: + for i in 0'u..10'u: var memberAdded: bool = false if (i == index): # insert the current peer's pk - memberAdded = rln.insertMember(memKeys.idCommitment) + memberAdded = rln.insertMembers(i, @[memKeys.idCommitment]) else: # create a new key pair let memberKeysRes = rln.membershipKeyGen() require: memberKeysRes.isOk() - memberAdded = rln.insertMember(memberKeysRes.get().idCommitment) + memberAdded = rln.insertMembers(i, @[memberKeysRes.get().idCommitment]) # check the member is added require: memberAdded @@ -628,7 +644,7 @@ suite "Waku rln relay": let # peer's index in the Merkle Tree. - index = 5 + index = 5'u # create a membership key pair memKeysRes = membershipKeyGen(rlnRelay.rlnInstance) @@ -637,24 +653,27 @@ suite "Waku rln relay": let memKeys = memKeysRes.get() - let membershipCount = AcceptableRootWindowSize + 5 + let membershipCount: uint = AcceptableRootWindowSize + 5'u - # Create a Merkle tree with random members - for i in 0..membershipCount: - var memberIsAdded: RlnRelayResult[void] + var members = newSeq[MembershipKeyPair]() + + # Generate membership keys + for i in 0'u..membershipCount: if (i == index): # insert the current peer's pk - memberIsAdded = rlnRelay.insertMember(memKeys.idCommitment) + members.add(memKeys) else: # create a new key pair let memberKeysRes = rlnRelay.rlnInstance.membershipKeyGen() require: memberKeysRes.isOk() - memberIsAdded = rlnRelay.insertMember(memberKeysRes.get().idCommitment) - # require that the member is added - require: - memberIsAdded.isOk() - + members.add(memberKeysRes.get()) + + # Batch inserts into the tree + let insertedRes = rlnRelay.insertMembers(0, members.mapIt(it.idCommitment)) + require: + insertedRes.isOk() + # Given: # This step includes constructing a valid message with the latest merkle root # prepare the message @@ -686,11 +705,12 @@ suite "Waku rln relay": # Progress the local tree by removing members for i in 0..AcceptableRootWindowSize - 2: - discard rlnRelay.removeMember(MembershipIndex(i)) + let res = rlnRelay.removeMember(MembershipIndex(i)) # Ensure the local tree root has changed let currentMerkleRoot = rlnRelay.rlnInstance.getMerkleRoot() require: + res.isOk() currentMerkleRoot.isOk() currentMerkleRoot.value() != validProof.merkleRoot @@ -720,7 +740,7 @@ suite "Waku rln relay": let # peer's index in the Merkle Tree. - index = 6 + index = 6'u # create a membership key pair memKeysRes = membershipKeyGen(rlnRelay.rlnInstance) @@ -729,20 +749,20 @@ suite "Waku rln relay": let memKeys = memKeysRes.get() - let membershipCount = AcceptableRootWindowSize + 5 + let membershipCount: uint = AcceptableRootWindowSize + 5'u # Create a Merkle tree with random members - for i in 0..membershipCount: + for i in 0'u..membershipCount: var memberIsAdded: RlnRelayResult[void] if (i == index): # insert the current peer's pk - memberIsAdded = rlnRelay.insertMember(memKeys.idCommitment) + memberIsAdded = rlnRelay.insertMembers(i, @[memKeys.idCommitment]) else: # create a new key pair let memberKeysRes = rlnRelay.rlnInstance.membershipKeyGen() require: memberKeysRes.isOk() - memberIsAdded = rlnRelay.insertMember(memberKeysRes.get().idCommitment) + memberIsAdded = rlnRelay.insertMembers(i, @[memberKeysRes.get().idCommitment]) # require that the member is added require: memberIsAdded.isOk() diff --git a/tests/v2/test_waku_rln_relay_onchain.nim b/tests/v2/test_waku_rln_relay_onchain.nim index 2937248cf..79061d862 100644 --- a/tests/v2/test_waku_rln_relay_onchain.nim +++ b/tests/v2/test_waku_rln_relay_onchain.nim @@ -3,7 +3,7 @@ {.used.} import - std/[options, osproc, streams, strutils], + std/[options, osproc, streams, strutils, sequtils], testutils/unittests, chronos, chronicles, stint, web3, json, stew/byteutils, stew/shims/net as stewNet, libp2p/crypto/crypto, @@ -279,13 +279,15 @@ procSuite "Waku-rln-relay": debug "membership commitment key", pk2 = pk2 var events = [newFuture[void](), newFuture[void]()] - proc handler(pubkey: Uint256, index: Uint256): RlnRelayResult[void] = - debug "handler is called", pubkey = pubkey, index = index - if pubkey == pk: - events[0].complete() - if pubkey == pk2: - events[1].complete() - let isSuccessful = rlnPeer.rlnInstance.insertMember(pubkey.toIDCommitment()) + var futIndex = 0 + var handler: GroupUpdateHandler + handler = proc (blockNumber: BlockNumber, + members: seq[MembershipTuple]): RlnRelayResult[void] = + debug "handler is called", members = members + events[futIndex].complete() + futIndex += 1 + let index = members[0].index + let isSuccessful = rlnPeer.rlnInstance.insertMembers(index, members.mapIt(it.idComm)) check: isSuccessful return ok() @@ -305,7 +307,7 @@ procSuite "Waku-rln-relay": let tx2 = await contractObj.register(pk2).send(value = MembershipFee) debug "a member is registered", tx2 = tx2 - # wait for all the events to be received by the rlnPeer + # wait for the events to be processed await all(events) # release resources ----------------------- @@ -405,12 +407,12 @@ procSuite "Waku-rln-relay": # Create a group of 10 members var group = newSeq[IDCommitment]() - for i in 0..10: + for i in 0'u..10'u: var memberAdded: bool = false - if (uint(i) == index): + if (i == index): # insert the current peer's pk group.add(keyPair.idCommitment) - memberAdded = rln.insertMember(keyPair.idCommitment) + memberAdded = rln.insertMembers(i, @[keyPair.idCommitment]) doAssert(memberAdded) debug "member key", key = keyPair.idCommitment.inHex else: @@ -419,7 +421,7 @@ procSuite "Waku-rln-relay": memberKeyPairRes.isOk() let memberKeyPair = memberKeyPairRes.get() group.add(memberKeyPair.idCommitment) - let memberAdded = rln.insertMember(memberKeyPair.idCommitment) + let memberAdded = rln.insertMembers(i, @[memberKeyPair.idCommitment]) require: memberAdded debug "member key", key = memberKeyPair.idCommitment.inHex @@ -491,8 +493,8 @@ procSuite "Waku-rln-relay": # add the rln keys to the Merkle tree let - memberIsAdded1 = rln.insertMember(keyPair1.idCommitment) - memberIsAdded2 = rln.insertMember(keyPair2.idCommitment) + memberIsAdded1 = rln.insertMembers(0, @[keyPair1.idCommitment]) + memberIsAdded2 = rln.insertMembers(1, @[keyPair2.idCommitment]) require: memberIsAdded1 diff --git a/waku/v2/protocol/waku_rln_relay/rln.nim b/waku/v2/protocol/waku_rln_relay/rln.nim index 1d912a14a..1528cc163 100644 --- a/waku/v2/protocol/waku_rln_relay/rln.nim +++ b/waku/v2/protocol/waku_rln_relay/rln.nim @@ -58,12 +58,20 @@ proc set_leaf*(ctx: ptr RLN, index: uint, input_buffer: ptr Buffer): bool {.impo ## the input_buffer holds a serialized leaf of 32 bytes ## the return bool value indicates the success or failure of the operation -proc set_leaves*(ctx: ptr RLN, input_buffer: ptr Buffer): bool {.importc: "set_leaves".} +proc init_tree_with_leaves*(ctx: ptr RLN, input_buffer: ptr Buffer): bool {.importc: "init_tree_with_leaves".} ## sets multiple leaves in the tree stored by ctx to the value passed by input_buffer ## the input_buffer holds a serialized vector of leaves (32 bytes each) +## the input_buffer size is prefixed by a 8 bytes integer indicating the number of leaves ## leaves are set one after each other starting from index 0 ## the return bool value indicates the success or failure of the operation +proc set_leaves_from*(ctx: ptr RLN, index: uint, input_buffer: ptr Buffer): bool {.importc: "set_leaves_from".} +## sets multiple leaves in the tree stored by ctx to the value passed by input_buffer +## the input_buffer holds a serialized vector of leaves (32 bytes each) +## the input_buffer size is prefixed by a 8 bytes integer indicating the number of leaves +## leaves are set one after each other starting from index `index` +## the return bool value indicates the success or failure of the operation + proc reset_tree*(ctx: ptr RLN, tree_height: uint): bool {.importc: "set_tree".} ## resets the tree stored by ctx to the the empty tree (all leaves set to 0) of height tree_height ## the return bool value indicates the success or failure of the operation @@ -154,4 +162,4 @@ proc hash*(ctx: ptr RLN, ## the hash output is generated and populated inside output_buffer ## the output_buffer contains 32 bytes hash output -{.pop.} +{.pop.} \ No newline at end of file diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim index 8f726aa96..c56744199 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim @@ -95,6 +95,7 @@ type WakuRLNRelay* = ref object lastEpoch*: Epoch # the epoch of the last published rln message validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion lastSeenMembershipIndex*: MembershipIndex # the last seen membership index + lastProcessedBlock*: BlockNumber # the last processed block number type MessageValidationResult* {.pure.} = enum @@ -152,4 +153,4 @@ proc encode*(nsp: RateLimitProof): ProtoBuffer = output.finish3() - return output + return output \ No newline at end of file diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index b623a24cd..5903d8132 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -7,6 +7,7 @@ import std/[sequtils, tables, times, os, deques], chronicles, options, chronos, stint, confutils, + strutils, web3, json, web3/ethtypes, eth/keys, @@ -41,7 +42,9 @@ type WakuRlnConfig* = object type SpamHandler* = proc(wakuMessage: WakuMessage): void {.gcsafe, closure, raises: [Defect].} RegistrationHandler* = proc(txHash: string): void {.gcsafe, closure, raises: [Defect].} - GroupUpdateHandler* = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe.} + GroupUpdateHandler* = proc(blockNumber: BlockNumber, + members: seq[MembershipTuple]): RlnRelayResult[void] {.gcsafe.} + MembershipTuple* = tuple[index: MembershipIndex, idComm: IDCommitment] # membership contract interface contract(MembershipContract): @@ -158,7 +161,7 @@ proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAcco # web3.privateKey = some(ethAccountPrivateKey) var sender = web3.contractSender(MembershipContract, membershipContractAddress) # creates a Sender object with a web3 field and contract address of type Address - debug "registering an id commitment", idComm=idComm.inHex + debug "registering an id commitment", idComm=idComm.inHex() let pk = idComm.toUInt256() var txHash: TxHash @@ -185,7 +188,7 @@ proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAcco eventIdCommUint = UInt256.fromBytesBE(argumentsBytes[0..31]) eventIndex = UInt256.fromBytesBE(argumentsBytes[32..^1]) eventIdComm = eventIdCommUint.toIDCommitment() - debug "the identity commitment key extracted from tx log", eventIdComm=eventIdComm.inHex + debug "the identity commitment key extracted from tx log", eventIdComm=eventIdComm.inHex() debug "the index of registered identity commitment key", eventIndex=eventIndex if eventIdComm != idComm: @@ -353,16 +356,51 @@ proc proofVerify*(rlnInstance: ptr RLN, if not validProof: return ok(false) - - return ok(true) + else: + return ok(true) proc insertMember*(rlnInstance: ptr RLN, idComm: IDCommitment): bool = + ## inserts a member to the tree + ## returns true if the member is inserted successfully + ## returns false if the member could not be inserted var pkBuffer = toBuffer(idComm) let pkBufferPtr = addr pkBuffer # add the member to the tree - var member_is_added = update_next_member(rlnInstance, pkBufferPtr) - return member_is_added + let memberAdded = update_next_member(rlnInstance, pkBufferPtr) + return memberAdded + +proc serializeIdCommitments*(idComms: seq[IDCommitment]): seq[byte] = + ## serializes a seq of IDCommitments to a byte seq + ## the serialization is based on https://github.com/status-im/nwaku/blob/37bd29fbc37ce5cf636734e7dd410b1ed27b88c8/waku/v2/protocol/waku_rln_relay/rln.nim#L142 + ## the order of serialization is |id_commitment_len<8>|id_commitment| + var idCommsBytes = newSeq[byte]() + + # serialize the idComms, with its length prefixed + let len = toBytes(uint64(idComms.len), Endianness.littleEndian) + idCommsBytes.add(len) + + for idComm in idComms: + idCommsBytes = concat(idCommsBytes, @idComm) + + return idCommsBytes + +proc insertMembers*(rlnInstance: ptr RLN, + index: MembershipIndex, + idComms: seq[IDCommitment]): bool = + ## Insert multiple members i.e., identity commitments + ## returns true if the insertion is successful + ## returns false if any of the insertions fails + ## Note: This proc is atomic, i.e., if any of the insertions fails, all the previous insertions are rolled back + + # serialize the idComms + let idCommsBytes = serializeIdCommitments(idComms) + + var idCommsBuffer = idCommsBytes.toBuffer() + let idCommsBufferPtr = addr idCommsBuffer + # add the member to the tree + let membersAdded = set_leaves_from(rlnInstance, index, idCommsBufferPtr) + return membersAdded proc removeMember*(rlnInstance: ptr RLN, index: MembershipIndex): bool = let deletion_success = delete_member(rlnInstance, index) @@ -392,14 +430,16 @@ proc updateValidRootQueue*(wakuRlnRelay: WakuRLNRelay, root: MerkleNode): void = # Push the next root into the queue wakuRlnRelay.validMerkleRoots.addLast(root) -proc insertMember*(wakuRlnRelay: WakuRLNRelay, idComm: IDCommitment): RlnRelayResult[void] = - ## inserts a new id commitment into the local merkle tree, and adds the changed root to the +proc insertMembers*(wakuRlnRelay: WakuRLNRelay, + index: MembershipIndex, + idComms: seq[IDCommitment]): RlnRelayResult[void] = + ## inserts a sequence of id commitments into the local merkle tree, and adds the changed root to the ## queue of valid roots ## Returns an error if the insertion fails waku_rln_membership_insertion_duration_seconds.nanosecondTime: - let actionSucceeded = wakuRlnRelay.rlnInstance.insertMember(idComm) + let actionSucceeded = wakuRlnRelay.rlnInstance.insertMembers(index, idComms) if not actionSucceeded: - return err("could not insert id commitment into the merkle tree") + return err("could not insert id commitments into the merkle tree") let rootAfterUpdate = ?wakuRlnRelay.rlnInstance.getMerkleRoot() wakuRlnRelay.updateValidRootQueue(rootAfterUpdate) @@ -453,12 +493,10 @@ proc calcMerkleRoot*(list: seq[IDCommitment]): RlnRelayResult[string] = let rln = rlnInstance.get() # create a Merkle tree - for i in 0..list.len-1: - var member_is_added = false - member_is_added = rln.insertMember(list[i]) - doAssert(member_is_added) - - let root = rln.getMerkleRoot().value().inHex + let membersAdded = rln.insertMembers(0, list) + if not membersAdded: + return err("could not insert members into the tree") + let root = rln.getMerkleRoot().value().inHex() return ok(root) proc createMembershipList*(n: int): RlnRelayResult[( @@ -476,6 +514,7 @@ proc createMembershipList*(n: int): RlnRelayResult[( let rln = rlnInstance.get() var output = newSeq[(string, string)]() + var idCommitments = newSeq[IDCommitment]() for i in 0..n-1: # generate a key pair @@ -483,16 +522,17 @@ proc createMembershipList*(n: int): RlnRelayResult[( if keypairRes.isErr(): return err("could not generate a key pair: " & keypairRes.error()) let keypair = keypairRes.get() - let keyTuple = (keypair.idKey.inHex, keypair.idCommitment.inHex) + let keyTuple = (keypair.idKey.inHex(), keypair.idCommitment.inHex()) output.add(keyTuple) - # insert the key to the Merkle tree - let inserted = rln.insertMember(keypair.idCommitment) - if not inserted: - return err("could not insert the key into the Merkle tree") + idCommitments.add(keypair.idCommitment) + + # Insert members into tree + let membersAdded = rln.insertMembers(0, idCommitments) + if not membersAdded: + return err("could not insert members into the tree") - - let root = rln.getMerkleRoot().value().inHex + let root = rln.getMerkleRoot().value().inHex() return ok((output, root)) proc rlnRelayStaticSetUp*(rlnRelayMembershipIndex: MembershipIndex): RlnRelayResult[(Option[seq[ @@ -663,14 +703,14 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, if gap > MaxEpochGap: # message's epoch is too old or too ahead # accept messages whose epoch is within +-MaxEpochGap from the current epoch - debug "invalid message: epoch gap exceeds a threshold", gap = gap, + warn "invalid message: epoch gap exceeds a threshold", gap = gap, payload = string.fromBytes(msg.payload) waku_rln_invalid_messages_total.inc(labelValues=["invalid_epoch"]) return MessageValidationResult.Invalid ## TODO: FIXME after resolving this issue https://github.com/status-im/nwaku/issues/1247 if not rlnPeer.validateRoot(msg.proof.merkleRoot): - debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex) + debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex()) waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"]) # return MessageValidationResult.Invalid @@ -685,6 +725,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, if proofVerificationRes.isErr(): waku_rln_errors_total.inc(labelValues=["proof_verification"]) + warn "invalid message: proof verification failed", payload = string.fromBytes(msg.payload) return MessageValidationResult.Invalid if not proofVerificationRes.value(): # invalid proof @@ -741,11 +782,9 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay, msg: var WakuMessage, proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResult[void] = # add members to the Merkle tree of the `rlnInstance` ## Returns an error if it cannot add any member to the Merkle tree - for i in 0..list.len-1: - let member = list[i] - let memberAdded = wakuRlnRelay.insertMember(member) - if not memberAdded.isOk(): - return err(memberAdded.error()) + let membersAdded = wakuRlnRelay.insertMembers(0, list) + if not membersAdded.isOk(): + return err("failed to add members to the Merkle tree") return ok() proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = @@ -753,53 +792,84 @@ proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = ## TODO: check the index and the pubkey depending on ## the group update operation var handler: GroupUpdateHandler - handler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] = - var pk: IDCommitment - try: - pk = pubkey.toIDCommitment() - except: - return err("invalid pubkey") - let isSuccessful = rlnPeer.insertMember(pk) + handler = proc(blockNumber: BlockNumber, members: seq[MembershipTuple]): RlnRelayResult[void] = + let startingIndex = members[0].index + debug "starting index", startingIndex = startingIndex, members = members.mapIt(it.idComm.inHex()) + let isSuccessful = rlnPeer.insertMembers(startingIndex, members.mapIt(it.idComm)) if isSuccessful.isErr(): - return err("failed to add a new member to the Merkle tree") + return err("failed to add new members to the Merkle tree") else: - debug "new member added to the Merkle tree", pubkey=pubkey, index=index - debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex) - let membershipIndex = index.toMembershipIndex() - if rlnPeer.lastSeenMembershipIndex != membershipIndex + 1: - warn "membership index gap, may have lost connection", gap = membershipIndex - rlnPeer.lastSeenMembershipIndex - rlnPeer.lastSeenMembershipIndex = membershipIndex + debug "new members added to the Merkle tree", pubkeys=members.mapIt(it.idComm.inHex()) , startingIndex=startingIndex + debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex()) + let lastIndex = members[0].index + members.len.uint - 1 + let indexGap = startingIndex - rlnPeer.lastSeenMembershipIndex + if not (toSeq(startingIndex..lastIndex) == members.mapIt(it.index)): + return err("the indexes of the new members are not in order") + if indexGap != 1.uint: + warn "membership index gap, may have lost connection", lastIndex, currIndex=rlnPeer.lastSeenMembershipIndex, indexGap = indexGap + rlnPeer.lastSeenMembershipIndex = lastIndex + rlnPeer.lastProcessedBlock = blockNumber + debug "last processed block", blockNumber = blockNumber return ok() return handler -proc subscribeToMemberRegistrations(web3: Web3, - contractAddress: Address, - fromBlock: string = "0x0", - handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} = - ## subscribes to member registrations, on a given membership group contract - ## `fromBlock` indicates the block number from which the subscription starts - ## `handler` is a callback that is called when a new member is registered - ## the callback is called with the pubkey and the index of the new member - ## TODO: need a similar proc for member deletions - var contractObj = web3.contractSender(MembershipContract, contractAddress) +proc parse*(event: type MemberRegistered, + log: JsonNode): RlnRelayResult[MembershipTuple] = + ## parses the `data` parameter of the `MemberRegistered` event `log` + ## returns an error if it cannot parse the `data` parameter + var pubkey: UInt256 + var index: UInt256 + var data: string + # Remove the 0x prefix + try: + data = strip0xPrefix(log["data"].getStr()) + except CatchableError: + return err("failed to parse the data field of the MemberRegistered event: " & getCurrentExceptionMsg()) + var offset = 0 + try: + # Parse the pubkey + offset += decode(data, offset, pubkey) + # Parse the index + offset += decode(data, offset, index) + return ok((index: index.toMembershipIndex(), + idComm: pubkey.toIDCommitment())) + except: + return err("failed to parse the data field of the MemberRegistered event") - let onMemberRegistered = proc (pubkey: Uint256, index: Uint256) {.gcsafe.} = - debug "onRegister", pubkey = pubkey, index = index - var groupUpdateRes: RlnRelayResult[void] - try: - groupUpdateRes = handler(pubkey, index) - except Exception as err: - error "failed to handle group update", err = err.msg - if groupUpdateRes.isErr(): - error "Error handling new member registration", err=groupUpdateRes.error() +type BlockTable = OrderedTable[BlockNumber, seq[MembershipTuple]] +proc getHistoricalEvents*(ethClientUri: string, + contractAddress: Address, + fromBlock: string = "0x0", + toBlock: string = "latest"): Future[RlnRelayResult[BlockTable]] {.async, gcsafe.} = + ## `ethClientUri` is the URI of the Ethereum client + ## `contractAddress` is the address of the contract + ## `fromBlock` is the block number from which the events are fetched + ## `toBlock` is the block number to which the events are fetched + ## returns a table that maps block numbers to the list of members registered in that block + ## returns an error if it cannot retrieve the historical events + let web3 = await newWeb3(ethClientUri) + let contract = web3.contractSender(MembershipContract, contractAddress) + # Get the historical events, and insert memberships into the tree + let historicalEvents = await contract.getJsonLogs(MemberRegistered, + fromBlock=some(fromBlock.blockId()), + toBlock=some(toBlock.blockId())) + # Create a table that maps block numbers to the list of members registered in that block + var blockTable = OrderedTable[BlockNumber, seq[MembershipTuple]]() + for log in historicalEvents: + # batch according to log.blockNumber + let blockNumber = parseHexInt(log["blockNumber"].getStr()).uint + let parsedEventRes = parse(MemberRegistered, log) - let onError = proc (err: CatchableError) = - error "Error in subscription", err=err.msg - - return await contractObj.subscribe(MemberRegistered, - %*{"fromBlock": fromBlock, "address": contractAddress}, - onMemberRegistered, - onError) + if parsedEventRes.isErr(): + error "failed to parse the MemberRegistered event", error=parsedEventRes.error() + return err("failed to parse the MemberRegistered event") + let parsedEvent = parsedEventRes.get() + # Add the parsed event to the table + if blockTable.hasKey(blockNumber): + blockTable[blockNumber].add(parsedEvent) + else: + blockTable[blockNumber] = @[parsedEvent] + return ok(blockTable) proc subscribeToGroupEvents*(ethClientUri: string, ethAccountAddress: Option[Address] = none(Address), @@ -809,25 +879,61 @@ proc subscribeToGroupEvents*(ethClientUri: string, ## connects to the eth client whose URI is supplied as `ethClientUri` ## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress` ## it collects all the events starting from the given `blockNumber` - ## for every received event, it calls the `handler` + ## for every received block, it calls the `handler` let web3 = await newWeb3(ethClientUri) - var latestBlock: Quantity + let contract = web3.contractSender(MembershipContract, contractAddress) + + let blockTableRes = await getHistoricalEvents(ethClientUri, + contractAddress, + fromBlock=blockNumber) + if blockTableRes.isErr(): + error "failed to retrieve historical events", error=blockTableRes.error + return + let blockTable = blockTableRes.get() + # Update MT by batch + for blockNumber, members in blockTable.pairs(): + debug "updating the Merkle tree", blockNumber=blockNumber, members=members + let res = handler(blockNumber, members) + if res.isErr(): + error "failed to update the Merkle tree", error=res.error + + # We don't need the block table after this point + discard blockTable + + var latestBlock: BlockNumber + let handleLog = proc(blockHeader: BlockHeader) {.async, gcsafe.} = + try: + let membershipRegistrationLogs = await contract.getJsonLogs(MemberRegistered, + blockHash = some(blockheader.hash)) + if membershipRegistrationLogs.len == 0: + return + var members: seq[MembershipTuple] + for log in membershipRegistrationLogs: + let parsedEventRes = parse(MemberRegistered, log) + if parsedEventRes.isErr(): + fatal "failed to parse the MemberRegistered event", error=parsedEventRes.error() + return + let parsedEvent = parsedEventRes.get() + members.add(parsedEvent) + let res = handler(blockHeader.number.uint, members) + if res.isErr(): + error "failed to update the Merkle tree", error=res.error + except CatchableError: + warn "failed to get logs", error=getCurrentExceptionMsg() + return let newHeadCallback = proc (blockheader: BlockHeader) {.gcsafe.} = - latestBlock = blockheader.number + latestBlock = blockheader.number.uint debug "block received", blockNumber = latestBlock + # get logs from the last block + try: + asyncSpawn handleLog(blockHeader) + except CatchableError: + warn "failed to handle log: ", error=getCurrentExceptionMsg() + let newHeadErrorHandler = proc (err: CatchableError) {.gcsafe.} = error "Error from subscription: ", err=err.msg discard await web3.subscribeForBlockHeaders(newHeadCallback, newHeadErrorHandler) - proc startSubscription(web3: Web3) {.async, gcsafe.} = - # subscribe to the MemberRegistered events - # TODO: can do similarly for deletion events, though it is not yet supported - # TODO: add block number for reconnection logic - discard await subscribeToMemberRegistrations(web3 = web3, - contractAddress = contractAddress, - handler = handler) - - await startSubscription(web3) web3.onDisconnect = proc() = debug "connection to ethereum node dropped", lastBlock = latestBlock @@ -924,12 +1030,10 @@ proc mountRlnRelayStatic*(node: WakuNode, pubsubTopic: pubsubTopic, contentTopic: contentTopic) - # add members to the Merkle tree - for index in 0..group.len-1: - let member = group[index] - let memberAdded = rlnPeer.insertMember(member) - if memberAdded.isErr(): - return err("member addition to the Merkle tree failed: " & memberAdded.error()) + # add members to the Merkle tree + let membersAdded = rlnPeer.insertMembers(0, group) + if membersAdded.isErr(): + return err("member addition to the Merkle tree failed: " & membersAdded.error) # adds a topic validator for the supplied pubsub topic at the relay protocol # messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped @@ -1096,8 +1200,8 @@ proc mount(node: WakuNode, if mountRes.isErr(): return err("Failed to mount WakuRLNRelay: " & mountRes.error()) - info "membership id key", idkey=memKeyPairOpt.get().idKey.inHex - info "membership id commitment key", idCommitmentkey=memKeyPairOpt.get().idCommitment.inHex + info "membership id key", idkey=memKeyPairOpt.get().idKey.inHex() + info "membership id commitment key", idCommitmentkey=memKeyPairOpt.get().idCommitment.inHex() # check the correct construction of the tree by comparing the calculated root against the expected root # no error should happen as it is already captured in the unit tests @@ -1111,7 +1215,7 @@ proc mount(node: WakuNode, let root = rootRes.value() - if root.inHex != expectedRoot: + if root.inHex() != expectedRoot: error "root mismatch: something went wrong not in Merkle tree construction" debug "the calculated root", root info "WakuRLNRelay is mounted successfully", pubsubtopic=conf.rlnRelayPubsubTopic, contentTopic=conf.rlnRelayContentTopic