mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-27 05:56:07 +00:00
chore(rln-relay): clean up nullifier table every MaxEpochGap
This commit is contained in:
parent
793c059ea7
commit
18efd2c737
120
waku/v2/protocol/rln/nullifier_log.go
Normal file
120
waku/v2/protocol/rln/nullifier_log.go
Normal file
@ -0,0 +1,120 @@
|
||||
package rln
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/waku-org/go-zerokit-rln/rln"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// NullifierLog is the log of nullifiers and Shamir shares of the past messages grouped per epoch
|
||||
type NullifierLog struct {
|
||||
sync.RWMutex
|
||||
|
||||
log *zap.Logger
|
||||
nullifierLog map[rln.Nullifier][]rln.ProofMetadata // Might make sense to replace this map by a shrinkable map due to https://github.com/golang/go/issues/20135.
|
||||
nullifierQueue []rln.Nullifier
|
||||
}
|
||||
|
||||
// NewNullifierLog creates an instance of NullifierLog
|
||||
func NewNullifierLog(ctx context.Context, log *zap.Logger) *NullifierLog {
|
||||
result := &NullifierLog{
|
||||
nullifierLog: make(map[rln.Nullifier][]rln.ProofMetadata),
|
||||
log: log,
|
||||
}
|
||||
|
||||
go result.cleanup(ctx)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
var errAlreadyExists = errors.New("proof already exists")
|
||||
|
||||
// Insert stores a proof in the nullifier log only if it doesnt exist already
|
||||
func (n *NullifierLog) Insert(proofMD rln.ProofMetadata) error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
proofs, ok := n.nullifierLog[proofMD.ExternalNullifier]
|
||||
if ok {
|
||||
// check if an identical record exists
|
||||
for _, p := range proofs {
|
||||
if p.Equals(proofMD) {
|
||||
// TODO: slashing logic
|
||||
return errAlreadyExists
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
n.nullifierLog[proofMD.ExternalNullifier] = append(proofs, proofMD)
|
||||
n.nullifierQueue = append(n.nullifierQueue, proofMD.ExternalNullifier)
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasDuplicate returns true if there is another message in the `nullifierLog` with the same
|
||||
// epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares
|
||||
// otherwise, returns false
|
||||
func (n *NullifierLog) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
proofs, ok := n.nullifierLog[proofMD.ExternalNullifier]
|
||||
if !ok {
|
||||
// epoch does not exist
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, p := range proofs {
|
||||
if p.Equals(proofMD) {
|
||||
// there is an identical record, ignore the msg
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// check for a message with the same nullifier but different secret shares
|
||||
matched := false
|
||||
for _, it := range proofs {
|
||||
if bytes.Equal(it.Nullifier[:], proofMD.Nullifier[:]) && (!bytes.Equal(it.ShareX[:], proofMD.ShareX[:]) || !bytes.Equal(it.ShareY[:], proofMD.ShareY[:])) {
|
||||
matched = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return matched, nil
|
||||
}
|
||||
|
||||
// cleanup cleans up the log every time there are more than MaxEpochGap epochs stored in it
|
||||
func (n *NullifierLog) cleanup(ctx context.Context) {
|
||||
t := time.NewTicker(1 * time.Minute) // TODO: tune this
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-t.C:
|
||||
func() {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
if int64(len(n.nullifierQueue)) < maxEpochGap {
|
||||
return
|
||||
}
|
||||
|
||||
n.log.Debug("clearing epochs from the nullifier log", zap.Int64("count", maxEpochGap))
|
||||
|
||||
toDelete := n.nullifierQueue[0:maxEpochGap]
|
||||
for _, l := range toDelete {
|
||||
delete(n.nullifierLog, l)
|
||||
}
|
||||
n.nullifierQueue = n.nullifierQueue[maxEpochGap:]
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -160,7 +160,7 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() {
|
||||
RLN: rlnInstance,
|
||||
},
|
||||
log: utils.Logger(),
|
||||
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
|
||||
nullifierLog: NewNullifierLog(context.TODO(), utils.Logger()),
|
||||
}
|
||||
|
||||
err = rlnRelay.Start(context.TODO())
|
||||
|
@ -91,7 +91,7 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() {
|
||||
s.Require().NoError(err)
|
||||
|
||||
rlnRelay := &WakuRLNRelay{
|
||||
nullifierLog: make(map[r.Nullifier][]r.ProofMetadata),
|
||||
nullifierLog: NewNullifierLog(context.TODO(), utils.Logger()),
|
||||
Details: group_manager.Details{
|
||||
RootTracker: rootTracker,
|
||||
},
|
||||
@ -132,27 +132,25 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() {
|
||||
|
||||
// check whether hasDuplicate correctly finds records with the same nullifiers but different secret shares
|
||||
// no duplicate for wm1 should be found, since the log is empty
|
||||
result1, err := rlnRelay.HasDuplicate(md1)
|
||||
result1, err := rlnRelay.nullifierLog.HasDuplicate(md1)
|
||||
s.Require().NoError(err)
|
||||
s.Require().False(result1) // No duplicate is found
|
||||
|
||||
// Add it to the log
|
||||
added, err := rlnRelay.updateLog(md1)
|
||||
err = rlnRelay.nullifierLog.Insert(md1)
|
||||
s.Require().NoError(err)
|
||||
s.Require().True(added)
|
||||
|
||||
// no duplicate for wm2 should be found, its nullifier differs from wm1
|
||||
result2, err := rlnRelay.HasDuplicate(md2)
|
||||
result2, err := rlnRelay.nullifierLog.HasDuplicate(md2)
|
||||
s.Require().NoError(err)
|
||||
s.Require().False(result2) // No duplicate is found
|
||||
|
||||
// Add it to the log
|
||||
added, err = rlnRelay.updateLog(md2)
|
||||
err = rlnRelay.nullifierLog.Insert(md2)
|
||||
s.Require().NoError(err)
|
||||
s.Require().True(added)
|
||||
|
||||
// wm3 has the same nullifier as wm1 but different secret shares, it should be detected as duplicate
|
||||
result3, err := rlnRelay.HasDuplicate(md3)
|
||||
result3, err := rlnRelay.nullifierLog.HasDuplicate(md3)
|
||||
s.Require().NoError(err)
|
||||
s.Require().True(result3) // It's a duplicate
|
||||
|
||||
@ -189,8 +187,7 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() {
|
||||
RootTracker: rootTracker,
|
||||
RLN: rlnInstance,
|
||||
},
|
||||
|
||||
nullifierLog: make(map[r.Nullifier][]r.ProofMetadata),
|
||||
nullifierLog: NewNullifierLog(context.TODO(), utils.Logger()),
|
||||
log: utils.Logger(),
|
||||
metrics: newMetrics(prometheus.DefaultRegisterer),
|
||||
}
|
||||
|
@ -1,12 +1,10 @@
|
||||
package rln
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
@ -28,9 +26,7 @@ type WakuRLNRelay struct {
|
||||
|
||||
group_manager.Details
|
||||
|
||||
// the log of nullifiers and Shamir shares of the past messages grouped per epoch
|
||||
nullifierLogLock sync.RWMutex
|
||||
nullifierLog map[rln.Nullifier][]rln.ProofMetadata
|
||||
nullifierLog *NullifierLog
|
||||
|
||||
log *zap.Logger
|
||||
}
|
||||
@ -67,17 +63,18 @@ func New(
|
||||
|
||||
// create the WakuRLNRelay
|
||||
rlnPeer := &WakuRLNRelay{
|
||||
Details: Details,
|
||||
metrics: newMetrics(reg),
|
||||
log: log,
|
||||
timesource: timesource,
|
||||
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
|
||||
Details: Details,
|
||||
metrics: newMetrics(reg),
|
||||
log: log,
|
||||
timesource: timesource,
|
||||
}
|
||||
|
||||
return rlnPeer
|
||||
}
|
||||
|
||||
func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error {
|
||||
rlnRelay.nullifierLog = NewNullifierLog(ctx, rlnRelay.log)
|
||||
|
||||
err := rlnRelay.GroupManager.Start(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -93,72 +90,12 @@ func (rlnRelay *WakuRLNRelay) Stop() error {
|
||||
return rlnRelay.GroupManager.Stop()
|
||||
}
|
||||
|
||||
func (rlnRelay *WakuRLNRelay) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) {
|
||||
// returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same
|
||||
// epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares
|
||||
// otherwise, returns false
|
||||
|
||||
rlnRelay.nullifierLogLock.RLock()
|
||||
proofs, ok := rlnRelay.nullifierLog[proofMD.ExternalNullifier]
|
||||
rlnRelay.nullifierLogLock.RUnlock()
|
||||
|
||||
// check if the epoch exists
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, p := range proofs {
|
||||
if p.Equals(proofMD) {
|
||||
// there is an identical record, ignore rhe mag
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// check for a message with the same nullifier but different secret shares
|
||||
matched := false
|
||||
for _, it := range proofs {
|
||||
if bytes.Equal(it.Nullifier[:], proofMD.Nullifier[:]) && (!bytes.Equal(it.ShareX[:], proofMD.ShareX[:]) || !bytes.Equal(it.ShareY[:], proofMD.ShareY[:])) {
|
||||
matched = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return matched, nil
|
||||
}
|
||||
|
||||
func (rlnRelay *WakuRLNRelay) updateLog(proofMD rln.ProofMetadata) (bool, error) {
|
||||
rlnRelay.nullifierLogLock.Lock()
|
||||
defer rlnRelay.nullifierLogLock.Unlock()
|
||||
proofs, ok := rlnRelay.nullifierLog[proofMD.ExternalNullifier]
|
||||
|
||||
// check if the epoch exists
|
||||
if !ok {
|
||||
rlnRelay.nullifierLog[proofMD.ExternalNullifier] = []rln.ProofMetadata{proofMD}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// check if an identical record exists
|
||||
for _, p := range proofs {
|
||||
if p.Equals(proofMD) {
|
||||
// TODO: slashing logic
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// add proofMD to the log
|
||||
proofs = append(proofs, proofMD)
|
||||
rlnRelay.nullifierLog[proofMD.ExternalNullifier] = proofs
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// ValidateMessage validates the supplied message based on the waku-rln-relay routing protocol i.e.,
|
||||
// the message's epoch is within `maxEpochGap` of the current epoch
|
||||
// the message's has valid rate limit proof
|
||||
// the message's does not violate the rate limit
|
||||
// if `optionalTime` is supplied, then the current epoch is calculated based on that, otherwise the current time will be used
|
||||
func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time.Time) (messageValidationResult, error) {
|
||||
//
|
||||
if msg == nil {
|
||||
return validationError, errors.New("nil message")
|
||||
}
|
||||
@ -222,7 +159,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime
|
||||
}
|
||||
|
||||
// check if double messaging has happened
|
||||
hasDup, err := rlnRelay.HasDuplicate(proofMD)
|
||||
hasDup, err := rlnRelay.nullifierLog.HasDuplicate(proofMD)
|
||||
if err != nil {
|
||||
rlnRelay.log.Debug("validation error", zap.Error(err))
|
||||
rlnRelay.metrics.RecordError(duplicateCheckErr)
|
||||
@ -234,10 +171,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime
|
||||
return spamMessage, nil
|
||||
}
|
||||
|
||||
// insert the message to the log
|
||||
// the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e.,
|
||||
// it will never error out
|
||||
_, err = rlnRelay.updateLog(proofMD)
|
||||
err = rlnRelay.nullifierLog.Insert(proofMD)
|
||||
if err != nil {
|
||||
rlnRelay.log.Debug("could not insert proof into log")
|
||||
rlnRelay.metrics.RecordError(logInsertionErr)
|
||||
|
Loading…
x
Reference in New Issue
Block a user