diff --git a/waku/v2/protocol/filter/filter_subscribers.go b/waku/v2/protocol/filter/filter_subscribers.go index 0c9adf85..4d6c4f67 100644 --- a/waku/v2/protocol/filter/filter_subscribers.go +++ b/waku/v2/protocol/filter/filter_subscribers.go @@ -36,6 +36,17 @@ func (sub *Subscribers) Append(s Subscriber) int { return len(sub.subscribers) } +func (subs *Subscribers) SubscriberHasContentTopic(sub Subscriber, topic string) bool { + subs.RLock() + defer subs.RUnlock() + for _, filter := range sub.filter.ContentFilters { + if filter.ContentTopic == topic { + return true + } + } + return false +} + func (sub *Subscribers) Items() <-chan Subscriber { c := make(chan Subscriber) @@ -59,6 +70,13 @@ func (sub *Subscribers) Length() int { return len(sub.subscribers) } +func (sub *Subscribers) IsFailedPeer(peerID peer.ID) bool { + sub.RLock() + defer sub.RUnlock() + _, ok := sub.failedPeers[peerID] + return ok +} + func (sub *Subscribers) FlagAsSuccess(peerID peer.ID) { sub.Lock() defer sub.Unlock() diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 135c6f4b..ffd5b955 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -199,21 +199,17 @@ func (wf *WakuFilter) FilterListener() { continue } - for _, filter := range subscriber.filter.ContentFilters { - if msg.ContentTopic == filter.ContentTopic { - logger.Info("found matching content topic", zap.Stringer("filter", filter)) - // Do a message push to light node - logger.Info("pushing message to light node") - g.Go(func() (err error) { - err = wf.pushMessage(subscriber, msg) - if err != nil { - logger.Error("pushing message", zap.Error(err)) - } - return err - }) - // Break if we have found a match - break - } + if wf.subscribers.SubscriberHasContentTopic(subscriber, msg.ContentTopic) { + logger.Info("found matching content topic", zap.String("contentTopic", msg.ContentTopic)) + // Do a message push to light node + logger.Info("pushing message to light node") + g.Go(func() (err error) { + err = wf.pushMessage(subscriber, msg) + if err != nil { + logger.Error("pushing message", zap.Error(err)) + } + return err + }) } } diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 2b1370cc..3b0dcfd7 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -175,8 +175,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { // Sleep to make sure the filter is subscribed time.Sleep(2 * time.Second) - _, ok := node2Filter.subscribers.failedPeers[host1.ID()] - require.True(t, ok) + require.True(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) var wg sync.WaitGroup @@ -187,8 +186,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) // Failure is removed - _, ok := node2Filter.subscribers.failedPeers[host1.ID()] - require.False(t, ok) + require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) }() @@ -207,8 +205,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { // TODO: find out how to eliminate this sleep time.Sleep(1 * time.Second) - _, ok = node2Filter.subscribers.failedPeers[host1.ID()] - require.True(t, ok) + require.True(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) time.Sleep(3 * time.Second) @@ -216,8 +213,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { require.NoError(t, err) time.Sleep(1 * time.Second) - _, ok = node2Filter.subscribers.failedPeers[host1.ID()] - require.False(t, ok) // Failed peer has been removed + require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) // Failed peer has been removed for subscriber := range node2Filter.subscribers.Items() { if subscriber.peer == node1.h.ID() {