chore(rln): run rln in all relay pubsubtopics + remove cli flags

This commit is contained in:
Richard Ramos 2023-08-21 16:54:13 -04:00 committed by richΛrd
parent 5422af8130
commit 624fe1bb2f
16 changed files with 55 additions and 183 deletions

View File

@ -6,7 +6,6 @@ package main
import (
cli "github.com/urfave/cli/v2"
wcli "github.com/waku-org/go-waku/waku/cliutils"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
func rlnFlags() []cli.Flag {
@ -23,18 +22,6 @@ func rlnFlags() []cli.Flag {
Usage: "the index of credentials to use, within a specific rln membership set",
Destination: &options.RLNRelay.MembershipGroupIndex,
},
&cli.StringFlag{
Name: "rln-relay-pubsub-topic",
Value: "/waku/2/default-waku/proto",
Usage: "the pubsub topic for which rln-relay gets enabled",
Destination: &options.RLNRelay.PubsubTopic,
},
&cli.StringFlag{
Name: "rln-relay-content-topic",
Value: protocol.NewContentTopic("toy-chat", 3, "mingde", "proto").String(),
Usage: "the content topic for which rln-relay gets enabled",
Destination: &options.RLNRelay.ContentTopic,
},
&cli.BoolFlag{
Name: "rln-relay-dynamic",
Usage: "Enable waku-rln-relay with on-chain dynamic group management",

View File

@ -17,13 +17,11 @@ func checkForRLN(logger *zap.Logger, options NodeOptions, nodeOpts *[]node.WakuN
failOnErr(errors.New("relay not available"), "Could not enable RLN Relay")
}
if !options.RLNRelay.Dynamic {
*nodeOpts = append(*nodeOpts, node.WithStaticRLNRelay(options.RLNRelay.PubsubTopic, options.RLNRelay.ContentTopic, rln.MembershipIndex(options.RLNRelay.MembershipGroupIndex), nil))
*nodeOpts = append(*nodeOpts, node.WithStaticRLNRelay(rln.MembershipIndex(options.RLNRelay.MembershipGroupIndex), nil))
} else {
// TODO: too many parameters in this function
// consider passing a config struct instead
*nodeOpts = append(*nodeOpts, node.WithDynamicRLNRelay(
options.RLNRelay.PubsubTopic,
options.RLNRelay.ContentTopic,
options.RLNRelay.CredentialsPath,
options.RLNRelay.CredentialsPassword,
options.RLNRelay.CredentialsIndex,

View File

@ -40,8 +40,6 @@ type RLNRelayOptions struct {
CredentialsIndex uint
TreePath string
MembershipGroupIndex uint
PubsubTopic string
ContentTopic string
Dynamic bool
ETHClientAddress string
MembershipContractAddress common.Address

View File

@ -141,7 +141,7 @@ func (c *Chat) receiveMessages() {
case value := <-c.C:
msgContentTopic := value.Message().ContentTopic
if msgContentTopic != c.options.ContentTopic || (c.options.RLNRelay.Enable && msgContentTopic != c.options.RLNRelay.ContentTopic) {
if msgContentTopic != c.options.ContentTopic {
continue // Discard messages from other topics
}

View File

@ -50,8 +50,6 @@ func execute(options Options) {
if options.RLNRelay.Dynamic {
fmt.Println("Setting up dynamic rln...")
opts = append(opts, node.WithDynamicRLNRelay(
options.RLNRelay.PubsubTopic,
options.RLNRelay.ContentTopic,
options.RLNRelay.CredentialsPath,
options.RLNRelay.CredentialsPassword,
options.RLNRelay.CredentialsIndex,
@ -63,8 +61,6 @@ func execute(options Options) {
))
} else {
opts = append(opts, node.WithStaticRLNRelay(
options.RLNRelay.PubsubTopic,
options.RLNRelay.ContentTopic,
uint(options.RLNRelay.MembershipIndex),
spamHandler))
}

View File

@ -6,7 +6,6 @@ import (
"github.com/waku-org/go-waku/waku/cliutils"
wcli "github.com/waku-org/go-waku/waku/cliutils"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/urfave/cli/v2"
)
@ -198,18 +197,6 @@ func getFlags() []cli.Flag {
Usage: "the index of credentials to use",
Destination: &options.RLNRelay.CredentialsIndex,
},
&cli.StringFlag{
Name: "rln-relay-pubsub-topic",
Value: relay.DefaultWakuTopic,
Usage: "the pubsub topic for which rln-relay gets enabled",
Destination: &options.RLNRelay.PubsubTopic,
},
&cli.StringFlag{
Name: "rln-relay-content-topic",
Value: testnetContentTopic,
Usage: "the content topic for which rln-relay gets enabled",
Destination: &options.RLNRelay.ContentTopic,
},
&cli.BoolFlag{
Name: "rln-relay-dynamic",
Usage: "Enable waku-rln-relay with on-chain dynamic group management",

View File

@ -34,8 +34,6 @@ type RLNRelayOptions struct {
CredentialsIndex uint
MembershipGroupIndex uint
MembershipIndex uint
PubsubTopic string
ContentTopic string
Dynamic bool
ETHClientAddress string
MembershipContractAddress common.Address

View File

@ -10,6 +10,7 @@ import (
backoffv4 "github.com/cenkalti/backoff/v4"
golog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
@ -66,10 +67,14 @@ type IdentityCredential = struct {
IDCommitment byte32 `json:"idCommitment"`
}
type SpamHandler = func(message *pb.WakuMessage) error
type RLNRelay interface {
IdentityCredential() (IdentityCredential, error)
MembershipIndex() (uint, error)
AppendRLNProof(msg *pb.WakuMessage, senderEpochTime time.Time) error
Validator(spamHandler SpamHandler) func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool
Start(ctx context.Context) error
Stop() error
}
@ -275,6 +280,14 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}
w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log)
if w.opts.enableRelay {
err = w.setupRLNRelay()
if err != nil {
return nil, err
}
}
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, w.opts.pubsubOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...)
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
@ -480,7 +493,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
if w.opts.enableRLN {
err = w.mountRlnRelay(ctx)
err = w.startRlnRelay(ctx)
if err != nil {
return err
}

View File

@ -9,7 +9,11 @@ func (w *WakuNode) RLNRelay() RLNRelay {
return nil
}
func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
func (w *WakuNode) setupRLNRelay() error {
return nil
}
func (w *WakuNode) startRlnRelay(ctx context.Context) error {
return nil
}

View File

@ -8,11 +8,11 @@ import (
"context"
"errors"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/waku-org/go-waku/waku/v2/protocol/rln"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/static"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)
// RLNRelay is used to access any operation related to Waku RLN protocol
@ -20,13 +20,7 @@ func (w *WakuNode) RLNRelay() RLNRelay {
return w.rlnRelay
}
func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
// check whether inputs are provided
// relay protocol is the prerequisite of rln-relay
if w.Relay() == nil {
return errors.New("relay protocol is required")
}
func (w *WakuNode) setupRLNRelay() error {
var err error
var groupManager rln.GroupManager
@ -61,18 +55,27 @@ func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
}
}
rlnRelay, err := rln.New(w.Relay(), groupManager, w.opts.rlnTreePath, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log)
if err != nil {
return err
}
err = rlnRelay.Start(ctx)
rlnRelay, err := rln.New(groupManager, w.opts.rlnTreePath, w.timesource, w.log)
if err != nil {
return err
}
w.rlnRelay = rlnRelay
// Adding RLN as a default validator
w.opts.pubsubOpts = append(w.opts.pubsubOpts, pubsub.WithDefaultValidator(rlnRelay.Validator(w.opts.rlnSpamHandler)))
return nil
}
func (w *WakuNode) startRlnRelay(ctx context.Context) error {
rlnRelay := w.rlnRelay.(*rln.WakuRLNRelay)
err := rlnRelay.Start(ctx)
if err != nil {
return err
}
if !w.opts.rlnRelayDynamic {
// check the correct construction of the tree by comparing the calculated root against the expected root
// no error should happen as it is already captured in the unit tests
@ -91,7 +94,7 @@ func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
}
}
w.log.Info("mounted waku RLN relay", zap.String("pubsubTopic", w.opts.rlnRelayPubsubTopic), zap.String("contentTopic", w.opts.rlnRelayContentTopic))
w.log.Info("mounted waku RLN relay")
return nil
}

View File

@ -95,8 +95,6 @@ type WakuNodeParameters struct {
enableRLN bool
rlnRelayMemIndex uint
rlnRelayPubsubTopic string
rlnRelayContentTopic string
rlnRelayDynamic bool
rlnSpamHandler func(message *pb.WakuMessage) error
rlnETHClientAddress string

View File

@ -11,13 +11,11 @@ import (
// WithStaticRLNRelay enables the Waku V2 RLN protocol in offchain mode
// Requires the `gowaku_rln` build constrain (or the env variable RLN=true if building go-waku)
func WithStaticRLNRelay(pubsubTopic string, contentTopic string, memberIndex r.MembershipIndex, spamHandler rln.SpamHandler) WakuNodeOption {
func WithStaticRLNRelay(memberIndex r.MembershipIndex, spamHandler rln.SpamHandler) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRLN = true
params.rlnRelayDynamic = false
params.rlnRelayMemIndex = memberIndex
params.rlnRelayPubsubTopic = pubsubTopic
params.rlnRelayContentTopic = contentTopic
params.rlnSpamHandler = spamHandler
return nil
}
@ -25,15 +23,13 @@ func WithStaticRLNRelay(pubsubTopic string, contentTopic string, memberIndex r.M
// WithDynamicRLNRelay enables the Waku V2 RLN protocol in onchain mode.
// Requires the `gowaku_rln` build constrain (or the env variable RLN=true if building go-waku)
func WithDynamicRLNRelay(pubsubTopic string, contentTopic string, keystorePath string, keystorePassword string, keystoreIndex uint, treePath string, membershipContract common.Address, membershipGroupIndex uint, spamHandler rln.SpamHandler, ethClientAddress string) WakuNodeOption {
func WithDynamicRLNRelay(keystorePath string, keystorePassword string, keystoreIndex uint, treePath string, membershipContract common.Address, membershipGroupIndex uint, spamHandler rln.SpamHandler, ethClientAddress string) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRLN = true
params.rlnRelayDynamic = true
params.keystorePassword = keystorePassword
params.keystorePath = keystorePath
params.keystoreIndex = keystoreIndex
params.rlnRelayPubsubTopic = pubsubTopic
params.rlnRelayContentTopic = contentTopic
params.rlnSpamHandler = spamHandler
params.rlnETHClientAddress = ethClientAddress
params.rlnMembershipContractAddress = membershipContract

View File

@ -179,6 +179,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
pubsub.WithSeenMessagesTTL(2 * time.Minute),
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
// TODO: to improve - setup default validator only if no default validator has been set.
pubsub.WithDefaultValidator(func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
msg := new(pb.WakuMessage)
err := proto.Unmarshal(message.Data, msg)

View File

@ -6,14 +6,12 @@ package rln
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"fmt"
"math/big"
"os"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
@ -23,8 +21,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic"
@ -145,18 +141,6 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() {
rlnInstance, err := rln.NewRLN()
s.Require().NoError(err)
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.TODO())
defer relay.Stop()
s.Require().NoError(err)
rt, err := group_manager.NewMerkleRootTracker(5, rlnInstance)
s.Require().NoError(err)
@ -172,7 +156,6 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() {
rlnRelay := &WakuRLNRelay{
rootTracker: rt,
groupManager: gm,
relay: relay,
RLN: rlnInstance,
log: utils.Logger(),
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
@ -243,25 +226,6 @@ func (s *WakuRLNRelayDynamicSuite) TestMerkleTreeConstruction() {
_, membershipGroupIndex := s.register(credentials1, s.u1PrivKey, "./test_onchain.json")
_, membershipGroupIndex = s.register(credentials2, s.u1PrivKey, "./test_onchain.json")
// Creating relay
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.TODO())
defer relay.Stop()
s.Require().NoError(err)
// Subscribing to topic
sub, err := relay.SubscribeToTopic(context.TODO(), RLNRELAY_PUBSUB_TOPIC)
s.Require().NoError(err)
defer sub.Unsubscribe()
// mount the rln relay protocol in the on-chain/dynamic mode
// TODO: This assumes the keystoreIndex is 0, but there are two possible credentials in this keystore due to using the same contract address
// when credentials1 and credentials2 were registered. We should remove this hardcoded value and obtain the correct value when the credentials are persisted
@ -269,7 +233,7 @@ func (s *WakuRLNRelayDynamicSuite) TestMerkleTreeConstruction() {
gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, "./test_onchain.json", keystorePassword, keystoreIndex, false, utils.Logger())
s.Require().NoError(err)
rlnRelay, err := New(relay, gm, "test-merkle-tree.db", RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, timesource.NewDefaultClock(), utils.Logger())
rlnRelay, err := New(gm, "test-merkle-tree.db", timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)
err = rlnRelay.Start(context.TODO())
@ -292,21 +256,6 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() {
s.Require().NoError(err)
// Node 1 ============================================================
port1, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host1, err := tests.MakeHost(context.Background(), port1, rand.Reader)
s.Require().NoError(err)
relay1 := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay1.SetHost(host1)
err = relay1.Start(context.TODO())
defer relay1.Stop()
s.Require().NoError(err)
sub1, err := relay1.SubscribeToTopic(context.TODO(), RLNRELAY_PUBSUB_TOPIC)
s.Require().NoError(err)
defer sub1.Unsubscribe()
// Register credentials1 in contract and keystore1
credentials1 := s.generateCredentials(rlnInstance)
@ -318,27 +267,12 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() {
gm1, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, utils.Logger())
s.Require().NoError(err)
rlnRelay1, err := New(relay1, gm1, "test-correct-registration-1.db", RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, timesource.NewDefaultClock(), utils.Logger())
rlnRelay1, err := New(gm1, "test-correct-registration-1.db", timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)
err = rlnRelay1.Start(context.TODO())
s.Require().NoError(err)
// Node 2 ============================================================
port2, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host2, err := tests.MakeHost(context.Background(), port2, rand.Reader)
s.Require().NoError(err)
relay2 := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay2.SetHost(host2)
err = relay2.Start(context.TODO())
defer relay2.Stop()
s.Require().NoError(err)
sub2, err := relay2.SubscribeToTopic(context.TODO(), RLNRELAY_PUBSUB_TOPIC)
s.Require().NoError(err)
defer sub2.Unsubscribe()
// Register credentials2 in contract and keystore2
credentials2 := s.generateCredentials(rlnInstance)
@ -350,7 +284,7 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() {
gm2, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath2, keystorePassword, 0, false, utils.Logger())
s.Require().NoError(err)
rlnRelay2, err := New(relay2, gm2, "test-correct-registration-2.db", RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, timesource.NewDefaultClock(), utils.Logger())
rlnRelay2, err := New(gm2, "test-correct-registration-2.db", timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)
err = rlnRelay2.Start(context.TODO())
s.Require().NoError(err)

View File

@ -59,7 +59,7 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() {
groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger())
s.Require().NoError(err)
wakuRLNRelay, err := New(relay, groupManager, "", RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, timesource.NewDefaultClock(), utils.Logger())
wakuRLNRelay, err := New(groupManager, "", timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)
err = wakuRLNRelay.Start(context.TODO())

View File

@ -13,7 +13,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-zerokit-rln/rln"
@ -29,17 +28,11 @@ type GroupManager interface {
}
type WakuRLNRelay struct {
relay *relay.WakuRelay
timesource timesource.Timesource
groupManager GroupManager
rootTracker *group_manager.MerkleRootTracker
// pubsubTopic is the topic for which rln relay is mounted
pubsubTopic string
contentTopic string
spamHandler SpamHandler
RLN *rln.RLN
// the log of nullifiers and Shamir shares of the past messages grouped per epoch
@ -52,12 +45,8 @@ type WakuRLNRelay struct {
const rlnDefaultTreePath = "./rln_tree.db"
func New(
relay *relay.WakuRelay,
groupManager GroupManager,
treePath string,
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler,
timesource timesource.Timesource,
log *zap.Logger) (*WakuRLNRelay, error) {
@ -86,10 +75,6 @@ func New(
RLN: rlnInstance,
groupManager: groupManager,
rootTracker: rootTracker,
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
relay: relay,
spamHandler: spamHandler,
log: log,
timesource: timesource,
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
@ -104,15 +89,7 @@ func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error {
return err
}
// adds a topic validator for the supplied pubsub topic at the relay protocol
// messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
// the topic validator checks for the correct non-spamming proof of the message
err = rlnRelay.addValidator(rlnRelay.relay, rlnRelay.pubsubTopic, rlnRelay.contentTopic, rlnRelay.spamHandler)
if err != nil {
return err
}
log.Info("rln relay topic validator mounted", zap.String("pubsubTopic", rlnRelay.pubsubTopic), zap.String("contentTopic", rlnRelay.contentTopic))
log.Info("rln relay topic validator mounted")
return nil
}
@ -287,15 +264,11 @@ func (rlnRelay *WakuRLNRelay) AppendRLNProof(msg *pb.WakuMessage, senderEpochTim
return nil
}
// this function sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
// if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic
// the message validation logic is according to https://rfc.vac.dev/spec/17/
func (rlnRelay *WakuRLNRelay) addValidator(
relay *relay.WakuRelay,
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler) error {
validator := func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
// Validator returns a validator for the waku messages.
// The message validation logic is according to https://rfc.vac.dev/spec/17/
func (rlnRelay *WakuRLNRelay) Validator(
spamHandler SpamHandler) func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
rlnRelay.log.Debug("rln-relay topic validator called")
wakuMessage := &pb.WakuMessage{}
@ -304,12 +277,6 @@ func (rlnRelay *WakuRLNRelay) addValidator(
return true
}
// check the contentTopic
if (wakuMessage.ContentTopic != "") && (contentTopic != "") && (wakuMessage.ContentTopic != contentTopic) {
rlnRelay.log.Debug("content topic did not match", zap.String("contentTopic", contentTopic))
return true
}
// validate the message
validationRes, err := rlnRelay.ValidateMessage(wakuMessage, nil)
if err != nil {
@ -320,20 +287,17 @@ func (rlnRelay *WakuRLNRelay) addValidator(
switch validationRes {
case validMessage:
rlnRelay.log.Debug("message verified",
zap.String("pubsubTopic", pubsubTopic),
zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))),
zap.String("id", hex.EncodeToString([]byte(message.ID))),
)
return true
case invalidMessage:
rlnRelay.log.Debug("message could not be verified",
zap.String("pubsubTopic", pubsubTopic),
zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))),
zap.String("id", hex.EncodeToString([]byte(message.ID))),
)
return false
case spamMessage:
rlnRelay.log.Debug("spam message found",
zap.String("pubsubTopic", pubsubTopic),
zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))),
zap.String("id", hex.EncodeToString([]byte(message.ID))),
)
if spamHandler != nil {
@ -348,11 +312,6 @@ func (rlnRelay *WakuRLNRelay) addValidator(
return false
}
}
// In case there's a topic validator registered
_ = relay.PubSub().UnregisterTopicValidator(pubsubTopic)
return relay.PubSub().RegisterTopicValidator(pubsubTopic, validator)
}
func (rlnRelay *WakuRLNRelay) generateProof(input []byte, epoch rln.Epoch) (*pb.RateLimitProof, error) {