From 18efd2c73711364062688c2a7aba071967609495 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 6 Sep 2023 16:02:30 -0400 Subject: [PATCH] chore(rln-relay): clean up nullifier table every MaxEpochGap --- waku/v2/protocol/rln/nullifier_log.go | 120 +++++++++++++++++++++++++ waku/v2/protocol/rln/onchain_test.go | 2 +- waku/v2/protocol/rln/rln_relay_test.go | 17 ++-- waku/v2/protocol/rln/waku_rln_relay.go | 84 ++--------------- 4 files changed, 137 insertions(+), 86 deletions(-) create mode 100644 waku/v2/protocol/rln/nullifier_log.go diff --git a/waku/v2/protocol/rln/nullifier_log.go b/waku/v2/protocol/rln/nullifier_log.go new file mode 100644 index 00000000..1bf6263a --- /dev/null +++ b/waku/v2/protocol/rln/nullifier_log.go @@ -0,0 +1,120 @@ +package rln + +import ( + "bytes" + "context" + "errors" + "sync" + "time" + + "github.com/waku-org/go-zerokit-rln/rln" + "go.uber.org/zap" +) + +// NullifierLog is the log of nullifiers and Shamir shares of the past messages grouped per epoch +type NullifierLog struct { + sync.RWMutex + + log *zap.Logger + nullifierLog map[rln.Nullifier][]rln.ProofMetadata // Might make sense to replace this map by a shrinkable map due to https://github.com/golang/go/issues/20135. + nullifierQueue []rln.Nullifier +} + +// NewNullifierLog creates an instance of NullifierLog +func NewNullifierLog(ctx context.Context, log *zap.Logger) *NullifierLog { + result := &NullifierLog{ + nullifierLog: make(map[rln.Nullifier][]rln.ProofMetadata), + log: log, + } + + go result.cleanup(ctx) + + return result +} + +var errAlreadyExists = errors.New("proof already exists") + +// Insert stores a proof in the nullifier log only if it doesnt exist already +func (n *NullifierLog) Insert(proofMD rln.ProofMetadata) error { + n.Lock() + defer n.Unlock() + + proofs, ok := n.nullifierLog[proofMD.ExternalNullifier] + if ok { + // check if an identical record exists + for _, p := range proofs { + if p.Equals(proofMD) { + // TODO: slashing logic + return errAlreadyExists + } + } + } + + n.nullifierLog[proofMD.ExternalNullifier] = append(proofs, proofMD) + n.nullifierQueue = append(n.nullifierQueue, proofMD.ExternalNullifier) + return nil +} + +// HasDuplicate returns true if there is another message in the `nullifierLog` with the same +// epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares +// otherwise, returns false +func (n *NullifierLog) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) { + n.RLock() + defer n.RUnlock() + + proofs, ok := n.nullifierLog[proofMD.ExternalNullifier] + if !ok { + // epoch does not exist + return false, nil + } + + for _, p := range proofs { + if p.Equals(proofMD) { + // there is an identical record, ignore the msg + return true, nil + } + } + + // check for a message with the same nullifier but different secret shares + matched := false + for _, it := range proofs { + if bytes.Equal(it.Nullifier[:], proofMD.Nullifier[:]) && (!bytes.Equal(it.ShareX[:], proofMD.ShareX[:]) || !bytes.Equal(it.ShareY[:], proofMD.ShareY[:])) { + matched = true + break + } + } + + return matched, nil +} + +// cleanup cleans up the log every time there are more than MaxEpochGap epochs stored in it +func (n *NullifierLog) cleanup(ctx context.Context) { + t := time.NewTicker(1 * time.Minute) // TODO: tune this + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-t.C: + func() { + n.Lock() + defer n.Unlock() + + if int64(len(n.nullifierQueue)) < maxEpochGap { + return + } + + n.log.Debug("clearing epochs from the nullifier log", zap.Int64("count", maxEpochGap)) + + toDelete := n.nullifierQueue[0:maxEpochGap] + for _, l := range toDelete { + delete(n.nullifierLog, l) + } + n.nullifierQueue = n.nullifierQueue[maxEpochGap:] + }() + } + } + +} diff --git a/waku/v2/protocol/rln/onchain_test.go b/waku/v2/protocol/rln/onchain_test.go index 3eaf057e..16e32d20 100644 --- a/waku/v2/protocol/rln/onchain_test.go +++ b/waku/v2/protocol/rln/onchain_test.go @@ -160,7 +160,7 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() { RLN: rlnInstance, }, log: utils.Logger(), - nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata), + nullifierLog: NewNullifierLog(context.TODO(), utils.Logger()), } err = rlnRelay.Start(context.TODO()) diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index 1f99f12f..8dcfe553 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -91,7 +91,7 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() { s.Require().NoError(err) rlnRelay := &WakuRLNRelay{ - nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), + nullifierLog: NewNullifierLog(context.TODO(), utils.Logger()), Details: group_manager.Details{ RootTracker: rootTracker, }, @@ -132,27 +132,25 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() { // check whether hasDuplicate correctly finds records with the same nullifiers but different secret shares // no duplicate for wm1 should be found, since the log is empty - result1, err := rlnRelay.HasDuplicate(md1) + result1, err := rlnRelay.nullifierLog.HasDuplicate(md1) s.Require().NoError(err) s.Require().False(result1) // No duplicate is found // Add it to the log - added, err := rlnRelay.updateLog(md1) + err = rlnRelay.nullifierLog.Insert(md1) s.Require().NoError(err) - s.Require().True(added) // no duplicate for wm2 should be found, its nullifier differs from wm1 - result2, err := rlnRelay.HasDuplicate(md2) + result2, err := rlnRelay.nullifierLog.HasDuplicate(md2) s.Require().NoError(err) s.Require().False(result2) // No duplicate is found // Add it to the log - added, err = rlnRelay.updateLog(md2) + err = rlnRelay.nullifierLog.Insert(md2) s.Require().NoError(err) - s.Require().True(added) // wm3 has the same nullifier as wm1 but different secret shares, it should be detected as duplicate - result3, err := rlnRelay.HasDuplicate(md3) + result3, err := rlnRelay.nullifierLog.HasDuplicate(md3) s.Require().NoError(err) s.Require().True(result3) // It's a duplicate @@ -189,8 +187,7 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { RootTracker: rootTracker, RLN: rlnInstance, }, - - nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), + nullifierLog: NewNullifierLog(context.TODO(), utils.Logger()), log: utils.Logger(), metrics: newMetrics(prometheus.DefaultRegisterer), } diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index fab6ae86..4e489a7d 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -1,12 +1,10 @@ package rln import ( - "bytes" "context" "encoding/hex" "errors" "math" - "sync" "time" "github.com/ethereum/go-ethereum/log" @@ -28,9 +26,7 @@ type WakuRLNRelay struct { group_manager.Details - // the log of nullifiers and Shamir shares of the past messages grouped per epoch - nullifierLogLock sync.RWMutex - nullifierLog map[rln.Nullifier][]rln.ProofMetadata + nullifierLog *NullifierLog log *zap.Logger } @@ -67,17 +63,18 @@ func New( // create the WakuRLNRelay rlnPeer := &WakuRLNRelay{ - Details: Details, - metrics: newMetrics(reg), - log: log, - timesource: timesource, - nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata), + Details: Details, + metrics: newMetrics(reg), + log: log, + timesource: timesource, } return rlnPeer } func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error { + rlnRelay.nullifierLog = NewNullifierLog(ctx, rlnRelay.log) + err := rlnRelay.GroupManager.Start(ctx) if err != nil { return err @@ -93,72 +90,12 @@ func (rlnRelay *WakuRLNRelay) Stop() error { return rlnRelay.GroupManager.Stop() } -func (rlnRelay *WakuRLNRelay) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) { - // returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same - // epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares - // otherwise, returns false - - rlnRelay.nullifierLogLock.RLock() - proofs, ok := rlnRelay.nullifierLog[proofMD.ExternalNullifier] - rlnRelay.nullifierLogLock.RUnlock() - - // check if the epoch exists - if !ok { - return false, nil - } - - for _, p := range proofs { - if p.Equals(proofMD) { - // there is an identical record, ignore rhe mag - return true, nil - } - } - - // check for a message with the same nullifier but different secret shares - matched := false - for _, it := range proofs { - if bytes.Equal(it.Nullifier[:], proofMD.Nullifier[:]) && (!bytes.Equal(it.ShareX[:], proofMD.ShareX[:]) || !bytes.Equal(it.ShareY[:], proofMD.ShareY[:])) { - matched = true - break - } - } - - return matched, nil -} - -func (rlnRelay *WakuRLNRelay) updateLog(proofMD rln.ProofMetadata) (bool, error) { - rlnRelay.nullifierLogLock.Lock() - defer rlnRelay.nullifierLogLock.Unlock() - proofs, ok := rlnRelay.nullifierLog[proofMD.ExternalNullifier] - - // check if the epoch exists - if !ok { - rlnRelay.nullifierLog[proofMD.ExternalNullifier] = []rln.ProofMetadata{proofMD} - return true, nil - } - - // check if an identical record exists - for _, p := range proofs { - if p.Equals(proofMD) { - // TODO: slashing logic - return true, nil - } - } - - // add proofMD to the log - proofs = append(proofs, proofMD) - rlnRelay.nullifierLog[proofMD.ExternalNullifier] = proofs - - return true, nil -} - // ValidateMessage validates the supplied message based on the waku-rln-relay routing protocol i.e., // the message's epoch is within `maxEpochGap` of the current epoch // the message's has valid rate limit proof // the message's does not violate the rate limit // if `optionalTime` is supplied, then the current epoch is calculated based on that, otherwise the current time will be used func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time.Time) (messageValidationResult, error) { - // if msg == nil { return validationError, errors.New("nil message") } @@ -222,7 +159,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime } // check if double messaging has happened - hasDup, err := rlnRelay.HasDuplicate(proofMD) + hasDup, err := rlnRelay.nullifierLog.HasDuplicate(proofMD) if err != nil { rlnRelay.log.Debug("validation error", zap.Error(err)) rlnRelay.metrics.RecordError(duplicateCheckErr) @@ -234,10 +171,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime return spamMessage, nil } - // insert the message to the log - // the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e., - // it will never error out - _, err = rlnRelay.updateLog(proofMD) + err = rlnRelay.nullifierLog.Insert(proofMD) if err != nil { rlnRelay.log.Debug("could not insert proof into log") rlnRelay.metrics.RecordError(logInsertionErr)