diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index eaddf565..d49b5b9d 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -113,6 +113,12 @@ func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Str defer s.Close() logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) + if !wf.subscriptions.IsSubscribedTo(s.Conn().RemotePeer()) { + logger.Warn("received message push from unknown peer", logging.HostID("peerID", s.Conn().RemotePeer())) + metrics.RecordFilterError(ctx, "unknown_peer_messagepush") + return + } + reader := pbio.NewDelimitedReader(s, math.MaxInt32) messagePush := &pb.MessagePushV2{} @@ -123,6 +129,12 @@ func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Str return } + if !wf.subscriptions.Has(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { + logger.Warn("received messagepush with invalid subscription parameters", logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", messagePush.PubsubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) + metrics.RecordFilterError(ctx, "invalid_subscription_message") + return + } + metrics.RecordFilterMessage(ctx, "PushMessage", 1) wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage) @@ -237,7 +249,7 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { - if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics) { + if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) { return nil, errors.New("subscription does not exist") } diff --git a/waku/v2/protocol/filter/subscriptions_map.go b/waku/v2/protocol/filter/subscriptions_map.go index 663d419b..c0afbbcb 100644 --- a/waku/v2/protocol/filter/subscriptions_map.go +++ b/waku/v2/protocol/filter/subscriptions_map.go @@ -76,7 +76,18 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, conte return details } -func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics []string) bool { +func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool { + sub.RLock() + defer sub.RUnlock() + + _, ok := sub.items[peerID] + return ok +} + +func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ...string) bool { + sub.RLock() + defer sub.RUnlock() + // Check if peer exits peerSubscription, ok := sub.items[peerID] if !ok {