mirror of https://github.com/waku-org/nwaku.git
Chore(rln-relay): decouples rln-relay and wakunode2 modules (#1019)
* adds wakunode2_types * removes unused imports
This commit is contained in:
parent
9be64a55b6
commit
3e20127933
|
@ -3,11 +3,11 @@
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/options, sequtils, times,
|
std/options,
|
||||||
testutils/unittests, chronos, chronicles, stint, web3, json,
|
testutils/unittests, chronos, chronicles, stint, web3, json,
|
||||||
stew/byteutils, stew/shims/net as stewNet,
|
stew/byteutils, stew/shims/net as stewNet,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
../../waku/v2/protocol/waku_rln_relay/[rln, waku_rln_relay_utils,
|
../../waku/v2/protocol/waku_rln_relay/[waku_rln_relay_utils,
|
||||||
waku_rln_relay_types, rln_relay_contract],
|
waku_rln_relay_types, rln_relay_contract],
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/wakunode2,
|
||||||
../test_helpers,
|
../test_helpers,
|
||||||
|
|
|
@ -24,7 +24,8 @@ import
|
||||||
./storage/migration/migration_types,
|
./storage/migration/migration_types,
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
./dnsdisc/waku_dnsdisc,
|
./dnsdisc/waku_dnsdisc,
|
||||||
./discv5/waku_discv5
|
./discv5/waku_discv5,
|
||||||
|
wakunode2_types
|
||||||
|
|
||||||
export
|
export
|
||||||
builders,
|
builders,
|
||||||
|
@ -33,14 +34,11 @@ export
|
||||||
waku_swap,
|
waku_swap,
|
||||||
waku_filter,
|
waku_filter,
|
||||||
waku_lightpush,
|
waku_lightpush,
|
||||||
waku_rln_relay_types
|
waku_rln_relay_types,
|
||||||
|
wakunode2_types
|
||||||
|
|
||||||
when defined(rln):
|
when defined(rln):
|
||||||
import
|
import ../protocol/waku_rln_relay/waku_rln_relay_utils
|
||||||
libp2p/protocols/pubsub/rpc/messages,
|
|
||||||
libp2p/protocols/pubsub/pubsub,
|
|
||||||
web3, web3/ethtypes,
|
|
||||||
../protocol/waku_rln_relay/[rln, waku_rln_relay_utils]
|
|
||||||
|
|
||||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||||
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
||||||
|
@ -59,41 +57,6 @@ const defaultTopic* = "/waku/2/default-waku/proto"
|
||||||
# Default Waku Filter Timeout
|
# Default Waku Filter Timeout
|
||||||
const WakuFilterTimeout: Duration = 1.days
|
const WakuFilterTimeout: Duration = 1.days
|
||||||
|
|
||||||
|
|
||||||
# key and crypto modules different
|
|
||||||
type
|
|
||||||
KeyPair* = crypto.KeyPair
|
|
||||||
PublicKey* = crypto.PublicKey
|
|
||||||
PrivateKey* = crypto.PrivateKey
|
|
||||||
|
|
||||||
# XXX: Weird type, should probably be using pubsub Topic object name?
|
|
||||||
Topic* = string
|
|
||||||
Message* = seq[byte]
|
|
||||||
|
|
||||||
WakuInfo* = object
|
|
||||||
# NOTE One for simplicity, can extend later as needed
|
|
||||||
listenAddresses*: seq[string]
|
|
||||||
enrUri*: string
|
|
||||||
#multiaddrStrings*: seq[string]
|
|
||||||
|
|
||||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
|
||||||
WakuNode* = ref object of RootObj
|
|
||||||
peerManager*: PeerManager
|
|
||||||
switch*: Switch
|
|
||||||
wakuRelay*: WakuRelay
|
|
||||||
wakuStore*: WakuStore
|
|
||||||
wakuFilter*: WakuFilter
|
|
||||||
wakuSwap*: WakuSwap
|
|
||||||
wakuRlnRelay*: WakuRLNRelay
|
|
||||||
wakuLightPush*: WakuLightPush
|
|
||||||
enr*: enr.Record
|
|
||||||
libp2pPing*: Ping
|
|
||||||
filters*: Filters
|
|
||||||
rng*: ref BrHmacDrbgContext
|
|
||||||
wakuDiscv5*: WakuDiscoveryV5
|
|
||||||
announcedAddresses* : seq[MultiAddress]
|
|
||||||
started*: bool # Indicates that node has started listening
|
|
||||||
|
|
||||||
proc protocolMatcher(codec: string): Matcher =
|
proc protocolMatcher(codec: string): Matcher =
|
||||||
## Returns a protocol matcher function for the provided codec
|
## Returns a protocol matcher function for the provided codec
|
||||||
proc match(proto: string): bool {.gcsafe.} =
|
proc match(proto: string): bool {.gcsafe.} =
|
||||||
|
@ -496,180 +459,6 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: boo
|
||||||
|
|
||||||
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
|
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
|
||||||
|
|
||||||
when defined(rln):
|
|
||||||
proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) =
|
|
||||||
## this procedure is a thin wrapper for the pubsub addValidator method
|
|
||||||
## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
|
|
||||||
## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic
|
|
||||||
## the message validation logic is according to https://rfc.vac.dev/spec/17/
|
|
||||||
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
|
|
||||||
trace "rln-relay topic validator is called"
|
|
||||||
let msg = WakuMessage.init(message.data)
|
|
||||||
if msg.isOk():
|
|
||||||
let
|
|
||||||
wakumessage = msg.value()
|
|
||||||
payload = string.fromBytes(wakumessage.payload)
|
|
||||||
|
|
||||||
# check the contentTopic
|
|
||||||
if (wakumessage.contentTopic != "") and (contentTopic != "") and (wakumessage.contentTopic != contentTopic):
|
|
||||||
trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload
|
|
||||||
return pubsub.ValidationResult.Accept
|
|
||||||
|
|
||||||
# validate the message
|
|
||||||
let
|
|
||||||
validationRes = node.wakuRlnRelay.validateMessage(wakumessage)
|
|
||||||
proof = toHex(wakumessage.proof.proof)
|
|
||||||
epoch = fromEpoch(wakumessage.proof.epoch)
|
|
||||||
root = toHex(wakumessage.proof.merkleRoot)
|
|
||||||
shareX = toHex(wakumessage.proof.shareX)
|
|
||||||
shareY = toHex(wakumessage.proof.shareY)
|
|
||||||
nullifier = toHex(wakumessage.proof.nullifier)
|
|
||||||
case validationRes:
|
|
||||||
of Valid:
|
|
||||||
debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
|
|
||||||
trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
|
||||||
return pubsub.ValidationResult.Accept
|
|
||||||
of Invalid:
|
|
||||||
debug "message validity could not be verified, discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
|
|
||||||
trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
|
||||||
return pubsub.ValidationResult.Reject
|
|
||||||
of Spam:
|
|
||||||
debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
|
|
||||||
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
|
||||||
if spamHandler.isSome:
|
|
||||||
let handler = spamHandler.get
|
|
||||||
handler(wakumessage)
|
|
||||||
return pubsub.ValidationResult.Reject
|
|
||||||
# set a validator for the supplied pubsubTopic
|
|
||||||
let pb = PubSub(node.wakuRelay)
|
|
||||||
pb.addValidator(pubsubTopic, validator)
|
|
||||||
|
|
||||||
proc mountRlnRelayStatic*(node: WakuNode,
|
|
||||||
group: seq[IDCommitment],
|
|
||||||
memKeyPair: MembershipKeyPair,
|
|
||||||
memIndex: MembershipIndex,
|
|
||||||
pubsubTopic: string,
|
|
||||||
contentTopic: ContentTopic,
|
|
||||||
spamHandler: Option[SpamHandler] = none(SpamHandler)) {.raises: [Defect, IOError].}=
|
|
||||||
# TODO return a bool value to indicate the success of the call
|
|
||||||
|
|
||||||
debug "mounting rln-relay in off-chain/static mode"
|
|
||||||
# check whether inputs are provided
|
|
||||||
# relay protocol is the prerequisite of rln-relay
|
|
||||||
if node.wakuRelay.isNil:
|
|
||||||
error "Failed to mount WakuRLNRelay. Relay protocol is not mounted."
|
|
||||||
return
|
|
||||||
# check whether the pubsub topic is supported at the relay level
|
|
||||||
if pubsubTopic notin node.wakuRelay.defaultTopics:
|
|
||||||
error "Failed to mount WakuRLNRelay. The relay protocol does not support the configured pubsub topic.", pubsubTopic=pubsubTopic
|
|
||||||
return
|
|
||||||
|
|
||||||
debug "rln-relay input validation passed"
|
|
||||||
|
|
||||||
# check the peer's index and the inclusion of user's identity commitment in the group
|
|
||||||
doAssert((memKeyPair.idCommitment) == group[int(memIndex)])
|
|
||||||
|
|
||||||
# create an RLN instance
|
|
||||||
var rlnInstance = createRLNInstance()
|
|
||||||
doAssert(rlnInstance.isOk)
|
|
||||||
var rln = rlnInstance.value
|
|
||||||
|
|
||||||
# add members to the Merkle tree
|
|
||||||
for index in 0..group.len-1:
|
|
||||||
let member = group[index]
|
|
||||||
let member_is_added = rln.insertMember(member)
|
|
||||||
doAssert(member_is_added)
|
|
||||||
|
|
||||||
# create the WakuRLNRelay
|
|
||||||
var rlnPeer = WakuRLNRelay(membershipKeyPair: memKeyPair,
|
|
||||||
membershipIndex: memIndex,
|
|
||||||
rlnInstance: rln,
|
|
||||||
pubsubTopic: pubsubTopic,
|
|
||||||
contentTopic: contentTopic)
|
|
||||||
|
|
||||||
# adds a topic validator for the supplied pubsub topic at the relay protocol
|
|
||||||
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
|
|
||||||
# the topic validator checks for the correct non-spamming proof of the message
|
|
||||||
node.addRLNRelayValidator(pubsubTopic, contentTopic, spamHandler)
|
|
||||||
debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic
|
|
||||||
|
|
||||||
node.wakuRlnRelay = rlnPeer
|
|
||||||
|
|
||||||
|
|
||||||
proc mountRlnRelayDynamic*(node: WakuNode,
|
|
||||||
ethClientAddr: string = "",
|
|
||||||
ethAccAddr: web3.Address,
|
|
||||||
memContractAddr: web3.Address,
|
|
||||||
memKeyPair: Option[MembershipKeyPair] = none(MembershipKeyPair),
|
|
||||||
memIndex: Option[MembershipIndex] = none(MembershipIndex),
|
|
||||||
pubsubTopic: string,
|
|
||||||
contentTopic: ContentTopic,
|
|
||||||
spamHandler: Option[SpamHandler] = none(SpamHandler)) {.async.} =
|
|
||||||
debug "mounting rln-relay in on-chain/dynamic mode"
|
|
||||||
# TODO return a bool value to indicate the success of the call
|
|
||||||
# relay protocol is the prerequisite of rln-relay
|
|
||||||
if node.wakuRelay.isNil:
|
|
||||||
error "Failed to mount WakuRLNRelay. Relay protocol is not mounted."
|
|
||||||
return
|
|
||||||
# check whether the pubsub topic is supported at the relay level
|
|
||||||
if pubsubTopic notin node.wakuRelay.defaultTopics:
|
|
||||||
error "Failed to mount WakuRLNRelay. The relay protocol does not support the configured pubsub topic.", pubsubTopic=pubsubTopic
|
|
||||||
return
|
|
||||||
debug "rln-relay input validation passed"
|
|
||||||
|
|
||||||
# create an RLN instance
|
|
||||||
var rlnInstance = createRLNInstance()
|
|
||||||
doAssert(rlnInstance.isOk)
|
|
||||||
var rln = rlnInstance.value
|
|
||||||
|
|
||||||
# prepare rln membership key pair
|
|
||||||
var
|
|
||||||
keyPair: MembershipKeyPair
|
|
||||||
rlnIndex: MembershipIndex
|
|
||||||
if memKeyPair.isNone: # if non provided, create one and register to the contract
|
|
||||||
trace "no rln-relay key is provided, generating one"
|
|
||||||
let keyPairOpt = rln.membershipKeyGen()
|
|
||||||
doAssert(keyPairOpt.isSome)
|
|
||||||
keyPair = keyPairOpt.get()
|
|
||||||
# register the rln-relay peer to the membership contract
|
|
||||||
let regIndexRes = await register(idComm = keyPair.idCommitment, ethAccountAddress = ethAccAddr, ethClientAddress = ethClientAddr, membershipContractAddress = memContractAddr)
|
|
||||||
# check whether registration is done
|
|
||||||
doAssert(regIndexRes.isOk())
|
|
||||||
rlnIndex = regIndexRes.value
|
|
||||||
debug "peer is successfully registered into the membership contract", rlnIndex=rlnIndex, idComm=keyPair.idCommitment.toHex(), idKey=keyPair.idKey.toHex()
|
|
||||||
else:
|
|
||||||
keyPair = memKeyPair.get()
|
|
||||||
rlnIndex = memIndex.get()
|
|
||||||
|
|
||||||
# create the WakuRLNRelay
|
|
||||||
var rlnPeer = WakuRLNRelay(membershipKeyPair: keyPair,
|
|
||||||
membershipIndex: rlnIndex,
|
|
||||||
membershipContractAddress: memContractAddr,
|
|
||||||
ethClientAddress: ethClientAddr,
|
|
||||||
ethAccountAddress: ethAccAddr,
|
|
||||||
rlnInstance: rln,
|
|
||||||
pubsubTopic: pubsubTopic,
|
|
||||||
contentTopic: contentTopic)
|
|
||||||
|
|
||||||
|
|
||||||
proc handler(pubkey: Uint256, index: Uint256) =
|
|
||||||
debug "a new key is added", pubkey=pubkey
|
|
||||||
# assuming all the members arrive in order
|
|
||||||
let pk = pubkey.toIDCommitment()
|
|
||||||
let isSuccessful = rlnPeer.rlnInstance.insertMember(pk)
|
|
||||||
debug "received pk", pk=pk.toHex, index =index
|
|
||||||
doAssert(isSuccessful)
|
|
||||||
|
|
||||||
asyncSpawn rlnPeer.handleGroupUpdates(handler)
|
|
||||||
debug "dynamic group management is started"
|
|
||||||
# adds a topic validator for the supplied pubsub topic at the relay protocol
|
|
||||||
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
|
|
||||||
# the topic validator checks for the correct non-spamming proof of the message
|
|
||||||
addRLNRelayValidator(node, pubsubTopic, contentTopic, spamHandler)
|
|
||||||
debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic
|
|
||||||
|
|
||||||
node.wakuRlnRelay = rlnPeer
|
|
||||||
|
|
||||||
|
|
||||||
proc startRelay*(node: WakuNode) {.async.} =
|
proc startRelay*(node: WakuNode) {.async.} =
|
||||||
if node.wakuRelay.isNil:
|
if node.wakuRelay.isNil:
|
||||||
|
@ -1226,51 +1015,7 @@ when isMainModule:
|
||||||
|
|
||||||
when defined(rln):
|
when defined(rln):
|
||||||
if conf.rlnRelay:
|
if conf.rlnRelay:
|
||||||
if not conf.rlnRelayDynamic:
|
node.mountRlnRelay(conf)
|
||||||
info " setting up waku-rln-relay in on-chain mode... "
|
|
||||||
# set up rln relay inputs
|
|
||||||
let (groupOpt, memKeyPairOpt, memIndexOpt) = rlnRelayStaticSetUp(conf.rlnRelayMemIndex)
|
|
||||||
if memIndexOpt.isNone:
|
|
||||||
error "failed to mount WakuRLNRelay"
|
|
||||||
else:
|
|
||||||
# mount rlnrelay in off-chain mode with a static group of users
|
|
||||||
node.mountRlnRelayStatic(group = groupOpt.get(), memKeyPair = memKeyPairOpt.get(), memIndex= memIndexOpt.get(), pubsubTopic = conf.rlnRelayPubsubTopic, contentTopic = conf.rlnRelayContentTopic)
|
|
||||||
|
|
||||||
info "membership id key", idkey=memKeyPairOpt.get().idKey.toHex
|
|
||||||
info "membership id commitment key", idCommitmentkey=memKeyPairOpt.get().idCommitment.toHex
|
|
||||||
|
|
||||||
# check the correct construction of the tree by comparing the calculated root against the expected root
|
|
||||||
# no error should happen as it is already captured in the unit tests
|
|
||||||
# TODO have added this check to account for unseen corner cases, will remove it later
|
|
||||||
let
|
|
||||||
root = node.wakuRlnRelay.rlnInstance.getMerkleRoot.value.toHex()
|
|
||||||
expectedRoot = STATIC_GROUP_MERKLE_ROOT
|
|
||||||
if root != expectedRoot:
|
|
||||||
error "root mismatch: something went wrong not in Merkle tree construction"
|
|
||||||
debug "the calculated root", root
|
|
||||||
info "WakuRLNRelay is mounted successfully", pubsubtopic=conf.rlnRelayPubsubTopic, contentTopic=conf.rlnRelayContentTopic
|
|
||||||
else:
|
|
||||||
info " setting up waku-rln-relay in on-chain mode... "
|
|
||||||
|
|
||||||
# read related inputs to run rln-relay in on-chain mode and do type conversion when needed
|
|
||||||
let
|
|
||||||
ethAccountAddr = web3.fromHex(web3.Address, conf.rlnRelayEthAccount)
|
|
||||||
ethClientAddr = conf.rlnRelayEthClientAddress
|
|
||||||
ethMemContractAddress = web3.fromHex(web3.Address, conf.rlnRelayEthMemContractAddress)
|
|
||||||
rlnRelayId = conf.rlnRelayIdKey
|
|
||||||
rlnRelayIdCommitmentKey = conf.rlnRelayIdCommitmentKey
|
|
||||||
rlnRelayIndex = conf.rlnRelayMemIndex
|
|
||||||
# check if the peer has provided its rln credentials
|
|
||||||
if rlnRelayIdCommitmentKey != "" and rlnRelayId != "":
|
|
||||||
# type conversation from hex strings to MembershipKeyPair
|
|
||||||
let keyPair = @[(rlnRelayId, rlnRelayIdCommitmentKey)]
|
|
||||||
let memKeyPair = keyPair.toMembershipKeyPairs()[0]
|
|
||||||
# mount the rln relay protocol in the on-chain/dynamic mode
|
|
||||||
waitFor node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, ethClientAddr = ethClientAddr, memKeyPair = some(memKeyPair), memIndex = some(rlnRelayIndex), ethAccAddr = ethAccountAddr, pubsubTopic = conf.rlnRelayPubsubTopic, contentTopic = conf.rlnRelayContentTopic)
|
|
||||||
else:
|
|
||||||
# no rln credential is provided
|
|
||||||
# mount the rln relay protocol in the on-chain/dynamic mode
|
|
||||||
waitFor node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, ethClientAddr = ethClientAddr, ethAccAddr = ethAccountAddr, pubsubTopic = conf.rlnRelayPubsubTopic, contentTopic = conf.rlnRelayContentTopic)
|
|
||||||
|
|
||||||
if conf.swap:
|
if conf.swap:
|
||||||
mountSwap(node)
|
mountSwap(node)
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
import
|
||||||
|
eth/p2p/discoveryv5/enr,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/protocols/ping,
|
||||||
|
../protocol/waku_relay,
|
||||||
|
../protocol/waku_store/waku_store,
|
||||||
|
../protocol/waku_swap/waku_swap,
|
||||||
|
../protocol/waku_filter/waku_filter,
|
||||||
|
../protocol/waku_lightpush/waku_lightpush,
|
||||||
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
|
./peer_manager/peer_manager,
|
||||||
|
./discv5/waku_discv5
|
||||||
|
|
||||||
|
|
||||||
|
# key and crypto modules different
|
||||||
|
type
|
||||||
|
KeyPair* = crypto.KeyPair
|
||||||
|
PublicKey* = crypto.PublicKey
|
||||||
|
PrivateKey* = crypto.PrivateKey
|
||||||
|
|
||||||
|
# XXX: Weird type, should probably be using pubsub Topic object name?
|
||||||
|
Topic* = string
|
||||||
|
Message* = seq[byte]
|
||||||
|
|
||||||
|
WakuInfo* = object
|
||||||
|
# NOTE One for simplicity, can extend later as needed
|
||||||
|
listenAddresses*: seq[string]
|
||||||
|
enrUri*: string
|
||||||
|
#multiaddrStrings*: seq[string]
|
||||||
|
|
||||||
|
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||||
|
WakuNode* = ref object of RootObj
|
||||||
|
peerManager*: PeerManager
|
||||||
|
switch*: Switch
|
||||||
|
wakuRelay*: WakuRelay
|
||||||
|
wakuStore*: WakuStore
|
||||||
|
wakuFilter*: WakuFilter
|
||||||
|
wakuSwap*: WakuSwap
|
||||||
|
wakuRlnRelay*: WakuRLNRelay
|
||||||
|
wakuLightPush*: WakuLightPush
|
||||||
|
enr*: enr.Record
|
||||||
|
libp2pPing*: Ping
|
||||||
|
filters*: Filters
|
||||||
|
rng*: ref BrHmacDrbgContext
|
||||||
|
wakuDiscv5*: WakuDiscoveryV5
|
||||||
|
announcedAddresses* : seq[MultiAddress]
|
||||||
|
started*: bool # Indicates that node has started listening
|
|
@ -3,14 +3,21 @@
|
||||||
import
|
import
|
||||||
std/sequtils, tables, times,
|
std/sequtils, tables, times,
|
||||||
chronicles, options, chronos, stint,
|
chronicles, options, chronos, stint,
|
||||||
|
confutils,
|
||||||
web3, json,
|
web3, json,
|
||||||
|
web3/ethtypes,
|
||||||
eth/keys,
|
eth/keys,
|
||||||
|
libp2p/protocols/pubsub/rpc/messages,
|
||||||
|
libp2p/protocols/pubsub/pubsub,
|
||||||
stew/results,
|
stew/results,
|
||||||
stew/[byteutils, arrayops, endians2],
|
stew/[byteutils, arrayops, endians2],
|
||||||
rln,
|
rln,
|
||||||
waku_rln_relay_types,
|
waku_rln_relay_types,
|
||||||
|
../../node/[wakunode2_types,config],
|
||||||
../waku_message
|
../waku_message
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakurlnrelayutils"
|
topics = "wakurlnrelayutils"
|
||||||
|
|
||||||
|
@ -626,3 +633,226 @@ proc subscribeToGroupEvents(ethClientUri: string, contractAddress: Address, bloc
|
||||||
proc handleGroupUpdates*(rlnPeer: WakuRLNRelay, handler: RegistrationEventHandler) {.async, gcsafe.} =
|
proc handleGroupUpdates*(rlnPeer: WakuRLNRelay, handler: RegistrationEventHandler) {.async, gcsafe.} =
|
||||||
# mounts the supplied handler for the registration events emitting from the membership contract
|
# mounts the supplied handler for the registration events emitting from the membership contract
|
||||||
await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler)
|
await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler)
|
||||||
|
|
||||||
|
proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) =
|
||||||
|
## this procedure is a thin wrapper for the pubsub addValidator method
|
||||||
|
## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
|
||||||
|
## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic
|
||||||
|
## the message validation logic is according to https://rfc.vac.dev/spec/17/
|
||||||
|
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
|
||||||
|
trace "rln-relay topic validator is called"
|
||||||
|
let msg = WakuMessage.init(message.data)
|
||||||
|
if msg.isOk():
|
||||||
|
let
|
||||||
|
wakumessage = msg.value()
|
||||||
|
payload = string.fromBytes(wakumessage.payload)
|
||||||
|
|
||||||
|
# check the contentTopic
|
||||||
|
if (wakumessage.contentTopic != "") and (contentTopic != "") and (wakumessage.contentTopic != contentTopic):
|
||||||
|
trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload
|
||||||
|
return pubsub.ValidationResult.Accept
|
||||||
|
|
||||||
|
# validate the message
|
||||||
|
let
|
||||||
|
validationRes = node.wakuRlnRelay.validateMessage(wakumessage)
|
||||||
|
proof = toHex(wakumessage.proof.proof)
|
||||||
|
epoch = fromEpoch(wakumessage.proof.epoch)
|
||||||
|
root = toHex(wakumessage.proof.merkleRoot)
|
||||||
|
shareX = toHex(wakumessage.proof.shareX)
|
||||||
|
shareY = toHex(wakumessage.proof.shareY)
|
||||||
|
nullifier = toHex(wakumessage.proof.nullifier)
|
||||||
|
case validationRes:
|
||||||
|
of Valid:
|
||||||
|
debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
|
||||||
|
trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||||
|
return pubsub.ValidationResult.Accept
|
||||||
|
of Invalid:
|
||||||
|
debug "message validity could not be verified, discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
|
||||||
|
trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||||
|
return pubsub.ValidationResult.Reject
|
||||||
|
of Spam:
|
||||||
|
debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
|
||||||
|
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||||
|
if spamHandler.isSome:
|
||||||
|
let handler = spamHandler.get
|
||||||
|
handler(wakumessage)
|
||||||
|
return pubsub.ValidationResult.Reject
|
||||||
|
# set a validator for the supplied pubsubTopic
|
||||||
|
let pb = PubSub(node.wakuRelay)
|
||||||
|
pb.addValidator(pubsubTopic, validator)
|
||||||
|
|
||||||
|
proc mountRlnRelayStatic*(node: WakuNode,
|
||||||
|
group: seq[IDCommitment],
|
||||||
|
memKeyPair: MembershipKeyPair,
|
||||||
|
memIndex: MembershipIndex,
|
||||||
|
pubsubTopic: string,
|
||||||
|
contentTopic: ContentTopic,
|
||||||
|
spamHandler: Option[SpamHandler] = none(SpamHandler)) {.raises: [Defect, IOError].}=
|
||||||
|
# TODO return a bool value to indicate the success of the call
|
||||||
|
|
||||||
|
debug "mounting rln-relay in off-chain/static mode"
|
||||||
|
# check whether inputs are provided
|
||||||
|
# relay protocol is the prerequisite of rln-relay
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Failed to mount WakuRLNRelay. Relay protocol is not mounted."
|
||||||
|
return
|
||||||
|
# check whether the pubsub topic is supported at the relay level
|
||||||
|
if pubsubTopic notin node.wakuRelay.defaultTopics:
|
||||||
|
error "Failed to mount WakuRLNRelay. The relay protocol does not support the configured pubsub topic.", pubsubTopic=pubsubTopic
|
||||||
|
return
|
||||||
|
|
||||||
|
debug "rln-relay input validation passed"
|
||||||
|
|
||||||
|
# check the peer's index and the inclusion of user's identity commitment in the group
|
||||||
|
doAssert((memKeyPair.idCommitment) == group[int(memIndex)])
|
||||||
|
|
||||||
|
# create an RLN instance
|
||||||
|
var rlnInstance = createRLNInstance()
|
||||||
|
doAssert(rlnInstance.isOk)
|
||||||
|
var rln = rlnInstance.value
|
||||||
|
|
||||||
|
# add members to the Merkle tree
|
||||||
|
for index in 0..group.len-1:
|
||||||
|
let member = group[index]
|
||||||
|
let member_is_added = rln.insertMember(member)
|
||||||
|
doAssert(member_is_added)
|
||||||
|
|
||||||
|
# create the WakuRLNRelay
|
||||||
|
var rlnPeer = WakuRLNRelay(membershipKeyPair: memKeyPair,
|
||||||
|
membershipIndex: memIndex,
|
||||||
|
rlnInstance: rln,
|
||||||
|
pubsubTopic: pubsubTopic,
|
||||||
|
contentTopic: contentTopic)
|
||||||
|
|
||||||
|
# adds a topic validator for the supplied pubsub topic at the relay protocol
|
||||||
|
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
|
||||||
|
# the topic validator checks for the correct non-spamming proof of the message
|
||||||
|
node.addRLNRelayValidator(pubsubTopic, contentTopic, spamHandler)
|
||||||
|
debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic
|
||||||
|
|
||||||
|
node.wakuRlnRelay = rlnPeer
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
proc mountRlnRelayDynamic*(node: WakuNode,
|
||||||
|
ethClientAddr: string = "",
|
||||||
|
ethAccAddr: web3.Address,
|
||||||
|
memContractAddr: web3.Address,
|
||||||
|
memKeyPair: Option[MembershipKeyPair] = none(MembershipKeyPair),
|
||||||
|
memIndex: Option[MembershipIndex] = none(MembershipIndex),
|
||||||
|
pubsubTopic: string,
|
||||||
|
contentTopic: ContentTopic,
|
||||||
|
spamHandler: Option[SpamHandler] = none(SpamHandler)) {.async.} =
|
||||||
|
debug "mounting rln-relay in on-chain/dynamic mode"
|
||||||
|
# TODO return a bool value to indicate the success of the call
|
||||||
|
# relay protocol is the prerequisite of rln-relay
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Failed to mount WakuRLNRelay. Relay protocol is not mounted."
|
||||||
|
return
|
||||||
|
# check whether the pubsub topic is supported at the relay level
|
||||||
|
if pubsubTopic notin node.wakuRelay.defaultTopics:
|
||||||
|
error "Failed to mount WakuRLNRelay. The relay protocol does not support the configured pubsub topic.", pubsubTopic=pubsubTopic
|
||||||
|
return
|
||||||
|
debug "rln-relay input validation passed"
|
||||||
|
|
||||||
|
# create an RLN instance
|
||||||
|
var rlnInstance = createRLNInstance()
|
||||||
|
doAssert(rlnInstance.isOk)
|
||||||
|
var rln = rlnInstance.value
|
||||||
|
|
||||||
|
# prepare rln membership key pair
|
||||||
|
var
|
||||||
|
keyPair: MembershipKeyPair
|
||||||
|
rlnIndex: MembershipIndex
|
||||||
|
if memKeyPair.isNone: # if non provided, create one and register to the contract
|
||||||
|
trace "no rln-relay key is provided, generating one"
|
||||||
|
let keyPairOpt = rln.membershipKeyGen()
|
||||||
|
doAssert(keyPairOpt.isSome)
|
||||||
|
keyPair = keyPairOpt.get()
|
||||||
|
# register the rln-relay peer to the membership contract
|
||||||
|
let regIndexRes = await register(idComm = keyPair.idCommitment, ethAccountAddress = ethAccAddr, ethClientAddress = ethClientAddr, membershipContractAddress = memContractAddr)
|
||||||
|
# check whether registration is done
|
||||||
|
doAssert(regIndexRes.isOk())
|
||||||
|
rlnIndex = regIndexRes.value
|
||||||
|
debug "peer is successfully registered into the membership contract"
|
||||||
|
else:
|
||||||
|
keyPair = memKeyPair.get()
|
||||||
|
rlnIndex = memIndex.get()
|
||||||
|
|
||||||
|
# create the WakuRLNRelay
|
||||||
|
var rlnPeer = WakuRLNRelay(membershipKeyPair: keyPair,
|
||||||
|
membershipIndex: rlnIndex,
|
||||||
|
membershipContractAddress: memContractAddr,
|
||||||
|
ethClientAddress: ethClientAddr,
|
||||||
|
ethAccountAddress: ethAccAddr,
|
||||||
|
rlnInstance: rln,
|
||||||
|
pubsubTopic: pubsubTopic,
|
||||||
|
contentTopic: contentTopic)
|
||||||
|
|
||||||
|
|
||||||
|
proc handler(pubkey: Uint256, index: Uint256) =
|
||||||
|
debug "a new key is added", pubkey=pubkey
|
||||||
|
# assuming all the members arrive in order
|
||||||
|
let pk = pubkey.toIDCommitment()
|
||||||
|
let isSuccessful = rlnPeer.rlnInstance.insertMember(pk)
|
||||||
|
debug "received pk", pk=pk.toHex, index =index
|
||||||
|
doAssert(isSuccessful)
|
||||||
|
|
||||||
|
asyncSpawn rlnPeer.handleGroupUpdates(handler)
|
||||||
|
debug "dynamic group management is started"
|
||||||
|
# adds a topic validator for the supplied pubsub topic at the relay protocol
|
||||||
|
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
|
||||||
|
# the topic validator checks for the correct non-spamming proof of the message
|
||||||
|
addRLNRelayValidator(node, pubsubTopic, contentTopic, spamHandler)
|
||||||
|
debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic
|
||||||
|
|
||||||
|
node.wakuRlnRelay = rlnPeer
|
||||||
|
|
||||||
|
|
||||||
|
proc mountRlnRelay*(node: WakuNode, conf: WakuNodeConf) {.raises: [Defect, ValueError, IOError, CatchableError].} =
|
||||||
|
if not conf.rlnRelayDynamic:
|
||||||
|
info " setting up waku-rln-relay in on-chain mode... "
|
||||||
|
# set up rln relay inputs
|
||||||
|
let (groupOpt, memKeyPairOpt, memIndexOpt) = rlnRelayStaticSetUp(conf.rlnRelayMemIndex)
|
||||||
|
if memIndexOpt.isNone:
|
||||||
|
error "failed to mount WakuRLNRelay"
|
||||||
|
else:
|
||||||
|
# mount rlnrelay in off-chain mode with a static group of users
|
||||||
|
node.mountRlnRelayStatic(group = groupOpt.get(), memKeyPair = memKeyPairOpt.get(), memIndex= memIndexOpt.get(), pubsubTopic = conf.rlnRelayPubsubTopic, contentTopic = conf.rlnRelayContentTopic)
|
||||||
|
|
||||||
|
info "membership id key", idkey=memKeyPairOpt.get().idKey.toHex
|
||||||
|
info "membership id commitment key", idCommitmentkey=memKeyPairOpt.get().idCommitment.toHex
|
||||||
|
|
||||||
|
# check the correct construction of the tree by comparing the calculated root against the expected root
|
||||||
|
# no error should happen as it is already captured in the unit tests
|
||||||
|
# TODO have added this check to account for unseen corner cases, will remove it later
|
||||||
|
let
|
||||||
|
root = node.wakuRlnRelay.rlnInstance.getMerkleRoot.value.toHex()
|
||||||
|
expectedRoot = STATIC_GROUP_MERKLE_ROOT
|
||||||
|
if root != expectedRoot:
|
||||||
|
error "root mismatch: something went wrong not in Merkle tree construction"
|
||||||
|
debug "the calculated root", root
|
||||||
|
info "WakuRLNRelay is mounted successfully", pubsubtopic=conf.rlnRelayPubsubTopic, contentTopic=conf.rlnRelayContentTopic
|
||||||
|
else:
|
||||||
|
info " setting up waku-rln-relay in on-chain mode... "
|
||||||
|
|
||||||
|
# read related inputs to run rln-relay in on-chain mode and do type conversion when needed
|
||||||
|
let
|
||||||
|
ethAccountAddr = web3.fromHex(web3.Address, conf.rlnRelayEthAccount)
|
||||||
|
ethClientAddr = conf.rlnRelayEthClientAddress
|
||||||
|
ethMemContractAddress = web3.fromHex(web3.Address, conf.rlnRelayEthMemContractAddress)
|
||||||
|
rlnRelayId = conf.rlnRelayIdKey
|
||||||
|
rlnRelayIdCommitmentKey = conf.rlnRelayIdCommitmentKey
|
||||||
|
rlnRelayIndex = conf.rlnRelayMemIndex
|
||||||
|
# check if the peer has provided its rln credentials
|
||||||
|
if rlnRelayIdCommitmentKey != "" and rlnRelayId != "":
|
||||||
|
# type conversation from hex strings to MembershipKeyPair
|
||||||
|
let keyPair = @[(rlnRelayId, rlnRelayIdCommitmentKey)]
|
||||||
|
let memKeyPair = keyPair.toMembershipKeyPairs()[0]
|
||||||
|
# mount the rln relay protocol in the on-chain/dynamic mode
|
||||||
|
waitFor node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, ethClientAddr = ethClientAddr, memKeyPair = some(memKeyPair), memIndex = some(rlnRelayIndex), ethAccAddr = ethAccountAddr, pubsubTopic = conf.rlnRelayPubsubTopic, contentTopic = conf.rlnRelayContentTopic)
|
||||||
|
else:
|
||||||
|
# no rln credential is provided
|
||||||
|
# mount the rln relay protocol in the on-chain/dynamic mode
|
||||||
|
waitFor node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, ethClientAddr = ethClientAddr, ethAccAddr = ethAccountAddr, pubsubTopic = conf.rlnRelayPubsubTopic, contentTopic = conf.rlnRelayContentTopic)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue