diff --git a/waku/v2/protocol/filter/filter_subscribers.go b/waku/v2/protocol/filter/filter_subscribers.go index 4d6c4f67..49c1d306 100644 --- a/waku/v2/protocol/filter/filter_subscribers.go +++ b/waku/v2/protocol/filter/filter_subscribers.go @@ -14,6 +14,15 @@ type Subscriber struct { filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN? } +func (sub Subscriber) HasContentTopic(topic string) bool { + for _, filter := range sub.filter.ContentFilters { + if filter.ContentTopic == topic { + return true + } + } + return false +} + type Subscribers struct { sync.RWMutex subscribers []Subscriber @@ -36,25 +45,16 @@ func (sub *Subscribers) Append(s Subscriber) int { return len(sub.subscribers) } -func (subs *Subscribers) SubscriberHasContentTopic(sub Subscriber, topic string) bool { - subs.RLock() - defer subs.RUnlock() - for _, filter := range sub.filter.ContentFilters { - if filter.ContentTopic == topic { - return true - } - } - return false -} - -func (sub *Subscribers) Items() <-chan Subscriber { +func (sub *Subscribers) Items(topic *string) <-chan Subscriber { c := make(chan Subscriber) f := func() { sub.RLock() defer sub.RUnlock() - for _, value := range sub.subscribers { - c <- value + for _, s := range sub.subscribers { + if topic == nil || s.HasContentTopic(*topic) { + c <- s + } } close(c) } diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index ffd5b955..b7f2fbc1 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -189,7 +189,7 @@ func (wf *WakuFilter) FilterListener() { g := new(errgroup.Group) // Each subscriber is a light node that earlier on invoked // a FilterRequest on this node - for subscriber := range wf.subscribers.Items() { + for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) { logger := logger.With(logging.HostID("subscriber", subscriber.peer)) subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { @@ -199,18 +199,15 @@ func (wf *WakuFilter) FilterListener() { continue } - if wf.subscribers.SubscriberHasContentTopic(subscriber, msg.ContentTopic) { - logger.Info("found matching content topic", zap.String("contentTopic", msg.ContentTopic)) - // Do a message push to light node - logger.Info("pushing message to light node") - g.Go(func() (err error) { - err = wf.pushMessage(subscriber, msg) - if err != nil { - logger.Error("pushing message", zap.Error(err)) - } - return err - }) - } + // Do a message push to light node + logger.Info("pushing message to light node", zap.String("contentTopic", msg.ContentTopic)) + g.Go(func() (err error) { + err = wf.pushMessage(subscriber, msg) + if err != nil { + logger.Error("pushing message", zap.Error(err)) + } + return err + }) } return g.Wait() diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 3b0dcfd7..aca14ef9 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -215,7 +215,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(1 * time.Second) require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) // Failed peer has been removed - for subscriber := range node2Filter.subscribers.Items() { + for subscriber := range node2Filter.subscribers.Items(nil) { if subscriber.peer == node1.h.ID() { require.Fail(t, "Subscriber should not exist") }