diff --git a/waku/v2/protocol/relay/validators.go b/waku/v2/protocol/relay/validators.go index 09ae2047..ed145fdf 100644 --- a/waku/v2/protocol/relay/validators.go +++ b/waku/v2/protocol/relay/validators.go @@ -44,7 +44,15 @@ func validatorFnBuilder(topic string, publicKey *ecdsa.PublicKey) validatorFn { func (w *WakuRelay) AddSignedTopicValidator(topic string, publicKey *ecdsa.PublicKey) error { w.log.Info("adding validator to signed topic", zap.String("topic", topic), zap.String("publicKey", hex.EncodeToString(elliptic.Marshal(publicKey.Curve, publicKey.X, publicKey.Y)))) err := w.pubsub.RegisterTopicValidator(topic, validatorFnBuilder(topic, publicKey)) - return err + if err != nil { + return err + } + + if !w.IsSubscribed(topic) { + w.log.Warn("relay is not subscribed to signed topic", zap.String("topic", topic)) + } + + return nil } func SignMessage(privKey *ecdsa.PrivateKey, topic string, msg *pb.WakuMessage) error { diff --git a/waku/v2/protocol/relay/validators_test.go b/waku/v2/protocol/relay/validators_test.go index 4f6f6016..8f2bff5b 100644 --- a/waku/v2/protocol/relay/validators_test.go +++ b/waku/v2/protocol/relay/validators_test.go @@ -11,6 +11,8 @@ import ( pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "google.golang.org/protobuf/proto" ) @@ -25,6 +27,7 @@ func TestMsgHash(t *testing.T) { msg := &pb.WakuMessage{ Payload: payload, ContentTopic: contentTopic, + Timestamp: utils.GetUnixEpoch(timesource.NewDefaultClock()), } err := SignMessage(prvKey, pubsubTopic, msg) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index a0936da5..8f425ea5 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -8,7 +8,6 @@ import ( "sync" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -133,6 +132,13 @@ func (w *WakuRelay) Topics() []string { return result } +func (w *WakuRelay) IsSubscribed(topic string) bool { + defer w.topicsMutex.Unlock() + w.topicsMutex.Lock() + _, ok := w.relaySubs[topic] + return ok +} + // SetPubSub is used to set an implementation of the pubsub system func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) { w.pubsub = pubSub @@ -154,6 +160,7 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) { return pubSubTopic, nil } +/* func (w *WakuRelay) validatorFactory(pubsubTopic string) func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool { return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool { msg := new(pb.WakuMessage) @@ -161,6 +168,7 @@ func (w *WakuRelay) validatorFactory(pubsubTopic string) func(ctx context.Contex return err == nil } } +*/ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) { sub, ok := w.relaySubs[topic] @@ -170,10 +178,14 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return nil, err } - err = w.pubsub.RegisterTopicValidator(topic, w.validatorFactory(topic)) - if err != nil { - return nil, err - } + /* + // TODO: Add a function to validate the WakuMessage integrity + // Rejects messages that are not WakuMessage + err = w.pubsub.RegisterTopicValidator(topic, w.validatorFactory(topic)) + if err != nil { + return nil, err + } + */ sub, err = pubSubTopic.Subscribe() if err != nil {