mirror of https://github.com/status-im/go-waku.git
feat: rln metrics
This commit is contained in:
parent
ddb08adbbd
commit
cb3f5da322
|
@ -52,6 +52,7 @@ func (w *WakuNode) setupRLNRelay() error {
|
||||||
w.opts.keystorePassword,
|
w.opts.keystorePassword,
|
||||||
w.opts.keystoreIndex,
|
w.opts.keystoreIndex,
|
||||||
true,
|
true,
|
||||||
|
w.opts.prometheusReg,
|
||||||
w.log,
|
w.log,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -59,7 +60,7 @@ func (w *WakuNode) setupRLNRelay() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rlnRelay, err := rln.New(groupManager, w.opts.rlnTreePath, w.timesource, w.log)
|
rlnRelay, err := rln.New(groupManager, w.opts.rlnTreePath, w.timesource, w.opts.prometheusReg, w.log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,11 +6,13 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/accounts/abi/bind"
|
"github.com/ethereum/go-ethereum/accounts/abi/bind"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethclient"
|
"github.com/ethereum/go-ethereum/ethclient"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
|
||||||
|
@ -27,8 +29,9 @@ var RLNAppInfo = keystore.AppInfo{
|
||||||
}
|
}
|
||||||
|
|
||||||
type DynamicGroupManager struct {
|
type DynamicGroupManager struct {
|
||||||
rln *rln.RLN
|
rln *rln.RLN
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
metrics Metrics
|
||||||
|
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
@ -95,6 +98,8 @@ func handler(gm *DynamicGroupManager, events []*contracts.RLNMemberRegistered) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gm.metrics.RecordRegisteredMembership(toInsertTable.Len() - toRemoveTable.Len())
|
||||||
|
|
||||||
gm.lastBlockProcessed = lastBlockProcessed
|
gm.lastBlockProcessed = lastBlockProcessed
|
||||||
err = gm.SetMetadata(RLNMetadata{
|
err = gm.SetMetadata(RLNMetadata{
|
||||||
LastProcessedBlock: gm.lastBlockProcessed,
|
LastProcessedBlock: gm.lastBlockProcessed,
|
||||||
|
@ -121,6 +126,7 @@ func NewDynamicGroupManager(
|
||||||
keystorePassword string,
|
keystorePassword string,
|
||||||
keystoreIndex uint,
|
keystoreIndex uint,
|
||||||
saveKeystore bool,
|
saveKeystore bool,
|
||||||
|
reg prometheus.Registerer,
|
||||||
log *zap.Logger,
|
log *zap.Logger,
|
||||||
) (*DynamicGroupManager, error) {
|
) (*DynamicGroupManager, error) {
|
||||||
log = log.Named("rln-dynamic")
|
log = log.Named("rln-dynamic")
|
||||||
|
@ -147,6 +153,7 @@ func NewDynamicGroupManager(
|
||||||
keystorePassword: password,
|
keystorePassword: password,
|
||||||
keystoreIndex: keystoreIndex,
|
keystoreIndex: keystoreIndex,
|
||||||
log: log,
|
log: log,
|
||||||
|
metrics: newMetrics(reg),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,6 +197,7 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN,
|
||||||
}
|
}
|
||||||
|
|
||||||
if gm.identityCredential == nil && gm.keystorePassword != "" && gm.keystorePath != "" {
|
if gm.identityCredential == nil && gm.keystorePassword != "" && gm.keystorePath != "" {
|
||||||
|
start := time.Now()
|
||||||
credentials, err := keystore.GetMembershipCredentials(gm.log,
|
credentials, err := keystore.GetMembershipCredentials(gm.log,
|
||||||
gm.keystorePath,
|
gm.keystorePath,
|
||||||
gm.keystorePassword,
|
gm.keystorePassword,
|
||||||
|
@ -202,6 +210,7 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
gm.metrics.RecordMembershipCredentialsImportDuration(time.Since(start))
|
||||||
|
|
||||||
if len(credentials) != 0 {
|
if len(credentials) != 0 {
|
||||||
if int(gm.keystoreIndex) <= len(credentials)-1 {
|
if int(gm.keystoreIndex) <= len(credentials)-1 {
|
||||||
|
@ -247,11 +256,13 @@ func (gm *DynamicGroupManager) InsertMembers(toInsert *om.OrderedMap) error {
|
||||||
|
|
||||||
// TODO: should we track indexes to identify missing?
|
// TODO: should we track indexes to identify missing?
|
||||||
startIndex := rln.MembershipIndex(uint(oldestIndexInBlock.Int64()))
|
startIndex := rln.MembershipIndex(uint(oldestIndexInBlock.Int64()))
|
||||||
|
start := time.Now()
|
||||||
err := gm.rln.InsertMembers(startIndex, idCommitments)
|
err := gm.rln.InsertMembers(startIndex, idCommitments)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gm.log.Error("inserting members into merkletree", zap.Error(err))
|
gm.log.Error("inserting members into merkletree", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
gm.metrics.RecordMembershipInsertionDuration(time.Since(start))
|
||||||
|
|
||||||
_, err = gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64))
|
_, err = gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
|
||||||
|
@ -45,6 +46,7 @@ func TestHandler(t *testing.T) {
|
||||||
wg: sync.WaitGroup{},
|
wg: sync.WaitGroup{},
|
||||||
chainId: big.NewInt(1),
|
chainId: big.NewInt(1),
|
||||||
rootTracker: rootTracker,
|
rootTracker: rootTracker,
|
||||||
|
metrics: newMetrics(prometheus.DefaultRegisterer),
|
||||||
}
|
}
|
||||||
|
|
||||||
root0 := [32]byte{62, 31, 25, 34, 223, 182, 113, 211, 249, 18, 247, 234, 70, 30, 10, 136, 238, 132, 143, 221, 225, 43, 108, 24, 171, 26, 210, 197, 106, 231, 52, 33}
|
root0 := [32]byte{62, 31, 25, 34, 223, 182, 113, 211, 249, 18, 247, 234, 70, 30, 10, 136, 238, 132, 143, 221, 225, 43, 108, 24, 171, 26, 210, 197, 106, 231, 52, 33}
|
||||||
|
@ -88,21 +90,8 @@ func TestHandler(t *testing.T) {
|
||||||
// We should restore the valid roots from the buffer at the state the moment the chain forked
|
// We should restore the valid roots from the buffer at the state the moment the chain forked
|
||||||
// In this case, just adding the original merkle root from empty tree
|
// In this case, just adding the original merkle root from empty tree
|
||||||
validRootsBeforeFork := roots[0:3]
|
validRootsBeforeFork := roots[0:3]
|
||||||
events = []*contracts.RLNMemberRegistered{eventBuilder(3, true, 0xdddd, 4)}
|
|
||||||
|
|
||||||
err = handler(gm, events)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
roots = gm.rootTracker.Roots()
|
|
||||||
require.Len(t, roots, 4)
|
|
||||||
require.Equal(t, roots[0], root0)
|
|
||||||
require.Equal(t, roots[1], validRootsBeforeFork[0])
|
|
||||||
require.Equal(t, roots[2], validRootsBeforeFork[1])
|
|
||||||
require.Equal(t, roots[3], validRootsBeforeFork[2])
|
|
||||||
require.Len(t, rootTracker.Buffer(), 0)
|
|
||||||
|
|
||||||
// Adding multiple events for same block
|
|
||||||
events = []*contracts.RLNMemberRegistered{
|
events = []*contracts.RLNMemberRegistered{
|
||||||
|
eventBuilder(3, true, 0xdddd, 4),
|
||||||
eventBuilder(3, false, 0xdddd, 4),
|
eventBuilder(3, false, 0xdddd, 4),
|
||||||
eventBuilder(3, false, 0xeeee, 5),
|
eventBuilder(3, false, 0xeeee, 5),
|
||||||
}
|
}
|
||||||
|
@ -112,5 +101,19 @@ func TestHandler(t *testing.T) {
|
||||||
|
|
||||||
roots = gm.rootTracker.Roots()
|
roots = gm.rootTracker.Roots()
|
||||||
require.Len(t, roots, 5)
|
require.Len(t, roots, 5)
|
||||||
|
require.Equal(t, roots[0], root0)
|
||||||
|
require.Equal(t, roots[1], validRootsBeforeFork[0])
|
||||||
|
require.Equal(t, roots[2], validRootsBeforeFork[1])
|
||||||
|
require.Equal(t, roots[3], validRootsBeforeFork[2])
|
||||||
|
require.Len(t, rootTracker.Buffer(), 0)
|
||||||
|
|
||||||
|
// Adding multiple events for same block
|
||||||
|
events = []*contracts.RLNMemberRegistered{}
|
||||||
|
|
||||||
|
err = handler(gm, events)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
roots = gm.rootTracker.Roots()
|
||||||
|
require.Len(t, roots, 5)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
package dynamic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/p2p/metricshelper"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var numberRegisteredMemberships = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "waku_rln_number_registered_memberships",
|
||||||
|
Help: "number of registered and active rln memberships",
|
||||||
|
})
|
||||||
|
|
||||||
|
var membershipInsertionDurationSeconds = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "waku_rln_membership_insertion_duration_seconds",
|
||||||
|
Help: "time taken to insert a new member into the local merkle tree",
|
||||||
|
})
|
||||||
|
|
||||||
|
var membershipCredentialsImportDurationSeconds = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "waku_rln_membership_credentials_import_duration_seconds",
|
||||||
|
Help: "time taken to import membership credentials",
|
||||||
|
})
|
||||||
|
|
||||||
|
var collectors = []prometheus.Collector{
|
||||||
|
numberRegisteredMemberships,
|
||||||
|
membershipInsertionDurationSeconds,
|
||||||
|
membershipCredentialsImportDurationSeconds,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metrics exposes the functions required to update prometheus metrics for lightpush protocol
|
||||||
|
type Metrics interface {
|
||||||
|
RecordRegisteredMembership(num int)
|
||||||
|
RecordMembershipInsertionDuration(duration time.Duration)
|
||||||
|
RecordMembershipCredentialsImportDuration(duration time.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricsImpl struct {
|
||||||
|
reg prometheus.Registerer
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMetrics(reg prometheus.Registerer) Metrics {
|
||||||
|
metricshelper.RegisterCollectors(reg, collectors...)
|
||||||
|
return &metricsImpl{
|
||||||
|
reg: reg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordMembershipInsertionDuration records how long did it take to insert members into th merkle tree
|
||||||
|
func (m *metricsImpl) RecordMembershipInsertionDuration(duration time.Duration) {
|
||||||
|
membershipInsertionDurationSeconds.Set(duration.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordMembershipCredentialsImport records how long did it take to import the membership credentials
|
||||||
|
func (m *metricsImpl) RecordMembershipCredentialsImportDuration(duration time.Duration) {
|
||||||
|
membershipCredentialsImportDurationSeconds.Set(duration.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordRegisteredMembership records the number of registered memberships
|
||||||
|
func (m *metricsImpl) RecordRegisteredMembership(num int) {
|
||||||
|
if num < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
numberRegisteredMemberships.Add(float64(num))
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package group_manager
|
package group_manager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/waku-org/go-zerokit-rln/rln"
|
"github.com/waku-org/go-zerokit-rln/rln"
|
||||||
|
@ -75,6 +76,25 @@ func (m *MerkleRootTracker) Backfill(fromBlockNumber uint64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ContainsRoot is used to check whether a merkle tree root is contained in the list of valid merkle roots or not
|
||||||
|
func (m *MerkleRootTracker) ContainsRoot(root [32]byte) bool {
|
||||||
|
return m.IndexOf(root) > -1
|
||||||
|
}
|
||||||
|
|
||||||
|
// IndexOf returns the index of a root if present in the list of valid merkle roots
|
||||||
|
func (m *MerkleRootTracker) IndexOf(root [32]byte) int {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
for i := range m.validMerkleRoots {
|
||||||
|
if bytes.Equal(m.validMerkleRoots[i].root[:], root[:]) {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MerkleRootTracker) UpdateLatestRoot(blockNumber uint64) (rln.MerkleNode, error) {
|
func (m *MerkleRootTracker) UpdateLatestRoot(blockNumber uint64) (rln.MerkleNode, error) {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
package rln
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/p2p/metricshelper"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var messagesTotal = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "waku_rln_messages_total",
|
||||||
|
Help: "number of messages received in the RLN validator",
|
||||||
|
})
|
||||||
|
|
||||||
|
var spamMessagesTotal = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "waku_rln_spam_messages_total",
|
||||||
|
Help: "number of spam messages detected",
|
||||||
|
},
|
||||||
|
[]string{"contentTopic"})
|
||||||
|
|
||||||
|
var invalidMessagesTotal = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "waku_rln_invalid_messages_total",
|
||||||
|
Help: "number of invalid messages detected",
|
||||||
|
},
|
||||||
|
[]string{"type"})
|
||||||
|
|
||||||
|
func generateBucketsForHistogram(length int) []float64 {
|
||||||
|
// Generate a custom set of 5 buckets for a given length
|
||||||
|
numberOfBuckets := 5
|
||||||
|
stepSize := length / numberOfBuckets
|
||||||
|
var buckets []float64
|
||||||
|
for i := 1; i <= 5; i++ {
|
||||||
|
buckets = append(buckets, float64(stepSize*i))
|
||||||
|
}
|
||||||
|
return buckets
|
||||||
|
}
|
||||||
|
|
||||||
|
// This metric will be useful in detecting the index of the root in the acceptable window of roots
|
||||||
|
var validMessagesTotal = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Name: "waku_rln_valid_messages_total",
|
||||||
|
Help: "number of valid messages with their roots tracked",
|
||||||
|
Buckets: generateBucketsForHistogram(acceptableRootWindowSize),
|
||||||
|
})
|
||||||
|
|
||||||
|
var errorsTotal = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "waku_rln_errors_total",
|
||||||
|
Help: "number of errors detected while operating the rln relay",
|
||||||
|
},
|
||||||
|
[]string{"type"},
|
||||||
|
)
|
||||||
|
|
||||||
|
var proofVerificationTotal = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "waku_rln_proof_verification_total",
|
||||||
|
Help: "number of times the rln proofs are verified",
|
||||||
|
})
|
||||||
|
|
||||||
|
var proofVerificationDurationSeconds = prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Name: "waku_rln_proof_verification_duration_seconds",
|
||||||
|
Help: "time taken to verify a proof",
|
||||||
|
})
|
||||||
|
|
||||||
|
var proofGenerationDurationSeconds = prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Name: "waku_rln_proof_generation_duration_seconds",
|
||||||
|
Help: "time taken to generate a proof",
|
||||||
|
})
|
||||||
|
|
||||||
|
var instanceCreationDurationSeconds = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "waku_rln_instance_creation_duration_seconds",
|
||||||
|
Help: "time taken to create an rln instance",
|
||||||
|
})
|
||||||
|
|
||||||
|
var collectors = []prometheus.Collector{
|
||||||
|
messagesTotal,
|
||||||
|
spamMessagesTotal,
|
||||||
|
invalidMessagesTotal,
|
||||||
|
errorsTotal,
|
||||||
|
validMessagesTotal,
|
||||||
|
proofVerificationTotal,
|
||||||
|
proofVerificationDurationSeconds,
|
||||||
|
proofGenerationDurationSeconds,
|
||||||
|
instanceCreationDurationSeconds,
|
||||||
|
}
|
||||||
|
|
||||||
|
type errCategory string
|
||||||
|
|
||||||
|
var (
|
||||||
|
proofMetadataExtractionErr errCategory = "proof_metadata_extraction"
|
||||||
|
proofVerificationErr errCategory = "proof_verification"
|
||||||
|
duplicateCheckErr errCategory = "duplicate_check"
|
||||||
|
logInsertionErr errCategory = "log_insertion"
|
||||||
|
)
|
||||||
|
|
||||||
|
type invalidCategory string
|
||||||
|
|
||||||
|
var (
|
||||||
|
invalidNoProof invalidCategory = "no_proof"
|
||||||
|
invalidEpoch invalidCategory = "invalid_epoch"
|
||||||
|
invalidRoot invalidCategory = "invalid_root"
|
||||||
|
invalidProof invalidCategory = "invalid_proof"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Metrics exposes the functions required to update prometheus metrics for lightpush protocol
|
||||||
|
type Metrics interface {
|
||||||
|
RecordMessage()
|
||||||
|
RecordSpam(contentTopic string)
|
||||||
|
RecordInvalidMessage(cause invalidCategory)
|
||||||
|
RecordError(err errCategory)
|
||||||
|
RecordProofVerification(duration time.Duration)
|
||||||
|
RecordProofGeneration(duration time.Duration)
|
||||||
|
RecordValidMessages(rootIndex int)
|
||||||
|
RecordInstanceCreation(duration time.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricsImpl struct {
|
||||||
|
reg prometheus.Registerer
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMetrics(reg prometheus.Registerer) Metrics {
|
||||||
|
metricshelper.RegisterCollectors(reg, collectors...)
|
||||||
|
return &metricsImpl{
|
||||||
|
reg: reg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordMessage is used to increase the counter for the number of messages received in the RLN validator
|
||||||
|
func (m *metricsImpl) RecordMessage() {
|
||||||
|
messagesTotal.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordSpam is used to increase the counter for the number of spam of messages received
|
||||||
|
func (m *metricsImpl) RecordSpam(contentTopic string) {
|
||||||
|
spamMessagesTotal.WithLabelValues(contentTopic).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordError increases the counter for different error types
|
||||||
|
func (m *metricsImpl) RecordError(err errCategory) {
|
||||||
|
errorsTotal.WithLabelValues(string(err)).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordProofVerification increases the counter for the number of proof verifications perfomed, and the duration of these
|
||||||
|
func (m *metricsImpl) RecordProofVerification(duration time.Duration) {
|
||||||
|
proofVerificationTotal.Inc()
|
||||||
|
proofVerificationDurationSeconds.Observe(duration.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordProofGeneration measures the duration to generate a proof
|
||||||
|
func (m *metricsImpl) RecordProofGeneration(duration time.Duration) {
|
||||||
|
proofGenerationDurationSeconds.Observe(duration.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordInstanceCreation records how long did it take to instantiate RLN
|
||||||
|
func (m *metricsImpl) RecordInstanceCreation(duration time.Duration) {
|
||||||
|
instanceCreationDurationSeconds.Set(duration.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordInvalidMessage increases the counter for different types of invalid messages
|
||||||
|
func (m *metricsImpl) RecordInvalidMessage(cause invalidCategory) {
|
||||||
|
invalidMessagesTotal.WithLabelValues(string(cause)).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordValidMessages records the root index used for valid messages
|
||||||
|
func (m *metricsImpl) RecordValidMessages(rootIndex int) {
|
||||||
|
validMessagesTotal.Observe(float64(rootIndex))
|
||||||
|
}
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-zerokit-rln/rln"
|
"github.com/waku-org/go-zerokit-rln/rln"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
@ -149,7 +150,7 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() {
|
||||||
_, membershipGroupIndex := s.register(u1Credentials, s.u1PrivKey, keystorePath1)
|
_, membershipGroupIndex := s.register(u1Credentials, s.u1PrivKey, keystorePath1)
|
||||||
defer s.removeCredentials(keystorePath1)
|
defer s.removeCredentials(keystorePath1)
|
||||||
|
|
||||||
gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, utils.Logger())
|
gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, prometheus.DefaultRegisterer, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
// initialize the WakuRLNRelay
|
// initialize the WakuRLNRelay
|
||||||
|
@ -230,10 +231,10 @@ func (s *WakuRLNRelayDynamicSuite) TestMerkleTreeConstruction() {
|
||||||
// TODO: This assumes the keystoreIndex is 0, but there are two possible credentials in this keystore due to using the same contract address
|
// TODO: This assumes the keystoreIndex is 0, but there are two possible credentials in this keystore due to using the same contract address
|
||||||
// when credentials1 and credentials2 were registered. We should remove this hardcoded value and obtain the correct value when the credentials are persisted
|
// when credentials1 and credentials2 were registered. We should remove this hardcoded value and obtain the correct value when the credentials are persisted
|
||||||
keystoreIndex := uint(0)
|
keystoreIndex := uint(0)
|
||||||
gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, "./test_onchain.json", keystorePassword, keystoreIndex, false, utils.Logger())
|
gm, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, "./test_onchain.json", keystorePassword, keystoreIndex, false, prometheus.DefaultRegisterer, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
rlnRelay, err := New(gm, "test-merkle-tree.db", timesource.NewDefaultClock(), utils.Logger())
|
rlnRelay, err := New(gm, "test-merkle-tree.db", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
err = rlnRelay.Start(context.TODO())
|
err = rlnRelay.Start(context.TODO())
|
||||||
|
@ -264,10 +265,10 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() {
|
||||||
defer s.removeCredentials(keystorePath1)
|
defer s.removeCredentials(keystorePath1)
|
||||||
|
|
||||||
// mount the rln relay protocol in the on-chain/dynamic mode
|
// mount the rln relay protocol in the on-chain/dynamic mode
|
||||||
gm1, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, utils.Logger())
|
gm1, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath1, keystorePassword, 0, false, prometheus.DefaultRegisterer, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
rlnRelay1, err := New(gm1, "test-correct-registration-1.db", timesource.NewDefaultClock(), utils.Logger())
|
rlnRelay1, err := New(gm1, "test-correct-registration-1.db", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
err = rlnRelay1.Start(context.TODO())
|
err = rlnRelay1.Start(context.TODO())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
@ -281,10 +282,10 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() {
|
||||||
defer s.removeCredentials(keystorePath2)
|
defer s.removeCredentials(keystorePath2)
|
||||||
|
|
||||||
// mount the rln relay protocol in the on-chain/dynamic mode
|
// mount the rln relay protocol in the on-chain/dynamic mode
|
||||||
gm2, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath2, keystorePassword, 0, false, utils.Logger())
|
gm2, err := dynamic.NewDynamicGroupManager(s.clientAddr, s.rlnAddr, membershipGroupIndex, keystorePath2, keystorePassword, 0, false, prometheus.DefaultRegisterer, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
rlnRelay2, err := New(gm2, "test-correct-registration-2.db", timesource.NewDefaultClock(), utils.Logger())
|
rlnRelay2, err := New(gm2, "test-correct-registration-2.db", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
err = rlnRelay2.Start(context.TODO())
|
err = rlnRelay2.Start(context.TODO())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
|
@ -59,7 +59,7 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() {
|
||||||
groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger())
|
groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
wakuRLNRelay, err := New(groupManager, "", timesource.NewDefaultClock(), utils.Logger())
|
wakuRLNRelay, err := New(groupManager, "", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
err = wakuRLNRelay.Start(context.TODO())
|
err = wakuRLNRelay.Start(context.TODO())
|
||||||
|
@ -180,6 +180,7 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() {
|
||||||
RLN: rlnInstance,
|
RLN: rlnInstance,
|
||||||
nullifierLog: make(map[r.Nullifier][]r.ProofMetadata),
|
nullifierLog: make(map[r.Nullifier][]r.ProofMetadata),
|
||||||
log: utils.Logger(),
|
log: utils.Logger(),
|
||||||
|
metrics: newMetrics(prometheus.DefaultRegisterer),
|
||||||
}
|
}
|
||||||
|
|
||||||
//get the current epoch time
|
//get the current epoch time
|
||||||
|
|
|
@ -12,6 +12,8 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
|
@ -29,6 +31,7 @@ type GroupManager interface {
|
||||||
|
|
||||||
type WakuRLNRelay struct {
|
type WakuRLNRelay struct {
|
||||||
timesource timesource.Timesource
|
timesource timesource.Timesource
|
||||||
|
metrics Metrics
|
||||||
|
|
||||||
groupManager GroupManager
|
groupManager GroupManager
|
||||||
rootTracker *group_manager.MerkleRootTracker
|
rootTracker *group_manager.MerkleRootTracker
|
||||||
|
@ -48,12 +51,16 @@ func New(
|
||||||
groupManager GroupManager,
|
groupManager GroupManager,
|
||||||
treePath string,
|
treePath string,
|
||||||
timesource timesource.Timesource,
|
timesource timesource.Timesource,
|
||||||
|
reg prometheus.Registerer,
|
||||||
log *zap.Logger) (*WakuRLNRelay, error) {
|
log *zap.Logger) (*WakuRLNRelay, error) {
|
||||||
|
|
||||||
if treePath == "" {
|
if treePath == "" {
|
||||||
treePath = rlnDefaultTreePath
|
treePath = rlnDefaultTreePath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics := newMetrics(reg)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
rlnInstance, err := rln.NewWithConfig(rln.DefaultTreeDepth, &rln.TreeConfig{
|
rlnInstance, err := rln.NewWithConfig(rln.DefaultTreeDepth, &rln.TreeConfig{
|
||||||
CacheCapacity: 15000,
|
CacheCapacity: 15000,
|
||||||
Mode: rln.HighThroughput,
|
Mode: rln.HighThroughput,
|
||||||
|
@ -64,6 +71,7 @@ func New(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
metrics.RecordInstanceCreation(time.Since(start))
|
||||||
|
|
||||||
rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
|
rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -75,6 +83,7 @@ func New(
|
||||||
RLN: rlnInstance,
|
RLN: rlnInstance,
|
||||||
groupManager: groupManager,
|
groupManager: groupManager,
|
||||||
rootTracker: rootTracker,
|
rootTracker: rootTracker,
|
||||||
|
metrics: metrics,
|
||||||
log: log,
|
log: log,
|
||||||
timesource: timesource,
|
timesource: timesource,
|
||||||
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
|
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
|
||||||
|
@ -183,12 +192,14 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime
|
||||||
if msgProof == nil {
|
if msgProof == nil {
|
||||||
// message does not contain a proof
|
// message does not contain a proof
|
||||||
rlnRelay.log.Debug("invalid message: message does not contain a proof")
|
rlnRelay.log.Debug("invalid message: message does not contain a proof")
|
||||||
|
rlnRelay.metrics.RecordInvalidMessage(invalidNoProof)
|
||||||
return invalidMessage, nil
|
return invalidMessage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
proofMD, err := rlnRelay.RLN.ExtractMetadata(*msgProof)
|
proofMD, err := rlnRelay.RLN.ExtractMetadata(*msgProof)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rlnRelay.log.Debug("could not extract metadata", zap.Error(err))
|
rlnRelay.log.Debug("could not extract metadata", zap.Error(err))
|
||||||
|
rlnRelay.metrics.RecordError(proofMetadataExtractionErr)
|
||||||
return invalidMessage, nil
|
return invalidMessage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,18 +209,30 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime
|
||||||
// message's epoch is too old or too ahead
|
// message's epoch is too old or too ahead
|
||||||
// accept messages whose epoch is within +-MAX_EPOCH_GAP from the current epoch
|
// accept messages whose epoch is within +-MAX_EPOCH_GAP from the current epoch
|
||||||
rlnRelay.log.Debug("invalid message: epoch gap exceeds a threshold", zap.Int64("gap", gap))
|
rlnRelay.log.Debug("invalid message: epoch gap exceeds a threshold", zap.Int64("gap", gap))
|
||||||
|
rlnRelay.metrics.RecordInvalidMessage(invalidEpoch)
|
||||||
|
|
||||||
return invalidMessage, nil
|
return invalidMessage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !(rlnRelay.rootTracker.ContainsRoot(msgProof.MerkleRoot)) {
|
||||||
|
rlnRelay.log.Debug("invalid message: unexpected root", logging.HexBytes("msgRoot", msg.RateLimitProof.MerkleRoot))
|
||||||
|
rlnRelay.metrics.RecordInvalidMessage(invalidRoot)
|
||||||
|
return invalidMessage, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
valid, err := rlnRelay.verifyProof(msg, msgProof)
|
valid, err := rlnRelay.verifyProof(msg, msgProof)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rlnRelay.log.Debug("could not verify proof", zap.Error(err))
|
rlnRelay.log.Debug("could not verify proof", zap.Error(err))
|
||||||
|
rlnRelay.metrics.RecordError(proofVerificationErr)
|
||||||
return invalidMessage, nil
|
return invalidMessage, nil
|
||||||
}
|
}
|
||||||
|
rlnRelay.metrics.RecordProofVerification(time.Since(start))
|
||||||
|
|
||||||
if !valid {
|
if !valid {
|
||||||
// invalid proof
|
// invalid proof
|
||||||
rlnRelay.log.Debug("Invalid proof")
|
rlnRelay.log.Debug("Invalid proof")
|
||||||
|
rlnRelay.metrics.RecordInvalidMessage(invalidProof)
|
||||||
return invalidMessage, nil
|
return invalidMessage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,6 +240,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime
|
||||||
hasDup, err := rlnRelay.HasDuplicate(proofMD)
|
hasDup, err := rlnRelay.HasDuplicate(proofMD)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rlnRelay.log.Debug("validation error", zap.Error(err))
|
rlnRelay.log.Debug("validation error", zap.Error(err))
|
||||||
|
rlnRelay.metrics.RecordError(duplicateCheckErr)
|
||||||
return validationError, err
|
return validationError, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,10 +254,16 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime
|
||||||
// it will never error out
|
// it will never error out
|
||||||
_, err = rlnRelay.updateLog(proofMD)
|
_, err = rlnRelay.updateLog(proofMD)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
rlnRelay.log.Debug("could not insert proof into log")
|
||||||
|
rlnRelay.metrics.RecordError(logInsertionErr)
|
||||||
return validationError, err
|
return validationError, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rlnRelay.log.Debug("message is valid")
|
rlnRelay.log.Debug("message is valid")
|
||||||
|
|
||||||
|
rootIndex := rlnRelay.rootTracker.IndexOf(msgProof.MerkleRoot)
|
||||||
|
rlnRelay.metrics.RecordValidMessages(rootIndex)
|
||||||
|
|
||||||
return validMessage, nil
|
return validMessage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,10 +284,12 @@ func (rlnRelay *WakuRLNRelay) AppendRLNProof(msg *pb.WakuMessage, senderEpochTim
|
||||||
|
|
||||||
input := toRLNSignal(msg)
|
input := toRLNSignal(msg)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
proof, err := rlnRelay.generateProof(input, rln.CalcEpoch(senderEpochTime))
|
proof, err := rlnRelay.generateProof(input, rln.CalcEpoch(senderEpochTime))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
rlnRelay.metrics.RecordProofGeneration(time.Since(start))
|
||||||
|
|
||||||
msg.RateLimitProof = proof
|
msg.RateLimitProof = proof
|
||||||
|
|
||||||
|
@ -271,6 +303,8 @@ func (rlnRelay *WakuRLNRelay) Validator(
|
||||||
return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
|
return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
|
||||||
rlnRelay.log.Debug("rln-relay topic validator called")
|
rlnRelay.log.Debug("rln-relay topic validator called")
|
||||||
|
|
||||||
|
rlnRelay.metrics.RecordMessage()
|
||||||
|
|
||||||
wakuMessage := &pb.WakuMessage{}
|
wakuMessage := &pb.WakuMessage{}
|
||||||
if err := proto.Unmarshal(message.Data, wakuMessage); err != nil {
|
if err := proto.Unmarshal(message.Data, wakuMessage); err != nil {
|
||||||
rlnRelay.log.Debug("could not unmarshal message")
|
rlnRelay.log.Debug("could not unmarshal message")
|
||||||
|
@ -300,6 +334,8 @@ func (rlnRelay *WakuRLNRelay) Validator(
|
||||||
zap.String("id", hex.EncodeToString([]byte(message.ID))),
|
zap.String("id", hex.EncodeToString([]byte(message.ID))),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
rlnRelay.metrics.RecordSpam(wakuMessage.ContentTopic)
|
||||||
|
|
||||||
if spamHandler != nil {
|
if spamHandler != nil {
|
||||||
if err := spamHandler(wakuMessage); err != nil {
|
if err := spamHandler(wakuMessage); err != nil {
|
||||||
rlnRelay.log.Error("executing spam handler", zap.Error(err))
|
rlnRelay.log.Error("executing spam handler", zap.Error(err))
|
||||||
|
|
Loading…
Reference in New Issue