feat(rln-relay): group manager integration (#1496)

* feat(rln-relay): init group manager integration

fix(rln-relay): integrate group manager. todo spam and reg handlers

fix(rln-relay): decouple waku-relay and waku-rln-relay

fix(rln-relay): compiles now

fix(chat2): compilation

fix(rln-relay): wip segfault

fix(rln-relay): segfault

fix(chat2|wakunode2): use optional field

fix(rln-relay): wakunode test

fix(rln-relay): uncomment fields in proto decode

fix(rln-relay): used pragma on tests

fix(rln-relay): include cred processing

fix(rln-relay): add reg callback

fix(rln-relay): args to mount

fix(rln-relay): add timeout to waitForExit

fix(rln-relay): use osproc term instead of posix kill

fix(rln-relay): use poParentStream to prevent deadlock

fix(rln-relay): remove poParentStream, remove ganache log output

* fix(rln-relay): abstract tuple into own type
This commit is contained in:
Aaryamann Challani 2023-02-28 19:08:30 +05:30 committed by GitHub
parent 982c98c080
commit 7c9339b2e5
18 changed files with 935 additions and 2306 deletions

4
.gitignore vendored
View File

@ -37,8 +37,10 @@ node_modules/
# Ignore Jetbrains IDE files
.idea/
# RLN / keystore
rlnCredentials.txt
testPath.txt
rlnKeystore.json
# Nimbus Build System
nimbus-build-system.paths

View File

@ -9,7 +9,7 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[tables, strformat, strutils, times, json, options, random]
import std/[strformat, strutils, times, json, options, random]
import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, results],
nimcrypto/pbkdf2
@ -563,7 +563,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic,
rlnRelayContentTopic: conf.rlnRelayContentTopic,
rlnRelayMembershipIndex: conf.rlnRelayMembershipIndex,
rlnRelayMembershipIndex: some(conf.rlnRelayMembershipIndex),
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey,
@ -576,11 +576,13 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
spamHandler=some(spamHandler),
registrationHandler=some(registrationHandler))
echo "your membership index is: ", node.wakuRlnRelay.membershipIndex
echo "your rln identity trapdoor is: ", node.wakuRlnRelay.identityCredential.idTrapdoor.inHex()
echo "your rln identity nullifier is: ", node.wakuRlnRelay.identityCredential.idNullifier.inHex()
echo "your rln identity secret hash is: ", node.wakuRlnRelay.identityCredential.idSecretHash.inHex()
echo "your rln identity commitment key is: ", node.wakuRlnRelay.identityCredential.idCommitment.inHex()
let membershipIndex = node.wakuRlnRelay.groupManager.membershipIndex.get()
let identityCredential = node.wakuRlnRelay.groupManager.idCredentials.get()
echo "your membership index is: ", membershipIndex
echo "your rln identity trapdoor is: ", identityCredential.idTrapdoor.inHex()
echo "your rln identity nullifier is: ", identityCredential.idNullifier.inHex()
echo "your rln identity secret hash is: ", identityCredential.idSecretHash.inHex()
echo "your rln identity commitment key is: ", identityCredential.idCommitment.inHex()
else:
info "WakuRLNRelay is disabled"
if conf.rlnRelay:

View File

@ -397,7 +397,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic,
rlnRelayContentTopic: conf.rlnRelayContentTopic,
rlnRelayMembershipIndex: conf.rlnRelayMembershipIndex,
rlnRelayMembershipIndex: some(conf.rlnRelayMembershipIndex),
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey,

View File

@ -96,7 +96,8 @@ when defined(rln):
import
./v2/test_waku_rln_relay,
./v2/test_wakunode_rln_relay,
./v2/test_waku_rln_relay_onchain
./v2/test_rln_group_manager_onchain,
./v2/test_rln_group_manager_static
# Waku swap test suite
import

View File

@ -1,3 +1,5 @@
{.used.}
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
@ -5,7 +7,7 @@ else:
import
std/[options, osproc, streams, strutils, sequtils],
stew/results,
stew/[results, byteutils],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
@ -25,8 +27,6 @@ import
./testlib/common,
./test_utils
from posix import kill, SIGINT
proc generateCredentials(rlnInstance: ptr RLN): IdentityCredential =
let credRes = membershipKeyGen(rlnInstance)
return credRes.get()
@ -145,20 +145,18 @@ proc runGanache(): Process =
proc stopGanache(runGanache: Process) {.used.} =
let ganachePID = runGanache.processID
# We gracefully terminate Ganache daemon by sending a SIGINT signal to the runGanache PID to trigger RPC server termination and clean-up
let returnCodeSIGINT = kill(ganachePID.int32, SIGINT)
debug "Sent SIGINT to Ganache", ganachePID=ganachePID, returnCode=returnCodeSIGINT
# We wait the daemon to exit
try:
let returnCodeExit = runGanache.waitForExit()
debug "Ganache daemon terminated", returnCode=returnCodeExit
debug "Ganache daemon run log", log=runGanache.outputstream.readAll()
# We terminate Ganache daemon by sending a SIGTERM signal to the runGanache PID to trigger RPC server termination and clean-up
terminate(runGanache)
# NOTE: the below line must remain commented out, otherwise it will cause a deadlocked state
# ref: https://nim-lang.org/docs/osproc.html#waitForExit%2CProcess%2Cint
# debug "ganache logs", logs=runGanache.outputstream.readAll()
debug "Sent SIGTERM to Ganache", ganachePID=ganachePID
except:
error "Ganache daemon termination failed"
error "Ganache daemon termination failed: ", err = getCurrentExceptionMsg()
proc setup(): Future[OnchainGroupManager] {.async.} =
proc setup(signer = true): Future[OnchainGroupManager] {.async.} =
let rlnInstanceRes = createRlnInstance()
require:
rlnInstanceRes.isOk()
@ -172,14 +170,16 @@ proc setup(): Future[OnchainGroupManager] {.async.} =
let accounts = await web3.provider.eth_accounts()
web3.defaultAccount = accounts[1]
let (pk, _) = await createEthAccount()
var pk = none(string)
if signer:
let (privateKey, _) = await createEthAccount()
pk = some($privateKey)
let onchainConfig = OnchainGroupManagerConfig(ethClientUrl: EthClient,
ethContractAddress: $contractAddress,
ethPrivateKey: some($pk))
let manager {.used.} = OnchainGroupManager(config: onchainConfig,
rlnInstance: rlnInstance)
let manager = OnchainGroupManager(ethClientUrl: EthClient,
ethContractAddress: $contractAddress,
ethPrivateKey: pk,
rlnInstance: rlnInstance,
saveKeystore: false)
return manager
@ -192,9 +192,9 @@ suite "Onchain group manager":
await manager.init()
check:
manager.config.ethRpc.isSome()
manager.config.rlnContract.isSome()
manager.config.membershipFee.isSome()
manager.ethRpc.isSome()
manager.rlnContract.isSome()
manager.membershipFee.isSome()
manager.initialized
asyncTest "startGroupSync: should start group sync":
@ -211,9 +211,7 @@ suite "Onchain group manager":
asyncTest "startGroupSync: should sync to the state of the group":
let manager = await setup()
let credentials = generateCredentials(manager.rlnInstance)
manager.idCredentials = some(credentials)
await manager.init()
let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot()
@ -221,21 +219,22 @@ suite "Onchain group manager":
merkleRootBeforeRes.isOk()
let merkleRootBefore = merkleRootBeforeRes.get()
let future = newFuture[void]("startGroupSync")
let fut = newFuture[void]("startGroupSync")
proc generateCallback(fut: Future[void], idCommitment: IDCommitment): OnRegisterCallback =
proc generateCallback(fut: Future[void]): OnRegisterCallback =
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
require:
registrations.len == 1
registrations[0].idCommitment == idCommitment
registrations[0].idCommitment == manager.idCredentials.get().idCommitment
registrations[0].index == 0
fut.complete()
return callback
manager.onRegister(generateCallback(future, credentials.idCommitment))
manager.onRegister(generateCallback(fut))
await manager.startGroupSync()
await future
await fut
let merkleRootAfterRes = manager.rlnInstance.getMerkleRoot()
require:
@ -260,12 +259,11 @@ suite "Onchain group manager":
proc generateCallback(futs: array[0..4, Future[system.void]], credentials: seq[IdentityCredential]): OnRegisterCallback =
var futureIndex = 0
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
require:
registrations.len == 1
registrations[0].idCommitment == credentials[futureIndex].idCommitment
registrations[0].index == MembershipIndex(futureIndex)
futs[futureIndex].complete()
futureIndex += 1
if registrations.len == 1 and
registrations[0].idCommitment == credentials[futureIndex].idCommitment and
registrations[0].index == MembershipIndex(futureIndex + 1):
futs[futureIndex].complete()
futureIndex += 1
return callback
manager.onRegister(generateCallback(futures, credentials))
@ -292,7 +290,7 @@ suite "Onchain group manager":
await manager.register(dummyCommitment)
asyncTest "register: should register successfully":
let manager = await setup()
let manager = await setup(false)
await manager.init()
await manager.startGroupSync()
@ -311,9 +309,8 @@ suite "Onchain group manager":
manager.latestIndex == 1
asyncTest "register: callback is called":
let manager = await setup()
let manager = await setup(false)
var callbackCalled = false
let idCommitment = generateCredentials(manager.rlnInstance).idCommitment
let fut = newFuture[void]()
@ -323,7 +320,6 @@ suite "Onchain group manager":
registrations.len == 1
registrations[0].idCommitment == idCommitment
registrations[0].index == 0
callbackCalled = true
fut.complete()
manager.onRegister(callback)
@ -333,8 +329,6 @@ suite "Onchain group manager":
await manager.register(idCommitment)
await fut
check:
callbackCalled
asyncTest "withdraw: should guard against uninitialized state":
let manager = await setup()
@ -343,6 +337,146 @@ suite "Onchain group manager":
expect(ValueError):
await manager.withdraw(idSecretHash)
asyncTest "validateRoot: should validate good root":
let manager = await setup()
await manager.init()
let fut = newFuture[void]()
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
registrations[0].idCommitment == manager.idCredentials.get().idCommitment and
registrations[0].index == 0:
fut.complete()
manager.onRegister(callback)
await manager.startGroupSync()
await fut
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let validProofRes = manager.generateProof(data = messageBytes,
epoch = epoch)
require:
validProofRes.isOk()
let validProof = validProofRes.get()
# validate the root (should be true)
let validated = manager.validateRoot(validProof.merkleRoot)
check:
validated
asyncTest "validateRoot: should reject bad root":
let manager = await setup()
await manager.init()
await manager.startGroupSync()
let idCredential = generateCredentials(manager.rlnInstance)
## Assume the registration occured out of band
manager.idCredentials = some(idCredential)
manager.membershipIndex = some(MembershipIndex(0))
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let validProofRes = manager.generateProof(data = messageBytes,
epoch = epoch)
require:
validProofRes.isOk()
let validProof = validProofRes.get()
# validate the root (should be false)
let validated = manager.validateRoot(validProof.merkleRoot)
check:
validated == false
asyncTest "verifyProof: should verify valid proof":
let manager = await setup()
await manager.init()
let fut = newFuture[void]()
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
registrations[0].idCommitment == manager.idCredentials.get().idCommitment and
registrations[0].index == 0:
fut.complete()
manager.onRegister(callback)
await manager.startGroupSync()
await fut
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let validProofRes = manager.generateProof(data = messageBytes,
epoch = epoch)
require:
validProofRes.isOk()
let validProof = validProofRes.get()
# verify the proof (should be true)
let verifiedRes = manager.verifyProof(messageBytes, validProof)
require:
verifiedRes.isOk()
check:
verifiedRes.get()
asyncTest "verifyProof: should reject invalid proof":
let manager = await setup()
await manager.init()
await manager.startGroupSync()
let idCredential = generateCredentials(manager.rlnInstance)
await manager.register(idCredential.idCommitment)
let idCredential2 = generateCredentials(manager.rlnInstance)
## Assume the registration occured out of band
manager.idCredentials = some(idCredential2)
manager.membershipIndex = some(MembershipIndex(0))
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let invalidProofRes = manager.generateProof(data = messageBytes,
epoch = epoch)
require:
invalidProofRes.isOk()
let invalidProof = invalidProofRes.get()
# verify the proof (should be false)
let verifiedRes = manager.verifyProof(messageBytes, invalidProof)
require:
verifiedRes.isOk()
check:
verifiedRes.get() == false
################################
## Terminating/removing Ganache
################################

