chore: filter v2 tests unsubscribe (#855)

* Chore(filter v2) test updates (#811)

* test: Test incorrect protocol identifiers

* fix: return errors in FilterSubscribeOption

* test: Test incorrect push identifier added
- test incorrect subscribe identifier separated

* test: Test Ping failure after unsubscription

* test: Test PubSub with single content topic

* test: Simplify test PubSub with single content topic

* test: Test with single pubsub and multiple content topics

* test: Test with multiple PubSub and multiple contentTopic

* test: Test with multiple overlaping contentTopics
- test contentTopics limit

* test: refactor tests to fix concurrent run errors

* test: Test subscription refresh

* test: Test error handling for subscribe

* test: Test subscription to multiple full nodes

* update test to fix #804

* Update waku/v2/protocol/filter/filter_test.go

Combine log messages

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Delete commented - temporary code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

fmt.Sprintf instead of "+" suffix => more performance and beauty

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Adjust comment with code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Combine multiple related log entries into one.

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Align comment with the code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Use fmt.Sprintf() instead of "+" for more beauty and speed

Co-authored-by: richΛrd <info@richardramos.me>

* test: refactor tests with prepareData()

* test: Test incorrect protocol identifiers

* chore: rebase onto latest master

* test: Test incorrect push identifier added
- test incorrect subscribe identifier separated

* test: Test Ping failure after unsubscription

* test: Test PubSub with single content topic

* test: Simplify test PubSub with single content topic

* test: Test with single pubsub and multiple content topics

* test: Test with multiple PubSub and multiple contentTopic

* test: Test with multiple overlaping contentTopics
- test contentTopics limit

* test: refactor tests to fix concurrent run errors

* test: Test subscription refresh

* test: Test error handling for subscribe

* test: Test subscription to multiple full nodes

* update test to fix #804

* Update waku/v2/protocol/filter/filter_test.go

Combine log messages

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Delete commented - temporary code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

fmt.Sprintf instead of "+" suffix => more performance and beauty

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Adjust comment with code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Combine multiple related log entries into one.

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Align comment with the code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Use fmt.Sprintf() instead of "+" for more beauty and speed

Co-authored-by: richΛrd <info@richardramos.me>

* test: refactor tests with prepareData()

* test: Test incorrect protocol identifiers

* fix: return errors in FilterSubscribeOption

* test: Test incorrect push identifier added
- test incorrect subscribe identifier separated

* test: Test Ping failure after unsubscription

* test: Test PubSub with single content topic

* test: Simplify test PubSub with single content topic

* test: Test with single pubsub and multiple content topics

* test: Test with multiple PubSub and multiple contentTopic

* test: Test with multiple overlaping contentTopics
- test contentTopics limit

* test: refactor tests to fix concurrent run errors

* test: Test subscription refresh

* test: Test error handling for subscribe

* test: Test subscription to multiple full nodes

* update test to fix #804

* Update waku/v2/protocol/filter/filter_test.go

Combine log messages

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Delete commented - temporary code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

fmt.Sprintf instead of "+" suffix => more performance and beauty

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Adjust comment with code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Combine multiple related log entries into one.

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Align comment with the code

Co-authored-by: richΛrd <info@richardramos.me>

* Update waku/v2/protocol/filter/filter_test.go

Use fmt.Sprintf() instead of "+" for more beauty and speed

Co-authored-by: richΛrd <info@richardramos.me>

* test: refactor tests with prepareData()

* Fix error during rebase

* Sync filter tests with latest master

* Refactor context initialization for test

* test: Incorrect Subscribe Identifier refactored with custom subscribe

* test: refactor into multiple files

* test: Subscribe with multiple light nodes to one full node

* test: shared mode for full node creation
- test preview Subscribe fullNode to fullNode

* test: test Subscribe fullNode to fullNode

---------

Co-authored-by: Richard Ramos <info@richardramos.me>
Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>

* test: unsubscribe with single contentTopic

* test: extend test - unsubscribe with single contentTopic

* test: unsubscribe with multiple contentTopic

* test: unsubscribe with multiple pubSub/contentTopic

* test: refactor back to use waitForTimeout()

* test: unsubscribe error handling

---------

Co-authored-by: Richard Ramos <info@richardramos.me>
Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>
This commit is contained in:
Roman Zajic 2023-11-02 12:39:44 +08:00 committed by GitHub
parent 5dfbd98c74
commit 02f2800b04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 203 additions and 8 deletions

View File

@ -138,7 +138,7 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
}
}
s.Require().True(msgFound)
case <-time.After(5 * time.Second):
case <-time.After(1 * time.Second):
s.Require().Fail("Message timeout")
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
@ -187,7 +187,7 @@ func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.Subscr
if matchOneOfManyMsg(received, expected) {
found++
}
case <-time.After(2 * time.Second):
case <-time.After(1 * time.Second):
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
@ -248,11 +248,11 @@ func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, pee
return subDetails
}
func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) <-chan WakuFilterPushResult {
func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
topicsCount := len(s.contentFilter.ContentTopicsList())
topicsCount := len(sub.ContentFilter.ContentTopicsList())
if topicsCount == 1 {
_, err := s.lightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer))
s.Require().NoError(err)
@ -260,11 +260,10 @@ func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, p
sub.Remove(contentTopic)
}
s.contentFilter = sub.ContentFilter
return nil
}
}
return nil
return s.lightNode.Subscriptions()
}
func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload ...string) {
@ -288,8 +287,8 @@ func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) {
func prepareData(quantity int, topics, contentTopics, payloads bool) []WakuMsg {
var (
pubsubTopic = "/waku/2/go/filter/test"
contentTopic = "TopicA"
pubsubTopic = "/waku/2/go/filter/test" // Has to be the same with initial s.testTopic
contentTopic = "TopicA" // Has to be the same with initial s.testContentTopic
payload = "test_msg"
messages []WakuMsg
)
@ -532,6 +531,7 @@ func (s *FilterTestSuite) TestAutoShard() {
s.Require().NoError(err)
}, s.subDetails[0].C)
_, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.testTopic,
ContentTopics: protocol.NewContentTopicSet(newContentTopic),

View File

@ -0,0 +1,195 @@
package filter
import (
"context"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"time"
)
func (s *FilterTestSuite) TestUnsubscribeSingleContentTopic() {
var newContentTopic = "TopicB"
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
// Message is possible to receive for original contentTopic
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "test_msg")
}, s.subDetails[0].C)
// Message is possible to receive for new contentTopic
s.waitForMsg(func() {
s.publishMsg(s.testTopic, newContentTopic, "test_msg")
}, s.subDetails[0].C)
_ = s.unsubscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
// Message should not be received for new contentTopic as it was unsubscribed
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, newContentTopic, "test_msg")
}, s.subDetails[0].C)
// Message is still possible to receive for original contentTopic
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "test_msg2")
}, s.subDetails[0].C)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestUnsubscribeMultiContentTopic() {
var messages = prepareData(3, false, true, true)
// Subscribe with 3 content topics
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
// Unsubscribe with the last 2 content topics
for _, m := range messages[1:] {
_ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
}
// Messages should not be received for the last two contentTopics as it was unsubscribed
for _, m := range messages[1:] {
s.waitForTimeout(func() {
s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload)
}, s.subDetails[0].C)
}
// Message is still possible to receive for the first contentTopic
s.waitForMsg(func() {
s.publishMsg(messages[0].pubSubTopic, messages[0].contentTopic, messages[0].payload)
}, s.subDetails[0].C)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestUnsubscribeMultiPubSubMultiContentTopic() {
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second)
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
messages := prepareData(2, true, true, true)
// Subscribe
for _, m := range messages {
s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...)
_, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic))
s.Require().NoError(err)
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
// Unsubscribe
for _, m := range messages {
_ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
}
// No messages can be received with previous subscriptions
for i, m := range messages {
s.waitForTimeout(func() {
s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload)
}, s.subDetails[i].C)
}
}
func (s *FilterTestSuite) TestUnsubscribeErrorHandling() {
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second)
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
var messages, invalidMessages []WakuMsg
messages = prepareData(2, false, true, true)
// Prepare "invalid" data for unsubscribe
invalidMessages = append(invalidMessages,
WakuMsg{
pubSubTopic: "",
contentTopic: messages[0].contentTopic,
payload: "N/A",
},
WakuMsg{
pubSubTopic: messages[0].pubSubTopic,
contentTopic: "",
payload: "N/A",
},
WakuMsg{
pubSubTopic: "/waku/2/go/filter/not_subscribed",
contentTopic: "not_subscribed_topic",
payload: "N/A",
})
// Subscribe with valid topics
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
_, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic))
s.Require().NoError(err)
}
// All messages should be possible to receive for subscribed topics
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
// Unsubscribe with empty pubsub
contentFilter := protocol.ContentFilter{PubsubTopic: invalidMessages[0].pubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[0].contentTopic)}
_, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().Error(err)
// Unsubscribe with empty content topic
contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[1].pubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[1].contentTopic)}
_, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().Error(err)
// Unsubscribe with non-existent topics, expect no error to prevent attacker from topic guessing
contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[2].pubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[2].contentTopic)}
_, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
// All messages should be still possible to receive for subscribed topics
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}