go-waku/waku/v2/protocol/rln/waku_rln_relay.go

377 lines
11 KiB
Go
Raw Normal View History

2022-07-05 21:28:34 +00:00
package rln
import (
"bytes"
"context"
"encoding/hex"
2022-07-05 21:28:34 +00:00
"errors"
"math"
2022-10-09 15:54:20 +00:00
"sync"
2022-07-05 21:28:34 +00:00
"time"
2023-04-03 21:45:19 +00:00
"github.com/ethereum/go-ethereum/log"
2022-07-05 21:28:34 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2022-10-19 19:39:32 +00:00
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
2022-12-09 03:08:04 +00:00
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-zerokit-rln/rln"
2022-07-05 21:28:34 +00:00
"go.uber.org/zap"
proto "google.golang.org/protobuf/proto"
2022-07-05 21:28:34 +00:00
)
2023-04-03 21:45:19 +00:00
type GroupManager interface {
Start(ctx context.Context, rln *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error
IdentityCredentials() (rln.IdentityCredential, error)
MembershipIndex() (rln.MembershipIndex, error)
2023-04-03 21:45:19 +00:00
Stop()
}
2022-10-21 19:49:55 +00:00
2022-07-05 21:28:34 +00:00
type WakuRLNRelay struct {
relay *relay.WakuRelay
2022-12-09 03:08:04 +00:00
timesource timesource.Timesource
2022-07-28 14:04:33 +00:00
2023-04-03 21:45:19 +00:00
groupManager GroupManager
rootTracker *group_manager.MerkleRootTracker
2022-07-05 21:28:34 +00:00
2023-04-03 21:45:19 +00:00
// pubsubTopic is the topic for which rln relay is mounted
pubsubTopic string
contentTopic string
spamHandler SpamHandler
2022-08-18 16:27:10 +00:00
RLN *rln.RLN
2022-10-21 19:49:55 +00:00
2022-07-05 21:28:34 +00:00
// the log of nullifiers and Shamir shares of the past messages grouped per epoch
2022-10-09 15:54:20 +00:00
nullifierLogLock sync.RWMutex
nullifierLog map[rln.Nullifier][]rln.ProofMetadata
2022-07-05 21:28:34 +00:00
2023-04-03 21:45:19 +00:00
log *zap.Logger
2022-07-05 21:28:34 +00:00
}
2023-04-03 21:45:19 +00:00
func New(
relay *relay.WakuRelay,
groupManager GroupManager,
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler,
timesource timesource.Timesource,
log *zap.Logger) (*WakuRLNRelay, error) {
rlnInstance, err := rln.NewRLN()
2023-04-03 21:45:19 +00:00
if err != nil {
return nil, err
2022-08-18 16:27:10 +00:00
}
rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
2023-04-12 21:53:23 +00:00
if err != nil {
return nil, err
}
2023-04-03 21:45:19 +00:00
// create the WakuRLNRelay
rlnPeer := &WakuRLNRelay{
RLN: rlnInstance,
groupManager: groupManager,
2023-04-12 21:53:23 +00:00
rootTracker: rootTracker,
2023-04-03 21:45:19 +00:00
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
relay: relay,
spamHandler: spamHandler,
log: log,
timesource: timesource,
nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata),
2023-04-03 21:45:19 +00:00
}
2022-07-05 21:28:34 +00:00
2023-04-03 21:45:19 +00:00
return rlnPeer, nil
}
func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error {
err := rlnRelay.groupManager.Start(ctx, rlnRelay.RLN, rlnRelay.rootTracker)
2022-07-05 21:28:34 +00:00
if err != nil {
2023-04-03 21:45:19 +00:00
return err
}
// adds a topic validator for the supplied pubsub topic at the relay protocol
// messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
// the topic validator checks for the correct non-spamming proof of the message
err = rlnRelay.addValidator(rlnRelay.relay, rlnRelay.pubsubTopic, rlnRelay.contentTopic, rlnRelay.spamHandler)
2023-04-03 21:45:19 +00:00
if err != nil {
return err
2022-07-05 21:28:34 +00:00
}
log.Info("rln relay topic validator mounted", zap.String("pubsubTopic", rlnRelay.pubsubTopic), zap.String("contentTopic", rlnRelay.contentTopic))
2022-07-05 21:28:34 +00:00
2023-04-03 21:45:19 +00:00
return nil
2022-07-05 21:28:34 +00:00
}
func (rlnRelay *WakuRLNRelay) Stop() {
rlnRelay.groupManager.Stop()
2023-04-03 21:45:19 +00:00
}
func (rlnRelay *WakuRLNRelay) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) {
2022-07-05 21:28:34 +00:00
// returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same
// epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares
// otherwise, returns false
rlnRelay.nullifierLogLock.RLock()
proofs, ok := rlnRelay.nullifierLog[proofMD.ExternalNullifier]
rlnRelay.nullifierLogLock.RUnlock()
2022-07-05 21:28:34 +00:00
// check if the epoch exists
if !ok {
return false, nil
}
for _, p := range proofs {
if p.Equals(proofMD) {
// there is an identical record, ignore rhe mag
return true, nil
2022-07-05 21:28:34 +00:00
}
}
// check for a message with the same nullifier but different secret shares
matched := false
for _, it := range proofs {
if bytes.Equal(it.Nullifier[:], proofMD.Nullifier[:]) && (!bytes.Equal(it.ShareX[:], proofMD.ShareX[:]) || !bytes.Equal(it.ShareY[:], proofMD.ShareY[:])) {
matched = true
break
}
}
return matched, nil
}
func (rlnRelay *WakuRLNRelay) updateLog(proofMD rln.ProofMetadata) (bool, error) {
rlnRelay.nullifierLogLock.Lock()
defer rlnRelay.nullifierLogLock.Unlock()
proofs, ok := rlnRelay.nullifierLog[proofMD.ExternalNullifier]
2022-07-05 21:28:34 +00:00
// check if the epoch exists
if !ok {
rlnRelay.nullifierLog[proofMD.ExternalNullifier] = []rln.ProofMetadata{proofMD}
2022-07-05 21:28:34 +00:00
return true, nil
}
// check if an identical record exists
for _, p := range proofs {
if p.Equals(proofMD) {
2023-04-03 21:45:19 +00:00
// TODO: slashing logic
2022-07-05 21:28:34 +00:00
return true, nil
}
}
// add proofMD to the log
proofs = append(proofs, proofMD)
rlnRelay.nullifierLog[proofMD.ExternalNullifier] = proofs
2022-07-05 21:28:34 +00:00
return true, nil
}
// ValidateMessage validates the supplied message based on the waku-rln-relay routing protocol i.e.,
// the message's epoch is within `maxEpochGap` of the current epoch
// the message's has valid rate limit proof
// the message's does not violate the rate limit
// if `optionalTime` is supplied, then the current epoch is calculated based on that, otherwise the current time will be used
func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time.Time) (messageValidationResult, error) {
//
2022-07-05 21:28:34 +00:00
if msg == nil {
return validationError, errors.New("nil message")
2022-07-05 21:28:34 +00:00
}
// checks if the `msg`'s epoch is far from the current epoch
// it corresponds to the validation of rln external nullifier
var epoch rln.Epoch
2022-07-05 21:28:34 +00:00
if optionalTime != nil {
epoch = rln.CalcEpoch(*optionalTime)
2022-07-05 21:28:34 +00:00
} else {
// get current rln epoch
epoch = rln.CalcEpoch(rlnRelay.timesource.Now())
2022-07-05 21:28:34 +00:00
}
msgProof := toRateLimitProof(msg)
2022-07-06 16:59:38 +00:00
if msgProof == nil {
// message does not contain a proof
rlnRelay.log.Debug("invalid message: message does not contain a proof")
return invalidMessage, nil
2022-07-06 16:59:38 +00:00
}
2022-07-05 21:28:34 +00:00
proofMD, err := rlnRelay.RLN.ExtractMetadata(*msgProof)
2023-04-03 21:45:19 +00:00
if err != nil {
rlnRelay.log.Debug("could not extract metadata", zap.Error(err))
return invalidMessage, nil
2023-04-03 21:45:19 +00:00
}
2022-07-06 16:59:38 +00:00
// calculate the gaps and validate the epoch
gap := rln.Diff(epoch, msgProof.Epoch)
if int64(math.Abs(float64(gap))) > maxEpochGap {
2022-07-05 21:28:34 +00:00
// message's epoch is too old or too ahead
// accept messages whose epoch is within +-MAX_EPOCH_GAP from the current epoch
rlnRelay.log.Debug("invalid message: epoch gap exceeds a threshold", zap.Int64("gap", gap))
return invalidMessage, nil
2022-07-05 21:28:34 +00:00
}
valid, err := rlnRelay.verifyProof(msg, msgProof)
2022-10-07 22:58:16 +00:00
if err != nil {
rlnRelay.log.Debug("could not verify proof", zap.Error(err))
return invalidMessage, nil
2022-10-07 22:58:16 +00:00
}
if !valid {
2022-07-05 21:28:34 +00:00
// invalid proof
rlnRelay.log.Debug("Invalid proof")
return invalidMessage, nil
2022-07-05 21:28:34 +00:00
}
// check if double messaging has happened
hasDup, err := rlnRelay.HasDuplicate(proofMD)
2022-07-05 21:28:34 +00:00
if err != nil {
rlnRelay.log.Debug("validation error", zap.Error(err))
return validationError, err
2022-07-05 21:28:34 +00:00
}
if hasDup {
rlnRelay.log.Debug("spam received")
return spamMessage, nil
2022-07-05 21:28:34 +00:00
}
// insert the message to the log
// the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e.,
// it will never error out
_, err = rlnRelay.updateLog(proofMD)
2022-07-05 21:28:34 +00:00
if err != nil {
return validationError, err
2022-07-05 21:28:34 +00:00
}
rlnRelay.log.Debug("message is valid")
return validMessage, nil
2022-07-05 21:28:34 +00:00
}
func (rlnRelay *WakuRLNRelay) verifyProof(msg *pb.WakuMessage, proof *rln.RateLimitProof) (bool, error) {
contentTopicBytes := []byte(msg.ContentTopic)
input := append(msg.Payload, contentTopicBytes...)
return rlnRelay.RLN.Verify(input, *proof, rlnRelay.rootTracker.Roots()...)
}
func (rlnRelay *WakuRLNRelay) AppendRLNProof(msg *pb.WakuMessage, senderEpochTime time.Time) error {
2022-07-05 21:28:34 +00:00
// returns error if it could not create and append a `RateLimitProof` to the supplied `msg`
// `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds.
// The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`)
if msg == nil {
return errors.New("nil message")
}
input := toRLNSignal(msg)
proof, err := rlnRelay.generateProof(input, rln.CalcEpoch(senderEpochTime))
2022-07-05 21:28:34 +00:00
if err != nil {
return err
}
msg.RateLimitProof = proof
2022-07-05 21:28:34 +00:00
return nil
}
// this function sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
// if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic
// the message validation logic is according to https://rfc.vac.dev/spec/17/
func (rlnRelay *WakuRLNRelay) addValidator(
2022-07-05 21:28:34 +00:00
relay *relay.WakuRelay,
pubsubTopic string,
contentTopic string,
2022-07-06 20:29:20 +00:00
spamHandler SpamHandler) error {
2022-07-05 21:28:34 +00:00
validator := func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
rlnRelay.log.Debug("rln-relay topic validator called")
2022-07-05 21:28:34 +00:00
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(message.Data, wakuMessage); err != nil {
rlnRelay.log.Debug("could not unmarshal message")
2022-07-05 21:28:34 +00:00
return true
}
// check the contentTopic
if (wakuMessage.ContentTopic != "") && (contentTopic != "") && (wakuMessage.ContentTopic != contentTopic) {
rlnRelay.log.Debug("content topic did not match", zap.String("contentTopic", contentTopic))
2022-07-05 21:28:34 +00:00
return true
}
// validate the message
validationRes, err := rlnRelay.ValidateMessage(wakuMessage, nil)
2022-07-05 21:28:34 +00:00
if err != nil {
rlnRelay.log.Debug("validating message", zap.Error(err))
2022-07-05 21:28:34 +00:00
return false
}
switch validationRes {
case validMessage:
rlnRelay.log.Debug("message verified",
zap.String("pubsubTopic", pubsubTopic),
zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))),
2022-07-05 21:28:34 +00:00
)
return true
case invalidMessage:
rlnRelay.log.Debug("message could not be verified",
zap.String("pubsubTopic", pubsubTopic),
zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))),
2022-07-05 21:28:34 +00:00
)
2022-10-04 23:15:39 +00:00
return false
case spamMessage:
rlnRelay.log.Debug("spam message found",
zap.String("pubsubTopic", pubsubTopic),
zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))),
2022-07-05 21:28:34 +00:00
)
if spamHandler != nil {
if err := spamHandler(wakuMessage); err != nil {
rlnRelay.log.Error("executing spam handler", zap.Error(err))
2022-07-05 21:28:34 +00:00
}
}
return false
default:
rlnRelay.log.Debug("unhandled validation result", zap.Int("validationResult", int(validationRes)))
2022-07-05 21:28:34 +00:00
return false
}
}
// In case there's a topic validator registered
_ = relay.PubSub().UnregisterTopicValidator(pubsubTopic)
2022-07-06 20:29:20 +00:00
return relay.PubSub().RegisterTopicValidator(pubsubTopic, validator)
2022-07-05 21:28:34 +00:00
}
func (rlnRelay *WakuRLNRelay) generateProof(input []byte, epoch rln.Epoch) (*pb.RateLimitProof, error) {
identityCredentials, err := rlnRelay.groupManager.IdentityCredentials()
if err != nil {
return nil, err
2022-07-05 21:28:34 +00:00
}
membershipIndex, err := rlnRelay.groupManager.MembershipIndex()
if err != nil {
return nil, err
2022-07-05 21:28:34 +00:00
}
proof, err := rlnRelay.RLN.GenerateProof(input, identityCredentials, membershipIndex, epoch)
if err != nil {
return nil, err
2022-07-05 21:28:34 +00:00
}
return &pb.RateLimitProof{
Proof: proof.Proof[:],
MerkleRoot: proof.MerkleRoot[:],
Epoch: proof.Epoch[:],
ShareX: proof.ShareX[:],
ShareY: proof.ShareY[:],
Nullifier: proof.Nullifier[:],
RlnIdentifier: proof.RLNIdentifier[:],
}, nil
2022-07-05 21:28:34 +00:00
}
func (rlnRelay *WakuRLNRelay) IdentityCredential() (rln.IdentityCredential, error) {
return rlnRelay.groupManager.IdentityCredentials()
}
func (rlnRelay *WakuRLNRelay) MembershipIndex() (uint, error) {
return rlnRelay.groupManager.MembershipIndex()
}