View File

@ -1,3 +1,5 @@
{.used.}
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
@ -38,12 +40,10 @@ suite "Static group manager":
let rlnInstance = rlnInstanceRes.get()
let credentials = generateCredentials(rlnInstance, 10)
let staticConfig = StaticGroupManagerConfig(groupSize: 10,
membershipIndex: 5,
groupKeys: credentials)
let manager {.used.} = StaticGroupManager(config: staticConfig,
rlnInstance: rlnInstance)
let manager {.used.} = StaticGroupManager(rlnInstance: rlnInstance,
groupSize: 10,
membershipIndex: some(MembershipIndex(5)),
groupKeys: credentials)
asyncTest "should initialize successfully":
let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot()
@ -58,34 +58,33 @@ suite "Static group manager":
let merkleRootAfter = merkleRootAfterRes.get()
check:
manager.idCredentials.isSome()
manager.config.groupKeys.len == 10
manager.config.groupSize == 10
manager.config.membershipIndex == 5
manager.config.groupKeys[5] == manager.idCredentials.get()
manager.groupKeys.len == 10
manager.groupSize == 10
manager.membershipIndex == some(MembershipIndex(5))
manager.groupKeys[5] == manager.idCredentials.get()
manager.latestIndex == 9
merkleRootAfter.inHex() != merkleRootBefore.inHex()
asyncTest "startGroupSync: should start group sync":
await manager.init()
require:
manager.validRoots.len() == 1
manager.rlnInstance.getMerkleRoot().get() == manager.validRoots[0]
await manager.startGroupSync()
asyncTest "startGroupSync: should guard against uninitialized state":
let staticConfig = StaticGroupManagerConfig(groupSize: 0,
membershipIndex: 0,
groupKeys: @[])
let manager = StaticGroupManager(config: staticConfig,
let manager = StaticGroupManager(groupSize: 0,
membershipIndex: some(MembershipIndex(0)),
groupKeys: @[],
rlnInstance: rlnInstance)
expect(ValueError):
await manager.startGroupSync()
asyncTest "register: should guard against uninitialized state":
let staticConfig = StaticGroupManagerConfig(groupSize: 0,
membershipIndex: 0,
groupKeys: @[])
let manager = StaticGroupManager(config: staticConfig,
let manager = StaticGroupManager(groupSize: 0,
membershipIndex: some(MembershipIndex(0)),
groupKeys: @[],
rlnInstance: rlnInstance)
let dummyCommitment = default(IDCommitment)

View File

@ -11,79 +11,14 @@ import
stint,
libp2p/crypto/crypto
import
../../waku/v2/node/waku_node,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_rln_relay,
../../waku/v2/protocol/waku_keystore,
./testlib/waku2
./testlib/common
const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto"
const RlnRelayContentTopic = "waku/2/rlnrelay/proto"
suite "Waku rln relay":
asyncTest "mount waku-rln-relay in the off-chain mode":
let
nodeKey = generateSecp256k1Key()
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await node.start()
# preparing inputs to mount rln-relay
# create a group of 100 membership keys
let memListRes = createMembershipList(100)
require:
memListRes.isOk()
let (groupCredentials, root) = memListRes.get()
require:
groupCredentials.len == 100
let
# convert the keys to IdentityCredential structs
groupIdCredentialsRes = groupCredentials.toIdentityCredentials()
require:
groupIdCredentialsRes.isOk()
let
groupIdCredentials = groupIdCredentialsRes.get()
# extract the id commitments
groupIDCommitments = groupIdCredentials.mapIt(it.idCommitment)
debug "groupIdCredentials", groupIdCredentials
debug "groupIDCommitments", groupIDCommitments
# index indicates the position of a membership credential in the static list of group keys i.e., groupIdCredentials
# the corresponding credential will be used to mount rlnRelay on the current node
# index also represents the index of the leaf in the Merkle tree that contains node's commitment key
let index = MembershipIndex(5)
# -------- mount rln-relay in the off-chain mode
await node.mountRelay(@[RlnRelayPubsubTopic])
let mountRes = node.wakuRelay.mountRlnRelayStatic(group = groupIDCommitments,
memIdCredential = groupIdCredentials[index],
memIndex = index,
pubsubTopic = RlnRelayPubsubTopic,
contentTopic = RlnRelayContentTopic)
require:
mountRes.isOk()
let wakuRlnRelay = mountRes.get()
# get the root of Merkle tree which is constructed inside the mountRlnRelay proc
let calculatedRootRes = wakuRlnRelay.rlnInstance.getMerkleRoot()
require:
calculatedRootRes.isOk()
let calculatedRoot = calculatedRootRes.get().inHex()
debug "calculated root by mountRlnRelay", calculatedRoot
# this part checks whether the Merkle tree is constructed correctly inside the mountRlnRelay proc
# this check is done by comparing the tree root resulted from mountRlnRelay i.e., calculatedRoot
# against the root which is the expected root
check:
calculatedRoot == root
await node.stop()
suite "Waku rln relay":
test "key_gen Nim Wrappers":
@ -436,9 +371,14 @@ suite "Waku rln relay":
hash.inHex()
test "create a list of membership keys and construct a Merkle tree based on the list":
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
let
groupSize = 100
memListRes = createMembershipList(groupSize)
memListRes = rln.createMembershipList(groupSize)
require:
memListRes.isOk()
@ -452,7 +392,7 @@ suite "Waku rln relay":
list.len == groupSize # check the number of keys
root.len == HashHexSize # check the size of the calculated tree root
test "check correctness of toIdentityCredentials and calcMerkleRoot":
test "check correctness of toIdentityCredentials":
let groupKeys = StaticGroupKeys
# create a set of IdentityCredentials objects from groupKeys
@ -464,12 +404,21 @@ suite "Waku rln relay":
# extract the id commitments
let groupIDCommitments = groupIdCredentials.mapIt(it.idCommitment)
# calculate the Merkle tree root out of the extracted id commitments
let rootRes = calcMerkleRoot(groupIDCommitments)
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
# create a Merkle tree
let membersAdded = rln.insertMembers(0, groupIDCommitments)
require:
membersAdded
let rootRes = rln.getMerkleRoot()
require:
rootRes.isOk()
let root = rootRes.get()
let root = rootRes.get().inHex()
debug "groupIdCredentials", groupIdCredentials
debug "groupIDCommitments", groupIDCommitments
@ -516,314 +465,6 @@ suite "Waku rln relay":
check:
decodednsp.value == rateLimitProof
test "test proofVerify and proofGen for a valid proof":
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
let
# peer's index in the Merkle Tree
index = 5'u
# create an identity credential
idCredentialRes = membershipKeyGen(rln)
require:
idCredentialRes.isOk()
let idCredential = idCredentialRes.get()
var members = newSeq[IDCommitment]()
# Create a Merkle tree with random members
for i in 0'u..10'u:
if (i == index):
# insert the current peer's pk
members.add(idCredential.idCommitment)
else:
# create a new identity credential
let idCredentialRes = rln.membershipKeyGen()
require:
idCredentialRes.isOk()
members.add(idCredentialRes.get().idCommitment)
# Batch the insert
let batchInsertRes = rln.insertMembers(0, members)
require:
batchInsertRes
# prepare the message
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch", epochHex = epoch.inHex()
# generate proof
let proofRes = rln.proofGen(data = messageBytes,
memKeys = idCredential,
memIndex = MembershipIndex(index),
epoch = epoch)
require:
proofRes.isOk()
let proof = proofRes.value
# verify the proof
let verified = rln.proofVerify(data = messageBytes,
proof = proof,
validRoots = @[rln.getMerkleRoot().value()])
# Ensure the proof verification did not error out
require:
verified.isOk()
check:
verified.value() == true
test "test proofVerify and proofGen for an invalid proof":
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
let
# peer's index in the Merkle Tree
index = 5'u
# create an identity credential
idCredentialRes = membershipKeyGen(rln)
require:
idCredentialRes.isOk()
let idCredential = idCredentialRes.get()
# Create a Merkle tree with random members
for i in 0'u..10'u:
var memberAdded: bool = false
if (i == index):
# insert the current peer's pk
memberAdded = rln.insertMembers(i, @[idCredential.idCommitment])
else:
# create a new identity credential
let idCredentialRes = rln.membershipKeyGen()
require:
idCredentialRes.isOk()
memberAdded = rln.insertMembers(i, @[idCredentialRes.get().idCommitment])
# check the member is added
require:
memberAdded
# prepare the message
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
let badIndex = 4
# generate proof
let proofRes = rln.proofGen(data = messageBytes,
memKeys = idCredential,
memIndex = MembershipIndex(badIndex),
epoch = epoch)
require:
proofRes.isOk()
let proof = proofRes.value
# verify the proof (should not be verified) against the internal RLN tree root
let verified = rln.proofVerify(data = messageBytes,
proof = proof,
validRoots = @[rln.getMerkleRoot().value()])
require:
verified.isOk()
check:
verified.value() == false
test "validate roots which are part of the acceptable window":
# Setup:
# This step consists of creating the rln instance and waku-rln-relay,
# Inserting members, and creating a valid proof with the merkle root
# create an RLN instance
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
let rlnRelay = WakuRLNRelay(rlnInstance:rln)
let
# peer's index in the Merkle Tree.
index = 5'u
# create an identity credential
idCredentialRes = membershipKeyGen(rlnRelay.rlnInstance)
require:
idCredentialRes.isOk()
let idCredential = idCredentialRes.get()
let membershipCount: uint = AcceptableRootWindowSize + 5'u
var members = newSeq[IdentityCredential]()
# Generate membership keys
for i in 0'u..membershipCount:
if (i == index):
# insert the current peer's pk
members.add(idCredential)
else:
# create a new identity credential
let idCredentialRes = rlnRelay.rlnInstance.membershipKeyGen()
require:
idCredentialRes.isOk()
members.add(idCredentialRes.get())
# Batch inserts into the tree
let insertedRes = rlnRelay.insertMembers(0, members.mapIt(it.idCommitment))
require:
insertedRes.isOk()
# Given:
# This step includes constructing a valid message with the latest merkle root
# prepare the message
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let validProofRes = rlnRelay.rlnInstance.proofGen(data = messageBytes,
memKeys = idCredential,
memIndex = MembershipIndex(index),
epoch = epoch)
require:
validProofRes.isOk()
let validProof = validProofRes.value
# validate the root (should be true)
let verified = rlnRelay.validateRoot(validProof.merkleRoot)
require:
verified == true
# When:
# This test depends on the local merkle tree root being part of a
# acceptable set of roots, which is denoted by AcceptableRootWindowSize
# The following action is equivalent to a member being removed upon listening to the events emitted by the contract
# Progress the local tree by removing members
for i in 0..AcceptableRootWindowSize - 2:
let res = rlnRelay.removeMember(MembershipIndex(i))
# Ensure the local tree root has changed
let currentMerkleRoot = rlnRelay.rlnInstance.getMerkleRoot()
require:
res.isOk()
currentMerkleRoot.isOk()
currentMerkleRoot.value() != validProof.merkleRoot
# Then:
# we try to verify a root against this window,
# which should return true
let olderRootVerified = rlnRelay.validateRoot(validProof.merkleRoot)
check:
olderRootVerified == true
test "invalidate roots which are not part of the acceptable window":
# Setup:
# This step consists of creating the rln instance and waku-rln-relay,
# Inserting members, and creating a valid proof with the merkle root
require:
AcceptableRootWindowSize < 10
# create an RLN instance
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
let rlnRelay = WakuRLNRelay(rlnInstance:rln)
let
# peer's index in the Merkle Tree.
index = 6'u
# create an identity credential
idCredentialRes = membershipKeyGen(rlnRelay.rlnInstance)
require:
idCredentialRes.isOk()
let idCredential = idCredentialRes.get()
let membershipCount: uint = AcceptableRootWindowSize + 5'u
# Create a Merkle tree with random members
for i in 0'u..membershipCount:
var memberIsAdded: RlnRelayResult[void]
if (i == index):
# insert the current peer's pk
memberIsAdded = rlnRelay.insertMembers(i, @[idCredential.idCommitment])
else:
# create a new identity credential
let idCredentialRes = rlnRelay.rlnInstance.membershipKeyGen()
require:
idCredentialRes.isOk()
memberIsAdded = rlnRelay.insertMembers(i, @[idCredentialRes.get().idCommitment])
# require that the member is added
require:
memberIsAdded.isOk()
# Given:
# This step includes constructing a valid message with the latest merkle root
# prepare the message
let messageBytes = "Hello".toBytes()
# prepare the epoch
let epoch = default(Epoch)
debug "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let validProofRes = rlnRelay.rlnInstance.proofGen(data = messageBytes,
memKeys = idCredential,
memIndex = MembershipIndex(index),
epoch = epoch)
require:
validProofRes.isOk()
let validProof = validProofRes.value
# validate the root (should be true)
let verified = rlnRelay.validateRoot(validProof.merkleRoot)
require:
verified == true
# When:
# This test depends on the local merkle tree root being part of a
# acceptable set of roots, which is denoted by AcceptableRootWindowSize
# The following action is equivalent to a member being removed upon listening to the events emitted by the contract
# Progress the local tree by removing members
for i in 0..AcceptableRootWindowSize:
discard rlnRelay.removeMember(MembershipIndex(i))
# Ensure the local tree root has changed
let currentMerkleRoot = rlnRelay.rlnInstance.getMerkleRoot()
require:
currentMerkleRoot.isOk()
currentMerkleRoot.value() != validProof.merkleRoot
# Then:
# we try to verify a proof against this window,
# which should return false
let olderRootVerified = rlnRelay.validateRoot(validProof.merkleRoot)
check:
olderRootVerified == false
test "toEpoch and fromEpoch consistency check":
# check edge cases
let
@ -913,47 +554,17 @@ suite "Waku rln relay":
# it is a duplicate
result3.value == true
test "validateMessage test":
# setup a wakurlnrelay peer with a static group----------
# create a group of 100 membership keys
let memListRes = createMembershipList(100)
require:
memListRes.isOk()
let
(groupKeys, _) = memListRes.get()
# convert the keys to IdentityCredential structs
groupIdCredentialsRes = groupKeys.toIdentityCredentials()
require:
groupIdCredentialsRes.isOk()
let groupIdCredentials = groupIdCredentialsRes.get()
# extract the id commitments
let groupIDCommitments = groupIdCredentials.mapIt(it.idCommitment)
debug "groupIdCredentials", groupIdCredentials
debug "groupIDCommitments", groupIDCommitments
# index indicates the position of an identity credential in the static list of group keys i.e., groupIdCredentials
# the corresponding identity credential will be used to mount rlnRelay on the current node
# index also represents the index of the leaf in the Merkle tree that contains node's commitment key
asyncTest "validateMessage test":
let index = MembershipIndex(5)
# create an RLN instance
let rlnInstance = createRLNInstance()
let rlnConf = WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: RlnRelayPubsubTopic,
rlnRelayContentTopic: RlnRelayContentTopic,
rlnRelayMembershipIndex: some(index.uint))
let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf)
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
let
wakuRlnRelay = WakuRLNRelay(membershipIndex: index,
identityCredential: groupIdCredentials[index], rlnInstance: rln)
# add members
let commitmentAddRes = wakuRlnRelay.addAll(groupIDCommitments)
require:
commitmentAddRes.isOk()
wakuRlnRelayRes.isOk()
let wakuRlnRelay = wakuRlnRelayRes.get()
# get the current epoch time
let time = epochTime()

