go-waku/waku/v2/protocol/rln/waku_rln_relay.go

315 lines
9.0 KiB
Go
Raw Normal View History

2022-07-05 21:28:34 +00:00
package rln
import (
"context"
"errors"
"math"
"time"
2023-04-03 21:45:19 +00:00
"github.com/ethereum/go-ethereum/log"
2023-08-22 19:30:04 +00:00
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
rlnpb "github.com/waku-org/go-waku/waku/v2/protocol/rln/pb"
2022-12-09 03:08:04 +00:00
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-zerokit-rln/rln"
2022-07-05 21:28:34 +00:00
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
2022-07-05 21:28:34 +00:00
)
type WakuRLNRelay struct {
2022-12-09 03:08:04 +00:00
timesource timesource.Timesource
2023-08-22 19:30:04 +00:00
metrics Metrics
2022-07-28 14:04:33 +00:00
group_manager.Details
2022-10-21 19:49:55 +00:00
nullifierLog *NullifierLog
2022-07-05 21:28:34 +00:00
2023-04-03 21:45:19 +00:00
log *zap.Logger
2022-07-05 21:28:34 +00:00
}
const rlnDefaultTreePath = "./rln_tree.db"
func GetRLNInstanceAndRootTracker(treePath string) (*rln.RLN, *group_manager.MerkleRootTracker, error) {
if treePath == "" {
treePath = rlnDefaultTreePath
}
rlnInstance, err := rln.NewWithConfig(rln.DefaultTreeDepth, &rln.TreeConfig{
CacheCapacity: 15000,
Mode: rln.HighThroughput,
Compression: false,
FlushInterval: 500 * time.Millisecond,
Path: treePath,
})
2023-04-03 21:45:19 +00:00
if err != nil {
return nil, nil, err
2022-08-18 16:27:10 +00:00
}
rootTracker := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
return rlnInstance, rootTracker, nil
}
func New(
Details group_manager.Details,
timesource timesource.Timesource,
reg prometheus.Registerer,
log *zap.Logger) *WakuRLNRelay {
2023-04-12 21:53:23 +00:00
2023-04-03 21:45:19 +00:00
// create the WakuRLNRelay
rlnPeer := &WakuRLNRelay{
Details: Details,
metrics: newMetrics(reg),
log: log,
timesource: timesource,
2023-04-03 21:45:19 +00:00
}
2022-07-05 21:28:34 +00:00
return rlnPeer
2023-04-03 21:45:19 +00:00
}
func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error {
rlnRelay.nullifierLog = NewNullifierLog(ctx, rlnRelay.log)
err := rlnRelay.GroupManager.Start(ctx)
2022-07-05 21:28:34 +00:00
if err != nil {
2023-04-03 21:45:19 +00:00
return err
}
log.Info("rln relay topic validator mounted")
2022-07-05 21:28:34 +00:00
2023-04-03 21:45:19 +00:00
return nil
2022-07-05 21:28:34 +00:00
}
// Stop will stop any operation or goroutine started while using WakuRLNRelay
func (rlnRelay *WakuRLNRelay) Stop() error {
return rlnRelay.GroupManager.Stop()
2023-04-03 21:45:19 +00:00
}
// ValidateMessage validates the supplied message based on the waku-rln-relay routing protocol i.e.,
// the message's epoch is within `maxEpochGap` of the current epoch
// the message's has valid rate limit proof
// the message's does not violate the rate limit
// if `optionalTime` is supplied, then the current epoch is calculated based on that, otherwise the current time will be used
func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time.Time) (messageValidationResult, error) {
2022-07-05 21:28:34 +00:00
if msg == nil {
return validationError, errors.New("nil message")
2022-07-05 21:28:34 +00:00
}
// checks if the `msg`'s epoch is far from the current epoch
// it corresponds to the validation of rln external nullifier
var epoch rln.Epoch
2022-07-05 21:28:34 +00:00
if optionalTime != nil {
epoch = rln.CalcEpoch(*optionalTime)
2022-07-05 21:28:34 +00:00
} else {
// get current rln epoch
epoch = rln.CalcEpoch(rlnRelay.timesource.Now())
2022-07-05 21:28:34 +00:00
}
msgProof, err := BytesToRateLimitProof(msg.RateLimitProof)
if err != nil {
rlnRelay.log.Debug("invalid message: could not extract proof", zap.Error(err))
rlnRelay.metrics.RecordInvalidMessage(proofExtractionErr)
}
2022-07-06 16:59:38 +00:00
if msgProof == nil {
// message does not contain a proof
rlnRelay.log.Debug("invalid message: message does not contain a proof")
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordInvalidMessage(invalidNoProof)
return invalidMessage, nil
2022-07-06 16:59:38 +00:00
}
2022-07-05 21:28:34 +00:00
proofMD, err := rlnRelay.RLN.ExtractMetadata(*msgProof)
2023-04-03 21:45:19 +00:00
if err != nil {
rlnRelay.log.Debug("could not extract metadata", zap.Error(err))
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordError(proofMetadataExtractionErr)
return invalidMessage, nil
2023-04-03 21:45:19 +00:00
}
2022-07-06 16:59:38 +00:00
// calculate the gaps and validate the epoch
gap := rln.Diff(epoch, msgProof.Epoch)
if int64(math.Abs(float64(gap))) > maxEpochGap {
2022-07-05 21:28:34 +00:00
// message's epoch is too old or too ahead
// accept messages whose epoch is within +-MAX_EPOCH_GAP from the current epoch
rlnRelay.log.Debug("invalid message: epoch gap exceeds a threshold", zap.Int64("gap", gap))
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordInvalidMessage(invalidEpoch)
return invalidMessage, nil
}
if !(rlnRelay.RootTracker.ContainsRoot(msgProof.MerkleRoot)) {
rlnRelay.log.Debug("invalid message: unexpected root", logging.HexBytes("msgRoot", msgProof.MerkleRoot[:]))
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordInvalidMessage(invalidRoot)
return invalidMessage, nil
2022-07-05 21:28:34 +00:00
}
2023-08-22 19:30:04 +00:00
start := time.Now()
valid, err := rlnRelay.verifyProof(msg, msgProof)
2022-10-07 22:58:16 +00:00
if err != nil {
rlnRelay.log.Debug("could not verify proof", zap.Error(err))
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordError(proofVerificationErr)
return invalidMessage, nil
2022-10-07 22:58:16 +00:00
}
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordProofVerification(time.Since(start))
2022-10-07 22:58:16 +00:00
if !valid {
2022-07-05 21:28:34 +00:00
// invalid proof
rlnRelay.log.Debug("Invalid proof")
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordInvalidMessage(invalidProof)
return invalidMessage, nil
2022-07-05 21:28:34 +00:00
}
// check if double messaging has happened
hasDup, err := rlnRelay.nullifierLog.HasDuplicate(proofMD)
2022-07-05 21:28:34 +00:00
if err != nil {
rlnRelay.log.Debug("validation error", zap.Error(err))
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordError(duplicateCheckErr)
return validationError, err
2022-07-05 21:28:34 +00:00
}
if hasDup {
rlnRelay.log.Debug("spam received")
return spamMessage, nil
2022-07-05 21:28:34 +00:00
}
err = rlnRelay.nullifierLog.Insert(proofMD)
2022-07-05 21:28:34 +00:00
if err != nil {
2023-08-22 19:30:04 +00:00
rlnRelay.log.Debug("could not insert proof into log")
rlnRelay.metrics.RecordError(logInsertionErr)
return validationError, err
2022-07-05 21:28:34 +00:00
}
rlnRelay.log.Debug("message is valid")
2023-08-22 19:30:04 +00:00
rootIndex := rlnRelay.RootTracker.IndexOf(msgProof.MerkleRoot)
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordValidMessages(rootIndex)
return validMessage, nil
2022-07-05 21:28:34 +00:00
}
func (rlnRelay *WakuRLNRelay) verifyProof(msg *pb.WakuMessage, proof *rln.RateLimitProof) (bool, error) {
contentTopicBytes := []byte(msg.ContentTopic)
input := append(msg.Payload, contentTopicBytes...)
return rlnRelay.RLN.Verify(input, *proof, rlnRelay.RootTracker.Roots()...)
}
func (rlnRelay *WakuRLNRelay) AppendRLNProof(msg *pb.WakuMessage, senderEpochTime time.Time) error {
2022-07-05 21:28:34 +00:00
// returns error if it could not create and append a `RateLimitProof` to the supplied `msg`
// `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds.
// The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`)
if msg == nil {
return errors.New("nil message")
}
input := toRLNSignal(msg)
2023-08-22 19:30:04 +00:00
start := time.Now()
proof, err := rlnRelay.generateProof(input, rln.CalcEpoch(senderEpochTime))
2022-07-05 21:28:34 +00:00
if err != nil {
return err
}
2023-08-22 19:30:04 +00:00
rlnRelay.metrics.RecordProofGeneration(time.Since(start))
2022-07-05 21:28:34 +00:00
b, err := proto.Marshal(proof)
if err != nil {
return err
}
msg.RateLimitProof = b
//If msgTimeStamp is not set, then set it to timestamp of proof
if msg.Timestamp == nil {
msg.Timestamp = proto.Int64(senderEpochTime.Unix())
}
2022-07-05 21:28:34 +00:00
return nil
}
// Validator returns a validator for the waku messages.
// The message validation logic is according to https://rfc.vac.dev/spec/17/
func (rlnRelay *WakuRLNRelay) Validator(
spamHandler SpamHandler) func(ctx context.Context, msg *pb.WakuMessage, topic string) bool {
return func(ctx context.Context, msg *pb.WakuMessage, topic string) bool {
2022-07-05 21:28:34 +00:00
hash := msg.Hash(topic)
2023-08-22 19:30:04 +00:00
log := rlnRelay.log.With(
logging.HexBytes("hash", hash),
zap.String("pubsubTopic", topic),
zap.String("contentTopic", msg.ContentTopic),
)
log.Debug("rln-relay topic validator called")
rlnRelay.metrics.RecordMessage()
2022-07-05 21:28:34 +00:00
// validate the message
validationRes, err := rlnRelay.ValidateMessage(msg, nil)
2022-07-05 21:28:34 +00:00
if err != nil {
log.Debug("validating message", zap.Error(err))
2022-07-05 21:28:34 +00:00
return false
}
switch validationRes {
case validMessage:
log.Debug("message verified")
2022-07-05 21:28:34 +00:00
return true
case invalidMessage:
log.Debug("message could not be verified")
2022-10-04 23:15:39 +00:00
return false
case spamMessage:
log.Debug("spam message found")
2022-07-05 21:28:34 +00:00
rlnRelay.metrics.RecordSpam(msg.ContentTopic)
2023-08-22 19:30:04 +00:00
2022-07-05 21:28:34 +00:00
if spamHandler != nil {
if err := spamHandler(msg, topic); err != nil {
log.Error("executing spam handler", zap.Error(err))
2022-07-05 21:28:34 +00:00
}
}
return false
default:
log.Debug("unhandled validation result", zap.Int("validationResult", int(validationRes)))
2022-07-05 21:28:34 +00:00
return false
}
}
}
func (rlnRelay *WakuRLNRelay) generateProof(input []byte, epoch rln.Epoch) (*rlnpb.RateLimitProof, error) {
identityCredentials, err := rlnRelay.GroupManager.IdentityCredentials()
if err != nil {
return nil, err
2022-07-05 21:28:34 +00:00
}
membershipIndex := rlnRelay.GroupManager.MembershipIndex()
2022-07-05 21:28:34 +00:00
proof, err := rlnRelay.RLN.GenerateProof(input, identityCredentials, membershipIndex, epoch)
if err != nil {
return nil, err
2022-07-05 21:28:34 +00:00
}
return &rlnpb.RateLimitProof{
Proof: proof.Proof[:],
MerkleRoot: proof.MerkleRoot[:],
Epoch: proof.Epoch[:],
ShareX: proof.ShareX[:],
ShareY: proof.ShareY[:],
Nullifier: proof.Nullifier[:],
RlnIdentifier: proof.RLNIdentifier[:],
}, nil
2022-07-05 21:28:34 +00:00
}
func (rlnRelay *WakuRLNRelay) IdentityCredential() (rln.IdentityCredential, error) {
return rlnRelay.GroupManager.IdentityCredentials()
}
func (rlnRelay *WakuRLNRelay) MembershipIndex() uint {
return rlnRelay.GroupManager.MembershipIndex()
}
2023-09-11 21:34:56 +00:00
// IsReady returns true if the RLN Relay protocol is ready to relay messages
func (rlnRelay *WakuRLNRelay) IsReady(ctx context.Context) (bool, error) {
return rlnRelay.GroupManager.IsReady(ctx)
}