diff --git a/waku/v2/node/wakunode2_rln.go b/waku/v2/node/wakunode2_rln.go index 33a28792..568c9751 100644 --- a/waku/v2/node/wakunode2_rln.go +++ b/waku/v2/node/wakunode2_rln.go @@ -52,6 +52,7 @@ func (w *WakuNode) setupRLNRelay() error { w.opts.keystorePassword, w.opts.keystoreIndex, true, + w.opts.prometheusReg, w.log, ) if err != nil { @@ -59,7 +60,7 @@ func (w *WakuNode) setupRLNRelay() error { } } - rlnRelay, err := rln.New(groupManager, w.opts.rlnTreePath, w.timesource, w.log) + rlnRelay, err := rln.New(groupManager, w.opts.rlnTreePath, w.timesource, w.opts.prometheusReg, w.log) if err != nil { return err } diff --git a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go index 637d791b..2046fc93 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go @@ -6,11 +6,13 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" + "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" @@ -27,8 +29,9 @@ var RLNAppInfo = keystore.AppInfo{ } type DynamicGroupManager struct { - rln *rln.RLN - log *zap.Logger + rln *rln.RLN + log *zap.Logger + metrics Metrics cancel context.CancelFunc wg sync.WaitGroup @@ -95,6 +98,8 @@ func handler(gm *DynamicGroupManager, events []*contracts.RLNMemberRegistered) e return err } + gm.metrics.RecordRegisteredMembership(toInsertTable.Len() - toRemoveTable.Len()) + gm.lastBlockProcessed = lastBlockProcessed err = gm.SetMetadata(RLNMetadata{ LastProcessedBlock: gm.lastBlockProcessed, @@ -121,6 +126,7 @@ func NewDynamicGroupManager( keystorePassword string, keystoreIndex uint, saveKeystore bool, + reg prometheus.Registerer, log *zap.Logger, ) (*DynamicGroupManager, error) { log = log.Named("rln-dynamic") @@ -147,6 +153,7 @@ func NewDynamicGroupManager( keystorePassword: password, keystoreIndex: keystoreIndex, log: log, + metrics: newMetrics(reg), }, nil } @@ -190,6 +197,7 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, } if gm.identityCredential == nil && gm.keystorePassword != "" && gm.keystorePath != "" { + start := time.Now() credentials, err := keystore.GetMembershipCredentials(gm.log, gm.keystorePath, gm.keystorePassword, @@ -202,6 +210,7 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, if err != nil { return err } + gm.metrics.RecordMembershipCredentialsImportDuration(time.Since(start)) if len(credentials) != 0 { if int(gm.keystoreIndex) <= len(credentials)-1 { @@ -247,11 +256,13 @@ func (gm *DynamicGroupManager) InsertMembers(toInsert *om.OrderedMap) error { // TODO: should we track indexes to identify missing? startIndex := rln.MembershipIndex(uint(oldestIndexInBlock.Int64())) + start := time.Now() err := gm.rln.InsertMembers(startIndex, idCommitments) if err != nil { gm.log.Error("inserting members into merkletree", zap.Error(err)) return err } + gm.metrics.RecordMembershipInsertionDuration(time.Since(start)) _, err = gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64)) if err != nil { diff --git a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go index 70c985c9..50c461b6 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/core/types" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" @@ -45,6 +46,7 @@ func TestHandler(t *testing.T) { wg: sync.WaitGroup{}, chainId: big.NewInt(1), rootTracker: rootTracker, + metrics: newMetrics(prometheus.DefaultRegisterer), } root0 := [32]byte{62, 31, 25, 34, 223, 182, 113, 211, 249, 18, 247, 234, 70, 30, 10, 136, 238, 132, 143, 221, 225, 43, 108, 24, 171, 26, 210, 197, 106, 231, 52, 33} @@ -88,21 +90,8 @@ func TestHandler(t *testing.T) { // We should restore the valid roots from the buffer at the state the moment the chain forked // In this case, just adding the original merkle root from empty tree validRootsBeforeFork := roots[0:3] - events = []*contracts.RLNMemberRegistered{eventBuilder(3, true, 0xdddd, 4)} - - err = handler(gm, events) - require.NoError(t, err) - - roots = gm.rootTracker.Roots() - require.Len(t, roots, 4) - require.Equal(t, roots[0], root0) - require.Equal(t, roots[1], validRootsBeforeFork[0]) - require.Equal(t, roots[2], validRootsBeforeFork[1]) - require.Equal(t, roots[3], validRootsBeforeFork[2]) - require.Len(t, rootTracker.Buffer(), 0) - - // Adding multiple events for same block events = []*contracts.RLNMemberRegistered{ + eventBuilder(3, true, 0xdddd, 4), eventBuilder(3, false, 0xdddd, 4), eventBuilder(3, false, 0xeeee, 5), } @@ -112,5 +101,19 @@ func TestHandler(t *testing.T) { roots = gm.rootTracker.Roots() require.Len(t, roots, 5) + require.Equal(t, roots[0], root0) + require.Equal(t, roots[1], validRootsBeforeFork[0]) + require.Equal(t, roots[2], validRootsBeforeFork[1]) + require.Equal(t, roots[3], validRootsBeforeFork[2]) + require.Len(t, rootTracker.Buffer(), 0) + + // Adding multiple events for same block + events = []*contracts.RLNMemberRegistered{} + + err = handler(gm, events) + require.NoError(t, err) + + roots = gm.rootTracker.Roots() + require.Len(t, roots, 5) } diff --git a/waku/v2/protocol/rln/group_manager/dynamic/metrics.go b/waku/v2/protocol/rln/group_manager/dynamic/metrics.go new file mode 100644 index 00000000..a11fc201 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/metrics.go @@ -0,0 +1,69 @@ +package dynamic + +import ( + "time" + + "github.com/libp2p/go-libp2p/p2p/metricshelper" + "github.com/prometheus/client_golang/prometheus" +) + +var numberRegisteredMemberships = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "waku_rln_number_registered_memberships", + Help: "number of registered and active rln memberships", + }) + +var membershipInsertionDurationSeconds = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "waku_rln_membership_insertion_duration_seconds", + Help: "time taken to insert a new member into the local merkle tree", + }) + +var membershipCredentialsImportDurationSeconds = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "waku_rln_membership_credentials_import_duration_seconds", + Help: "time taken to import membership credentials", + }) + +var collectors = []prometheus.Collector{ + numberRegisteredMemberships, + membershipInsertionDurationSeconds, + membershipCredentialsImportDurationSeconds, +} + +// Metrics exposes the functions required to update prometheus metrics for lightpush protocol +type Metrics interface { + RecordRegisteredMembership(num int) + RecordMembershipInsertionDuration(duration time.Duration) + RecordMembershipCredentialsImportDuration(duration time.Duration) +} + +type metricsImpl struct { + reg prometheus.Registerer +} + +func newMetrics(reg prometheus.Registerer) Metrics { + metricshelper.RegisterCollectors(reg, collectors...) + return &metricsImpl{ + reg: reg, + } +} + +// RecordMembershipInsertionDuration records how long did it take to insert members into th merkle tree +func (m *metricsImpl) RecordMembershipInsertionDuration(duration time.Duration) { + membershipInsertionDurationSeconds.Set(duration.Seconds()) +} + +// RecordMembershipCredentialsImport records how long did it take to import the membership credentials +func (m *metricsImpl) RecordMembershipCredentialsImportDuration(duration time.Duration) { + membershipCredentialsImportDurationSeconds.Set(duration.Seconds()) +} + +// RecordRegisteredMembership records the number of registered memberships +func (m *metricsImpl) RecordRegisteredMembership(num int) { + if num < 0 { + return + } + + numberRegisteredMemberships.Add(float64(num)) +} diff --git a/waku/v2/protocol/rln/group_manager/root_tracker.go b/waku/v2/protocol/rln/group_manager/root_tracker.go index 5fa29573..7ea84303 100644 --- a/waku/v2/protocol/rln/group_manager/root_tracker.go +++ b/waku/v2/protocol/rln/group_manager/root_tracker.go @@ -1,6 +1,7 @@ package group_manager import ( + "bytes" "sync" "github.com/waku-org/go-zerokit-rln/rln" @@ -75,6 +76,25 @@ func (m *MerkleRootTracker) Backfill(fromBlockNumber uint64) { } } +// ContainsRoot is used to check whether a merkle tree root is contained in the list of valid merkle roots or not +func (m *MerkleRootTracker) ContainsRoot(root [32]byte) bool { + return m.IndexOf(root) > -1 +} + +// IndexOf returns the index of a root if present in the list of valid merkle roots +func (m *MerkleRootTracker) IndexOf(root [32]byte) int { + m.RLock() + defer m.RUnlock() + + for i := range m.validMerkleRoots { + if bytes.Equal(m.validMerkleRoots[i].root[:], root[:]) { + return i + } + } + + return -1 +} + func (m *MerkleRootTracker) UpdateLatestRoot(blockNumber uint64) (rln.MerkleNode, error) { m.Lock() defer m.Unlock() diff --git a/waku/v2/protocol/rln/metrics.go b/waku/v2/protocol/rln/metrics.go new file mode 100644 index 00000000..47acad2b --- /dev/null +++ b/waku/v2/protocol/rln/metrics.go @@ -0,0 +1,172 @@ +package rln + +import ( + "time" + + "github.com/libp2p/go-libp2p/p2p/metricshelper" + "github.com/prometheus/client_golang/prometheus" +) + +var messagesTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "waku_rln_messages_total", + Help: "number of messages received in the RLN validator", + }) + +var spamMessagesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_rln_spam_messages_total", + Help: "number of spam messages detected", + }, + []string{"contentTopic"}) + +var invalidMessagesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_rln_invalid_messages_total", + Help: "number of invalid messages detected", + }, + []string{"type"}) + +func generateBucketsForHistogram(length int) []float64 { + // Generate a custom set of 5 buckets for a given length + numberOfBuckets := 5 + stepSize := length / numberOfBuckets + var buckets []float64 + for i := 1; i <= 5; i++ { + buckets = append(buckets, float64(stepSize*i)) + } + return buckets +} + +// This metric will be useful in detecting the index of the root in the acceptable window of roots +var validMessagesTotal = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "waku_rln_valid_messages_total", + Help: "number of valid messages with their roots tracked", + Buckets: generateBucketsForHistogram(acceptableRootWindowSize), +}) + +var errorsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_rln_errors_total", + Help: "number of errors detected while operating the rln relay", + }, + []string{"type"}, +) + +var proofVerificationTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "waku_rln_proof_verification_total", + Help: "number of times the rln proofs are verified", + }) + +var proofVerificationDurationSeconds = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "waku_rln_proof_verification_duration_seconds", + Help: "time taken to verify a proof", + }) + +var proofGenerationDurationSeconds = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "waku_rln_proof_generation_duration_seconds", + Help: "time taken to generate a proof", + }) + +var instanceCreationDurationSeconds = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "waku_rln_instance_creation_duration_seconds", + Help: "time taken to create an rln instance", + }) + +var collectors = []prometheus.Collector{ + messagesTotal, + spamMessagesTotal, + invalidMessagesTotal, + errorsTotal, + validMessagesTotal, + proofVerificationTotal, + proofVerificationDurationSeconds, + proofGenerationDurationSeconds, + instanceCreationDurationSeconds, +} + +type errCategory string + +var ( + proofMetadataExtractionErr errCategory = "proof_metadata_extraction" + proofVerificationErr errCategory = "proof_verification" + duplicateCheckErr errCategory = "duplicate_check" + logInsertionErr errCategory = "log_insertion" +) + +type invalidCategory string + +var ( + invalidNoProof invalidCategory = "no_proof" + invalidEpoch invalidCategory = "invalid_epoch" + invalidRoot invalidCategory = "invalid_root" + invalidProof invalidCategory = "invalid_proof" +) + +// Metrics exposes the functions required to update prometheus metrics for lightpush protocol +type Metrics interface { + RecordMessage() + RecordSpam(contentTopic string) + RecordInvalidMessage(cause invalidCategory) + RecordError(err errCategory) + RecordProofVerification(duration time.Duration) + RecordProofGeneration(duration time.Duration) + RecordValidMessages(rootIndex int) + RecordInstanceCreation(duration time.Duration) +} + +type metricsImpl struct { + reg prometheus.Registerer +} + +func newMetrics(reg prometheus.Registerer) Metrics { + metricshelper.RegisterCollectors(reg, collectors...) + return &metricsImpl{ + reg: reg, + } +} + +// RecordMessage is used to increase the counter for the number of messages received in the RLN validator +func (m *metricsImpl) RecordMessage() { + messagesTotal.Inc() +} + +// RecordSpam is used to increase the counter for the number of spam of messages received +func (m *metricsImpl) RecordSpam(contentTopic string) { + spamMessagesTotal.WithLabelValues(contentTopic).Inc() +} + +// RecordError increases the counter for different error types +func (m *metricsImpl) RecordError(err errCategory) { + errorsTotal.WithLabelValues(string(err)).Inc() +} + +// RecordProofVerification increases the counter for the number of proof verifications perfomed, and the duration of these +func (m *metricsImpl) RecordProofVerification(duration time.Duration) { + proofVerificationTotal.Inc() + proofVerificationDurationSeconds.Observe(duration.Seconds()) +} + +// RecordProofGeneration measures the duration to generate a proof +func (m *metricsImpl) RecordProofGeneration(duration time.Duration) { + proofGenerationDurationSeconds.Observe(duration.Seconds()) +} + +// RecordInstanceCreation records how long did it take to instantiate RLN +func (m *metricsImpl) RecordInstanceCreation(duration time.Duration) { + instanceCreationDurationSeconds.Set(duration.Seconds()) +} + +// RecordInvalidMessage increases the counter for different types of invalid messages +func (m *metricsImpl) RecordInvalidMessage(cause invalidCategory) { + invalidMessagesTotal.WithLabelValues(string(cause)).Inc() +} + +// RecordValidMessages records the root index used for valid messages +func (m *metricsImpl) RecordValidMessages(rootIndex int) { + validMessagesTotal.Observe(float64(rootIndex)) +} diff --git a/waku/v2/protocol/rln/onchain_test.go b/waku/v2/protocol/rln/onchain_test.go index c3240c6f..8bf35797 100644 --- a/waku/v2/protocol/rln/onchain_test.go +++ b/waku/v2/protocol/rln/onchain_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" @@ -149,7 +150,7 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() { _, membershipGroupIndex := s.register(u1Credentials, s.u1PrivKey, keystorePath1) defer s.removeCredentials(keystorePath1) - gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, utils.Logger()) + gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) // initialize the WakuRLNRelay @@ -230,10 +231,10 @@ func (s *WakuRLNRelayDynamicSuite) TestMerkleTreeConstruction() { // TODO: This assumes the keystoreIndex is 0, but there are two possible credentials in this keystore due to using the same contract address // when credentials1 and credentials2 were registered. We should remove this hardcoded value and obtain the correct value when the credentials are persisted keystoreIndex := uint(0) - gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, "./test_onchain.json", keystorePassword, keystoreIndex, false, utils.Logger()) + gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, "./test_onchain.json", keystorePassword, keystoreIndex, false, prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) - rlnRelay, err := New(gm, "test-merkle-tree.db", timesource.NewDefaultClock(), utils.Logger()) + rlnRelay, err := New(gm, "test-merkle-tree.db", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) err = rlnRelay.Start(context.TODO()) @@ -264,10 +265,10 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() { defer s.removeCredentials(keystorePath1) // mount the rln relay protocol in the on-chain/dynamic mode - gm1, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, utils.Logger()) + gm1, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) - rlnRelay1, err := New(gm1, "test-correct-registration-1.db", timesource.NewDefaultClock(), utils.Logger()) + rlnRelay1, err := New(gm1, "test-correct-registration-1.db", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) err = rlnRelay1.Start(context.TODO()) s.Require().NoError(err) @@ -281,10 +282,10 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() { defer s.removeCredentials(keystorePath2) // mount the rln relay protocol in the on-chain/dynamic mode - gm2, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath2, keystorePassword, 0, false, utils.Logger()) + gm2, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath2, keystorePassword, 0, false, prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) - rlnRelay2, err := New(gm2, "test-correct-registration-2.db", timesource.NewDefaultClock(), utils.Logger()) + rlnRelay2, err := New(gm2, "test-correct-registration-2.db", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) err = rlnRelay2.Start(context.TODO()) s.Require().NoError(err) diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index 859b73a4..bf755303 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -59,7 +59,7 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() { groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger()) s.Require().NoError(err) - wakuRLNRelay, err := New(groupManager, "", timesource.NewDefaultClock(), utils.Logger()) + wakuRLNRelay, err := New(groupManager, "", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) err = wakuRLNRelay.Start(context.TODO()) @@ -180,6 +180,7 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { RLN: rlnInstance, nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), log: utils.Logger(), + metrics: newMetrics(prometheus.DefaultRegisterer), } //get the current epoch time diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index 65540694..bc70792e 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -12,6 +12,8 @@ import ( "github.com/ethereum/go-ethereum/log" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" + "github.com/prometheus/client_golang/prometheus" + "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -29,6 +31,7 @@ type GroupManager interface { type WakuRLNRelay struct { timesource timesource.Timesource + metrics Metrics groupManager GroupManager rootTracker *group_manager.MerkleRootTracker @@ -48,12 +51,16 @@ func New( groupManager GroupManager, treePath string, timesource timesource.Timesource, + reg prometheus.Registerer, log *zap.Logger) (*WakuRLNRelay, error) { if treePath == "" { treePath = rlnDefaultTreePath } + metrics := newMetrics(reg) + + start := time.Now() rlnInstance, err := rln.NewWithConfig(rln.DefaultTreeDepth, &rln.TreeConfig{ CacheCapacity: 15000, Mode: rln.HighThroughput, @@ -64,6 +71,7 @@ func New( if err != nil { return nil, err } + metrics.RecordInstanceCreation(time.Since(start)) rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance) if err != nil { @@ -75,6 +83,7 @@ func New( RLN: rlnInstance, groupManager: groupManager, rootTracker: rootTracker, + metrics: metrics, log: log, timesource: timesource, nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata), @@ -183,12 +192,14 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime if msgProof == nil { // message does not contain a proof rlnRelay.log.Debug("invalid message: message does not contain a proof") + rlnRelay.metrics.RecordInvalidMessage(invalidNoProof) return invalidMessage, nil } proofMD, err := rlnRelay.RLN.ExtractMetadata(*msgProof) if err != nil { rlnRelay.log.Debug("could not extract metadata", zap.Error(err)) + rlnRelay.metrics.RecordError(proofMetadataExtractionErr) return invalidMessage, nil } @@ -198,18 +209,30 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime // message's epoch is too old or too ahead // accept messages whose epoch is within +-MAX_EPOCH_GAP from the current epoch rlnRelay.log.Debug("invalid message: epoch gap exceeds a threshold", zap.Int64("gap", gap)) + rlnRelay.metrics.RecordInvalidMessage(invalidEpoch) + return invalidMessage, nil } + if !(rlnRelay.rootTracker.ContainsRoot(msgProof.MerkleRoot)) { + rlnRelay.log.Debug("invalid message: unexpected root", logging.HexBytes("msgRoot", msg.RateLimitProof.MerkleRoot)) + rlnRelay.metrics.RecordInvalidMessage(invalidRoot) + return invalidMessage, nil + } + + start := time.Now() valid, err := rlnRelay.verifyProof(msg, msgProof) if err != nil { rlnRelay.log.Debug("could not verify proof", zap.Error(err)) + rlnRelay.metrics.RecordError(proofVerificationErr) return invalidMessage, nil } + rlnRelay.metrics.RecordProofVerification(time.Since(start)) if !valid { // invalid proof rlnRelay.log.Debug("Invalid proof") + rlnRelay.metrics.RecordInvalidMessage(invalidProof) return invalidMessage, nil } @@ -217,6 +240,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime hasDup, err := rlnRelay.HasDuplicate(proofMD) if err != nil { rlnRelay.log.Debug("validation error", zap.Error(err)) + rlnRelay.metrics.RecordError(duplicateCheckErr) return validationError, err } @@ -230,10 +254,16 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime // it will never error out _, err = rlnRelay.updateLog(proofMD) if err != nil { + rlnRelay.log.Debug("could not insert proof into log") + rlnRelay.metrics.RecordError(logInsertionErr) return validationError, err } rlnRelay.log.Debug("message is valid") + + rootIndex := rlnRelay.rootTracker.IndexOf(msgProof.MerkleRoot) + rlnRelay.metrics.RecordValidMessages(rootIndex) + return validMessage, nil } @@ -254,10 +284,12 @@ func (rlnRelay *WakuRLNRelay) AppendRLNProof(msg *pb.WakuMessage, senderEpochTim input := toRLNSignal(msg) + start := time.Now() proof, err := rlnRelay.generateProof(input, rln.CalcEpoch(senderEpochTime)) if err != nil { return err } + rlnRelay.metrics.RecordProofGeneration(time.Since(start)) msg.RateLimitProof = proof @@ -271,6 +303,8 @@ func (rlnRelay *WakuRLNRelay) Validator( return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool { rlnRelay.log.Debug("rln-relay topic validator called") + rlnRelay.metrics.RecordMessage() + wakuMessage := &pb.WakuMessage{} if err := proto.Unmarshal(message.Data, wakuMessage); err != nil { rlnRelay.log.Debug("could not unmarshal message") @@ -300,6 +334,8 @@ func (rlnRelay *WakuRLNRelay) Validator( zap.String("id", hex.EncodeToString([]byte(message.ID))), ) + rlnRelay.metrics.RecordSpam(wakuMessage.ContentTopic) + if spamHandler != nil { if err := spamHandler(wakuMessage); err != nil { rlnRelay.log.Error("executing spam handler", zap.Error(err))