View File

@ -1,634 +0,0 @@
# contains rln-relay tests that require interaction with Ganache i.e., onchain tests
{.used.}
import
std/[options, osproc, streams, strutils, sequtils],
testutils/unittests, chronos, chronicles, stint, web3, json,
stew/byteutils, stew/shims/net as stewNet,
libp2p/crypto/crypto,
eth/keys,
../../waku/v2/protocol/waku_keystore,
../../waku/v2/protocol/waku_rln_relay,
../../waku/v2/node/waku_node,
./testlib/common,
./testlib/waku2,
./test_utils
from posix import kill, SIGINT
const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto"
const RlnRelayContentTopic = "waku/2/rlnrelay/proto"
# contract ABI
contract(MembershipContract):
proc register(pubkey: Uint256) # external payable
proc MemberRegistered(pubkey: Uint256, index: Uint256) {.event.}
# proc registerBatch(pubkeys: seq[Uint256]) # external payable
# proc withdraw(secret: Uint256, pubkeyIndex: Uint256, receiver: Address)
# proc withdrawBatch( secrets: seq[Uint256], pubkeyIndex: seq[Uint256], receiver: seq[Address])
# a util function used for testing purposes
# it deploys membership contract on Ganache (or any Eth client available on EthClient address)
# must be edited if used for a different contract than membership contract
proc uploadRLNContract*(ethClientAddress: string): Future[Address] {.async.} =
let web3 = await newWeb3(ethClientAddress)
debug "web3 connected to", ethClientAddress
# fetch the list of registered accounts
let accounts = await web3.provider.eth_accounts()
web3.defaultAccount = accounts[1]
let add = web3.defaultAccount
debug "contract deployer account address ", add
let balance = await web3.provider.eth_getBalance(web3.defaultAccount, "latest")
debug "Initial account balance: ", balance
# deploy the poseidon hash contract and gets its address
let
hasherReceipt = await web3.deployContract(PoseidonHasherCode)
hasherAddress = hasherReceipt.contractAddress.get
debug "hasher address: ", hasherAddress
# encode membership contract inputs to 32 bytes zero-padded
let
membershipFeeEncoded = encode(MembershipFee).data
depthEncoded = encode(MerkleTreeDepth.u256).data
hasherAddressEncoded = encode(hasherAddress).data
# this is the contract constructor input
contractInput = membershipFeeEncoded & depthEncoded & hasherAddressEncoded
debug "encoded membership fee: ", membershipFeeEncoded
debug "encoded depth: ", depthEncoded
debug "encoded hasher address: ", hasherAddressEncoded
debug "encoded contract input:", contractInput
# deploy membership contract with its constructor inputs
let receipt = await web3.deployContract(MembershipContractCode,
contractInput = contractInput)
let contractAddress = receipt.contractAddress.get
debug "Address of the deployed membership contract: ", contractAddress
let newBalance = await web3.provider.eth_getBalance(web3.defaultAccount, "latest")
debug "Account balance after the contract deployment: ", newBalance
await web3.close()
debug "disconnected from ", ethClientAddress
return contractAddress
proc createEthAccount(): Future[(keys.PrivateKey, Address)] {.async.} =
let web3 = await newWeb3(EthClient)
let accounts = await web3.provider.eth_accounts()
let gasPrice = int(await web3.provider.eth_gasPrice())
web3.defaultAccount = accounts[0]
let pk = keys.PrivateKey.random(rng[])
let acc = Address(toCanonicalAddress(pk.toPublicKey()))
var tx:EthSend
tx.source = accounts[0]
tx.value = some(ethToWei(10.u256))
tx.to = some(acc)
tx.gasPrice = some(gasPrice)
# Send 10 eth to acc
discard await web3.send(tx)
let balance = await web3.provider.eth_getBalance(acc, "latest")
assert(balance == ethToWei(10.u256))
return (pk, acc)
# Installs Ganache Daemon
proc installGanache() =
# We install Ganache.
# Packages will be installed to the ./build folder through the --prefix option
let installGanache = startProcess("npm", args = ["install", "ganache", "--prefix", "./build"], options = {poUsePath})
let returnCode = installGanache.waitForExit()
debug "Ganache install log", returnCode=returnCode, log=installGanache.outputstream.readAll()
# Uninstalls Ganache Daemon
proc uninstallGanache() =
# We uninstall Ganache
# Packages will be uninstalled from the ./build folder through the --prefix option.
# Passed option is
# --save: Package will be removed from your dependencies.
# See npm documentation https://docs.npmjs.com/cli/v6/commands/npm-uninstall for further details
let uninstallGanache = startProcess("npm", args = ["uninstall", "ganache", "--save", "--prefix", "./build"], options = {poUsePath})
let returnCode = uninstallGanache.waitForExit()
debug "Ganache uninstall log", returnCode=returnCode, log=uninstallGanache.outputstream.readAll()
# Runs Ganache daemon
proc runGanache(): Process =
# We run directly "node node_modules/ganache/dist/node/cli.js" rather than using "npx ganache", so that the daemon does not spawn in a new child process.
# In this way, we can directly send a SIGINT signal to the corresponding PID to gracefully terminate Ganache without dealing with multiple processes.
# Passed options are
# --port Port to listen on.
# --miner.blockGasLimit Sets the block gas limit in WEI.
# --wallet.defaultBalance The default account balance, specified in ether.
# See ganache documentation https://www.npmjs.com/package/ganache for more details
let runGanache = startProcess("node", args = ["./build/node_modules/ganache/dist/node/cli.js", "--port", "8540", "--miner.blockGasLimit", "300000000000000", "--wallet.defaultBalance", "10000"], options = {poUsePath})
let ganachePID = runGanache.processID
# We read stdout from Ganache to see when daemon is ready
var ganacheStartLog: string
var cmdline: string
while true:
if runGanache.outputstream.readLine(cmdline):
ganacheStartLog.add(cmdline)
if cmdline.contains("Listening on 127.0.0.1:8540"):
break
debug "Ganache daemon is running and ready", pid=ganachePID, startLog=ganacheStartLog
return runGanache
# Stops Ganache daemon
proc stopGanache(runGanache: Process) =
let ganachePID = runGanache.processID
# We gracefully terminate Ganache daemon by sending a SIGINT signal to the runGanache PID to trigger RPC server termination and clean-up
let returnCodeSIGINT = kill(ganachePID.int32, SIGINT)
debug "Sent SIGINT to Ganache", ganachePID=ganachePID, returnCode=returnCodeSIGINT
# We wait the daemon to exit
let returnCodeExit = runGanache.waitForExit()
debug "Ganache daemon terminated", returnCode=returnCodeExit
debug "Ganache daemon run log", log=runGanache.outputstream.readAll()
procSuite "Waku-rln-relay":
################################
## Installing/running Ganache
################################
# We install Ganache
installGanache()
# We run Ganache
let runGanache = runGanache()
asyncTest "event subscription":
# preparation ------------------------------
debug "ethereum client address", EthClient
let contractAddress = await uploadRLNContract(EthClient)
# connect to the eth client
let web3 = await newWeb3(EthClient)
debug "web3 connected to", EthClient
# fetch the list of registered accounts
let accounts = await web3.provider.eth_accounts()
web3.defaultAccount = accounts[1]
debug "contract deployer account address ",
defaultAccount = web3.defaultAccount
# prepare a contract sender to interact with it
let contractObj = web3.contractSender(MembershipContract,
contractAddress) # creates a Sender object with a web3 field and contract address of type Address
# create an RLN instance
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
# generate the membership keys
let identityCredentialRes = membershipKeyGen(rlnInstance.get())
require:
identityCredentialRes.isOk()
let identityCredential = identityCredentialRes.get()
let pk = identityCredential.idCommitment.toUInt256()
debug "membership commitment key", pk = pk
# test ------------------------------
let fut = newFuture[void]()
let s = await contractObj.subscribe(MemberRegistered, %*{"fromBlock": "0x0",
"address": contractAddress}) do(
idCommitment: Uint256, index: Uint256){.raises: [Defect], gcsafe.}:
try:
debug "onRegister", idCommitment = idCommitment, index = index
require:
idCommitment == pk
fut.complete()
except Exception as err:
# chronos still raises exceptions which inherit directly from Exception
doAssert false, err.msg
do (err: CatchableError):
echo "Error from subscription: ", err.msg
# register a member
let tx = await contractObj.register(pk).send(value = MembershipFee)
debug "a member is registered", tx = tx
# wait for the event to be received
await fut
# release resources -----------------------
await web3.close()
asyncTest "dynamic group management":
# preparation ------------------------------
debug "ethereum client address", EthClient
let contractAddress = await uploadRLNContract(EthClient)
# connect to the eth client
let web3 = await newWeb3(EthClient)
debug "web3 connected to", EthClient
# fetch the list of registered accounts
let accounts = await web3.provider.eth_accounts()
web3.defaultAccount = accounts[1]
debug "contract deployer account address ",
defaultAccount = web3.defaultAccount
# prepare a contract sender to interact with it
let contractObj = web3.contractSender(MembershipContract,
contractAddress) # creates a Sender object with a web3 field and contract address of type Address
# test ------------------------------
# create an RLN instance
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
let idCredentialRes = rln.membershipKeyGen()
require:
idCredentialRes.isOk()
let idCredential = idCredentialRes.get()
let pk = idCredential.idCommitment.toUInt256()
debug "membership commitment key", pk = pk
# initialize the WakuRLNRelay
let rlnPeer = WakuRLNRelay(identityCredential: idCredential,
membershipIndex: MembershipIndex(0),
ethClientAddress: EthClient,
ethAccountAddress: some(accounts[0]),
membershipContractAddress: contractAddress,
rlnInstance: rln)
# generate another identity credential
let idCredential2Res = rln.membershipKeyGen()
require:
idCredential2Res.isOk()
let idCredential2 = idCredential2Res.get()
let pk2 = idCredential2.idCommitment.toUInt256()
debug "membership commitment key", pk2 = pk2
let events = [newFuture[void](), newFuture[void]()]
var futIndex = 0
var handler: GroupUpdateHandler
handler = proc (blockNumber: BlockNumber,
members: seq[MembershipTuple]): RlnRelayResult[void] =
debug "handler is called", members = members
events[futIndex].complete()
futIndex += 1
let index = members[0].index
let insertRes = rlnPeer.insertMembers(index, members.mapIt(it.idComm))
check:
insertRes.isOk()
return ok()
# mount the handler for listening to the contract events
await subscribeToGroupEvents(ethClientUri = EthClient,
ethAccountAddress = some(accounts[0]),
contractAddress = contractAddress,
blockNumber = "0x0",
handler = handler)
# register a member to the contract
let tx = await contractObj.register(pk).send(value = MembershipFee)
debug "a member is registered", tx = tx
# register another member to the contract
let tx2 = await contractObj.register(pk2).send(value = MembershipFee)
debug "a member is registered", tx2 = tx2
# wait for the events to be processed
await allFutures(events)
# release resources -----------------------
await web3.close()
asyncTest "insert a key to the membership contract":
# preparation ------------------------------
debug "ethereum client address", EthClient
let contractAddress = await uploadRLNContract(EthClient)
# connect to the eth client
let web3 = await newWeb3(EthClient)
debug "web3 connected to", EthClient
# fetch the list of registered accounts
let accounts = await web3.provider.eth_accounts()
web3.defaultAccount = accounts[1]
let add = web3.defaultAccount
debug "contract deployer account address ", add
# prepare a contract sender to interact with it
let sender = web3.contractSender(MembershipContract,
contractAddress) # creates a Sender object with a web3 field and contract address of type Address
# send takes the following parameters, c: ContractCallBase, value = 0.u256, gas = 3000000'u64 gasPrice = 0
# should use send proc for the contract functions that update the state of the contract
let tx = await sender.register(20.u256).send(value = MembershipFee) # value is the membership fee
debug "The hash of registration tx: ", tx
# let members: array[2, uint256] = [20.u256, 21.u256]
# debug "This is the batch registration result ", await sender.registerBatch(members).send(value = (members.len * MembershipFee)) # value is the membership fee
let balance = await web3.provider.eth_getBalance(web3.defaultAccount, "latest")
debug "Balance after registration: ", balance
await web3.close()
debug "disconnected from", EthClient
asyncTest "registration procedure":
# preparation ------------------------------
# deploy the contract
let contractAddress = await uploadRLNContract(EthClient)
# prepare rln-relay peer inputs
let
web3 = await newWeb3(EthClient)
await web3.close()
# create an RLN instance
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
# generate the membership keys
let identityCredentialRes = membershipKeyGen(rlnInstance.get())
require:
identityCredentialRes.isOk()
let identityCredential = identityCredentialRes.get()
# create an Ethereum private key and the corresponding account
let (ethPrivKey, ethacc) = await createEthAccount()
# test ------------------------------
# initialize the WakuRLNRelay
let rlnPeer = WakuRLNRelay(identityCredential: identityCredential,
membershipIndex: MembershipIndex(0),
ethClientAddress: EthClient,
ethAccountPrivateKey: some(ethPrivKey),
ethAccountAddress: some(ethacc),
membershipContractAddress: contractAddress)
# register the rln-relay peer to the membership contract
let isSuccessful = await rlnPeer.register()
check:
isSuccessful.isOk()
asyncTest "mounting waku rln-relay: check correct Merkle tree construction in the static/off-chain group management":
# preparation ------------------------------
let
nodeKey = generateSecp256k1Key()
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await node.start()
# create current peer's pk
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
# generate an identity credential
let idCredentialRes = rln.membershipKeyGen()
require:
idCredentialRes.isOk()
let idCredential = idCredentialRes.get()
# current peer index in the Merkle tree
let index = uint(5)
# Create a group of 10 members
var group = newSeq[IDCommitment]()
for i in 0'u..10'u:
var memberAdded: bool = false
if (i == index):
# insert the current peer's pk
group.add(idCredential.idCommitment)
memberAdded = rln.insertMembers(i, @[idCredential.idCommitment])
doAssert(memberAdded)
debug "member key", key = idCredential.idCommitment.inHex
else:
let idCredentialRes = rln.membershipKeyGen()
require:
idCredentialRes.isOk()
let idCredential = idCredentialRes.get()
group.add(idCredential.idCommitment)
let memberAdded = rln.insertMembers(i, @[idCredential.idCommitment])
require:
memberAdded
debug "member key", key = idCredential.idCommitment.inHex
let expectedRoot = rln.getMerkleRoot().value().inHex
debug "expected root ", expectedRoot
# test ------------------------------
# start rln-relay
await node.mountRelay(@[RlnRelayPubsubTopic])
let mountRes = mountRlnRelayStatic(wakuRelay = node.wakuRelay,
group = group,
memIdCredential = idCredential,
memIndex = index,
pubsubTopic = RlnRelayPubsubTopic,
contentTopic = RlnRelayContentTopic)
require:
mountRes.isOk()
let wakuRlnRelay = mountRes.get()
let calculatedRoot = wakuRlnRelay.rlnInstance.getMerkleRoot().value().inHex()
debug "calculated root ", calculatedRoot
check:
expectedRoot == calculatedRoot
await node.stop()
asyncTest "mounting waku rln-relay: check correct Merkle tree construction in the dynamic/onchain group management":
# preparation ------------------------------
let
nodeKey = generateSecp256k1Key()
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await node.start()
# deploy the contract
let contractAddress = await uploadRLNContract(EthClient)
# prepare rln-relay inputs
let
web3 = await newWeb3(EthClient)
accounts = await web3.provider.eth_accounts()
# choose one of the existing accounts for the rln-relay peer
ethAccountAddress = accounts[0]
web3.defaultAccount = accounts[0]
# create an rln instance
let rlnInstance = createRLNInstance()
require:
rlnInstance.isOk()
let rln = rlnInstance.get()
# create two identity credentials
let
idCredential1Res = rln.membershipKeyGen()
idCredential2Res = rln.membershipKeyGen()
require:
idCredential1Res.isOk()
idCredential2Res.isOk()
let
idCredential1 = idCredential1Res.get()
idCredential2 = idCredential2Res.get()
pk1 = idCredential1.idCommitment.toUInt256()
pk2 = idCredential2.idCommitment.toUInt256()
debug "member key1", key = idCredential1.idCommitment.inHex
debug "member key2", key = idCredential2.idCommitment.inHex
# add the rln keys to the Merkle tree
let
memberIsAdded1 = rln.insertMembers(0, @[idCredential1.idCommitment])
memberIsAdded2 = rln.insertMembers(1, @[idCredential2.idCommitment])
require:
memberIsAdded1
memberIsAdded2
# get the Merkle root
let expectedRoot = rln.getMerkleRoot().value().inHex
# prepare a contract sender to interact with it
let contractObj = web3.contractSender(MembershipContract,
contractAddress) # creates a Sender object with a web3 field and contract address of type Address
# register the members to the contract
let tx1Hash = await contractObj.register(pk1).send(value = MembershipFee)
debug "a member is registered", tx1 = tx1Hash
# register another member to the contract
let tx2Hash = await contractObj.register(pk2).send(value = MembershipFee)
debug "a member is registered", tx2 = tx2Hash
# create an Ethereum private key and the corresponding account
let (ethPrivKey, ethacc) = await createEthAccount()
# test ------------------------------
# start rln-relay
await node.mountRelay(@[RlnRelayPubsubTopic])
let mountRes = await mountRlnRelayDynamic(wakuRelay = node.wakuRelay,
ethClientAddr = EthClient,
ethAccountAddress = some(ethacc),
ethAccountPrivKeyOpt = some(ethPrivKey),
memContractAddr = contractAddress,
memIdCredential = some(idCredential1),
memIndex = some(MembershipIndex(0)),
pubsubTopic = RlnRelayPubsubTopic,
contentTopic = RlnRelayContentTopic)
require:
mountRes.isOk()
let wakuRlnRelay = mountRes.get()
await sleepAsync(2000.milliseconds()) # wait for the event to reach the group handler
# rln pks are inserted into the rln peer's Merkle tree and the resulting root
# is expected to be the same as the calculatedRoot i.e., the one calculated outside of the mountRlnRelayDynamic proc
let calculatedRoot = wakuRlnRelay.rlnInstance.getMerkleRoot().value().inHex
debug "calculated root ", calculatedRoot=calculatedRoot
debug "expected root ", expectedRoot=expectedRoot
check:
expectedRoot == calculatedRoot
await web3.close()
await node.stop()
asyncTest "mounting waku rln-relay: check correct registration of peers without rln-relay credentials in dynamic/on-chain mode":
# deploy the contract
let contractAddress = await uploadRLNContract(EthClient)
# prepare rln-relay inputs
let
web3 = await newWeb3(EthClient)
accounts = await web3.provider.eth_accounts()
# choose two of the existing accounts for the rln-relay peers
ethAccountAddress1 = accounts[0]
ethAccountAddress2 = accounts[1]
await web3.close()
# prepare two nodes
let
nodeKey = generateSecp256k1Key()
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await node.start()
let
nodeKey2 = generateSecp256k1Key()
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0))
await node2.start()
# create an Ethereum private key and the corresponding account
let (ethPrivKey, ethacc) = await createEthAccount()
# start rln-relay on the first node, leave rln-relay credentials empty
await node.mountRelay(@[RlnRelayPubsubTopic])
let mountRes = await mountRlnRelayDynamic(wakuRelay=node.wakuRelay,
ethClientAddr = EthClient,
ethAccountAddress = some(ethacc),
ethAccountPrivKeyOpt = some(ethPrivKey),
memContractAddr = contractAddress,
memIdCredential = none(IdentityCredential),
memIndex = none(MembershipIndex),
pubsubTopic = RlnRelayPubsubTopic,
contentTopic = RlnRelayContentTopic)
require:
mountRes.isOk()
let wakuRlnRelay = mountRes.get()
# start rln-relay on the second node, leave rln-relay credentials empty
await node2.mountRelay(@[RlnRelayPubsubTopic])
let mountRes2 = await mountRlnRelayDynamic(wakuRelay=node2.wakuRelay,
ethClientAddr = EthClient,
ethAccountAddress = some(ethacc),
ethAccountPrivKeyOpt = some(ethPrivKey),
memContractAddr = contractAddress,
memIdCredential = none(IdentityCredential),
memIndex = none(MembershipIndex),
pubsubTopic = RlnRelayPubsubTopic,
contentTopic = RlnRelayContentTopic)
require:
mountRes2.isOk()
let wakuRlnRelay2 = mountRes2.get()
# the two nodes should be registered into the contract
# since nodes are spun up sequentially
# the first node has index 0 whereas the second node gets index 1
check:
wakuRlnRelay.membershipIndex == MembershipIndex(0)
wakuRlnRelay2.membershipIndex == MembershipIndex(1)
await node.stop()
await node2.stop()
################################
## Terminating/removing Ganache
################################
# We stop Ganache daemon
stopGanache(runGanache)
# We uninstall Ganache
uninstallGanache()

