diff --git a/waku/v2/protocol/filterv2/client.go b/waku/v2/protocol/filterv2/client.go index d737fcaf..51467b90 100644 --- a/waku/v2/protocol/filterv2/client.go +++ b/waku/v2/protocol/filterv2/client.go @@ -213,8 +213,8 @@ func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFi return nil } -// SubscriptionChannel is used to obtain an object from which you could receive messages received via filter protocol -func (wf *WakuFilterPush) SubscriptionChannel(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails { +// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol +func (wf *WakuFilterPush) FilterSubscription(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails { return wf.subscriptions.NewSubscription(peerID, topic, contentTopics) } diff --git a/waku/v2/protocol/filterv2/options.go b/waku/v2/protocol/filterv2/options.go index 504116d6..71784714 100644 --- a/waku/v2/protocol/filterv2/options.go +++ b/waku/v2/protocol/filterv2/options.go @@ -89,6 +89,12 @@ func UnsubscribeAll() FilterUnsubscribeOption { } } +func Peer(p peer.ID) FilterUnsubscribeOption { + return func(params *FilterUnsubscribeParameters) { + params.selectedPeer = p + } +} + func RequestID(requestId []byte) FilterUnsubscribeOption { return func(params *FilterUnsubscribeParameters) { params.requestId = requestId diff --git a/waku/v2/protocol/filterv2/options_test.go b/waku/v2/protocol/filterv2/options_test.go new file mode 100644 index 00000000..274260b1 --- /dev/null +++ b/waku/v2/protocol/filterv2/options_test.go @@ -0,0 +1,52 @@ +package filterv2 + +import ( + "context" + "crypto/rand" + "testing" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func TestFilterOption(t *testing.T) { + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + options := []FilterSubscribeOption{ + WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), + WithAutomaticPeerSelection(), + WithFastestPeerSelection(context.Background()), + } + + params := new(FilterSubscribeParameters) + params.host = host + params.log = utils.Logger() + + for _, opt := range options { + opt(params) + } + + require.Equal(t, host, params.host) + require.NotNil(t, params.selectedPeer) + + options2 := []FilterUnsubscribeOption{ + AutomaticRequestId(), + UnsubscribeAll(), + Peer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), + } + + params2 := new(FilterUnsubscribeParameters) + + for _, opt := range options2 { + opt(params2) + } + + require.NotNil(t, params2.selectedPeer) + require.True(t, params2.unsubscribeAll) + +} diff --git a/waku/v2/protocol/filterv2/subscribers_map.go b/waku/v2/protocol/filterv2/subscribers_map.go index bcd23f9a..eee9679d 100644 --- a/waku/v2/protocol/filterv2/subscribers_map.go +++ b/waku/v2/protocol/filterv2/subscribers_map.go @@ -104,14 +104,19 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop sub.removeFromInterestMap(peerID, pubsubTopic, c) } + pubsubTopicMap[pubsubTopic] = contentTopicsMap + // No more content topics available. Removing content topic completely if len(contentTopicsMap) == 0 { delete(pubsubTopicMap, pubsubTopic) } - pubsubTopicMap[pubsubTopic] = contentTopicsMap sub.items[peerID] = pubsubTopicMap + if len(sub.items[peerID]) == 0 { + delete(sub.items, peerID) + } + return nil } @@ -129,6 +134,7 @@ func (sub *SubscribersMap) deleteAll(peerID peer.ID) error { } delete(sub.items, peerID) + delete(sub.failedPeers, peerID) return nil } diff --git a/waku/v2/protocol/filterv2/subscribers_map_test.go b/waku/v2/protocol/filterv2/subscribers_map_test.go new file mode 100644 index 00000000..4fbd0851 --- /dev/null +++ b/waku/v2/protocol/filterv2/subscribers_map_test.go @@ -0,0 +1,127 @@ +package filterv2 + +import ( + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const TOPIC = "/test/topic" + +func createPeerId(t *testing.T) peer.ID { + peerId, err := test.RandPeerID() + assert.NoError(t, err) + return peerId +} + +func firstSubscriber(subs *SubscribersMap, pubsubTopic string, contentTopic string) peer.ID { + for sub := range subs.Items(pubsubTopic, contentTopic) { + return sub + } + return "" +} + +func TestAppend(t *testing.T) { + subs := NewSubscribersMap(5 * time.Second) + peerId := createPeerId(t) + + subs.Set(peerId, TOPIC, []string{"topic1"}) + + sub := firstSubscriber(subs, TOPIC, "topic1") + assert.NotNil(t, sub) + + // Adding to existing peer + subs.Set(peerId, TOPIC, []string{"topic2"}) + + sub = firstSubscriber(subs, TOPIC, "topic2") + assert.NotNil(t, sub) + + subs.Set(peerId, TOPIC+"2", []string{"topic1"}) + + sub = firstSubscriber(subs, TOPIC+"2", "topic1") + assert.NotNil(t, sub) + + sub = firstSubscriber(subs, TOPIC, "topic2") + assert.Nil(t, sub) +} + +func TestRemove(t *testing.T) { + subs := NewSubscribersMap(5 * time.Second) + peerId := createPeerId(t) + + subs.Set(peerId, TOPIC+"1", []string{"topic1", "topic2"}) + subs.Set(peerId, TOPIC+"2", []string{"topic1"}) + + subs.DeleteAll(peerId) + + sub := firstSubscriber(subs, TOPIC+"1", "topic1") + assert.Nil(t, sub) + + sub = firstSubscriber(subs, TOPIC+"1", "topic2") + assert.Nil(t, sub) + + sub = firstSubscriber(subs, TOPIC+"2", "topic1") + assert.Nil(t, sub) + + assert.False(t, subs.Has(peerId)) + + _, found := subs.Get(peerId) + assert.False(t, found) + + _, ok := subs.items[peerId] + assert.False(t, ok) +} + +func TestRemovePartial(t *testing.T) { + subs := NewSubscribersMap(5 * time.Second) + peerId := createPeerId(t) + + subs.Set(peerId, TOPIC, []string{"topic1", "topic2"}) + err := subs.Delete(peerId, TOPIC, []string{"topic1"}) + require.NoError(t, err) + + sub := firstSubscriber(subs, TOPIC, "topic2") + assert.NotNil(t, sub) +} + +func TestRemoveBogus(t *testing.T) { + subs := NewSubscribersMap(5 * time.Second) + peerId := createPeerId(t) + + subs.Set(peerId, TOPIC, []string{"topic1", "topic2"}) + err := subs.Delete(peerId, TOPIC, []string{"does not exist", "topic1"}) + require.NoError(t, err) + + sub := firstSubscriber(subs, TOPIC, "topic1") + assert.Nil(t, sub) + sub = firstSubscriber(subs, TOPIC, "does not exist") + assert.Nil(t, sub) + + err = subs.Delete(peerId, "DOES_NOT_EXIST", []string{"topic1"}) + require.Error(t, err) +} + +func TestSuccessFailure(t *testing.T) { + subs := NewSubscribersMap(5 * time.Second) + peerId := createPeerId(t) + + subs.Set(peerId, TOPIC, []string{"topic1", "topic2"}) + + subs.FlagAsFailure(peerId) + require.True(t, subs.IsFailedPeer(peerId)) + + subs.FlagAsFailure(peerId) + require.False(t, subs.Has(peerId)) + + subs.Set(peerId, TOPIC, []string{"topic1", "topic2"}) + + subs.FlagAsFailure(peerId) + require.True(t, subs.IsFailedPeer(peerId)) + + subs.FlagAsSuccess(peerId) + require.False(t, subs.IsFailedPeer(peerId)) +} diff --git a/waku/v2/protocol/filterv2/subscriptions_map.go b/waku/v2/protocol/filterv2/subscriptions_map.go index 89005a29..501971d8 100644 --- a/waku/v2/protocol/filterv2/subscriptions_map.go +++ b/waku/v2/protocol/filterv2/subscriptions_map.go @@ -59,11 +59,12 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, conte } details := &SubscriptionDetails{ - id: uuid.NewString(), - mapRef: sub, - peerID: peerID, - pubsubTopic: topic, - C: make(chan *protocol.Envelope), + id: uuid.NewString(), + mapRef: sub, + peerID: peerID, + pubsubTopic: topic, + C: make(chan *protocol.Envelope), + contentTopics: make(map[string]struct{}), } for _, ct := range contentTopics { @@ -89,7 +90,7 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { return nil } -func (s *SubscriptionDetails) Add(contentTopics []string) { +func (s *SubscriptionDetails) Add(contentTopics ...string) { s.Lock() defer s.Unlock() @@ -98,7 +99,7 @@ func (s *SubscriptionDetails) Add(contentTopics []string) { } } -func (s *SubscriptionDetails) Remove(contentTopics []string) { +func (s *SubscriptionDetails) Remove(contentTopics ...string) { s.Lock() defer s.Unlock() @@ -149,11 +150,6 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) if ok { iterateSubscriptionSet(subscriptions, envelope) } - - subscriptionsWithNoPeer, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()] - if ok { - iterateSubscriptionSet(subscriptionsWithNoPeer, envelope) - } } func iterateSubscriptionSet(subscriptions SubscriptionSet, envelope *protocol.Envelope) { diff --git a/waku/v2/protocol/filterv2/subscriptions_map_test.go b/waku/v2/protocol/filterv2/subscriptions_map_test.go new file mode 100644 index 00000000..a30460db --- /dev/null +++ b/waku/v2/protocol/filterv2/subscriptions_map_test.go @@ -0,0 +1,209 @@ +package filterv2 + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +func TestSubscriptionMapAppend(t *testing.T) { + fmap := NewSubscriptionMap() + peerId := createPeerId(t) + contentTopics := []string{"ct1", "ct2"} + + sub := fmap.NewSubscription(peerId, TOPIC, contentTopics) + _, found := sub.contentTopics["ct1"] + require.True(t, found) + _, found = sub.contentTopics["ct2"] + require.True(t, found) + require.False(t, sub.closed) + require.Equal(t, sub.peerID, peerId) + require.Equal(t, sub.pubsubTopic, TOPIC) + + sub.Add("ct3") + _, found = sub.contentTopics["ct3"] + require.True(t, found) + + sub.Remove("ct3") + _, found = sub.contentTopics["ct3"] + require.False(t, found) + + err := sub.Close() + require.NoError(t, err) + require.True(t, sub.closed) + + err = sub.Close() + require.NoError(t, err) +} + +func TestSubscriptionClear(t *testing.T) { + fmap := NewSubscriptionMap() + contentTopics := []string{"ct1", "ct2"} + + var subscriptions = []*SubscriptionDetails{ + fmap.NewSubscription(createPeerId(t), TOPIC+"1", contentTopics), + fmap.NewSubscription(createPeerId(t), TOPIC+"2", contentTopics), + fmap.NewSubscription(createPeerId(t), TOPIC+"3", contentTopics), + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + wg := sync.WaitGroup{} + wg.Add(len(subscriptions)) + for _, s := range subscriptions { + go func(s *SubscriptionDetails) { + defer wg.Done() + select { + case <-ctx.Done(): + t.Fail() + return + case <-s.C: + return + } + }(s) + } + + fmap.Clear() + + wg.Wait() + + require.True(t, subscriptions[0].closed) + require.True(t, subscriptions[1].closed) + require.True(t, subscriptions[2].closed) +} + +func TestSubscriptionsNotify(t *testing.T) { + fmap := NewSubscriptionMap() + p1 := createPeerId(t) + p2 := createPeerId(t) + var subscriptions = []*SubscriptionDetails{ + fmap.NewSubscription(p1, TOPIC+"1", []string{"ct1", "ct2"}), + fmap.NewSubscription(p2, TOPIC+"1", []string{"ct1"}), + fmap.NewSubscription(p1, TOPIC+"2", []string{"ct1", "ct2"}), + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + successChan := make(chan struct{}, 10) + wg := sync.WaitGroup{} + + successOnReceive := func(ctx context.Context, i int) { + defer wg.Done() + + if subscriptions[i].closed { + successChan <- struct{}{} + return + } + + select { + case <-ctx.Done(): + panic("should have failed1") + case c := <-subscriptions[i].C: + if c == nil { + panic("should have failed2") + } + successChan <- struct{}{} + return + } + } + + failOnReceive := func(ctx context.Context, i int) { + defer wg.Done() + + if subscriptions[i].closed { + successChan <- struct{}{} + return + } + + select { + case <-ctx.Done(): + successChan <- struct{}{} + return + case c := <-subscriptions[i].C: + if c != nil { + panic("should have failed") + } + successChan <- struct{}{} + return + } + } + + wg.Add(3) + go successOnReceive(ctx, 0) + go successOnReceive(ctx, 1) + go failOnReceive(ctx, 2) + time.Sleep(200 * time.Millisecond) + + envTopic1Ct1 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", 0), 0, TOPIC+"1") + wg.Add(1) + go func() { + defer wg.Done() + fmap.Notify(p1, envTopic1Ct1) + fmap.Notify(p2, envTopic1Ct1) + }() + + <-successChan + <-successChan + cancel() + wg.Wait() + <-successChan + + ////////////////////////////////////// + + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + + wg.Add(3) + go successOnReceive(ctx, 0) + go failOnReceive(ctx, 1) + go failOnReceive(ctx, 2) + time.Sleep(200 * time.Millisecond) + + envTopic1Ct2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct2", 0), 0, TOPIC+"1") + wg.Add(1) + go func() { + defer wg.Done() + fmap.Notify(p1, envTopic1Ct2) + fmap.Notify(p2, envTopic1Ct2) + }() + + <-successChan + cancel() + wg.Wait() + <-successChan + <-successChan + + ////////////////////////////////////// + + // Testing after closing the subscription + + subscriptions[0].Close() + time.Sleep(200 * time.Millisecond) + + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + + wg.Add(3) + go failOnReceive(ctx, 0) + go successOnReceive(ctx, 1) + go failOnReceive(ctx, 2) + time.Sleep(200 * time.Millisecond) + + envTopic1Ct1_2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", 1), 1, TOPIC+"1") + + wg.Add(1) + go func() { + defer wg.Done() + fmap.Notify(p1, envTopic1Ct1_2) + fmap.Notify(p2, envTopic1Ct1_2) + }() + + <-successChan // One of these successes is for closing the subscription + <-successChan + cancel() + wg.Wait() + <-successChan +}