View File

@ -24,7 +24,6 @@ import
from std/times import epochTime
const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto"
procSuite "WakuNode - RLN relay":
@ -52,7 +51,7 @@ procSuite "WakuNode - RLN relay":
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(1),
rlnRelayMembershipIndex: some(MembershipIndex(1)),
))
await node1.start()
@ -63,7 +62,7 @@ procSuite "WakuNode - RLN relay":
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(2),
rlnRelayMembershipIndex: some(MembershipIndex(2)),
))
await node2.start()
@ -74,7 +73,7 @@ procSuite "WakuNode - RLN relay":
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(3),
rlnRelayMembershipIndex: some(MembershipIndex(3)),
))
await node3.start()
@ -132,8 +131,6 @@ procSuite "WakuNode - RLN relay":
rlnRelayPubSubTopic = RlnRelayPubsubTopic
contentTopic = ContentTopic("/waku/2/default-content/proto")
# set up three nodes
# node1
# set up three nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
@ -142,7 +139,7 @@ procSuite "WakuNode - RLN relay":
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(1),
rlnRelayMembershipIndex: some(MembershipIndex(1)),
))
await node1.start()
@ -153,7 +150,7 @@ procSuite "WakuNode - RLN relay":
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(2),
rlnRelayMembershipIndex: some(MembershipIndex(2)),
))
await node2.start()
@ -164,7 +161,7 @@ procSuite "WakuNode - RLN relay":
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(3),
rlnRelayMembershipIndex: some(MembershipIndex(3)),
))
await node3.start()
@ -197,10 +194,8 @@ procSuite "WakuNode - RLN relay":
contentTopicBytes = contentTopic.toBytes
input = concat(payload, contentTopicBytes)
extraBytes: seq[byte] = @[byte(1),2,3]
rateLimitProofRes = node1.wakuRlnRelay.rlnInstance.proofGen(data = concat(input, extraBytes), # we add extra bytes to invalidate proof verification against original payload
memKeys = node1.wakuRlnRelay.identityCredential,
memIndex = MembershipIndex(1),
epoch = epoch)
rateLimitProofRes = node1.wakuRlnRelay.groupManager.generateProof(concat(input, extraBytes), # we add extra bytes to invalidate proof verification against original payload
epoch)
require:
rateLimitProofRes.isOk()
let rateLimitProof = rateLimitProofRes.get().encode().buffer
@ -249,7 +244,7 @@ procSuite "WakuNode - RLN relay":
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(1),
rlnRelayMembershipIndex: some(MembershipIndex(1)),
))
await node1.start()
@ -261,7 +256,7 @@ procSuite "WakuNode - RLN relay":
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(2),
rlnRelayMembershipIndex: some(MembershipIndex(2)),
))
await node2.start()
@ -273,7 +268,7 @@ procSuite "WakuNode - RLN relay":
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
rlnRelayContentTopic: contentTopic,
rlnRelayMembershipIndex: MembershipIndex(3),
rlnRelayMembershipIndex: some(MembershipIndex(3)),
))
await node3.start()

View File

@ -884,14 +884,24 @@ when defined(rln):
registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)) {.async.} =
info "mounting rln relay"
let rlnRelayRes = await WakuRlnRelay.new(node.wakuRelay,
rlnConf,
spamHandler,
if node.wakuRelay.isNil():
error "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay"
return
# TODO: check whether the pubsub topic is supported at the relay level
# if rlnConf.rlnRelayPubsubTopic notin node.wakuRelay.defaultPubsubTopics:
# error "The relay protocol does not support the configured pubsub topic for WakuRlnRelay"
let rlnRelayRes = await WakuRlnRelay.new(rlnConf,
registrationHandler)
if rlnRelayRes.isErr():
error "failed to mount rln relay", error=rlnRelayRes.error
error "failed to mount WakuRlnRelay", error=rlnRelayRes.error
return
node.wakuRlnRelay = rlnRelayRes.get()
let rlnRelay = rlnRelayRes.get()
let validator = generateRlnValidator(rlnRelay, spamHandler)
let pb = PubSub(node.wakuRelay)
pb.addValidator(rlnRelay.pubsubTopic, validator)
node.wakuRlnRelay = rlnRelay
## Waku peer-exchange

View File

@ -2,16 +2,18 @@ import
./waku_rln_relay/rln,
./waku_rln_relay/constants,
./waku_rln_relay/protocol_types,
./waku_rln_relay/group_manager,
./waku_rln_relay/protocol_metrics,
./waku_rln_relay/conversion_utils,
./waku_rln_relay/utils,
./waku_rln_relay/rln_relay,
./waku_rln_relay/contract
export
rln,
constants,
protocol_types,
group_manager,
protocol_metrics,
conversion_utils,
utils,
rln_relay,
contract

View File

@ -4,21 +4,22 @@ else:
{.push raises: [].}
import
std/[sequtils],
std/[sequtils, strutils, algorithm],
web3,
chronicles,
stew/[arrayops, results, endians2],
stint
import
./constants,
./protocol_types
./constants,
./protocol_types
import
../waku_keystore
export
web3,
chronicles,
stint
stint,
constants
logScope:
topics = "waku rln_relay conversion_utils"
@ -35,6 +36,22 @@ proc toMembershipIndex*(v: UInt256): MembershipIndex =
let membershipIndex: MembershipIndex = cast[MembershipIndex](v)
return membershipIndex
proc inHex*(value: IdentityTrapdoor or
IdentityNullifier or
IdentitySecretHash or
IDCommitment or
MerkleNode or
Nullifier or
Epoch or
RlnIdentifier): string =
var valueHex = "" #UInt256.fromBytesLE(value)
for b in value.reversed():
valueHex = valueHex & b.toHex()
# We pad leading zeroes
while valueHex.len < value.len * 2:
valueHex = "0" & valueHex
return toLowerAscii(valueHex)
proc appendLength*(input: openArray[byte]): seq[byte] =
## returns length prefixed version of the input
## with the following format [len<8>|input<var>]
@ -98,7 +115,8 @@ proc serializeIdCommitments*(idComms: seq[IDCommitment]): seq[byte] =
return idCommsBytes
# Converts a sequence of tuples containing 4 string (i.e. identity trapdoor, nullifier, secret hash and commitment) to an IndentityCredential
proc toIdentityCredentials*(groupKeys: seq[(string, string, string, string)]): RlnRelayResult[seq[
type RawMembershipCredentials* = (string, string, string, string)
proc toIdentityCredentials*(groupKeys: seq[RawMembershipCredentials]): RlnRelayResult[seq[
IdentityCredential]] =
## groupKeys is sequence of membership key tuples in the form of (identity key, identity commitment) all in the hexadecimal format
## the toIdentityCredentials proc populates a sequence of IdentityCredentials using the supplied groupKeys
@ -120,27 +138,6 @@ proc toIdentityCredentials*(groupKeys: seq[(string, string, string, string)]): R
return err("could not convert the group key to bytes: " & err.msg)
return ok(groupIdCredentials)
# Converts a sequence of tuples containing 2 string (i.e. identity secret hash and commitment) to an IndentityCredential
proc toIdentityCredentials*(groupKeys: seq[(string, string)]): RlnRelayResult[seq[
IdentityCredential]] =
## groupKeys is sequence of membership key tuples in the form of (identity key, identity commitment) all in the hexadecimal format
## the toIdentityCredentials proc populates a sequence of IdentityCredentials using the supplied groupKeys
## Returns an error if the conversion fails
var groupIdCredentials = newSeq[IdentityCredential]()
for i in 0..groupKeys.len-1:
try:
let
idSecretHash = IdentitySecretHash(@(hexToUint[CredentialByteSize](groupKeys[i][0]).toBytesLE()))
idCommitment = IDCommitment(@(hexToUint[CredentialByteSize](groupKeys[i][1]).toBytesLE()))
groupIdCredentials.add(IdentityCredential(idSecretHash: idSecretHash,
idCommitment: idCommitment))
except ValueError as err:
warn "could not convert the group key to bytes", err = err.msg
return err("could not convert the group key to bytes: " & err.msg)
return ok(groupIdCredentials)
proc toEpoch*(t: uint64): Epoch =
## converts `t` to `Epoch` in little-endian order
let bytes = toBytes(t, Endianness.littleEndian)

View File

@ -1,15 +1,18 @@
import
../protocol_types
../protocol_types,
../rln
import
options,
chronos,
stew/results
stew/results,
std/[deques, sequtils]
export
options,
chronos,
results,
protocol_types
protocol_types,
deques
# This module contains the GroupManager interface
# The GroupManager is responsible for managing the group state
@ -26,42 +29,43 @@ type OnWithdrawCallback* = proc (withdrawals: seq[Membership]): Future[void] {.g
type GroupManagerResult*[T] = Result[T, string]
type
GroupManager*[Config] = ref object of RootObj
GroupManager* = ref object of RootObj
idCredentials*: Option[IdentityCredential]
membershipIndex*: Option[MembershipIndex]
registerCb*: Option[OnRegisterCallback]
withdrawCb*: Option[OnWithdrawCallback]
config*: Config
rlnInstance*: ptr RLN
initialized*: bool
latestIndex*: MembershipIndex
validRoots*: Deque[MerkleNode]
# This proc is used to initialize the group manager
# Any initialization logic should be implemented here
method init*(g: GroupManager): Future[void] {.base,gcsafe.} =
return err("init proc for " & $g.kind & " is not implemented yet")
raise newException(CatchableError, "init proc for " & $g.type & " is not implemented yet")
# This proc is used to start the group sync process
# It should be used to sync the group state with the rest of the group members
method startGroupSync*(g: GroupManager): Future[void] {.base,gcsafe.} =
return err("startGroupSync proc for " & $g.kind & " is not implemented yet")
raise newException(CatchableError, "startGroupSync proc for " & $g.type & " is not implemented yet")
# This proc is used to register a new identity commitment into the merkle tree
# The user may or may not have the identity secret to this commitment
# It should be used when detecting new members in the group, and syncing the group state
method register*(g: GroupManager, idCommitment: IDCommitment): Future[void] {.base,gcsafe.} =
return err("register proc for " & $g.kind & " is not implemented yet")
raise newException(CatchableError, "register proc for " & $g.type & " is not implemented yet")
# This proc is used to register a new identity commitment into the merkle tree
# The user should have the identity secret to this commitment
# It should be used when the user wants to join the group
method register*(g: GroupManager, credentials: IdentityCredential): Future[void] {.base,gcsafe.} =
return err("register proc for " & $g.kind & " is not implemented yet")
raise newException(CatchableError, "register proc for " & $g.type & " is not implemented yet")
# This proc is used to register a batch of new identity commitments into the merkle tree
# The user may or may not have the identity secret to these commitments
# It should be used when detecting a batch of new members in the group, and syncing the group state
method registerBatch*(g: GroupManager, idCommitments: seq[IDCommitment]): Future[void] {.base,gcsafe.} =
return err("registerBatch proc for " & $g.kind & " is not implemented yet")
raise newException(CatchableError, "registerBatch proc for " & $g.type & " is not implemented yet")
# This proc is used to set a callback that will be called when a new identity commitment is registered
# The callback may be called multiple times, and should be used to for any post processing
@ -71,14 +75,73 @@ method onRegister*(g: GroupManager, cb: OnRegisterCallback) {.base,gcsafe.} =
# This proc is used to withdraw/remove an identity commitment from the merkle tree
# The user should have the identity secret hash to this commitment, by either deriving it, or owning it
method withdraw*(g: GroupManager, identitySecretHash: IdentitySecretHash): Future[void] {.base,gcsafe.} =
return err("withdraw proc for " & $g.kind & " is not implemented yet")
raise newException(CatchableError, "withdraw proc for " & $g.type & " is not implemented yet")
# This proc is used to withdraw/remove a batch of identity commitments from the merkle tree
# The user should have the identity secret hash to these commitments, by either deriving them, or owning them
method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretHash]): Future[void] {.base,gcsafe.} =
return err("withdrawBatch proc for " & $g.kind & " is not implemented yet")
raise newException(CatchableError, "withdrawBatch proc for " & $g.type & " is not implemented yet")
# This proc is used to set a callback that will be called when an identity commitment is withdrawn
# The callback may be called multiple times, and should be used to for any post processing
method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} =
g.withdrawCb = some(cb)
# Acceptable roots for merkle root validation of incoming messages
const AcceptableRootWindowSize* = 5
proc updateValidRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): void =
## updates the root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached
let overflowCount = rootQueue.len() - AcceptableRootWindowSize
if overflowCount >= 0:
# Delete the oldest `overflowCount` elements in the deque (index 0..`overflowCount`)
for i in 0..overflowCount:
rootQueue.popFirst()
# Push the next root into the queue
rootQueue.addLast(root)
method indexOfRoot*(g: GroupManager, root: MerkleNode): int {.base,gcsafe,raises:[].} =
## returns the index of the root in the merkle tree.
## returns -1 if the root is not found
return g.validRoots.find(root)
method validateRoot*(g: GroupManager, root: MerkleNode): bool {.base,gcsafe,raises:[].} =
## validates the root against the valid roots queue
# Check if the root is in the valid roots queue
if g.indexOfRoot(root) >= 0:
return true
return false
template updateValidRootQueue*(g: GroupManager) =
let rootRes = g.rlnInstance.getMerkleRoot()
if rootRes.isErr():
raise newException(ValueError, "failed to get merkle root")
let rootAfterUpdate = rootRes.get()
updateValidRootQueue(g.validRoots, rootAfterUpdate)
method verifyProof*(g: GroupManager,
input: openArray[byte],
proof: RateLimitProof): GroupManagerResult[bool] {.base,gcsafe,raises:[].} =
## verifies the proof against the input and the current merkle root
let proofVerifyRes = g.rlnInstance.proofVerify(input, proof, g.validRoots.items().toSeq())
if proofVerifyRes.isErr():
return err("proof verification failed: " & $proofVerifyRes.error())
return ok(proofVerifyRes.value())
method generateProof*(g: GroupManager,
data: openArray[byte],
epoch: Epoch): GroupManagerResult[RateLimitProof] {.base,gcsafe,raises:[].} =
## generates a proof for the given data and epoch
## the proof is generated using the current merkle root
if g.idCredentials.isNone():
return err("identity credentials are not set")
if g.membershipIndex.isNone():
return err("membership index is not set")
let proofGenRes = proofGen(rlnInstance = g.rlnInstance,
data = data,
memKeys = g.idCredentials.get(),
memIndex = g.membershipIndex.get(),
epoch = epoch)
if proofGenRes.isErr():
return err("proof generation failed: " & $proofGenRes.error())
return ok(proofGenRes.value())

View File

@ -14,6 +14,7 @@ import
stew/[byteutils, arrayops],
sequtils
import
../../../waku_keystore,
../../rln,
../../conversion_utils,
../group_manager_base
@ -37,23 +38,29 @@ contract(RlnContract):
type
RlnContractWithSender = Sender[RlnContract]
OnchainGroupManagerConfig* = object
OnchainGroupManager* = ref object of GroupManager
ethClientUrl*: string
ethPrivateKey*: Option[string]
ethContractAddress*: string
ethRpc*: Option[Web3]
rlnContract*: Option[RlnContractWithSender]
membershipFee*: Option[Uint256]
membershipIndex*: Option[MembershipIndex]
latestProcessedBlock*: Option[BlockNumber]
registrationTxHash*: Option[TxHash]
chainId*: Option[Quantity]
keystorePath*: Option[string]
keystorePassword*: Option[string]
saveKeystore*: bool
registrationHandler*: Option[RegistrationHandler]
OnchainGroupManager* = ref object of GroupManager[OnchainGroupManagerConfig]
const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"
template initializedGuard*(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(ValueError, "OnchainGroupManager is not initialized")
proc register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
initializedGuard(g)
let memberInserted = g.rlnInstance.insertMember(idCommitment)
@ -63,11 +70,13 @@ proc register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void]
if g.registerCb.isSome():
await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)])
g.updateValidRootQueue()
g.latestIndex += 1
return
proc registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
initializedGuard(g)
let membersInserted = g.rlnInstance.insertMembers(g.latestIndex, idCommitments)
@ -83,16 +92,18 @@ proc registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): F
membersSeq.add(member)
await g.registerCb.get()(membersSeq)
g.updateValidRootQueue()
g.latestIndex += MembershipIndex(idCommitments.len())
return
proc register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): Future[void] {.async.} =
method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): Future[void] {.async.} =
initializedGuard(g)
let ethRpc = g.config.ethRpc.get()
let rlnContract = g.config.rlnContract.get()
let membershipFee = g.config.membershipFee.get()
let ethRpc = g.ethRpc.get()
let rlnContract = g.rlnContract.get()
let membershipFee = g.membershipFee.get()
let gasPrice = int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = identityCredentials.idCommitment.toUInt256()
@ -104,8 +115,11 @@ proc register*(g: OnchainGroupManager, identityCredentials: IdentityCredential):
except ValueError as e:
raise newException(ValueError, "could not register the member: " & e.msg)
# wait for the transaction to be mined
let tsReceipt = await ethRpc.getMinedTransactionReceipt(txHash)
g.registrationTxHash = some(txHash)
# the receipt topic holds the hash of signature of the raised events
# TODO: make this robust. search within the event list for the event
let firstTopic = tsReceipt.logs[0].topics[0]
@ -122,17 +136,17 @@ proc register*(g: OnchainGroupManager, identityCredentials: IdentityCredential):
# In TX log data, uints are encoded in big endian
eventIndex = UInt256.fromBytesBE(argumentsBytes[32..^1])
g.config.membershipIndex = some(eventIndex.toMembershipIndex())
g.membershipIndex = some(eventIndex.toMembershipIndex())
# don't handle member insertion into the tree here, it will be handled by the event listener
return
proc withdraw*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
method withdraw*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
initializedGuard(g)
# TODO: after slashing is enabled on the contract
proc withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
method withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
initializedGuard(g)
# TODO: after slashing is enabled on the contract
@ -164,8 +178,8 @@ type BlockTable* = OrderedTable[BlockNumber, seq[Membership]]
proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
initializedGuard(g)
let ethRpc = g.config.ethRpc.get()
let rlnContract = g.config.rlnContract.get()
let ethRpc = g.ethRpc.get()
let rlnContract = g.rlnContract.get()
var normalizedToBlock: BlockNumber
if toBlock.isSome():
@ -214,9 +228,9 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu
let indexGap = startingIndex - latestIndex
if not (toSeq(startingIndex..lastIndex) == members.mapIt(it.index)):
raise newException(ValueError, "membership indices are not sequential")
if indexGap != 1.uint and lastIndex != latestIndex:
if indexGap != 1.uint and lastIndex != latestIndex and startingIndex != 0.uint:
warn "membership index gap, may have lost connection", lastIndex, currIndex=latestIndex, indexGap = indexGap
g.config.latestProcessedBlock = some(blockNumber)
g.latestProcessedBlock = some(blockNumber)
return
@ -244,7 +258,7 @@ proc newHeadErrCallback(error: CatchableError) =
proc startListeningToEvents*(g: OnchainGroupManager): Future[void] {.async.} =
initializedGuard(g)
let ethRpc = g.config.ethRpc.get()
let ethRpc = g.ethRpc.get()
let newHeadCallback = g.getNewHeadCallback()
try:
discard await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback)
@ -265,7 +279,45 @@ proc startOnchainSync*(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNum
except:
raise newException(ValueError, "failed to start listening to events: " & getCurrentExceptionMsg())
proc startGroupSync*(g: OnchainGroupManager): Future[void] {.async.} =
proc persistCredentials*(g: OnchainGroupManager): GroupManagerResult[void] =
if not g.saveKeystore:
return ok()
if g.idCredentials.isNone():
return err("no credentials to persist")
let index = g.membershipIndex.get()
let idCredential = g.idCredentials.get()
var path = DefaultKeystorePath
var password = DefaultKeystorePassword
if g.keystorePath.isSome():
path = g.keystorePath.get()
else:
warn "keystore: no credentials path set, using default path", path=DefaultKeystorePath
if g.keystorePassword.isSome():
password = g.keystorePassword.get()
else:
warn "keystore: no credentials password set, using default password", password=DefaultKeystorePassword
let keystoreCred = MembershipCredentials(
identityCredential: idCredential,
membershipGroups: @[MembershipGroup(
membershipContract: MembershipContract(
chainId: $g.chainId.get(),
address: g.ethContractAddress
),
treeIndex: index
)]
)
let persistRes = addMembershipCredentials(path, @[keystoreCred], password, RLNAppInfo)
if persistRes.isErr():
error "keystore: failed to persist credentials", error=persistRes.error()
return ok()
method startGroupSync*(g: OnchainGroupManager): Future[void] {.async.} =
initializedGuard(g)
# Get archive history
try:
@ -273,28 +325,46 @@ proc startGroupSync*(g: OnchainGroupManager): Future[void] {.async.} =
except:
raise newException(ValueError, "failed to start onchain sync service: " & getCurrentExceptionMsg())
if g.config.ethPrivateKey.isSome() and g.idCredentials.isSome():
if g.ethPrivateKey.isSome() and g.idCredentials.isNone():
let idCredentialRes = g.rlnInstance.membershipKeyGen()
if idCredentialRes.isErr():
raise newException(CatchableError, "Identity credential generation failed")
let idCredential = idCredentialRes.get()
g.idCredentials = some(idCredential)
debug "registering commitment on contract"
await g.register(g.idCredentials.get())
await g.register(idCredential)
if g.registrationHandler.isSome():
# We need to callback with the tx hash
let handler = g.registrationHandler.get()
handler($g.registrationTxHash.get())
let persistRes = g.persistCredentials()
if persistRes.isErr():
error "failed to persist credentials", error=persistRes.error()
return
proc onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} =
method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} =
g.registerCb = some(cb)
proc onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
g.withdrawCb = some(cb)
proc init*(g: OnchainGroupManager): Future[void] {.async.} =
method init*(g: OnchainGroupManager): Future[void] {.async.} =
var ethRpc: Web3
var contract: RlnContractWithSender
# check if the Ethereum client is reachable
try:
ethRpc = await newWeb3(g.config.ethClientUrl)
ethRpc = await newWeb3(g.ethClientUrl)
except:
raise newException(ValueError, "could not connect to the Ethereum client")
let contractAddress = web3.fromHex(web3.Address, g.config.ethContractAddress)
# Set the chain id
let chainId = await ethRpc.provider.eth_chainId()
g.chainId = some(chainId)
let contractAddress = web3.fromHex(web3.Address, g.ethContractAddress)
contract = ethRpc.contractSender(RlnContract, contractAddress)
# check if the contract exists by calling a static function
@ -304,25 +374,39 @@ proc init*(g: OnchainGroupManager): Future[void] {.async.} =
except:
raise newException(ValueError, "could not get the membership deposit")
if g.config.ethPrivateKey.isSome():
let pk = string(g.config.ethPrivateKey.get())
if g.ethPrivateKey.isSome():
let pk = g.ethPrivateKey.get()
let pkParseRes = keys.PrivateKey.fromHex(pk)
if pkParseRes.isErr():
raise newException(ValueError, "could not parse the private key")
ethRpc.privateKey = some(pkParseRes.get())
g.config.ethRpc = some(ethRpc)
g.config.rlnContract = some(contract)
g.config.membershipFee = some(membershipFee)
g.ethRpc = some(ethRpc)
g.rlnContract = some(contract)
g.membershipFee = some(membershipFee)
if g.keystorePath.isSome() and g.keystorePassword.isSome():
let parsedCredsRes = getMembershipCredentials(path = g.keystorePath.get(),
password = g.keystorePassword.get(),
filterMembershipContracts = @[MembershipContract(chainId: $chainId,
address: g.ethContractAddress)],
appInfo = RLNAppInfo)
if parsedCredsRes.isErr():
raise newException(ValueError, "could not parse the keystore: " & $parsedCredsRes.error())
let parsedCreds = parsedCredsRes.get()
if parsedCreds.len == 0:
raise newException(ValueError, "keystore is empty")
# TODO: accept an index from the config (related: https://github.com/waku-org/nwaku/pull/1466)
g.idCredentials = some(parsedCreds[0].identityCredential)
g.membershipIndex = some(parsedCreds[0].membershipGroups[0].treeIndex)
ethRpc.ondisconnect = proc() =
error "Ethereum client disconnected"
let fromBlock = g.config.latestProcessedBlock.get()
let fromBlock = g.latestProcessedBlock.get()
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
try:
asyncSpawn g.startOnchainSync(fromBlock)
except:
error "failed to restart group sync"
error "failed to restart group sync", error = getCurrentExceptionMsg()
g.initialized = true

View File

@ -7,22 +7,22 @@ export
group_manager_base
type
StaticGroupManagerConfig* = object
StaticGroupManager* = ref object of GroupManager
groupKeys*: seq[IdentityCredential]
groupSize*: uint
membershipIndex*: MembershipIndex
StaticGroupManager* = ref object of GroupManager[StaticGroupManagerConfig]
template initializedGuard*(g: StaticGroupManager): untyped =
if not g.initialized:
raise newException(ValueError, "StaticGroupManager is not initialized")
proc init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} =
method init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} =
if g.membershipIndex.isNone():
raise newException(ValueError, "Membership index is not set")
let
groupSize = g.config.groupSize
groupKeys = g.config.groupKeys
membershipIndex = g.config.membershipIndex
groupSize = g.groupSize
groupKeys = g.groupKeys
membershipIndex = g.membershipIndex.get()
if membershipIndex < MembershipIndex(0) or membershipIndex >= MembershipIndex(groupSize):
raise newException(ValueError, "Invalid membership index. Must be within 0 and " & $(groupSize - 1) & "but was " & $membershipIndex)
@ -34,33 +34,37 @@ proc init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} =
if not membersInserted:
raise newException(ValueError, "Failed to insert members into the merkle tree")
g.updateValidRootQueue()
g.latestIndex += MembershipIndex(idCommitments.len() - 1)
g.initialized = true
return
proc startGroupSync*(g: StaticGroupManager): Future[void] =
method startGroupSync*(g: StaticGroupManager): Future[void] =
initializedGuard(g)
var retFuture = newFuture[void]("StaticGroupManager.sta rtGroupSync")
var retFuture = newFuture[void]("StaticGroupManager.startGroupSync")
# No-op
retFuture.complete()
return retFuture
proc register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
method register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
initializedGuard(g)
let memberInserted = g.rlnInstance.insertMember(idCommitment)
if not memberInserted:
raise newException(ValueError, "Failed to insert member into the merkle tree")
g.updateValidRootQueue()
g.latestIndex += 1
if g.registerCb.isSome():
await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)])
return
proc registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
initializedGuard(g)
let membersInserted = g.rlnInstance.insertMembers(g.latestIndex + 1, idCommitments)
@ -73,14 +77,16 @@ proc registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): Fu
memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i)))
await g.registerCb.get()(memberSeq)
g.updateValidRootQueue()
g.latestIndex += MembershipIndex(idCommitments.len() - 1)
return
proc withdraw*(g: StaticGroupManager, idSecretHash: IdentitySecretHash): Future[void] {.async.} =
method withdraw*(g: StaticGroupManager, idSecretHash: IdentitySecretHash): Future[void] {.async.} =
initializedGuard(g)
let groupKeys = g.config.groupKeys
let groupKeys = g.groupKeys
for i in 0..<groupKeys.len():
if groupKeys[i].idSecretHash == idSecretHash:
@ -96,15 +102,15 @@ proc withdraw*(g: StaticGroupManager, idSecretHash: IdentitySecretHash): Future[
return
proc withdrawBatch*(g: StaticGroupManager, idSecretHashes: seq[IdentitySecretHash]): Future[void] {.async.} =
method withdrawBatch*(g: StaticGroupManager, idSecretHashes: seq[IdentitySecretHash]): Future[void] {.async.} =
initializedGuard(g)
# call withdraw on each idSecretHash
for idSecretHash in idSecretHashes:
await g.withdraw(idSecretHash)
proc onRegister*(g: StaticGroupManager, cb: OnRegisterCallback) {.gcsafe.} =
method onRegister*(g: StaticGroupManager, cb: OnRegisterCallback) {.gcsafe.} =
g.registerCb = some(cb)
proc onWithdraw*(g: StaticGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
method onWithdraw*(g: StaticGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
g.withdrawCb = some(cb)

View File

@ -6,13 +6,18 @@ else:
import
std/[options, tables, deques],
stew/arrayops,
chronos,
chronos,
web3,
eth/keys
import
../waku_message,
../waku_keystore,
../../../common/protobuf
export
waku_keystore,
waku_message
type RlnRelayResult*[T] = Result[T, string]
## RLN is a Nim wrapper for the data types used in zerokit RLN
@ -52,39 +57,14 @@ type ProofMetadata* = object
shareX*: MerkleNode
shareY*: MerkleNode
type WakuRLNRelay* = ref object
identityCredential*: IdentityCredential
# membershipIndex denotes the index of a leaf in the Merkle tree
# that contains the pk of the current peer
# this index is used to retrieve the peer's authentication path
membershipIndex*: MembershipIndex
membershipContractAddress*: Address
ethClientAddress*: string
ethAccountAddress*: Option[Address]
# this field is required for signing transactions
# TODO may need to erase this ethAccountPrivateKey when is not used
# TODO may need to make ethAccountPrivateKey mandatory
ethAccountPrivateKey*: Option[PrivateKey]
rlnInstance*: ptr RLN
pubsubTopic*: string # the pubsub topic for which rln relay is mounted
# contentTopic should be of type waku_message.ContentTopic, however, due to recursive module dependency, the underlying type of ContentTopic is used instead
# TODO a long-term solution is to place types with recursive dependency inside one file
contentTopic*: string
# the log of nullifiers and Shamir shares of the past messages grouped per epoch
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion
lastSeenMembershipIndex*: MembershipIndex # the last seen membership index
lastProcessedBlock*: BlockNumber # the last processed block number
type
type
MessageValidationResult* {.pure.} = enum
Valid,
Invalid,
Valid,
Invalid,
Spam
MerkleNodeResult* = RlnRelayResult[MerkleNode]
RateLimitProofResult* = RlnRelayResult[RateLimitProof]
# Protobufs enc and init
proc init*(T: type RateLimitProof, buffer: seq[byte]): ProtoResult[T] =
var nsp: RateLimitProof
@ -113,7 +93,7 @@ proc init*(T: type RateLimitProof, buffer: seq[byte]): ProtoResult[T] =
var nullifier: seq[byte]
discard ? pb.getField(6, nullifier)
discard nsp.nullifier.copyFrom(nullifier)
var rlnIdentifier: seq[byte]
discard ? pb.getField(7, rlnIdentifier)
discard nsp.rlnIdentifier.copyFrom(rlnIdentifier)
@ -133,4 +113,8 @@ proc encode*(nsp: RateLimitProof): ProtoBuffer =
output.finish3()
return output
return output
type
SpamHandler* = proc(wakuMessage: WakuMessage): void {.gcsafe, closure, raises: [Defect].}
RegistrationHandler* = proc(txHash: string): void {.gcsafe, closure, raises: [Defect].}

View File

@ -0,0 +1,415 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[algorithm, sequtils, strutils, tables, times, os, deques],
chronicles, options, chronos, stint,
confutils,
strutils,
web3, json,
web3/ethtypes,
eth/keys,
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/pubsub,
stew/results,
stew/[byteutils, arrayops]
import
./group_manager,
./rln,
./conversion_utils,
./constants,
./protocol_types,
./protocol_metrics
import
../../utils/time,
../waku_keystore
logScope:
topics = "waku rln_relay"
type WakuRlnConfig* = object
rlnRelayDynamic*: bool
rlnRelayPubsubTopic*: PubsubTopic
rlnRelayContentTopic*: ContentTopic
rlnRelayMembershipIndex*: Option[uint]
rlnRelayEthContractAddress*: string
rlnRelayEthClientAddress*: string
rlnRelayEthAccountPrivateKey*: string
rlnRelayEthAccountAddress*: string
rlnRelayCredPath*: string
rlnRelayCredentialsPassword*: string
proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[(
seq[RawMembershipCredentials], string
)] =
## createMembershipList produces a sequence of identity credentials in the form of (identity trapdoor, identity nullifier, identity secret hash, id commitment) in the hexadecimal format
## this proc also returns the root of a Merkle tree constructed out of the identity commitment keys of the generated list
## the output of this proc is used to initialize a static group keys (to test waku-rln-relay in the off-chain mode)
## Returns an error if it cannot create the membership list
var output = newSeq[RawMembershipCredentials]()
var idCommitments = newSeq[IDCommitment]()
for i in 0..n-1:
# generate an identity credential
let idCredentialRes = rln.membershipKeyGen()
if idCredentialRes.isErr():
return err("could not generate an identity credential: " & idCredentialRes.error())
let idCredential = idCredentialRes.get()
let idTuple = (idCredential.idTrapdoor.inHex(), idCredential.idNullifier.inHex(), idCredential.idSecretHash.inHex(), idCredential.idCommitment.inHex())
output.add(idTuple)
idCommitments.add(idCredential.idCommitment)
# Insert members into tree
let membersAdded = rln.insertMembers(0, idCommitments)
if not membersAdded:
return err("could not insert members into the tree")
let root = rln.getMerkleRoot().value().inHex()
return ok((output, root))
proc calcEpoch*(t: float64): Epoch =
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
## and returns its corresponding rln `Epoch` value
let e = uint64(t/EpochUnitSeconds)
return toEpoch(e)
type WakuRLNRelay* = ref object of RootObj
pubsubTopic*: string # the pubsub topic for which rln relay is mounted
# contentTopic should be of type waku_message.ContentTopic, however, due to recursive module dependency, the underlying type of ContentTopic is used instead
# TODO a long-term solution is to place types with recursive dependency inside one file
contentTopic*: string
# the log of nullifiers and Shamir shares of the past messages grouped per epoch
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
groupManager*: GroupManager
proc hasDuplicate*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool] =
## returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same
## epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares
## otherwise, returns false
## Returns an error if it cannot check for duplicates
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():
return err("failed to decode the RLN proof")
let proof = decodeRes.get()
# extract the proof metadata of the supplied `msg`
let proofMD = ProofMetadata(
nullifier: proof.nullifier,
shareX: proof.shareX,
shareY: proof.shareY
)
# check if the epoch exists
if not rlnPeer.nullifierLog.hasKey(proof.epoch):
return ok(false)
try:
if rlnPeer.nullifierLog[proof.epoch].contains(proofMD):
# there is an identical record, ignore rhe mag
return ok(false)
# check for a message with the same nullifier but different secret shares
let matched = rlnPeer.nullifierLog[proof.epoch].filterIt((
it.nullifier == proofMD.nullifier) and ((it.shareX != proofMD.shareX) or
(it.shareY != proofMD.shareY)))
if matched.len != 0:
# there is a duplicate
return ok(true)
# there is no duplicate
return ok(false)
except KeyError as e:
return err("the epoch was not found")
proc updateLog*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool] =
## extracts the `ProofMetadata` of the supplied messages `msg` and
## saves it in the `nullifierLog` of the `rlnPeer`
## Returns an error if it cannot update the log
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():
return err("failed to decode the RLN proof")
let proof = decodeRes.get()
# extract the proof metadata of the supplied `msg`
let proofMD = ProofMetadata(
nullifier: proof.nullifier,
shareX: proof.shareX,
shareY: proof.shareY
)
debug "proof metadata", proofMD = proofMD
# check if the epoch exists
if not rlnPeer.nullifierLog.hasKey(proof.epoch):
rlnPeer.nullifierLog[proof.epoch] = @[proofMD]
return ok(true)
try:
# check if an identical record exists
if rlnPeer.nullifierLog[proof.epoch].contains(proofMD):
return ok(true)
# add proofMD to the log
rlnPeer.nullifierLog[proof.epoch].add(proofMD)
return ok(true)
except KeyError as e:
return err("the epoch was not found")
proc getCurrentEpoch*(): Epoch =
## gets the current rln Epoch time
return calcEpoch(epochTime())
proc absDiff*(e1, e2: Epoch): uint64 =
## returns the absolute difference between the two rln `Epoch`s `e1` and `e2`
## i.e., e1 - e2
# convert epochs to their corresponding unsigned numerical values
let
epoch1 = fromEpoch(e1)
epoch2 = fromEpoch(e2)
# Manually perform an `abs` calculation
if epoch1 > epoch2:
return epoch1 - epoch2
else:
return epoch2 - epoch1
proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
timeOption: Option[float64] = none(float64)): MessageValidationResult =
## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e.,
## the `msg`'s epoch is within MaxEpochGap of the current epoch
## the `msg` has valid rate limit proof
## the `msg` does not violate the rate limit
## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds)
## if `timeOption` is supplied, then the current epoch is calculated based on that
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():
return MessageValidationResult.Invalid
let proof = decodeRes.get()
# track message count for metrics
waku_rln_messages_total.inc()
# checks if the `msg`'s epoch is far from the current epoch
# it corresponds to the validation of rln external nullifier
var epoch: Epoch
if timeOption.isSome():
epoch = calcEpoch(timeOption.get())
else:
# get current rln epoch
epoch = getCurrentEpoch()
debug "current epoch", currentEpoch = fromEpoch(epoch)
let
msgEpoch = proof.epoch
# calculate the gaps
gap = absDiff(epoch, msgEpoch)
debug "message epoch", msgEpoch = fromEpoch(msgEpoch)
# validate the epoch
if gap > MaxEpochGap:
# message's epoch is too old or too ahead
# accept messages whose epoch is within +-MaxEpochGap from the current epoch
warn "invalid message: epoch gap exceeds a threshold", gap = gap,
payload = string.fromBytes(msg.payload), msgEpoch = fromEpoch(proof.epoch)
waku_rln_invalid_messages_total.inc(labelValues=["invalid_epoch"])
return MessageValidationResult.Invalid
let rootValidationRes = rlnPeer.groupManager.validateRoot(proof.merkleRoot)
if not rootValidationRes:
debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot, validRoots=rlnPeer.groupManager.validRoots.mapIt(it.inHex())
waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"])
return MessageValidationResult.Invalid
# verify the proof
let
contentTopicBytes = msg.contentTopic.toBytes
input = concat(msg.payload, contentTopicBytes)
waku_rln_proof_verification_total.inc()
waku_rln_proof_verification_duration_seconds.nanosecondTime:
let proofVerificationRes = rlnPeer.groupManager.verifyProof(input, proof)
if proofVerificationRes.isErr():
waku_rln_errors_total.inc(labelValues=["proof_verification"])
warn "invalid message: proof verification failed", payload = string.fromBytes(msg.payload)
return MessageValidationResult.Invalid
if not proofVerificationRes.value():
# invalid proof
debug "invalid message: invalid proof", payload = string.fromBytes(msg.payload)
waku_rln_invalid_messages_total.inc(labelValues=["invalid_proof"])
return MessageValidationResult.Invalid
# check if double messaging has happened
let hasDup = rlnPeer.hasDuplicate(msg)
if hasDup.isErr():
waku_rln_errors_total.inc(labelValues=["duplicate_check"])
elif hasDup.value == true:
debug "invalid message: message is spam", payload = string.fromBytes(msg.payload)
waku_rln_spam_messages_total.inc()
return MessageValidationResult.Spam
# insert the message to the log
# the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e.,
# it will never error out
discard rlnPeer.updateLog(msg)
debug "message is valid", payload = string.fromBytes(msg.payload)
let rootIndex = rlnPeer.groupManager.indexOfRoot(proof.merkleRoot)
waku_rln_valid_messages_total.observe(rootIndex.toFloat())
return MessageValidationResult.Valid
proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] =
## it is a utility proc that prepares the `data` parameter of the proof generation procedure i.e., `proofGen` that resides in the current module
## it extracts the `contentTopic` and the `payload` of the supplied `wakumessage` and serializes them into a byte sequence
let
contentTopicBytes = wakumessage.contentTopic.toBytes()
output = concat(wakumessage.payload, contentTopicBytes)
return output
proc appendRLNProof*(rlnPeer: WakuRLNRelay,
msg: var WakuMessage,
senderEpochTime: float64): bool =
## returns true if it can create and append a `RateLimitProof` to the supplied `msg`
## returns false otherwise
## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds.
## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`)
let input = msg.toRLNSignal()
let epoch = calcEpoch(senderEpochTime)
let proofGenRes = rlnPeer.groupManager.generateProof(input, epoch)
if proofGenRes.isErr():
return false
msg.proof = proofGenRes.get().encode().buffer
return true
proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
spamHandler: Option[SpamHandler] = none(SpamHandler)): pubsub.ValidatorHandler =
## this procedure is a thin wrapper for the pubsub addValidator method
## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic
## the message validation logic is according to https://rfc.vac.dev/spec/17/
let contentTopic = wakuRlnRelay.contentTopic
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called"
let decodeRes = WakuMessage.decode(message.data)
if decodeRes.isOk():
let
wakumessage = decodeRes.value
payload = string.fromBytes(wakumessage.payload)
# check the contentTopic
if (wakumessage.contentTopic != "") and (contentTopic != "") and (wakumessage.contentTopic != contentTopic):
trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload
return pubsub.ValidationResult.Accept
let decodeRes = RateLimitProof.init(wakumessage.proof)
if decodeRes.isErr():
return pubsub.ValidationResult.Reject
let msgProof = decodeRes.get()
# validate the message
let
validationRes = wakuRlnRelay.validateMessage(wakumessage)
proof = toHex(msgProof.proof)
epoch = fromEpoch(msgProof.epoch)
root = inHex(msgProof.merkleRoot)
shareX = inHex(msgProof.shareX)
shareY = inHex(msgProof.shareY)
nullifier = inHex(msgProof.nullifier)
case validationRes:
of Valid:
debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
return pubsub.ValidationResult.Accept
of Invalid:
debug "message validity could not be verified, discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
return pubsub.ValidationResult.Reject
of Spam:
debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
if spamHandler.isSome():
let handler = spamHandler.get()
handler(wakumessage)
return pubsub.ValidationResult.Reject
return validator
proc mount(conf: WakuRlnConfig,
registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)
): Future[WakuRlnRelay] {.async.} =
var
groupManager: GroupManager
credentials: MembershipCredentials
persistCredentials = false
# create an RLN instance
let rlnInstanceRes = createRLNInstance()
if rlnInstanceRes.isErr():
raise newException(CatchableError, "RLN instance creation failed")
let rlnInstance = rlnInstanceRes.get()
if not conf.rlnRelayDynamic:
# static setup
let parsedGroupKeysRes = StaticGroupKeys.toIdentityCredentials()
if parsedGroupKeysRes.isErr():
raise newException(ValueError, "Static group keys are not valid")
groupManager = StaticGroupManager(groupSize: StaticGroupSize,
groupKeys: parsedGroupKeysRes.get(),
membershipIndex: conf.rlnRelayMembershipIndex,
rlnInstance: rlnInstance)
# we don't persist credentials in static mode since they exist in ./constants.nim
else:
# dynamic setup
proc useValueOrNone(s: string): Option[string] =
if s == "": none(string) else: some(s)
let
ethPrivateKey = useValueOrNone(conf.rlnRelayEthAccountPrivateKey)
rlnRelayCredPath = useValueOrNone(conf.rlnRelayCredPath)
rlnRelayCredentialsPassword = useValueOrNone(conf.rlnRelayCredentialsPassword)
groupManager = OnchainGroupManager(ethClientUrl: conf.rlnRelayEthClientAddress,
ethContractAddress: $conf.rlnRelayEthContractAddress,
ethPrivateKey: ethPrivateKey,
rlnInstance: rlnInstance,
registrationHandler: registrationHandler,
keystorePath: rlnRelayCredPath,
keystorePassword: rlnRelayCredentialsPassword,
saveKeystore: true)
# Initialize the groupManager
await groupManager.init()
# Start the group sync
await groupManager.startGroupSync()
return WakuRLNRelay(pubsubTopic: conf.rlnRelayPubsubTopic,
contentTopic: conf.rlnRelayContentTopic,
groupManager: groupManager)
proc new*(T: type WakuRlnRelay,
conf: WakuRlnConfig,
registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)
): Future[RlnRelayResult[WakuRlnRelay]] {.async.} =
## Mounts the rln-relay protocol on the node.
## The rln-relay protocol can be mounted in two modes: on-chain and off-chain.
## Returns an error if the rln-relay protocol could not be mounted.
debug "rln-relay input validation passed"
try:
waku_rln_relay_mounting_duration_seconds.nanosecondTime:
let rlnRelay = await mount(conf,
registrationHandler)
return ok(rlnRelay)
except CatchableError as e:
return err(e.msg)

File diff suppressed because it is too large Load Diff