From fc3b2f76d5755cf4088f3b3f4a9ffc657551958a Mon Sep 17 00:00:00 2001 From: Roman Zajic Date: Sat, 28 Oct 2023 11:40:22 +0800 Subject: [PATCH] chore(filter v2): test updates (#811) --------- Co-authored-by: Richard Ramos Co-authored-by: Prem Chaitanya Prathi --- waku/v2/protocol/filter/filter_ping_test.go | 34 ++ .../filter/filter_proto_ident_test.go | 303 ++++++++++++++ .../protocol/filter/filter_subscribe_test.go | 376 ++++++++++++++++++ waku/v2/protocol/filter/filter_test.go | 297 +++++++------- 4 files changed, 873 insertions(+), 137 deletions(-) create mode 100644 waku/v2/protocol/filter/filter_ping_test.go create mode 100644 waku/v2/protocol/filter/filter_proto_ident_test.go create mode 100644 waku/v2/protocol/filter/filter_subscribe_test.go diff --git a/waku/v2/protocol/filter/filter_ping_test.go b/waku/v2/protocol/filter/filter_ping_test.go new file mode 100644 index 00000000..07fb8d83 --- /dev/null +++ b/waku/v2/protocol/filter/filter_ping_test.go @@ -0,0 +1,34 @@ +package filter + +import ( + "context" + "net/http" +) + +func (s *FilterTestSuite) TestSubscriptionPing() { + err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Require().Error(err) + filterErr, ok := err.(*FilterError) + s.Require().True(ok) + s.Require().Equal(filterErr.Code, http.StatusNotFound) + + contentTopic := "abc" + s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID()) + + err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Require().NoError(err) +} + +func (s *FilterTestSuite) TestUnSubscriptionPing() { + + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Require().NoError(err) + + _, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Require().Error(err) +} diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go new file mode 100644 index 00000000..d41c70f8 --- /dev/null +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -0,0 +1,303 @@ +package filter + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "github.com/libp2p/go-msgio/pbio" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" + "golang.org/x/exp/slices" + "math" + "net/http" + "strings" + "sync" + "time" + + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +func (s *FilterTestSuite) TestCreateSubscription() { + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) +} + +func (s *FilterTestSuite) TestModifySubscription() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + // Subscribe to another content_topic + newContentTopic := "Topic_modified" + s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) +} + +func (s *FilterTestSuite) TestMultipleMessages() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) +} + +func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters, + reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error { + + const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd") + + conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_Incorrect1) + if err != nil { + wf.metrics.RecordError(dialFailure) + return err + } + defer conn.Close() + + writer := pbio.NewDelimitedWriter(conn) + reader := pbio.NewDelimitedReader(conn, math.MaxInt32) + + request := &pb.FilterSubscribeRequest{ + RequestId: hex.EncodeToString(params.requestID), + FilterSubscribeType: reqType, + PubsubTopic: &contentFilter.PubsubTopic, + ContentTopics: contentFilter.ContentTopicsList(), + } + + wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) + err = writer.WriteMsg(request) + if err != nil { + wf.metrics.RecordError(writeRequestFailure) + wf.log.Error("sending FilterSubscribeRequest", zap.Error(err)) + return err + } + + filterSubscribeResponse := &pb.FilterSubscribeResponse{} + err = reader.ReadMsg(filterSubscribeResponse) + if err != nil { + wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err)) + wf.metrics.RecordError(decodeRPCFailure) + return err + } + if filterSubscribeResponse.RequestId != request.RequestId { + wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) + wf.metrics.RecordError(requestIDMismatch) + err := NewFilterError(300, "request_id_mismatch") + return &err + } + + if filterSubscribeResponse.StatusCode != http.StatusOK { + wf.metrics.RecordError(errorResponse) + err := NewFilterError(int(filterSubscribeResponse.StatusCode), filterSubscribeResponse.StatusDesc) + return &err + } + + return nil +} + +func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) { + wf.RLock() + defer wf.RUnlock() + if err := wf.ErrOnNotRunning(); err != nil { + return nil, err + } + + if len(contentFilter.ContentTopics) == 0 { + return nil, errors.New("at least one content topic is required") + } + if slices.Contains[string](contentFilter.ContentTopicsList(), "") { + return nil, errors.New("one or more content topics specified is empty") + } + + if len(contentFilter.ContentTopics) > MaxContentTopicsPerRequest { + return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest) + } + + params := new(FilterSubscribeParameters) + params.log = wf.log + params.host = wf.h + params.pm = wf.pm + + optList := DefaultSubscriptionOptions() + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, err + } + } + + pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) + + if err != nil { + return nil, err + } + failedContentTopics := []string{} + subscriptions := make([]*subscription.SubscriptionDetails, 0) + for pubSubTopic, cTopics := range pubSubTopicMap { + var selectedPeer peer.ID + //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic + if params.pm != nil && params.selectedPeer == "" { + selectedPeer, err = wf.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: FilterSubscribeID_v20beta1, + PubsubTopic: pubSubTopic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + } else { + selectedPeer = params.selectedPeer + } + + if selectedPeer == "" { + wf.metrics.RecordError(peerNotFoundFailure) + wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + continue + } + + var cFilter protocol.ContentFilter + cFilter.PubsubTopic = pubSubTopic + cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...) + + err := wf.incorrectSubscribeRequest(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) + if err != nil { + wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + continue + } + subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter)) + } + + if len(failedContentTopics) > 0 { + return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ",")) + } else { + return subscriptions, nil + } +} + +func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() { + log := utils.Logger() + s.log = log + s.wg = &sync.WaitGroup{} + + // Create test context + s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + + s.testTopic = "/waku/2/go/filter/test" + s.testContentTopic = "TopicA" + + s.lightNode = s.makeWakuFilterLightNode(true, true) + + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + + //Connect nodes + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + + // Subscribe with incorrect SubscribeID + s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)} + _, err := s.lightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().Error(err) + + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) +} + +func (wf *WakuFilterLightNode) startWithIncorrectPushProto() error { + const FilterPushID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-push/abcd") + + wf.subscriptions = subscription.NewSubscriptionMap(wf.log) + wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_Incorrect1)), wf.onRequest(wf.Context())) + + wf.log.Info("filter-push incorrect protocol started") + return nil +} + +func (s *FilterTestSuite) TestIncorrectPushIdentifier() { + log := utils.Logger() + s.log = log + s.wg = &sync.WaitGroup{} + + // Create test context + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + s.ctx = ctx + s.ctxCancel = cancel + + s.testTopic = "/waku/2/go/filter/test" + s.testContentTopic = "TopicA" + + s.lightNode = s.makeWakuFilterLightNode(false, true) + + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + + // Re-start light node with unsupported prefix for match func + s.lightNode.Stop() + err := s.lightNode.CommonService.Start(s.ctx, s.lightNode.startWithIncorrectPushProto) + s.Require().NoError(err) + + // Connect nodes + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + + // Subscribe + s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)} + s.subDetails, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + time.Sleep(1 * time.Second) + + // Send message + _, err = s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) + s.Require().NoError(err) + + // Message should never arrive -> exit after timeout + select { + case msg := <-s.subDetails[0].C: + s.log.Info("Light node received a msg") + s.Require().Nil(msg) + case <-time.After(1 * time.Second): + s.Require().True(true) + } + + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) +} diff --git a/waku/v2/protocol/filter/filter_subscribe_test.go b/waku/v2/protocol/filter/filter_subscribe_test.go new file mode 100644 index 00000000..6f932251 --- /dev/null +++ b/waku/v2/protocol/filter/filter_subscribe_test.go @@ -0,0 +1,376 @@ +package filter + +import ( + "context" + "encoding/hex" + "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +func (s *FilterTestSuite) TestWakuFilter() { + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + // Should be received + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "first") + }, s.subDetails[0].C) + + // Wrong content topic + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, "TopicB", "second") + }, s.subDetails[0].C) + + _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + // Should not receive after unsubscribe + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "third") + }, s.subDetails[0].C) + + // Two new subscriptions with same [peer, contentFilter] + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + // Assert that we have 2 subscriptions now + s.Require().Equal(len(s.lightNode.Subscriptions()), 2) + + // Should be received on both subscriptions + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "fourth") + }, s.subDetails[0].C) + + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "fifth") + }, secondSub[0].C) + + s.waitForMsg(nil, s.subDetails[0].C) + s.waitForMsg(nil, secondSub[0].C) + + // Unsubscribe from second sub only + _, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0]) + s.Require().NoError(err) + + // Should still receive + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "sixth") + }, s.subDetails[0].C) + + // Unsubscribe from first sub only + _, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0]) + s.Require().NoError(err) + + s.Require().Equal(len(s.lightNode.Subscriptions()), 0) + + // Should not receive after unsubscribe + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "seventh") + }, s.subDetails[0].C) +} + +func (s *FilterTestSuite) TestPubSubSingleContentTopic() { + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + // Message should be received + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "test_msg") + }, s.subDetails[0].C) + + _, err := s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) TestPubSubMultiContentTopic() { + + // Create test context + s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds + + messages := prepareData(3, false, true, false) + + // Subscribe + for _, m := range messages { + s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + } + + // All messages should be received + s.waitForMessages(func() { + s.publishMessages(messages) + }, s.subDetails, messages) + + _, err := s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) TestMultiPubSubMultiContentTopic() { + + // Create test context + s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds + + s.lightNode = s.makeWakuFilterLightNode(true, true) + + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true) + + // Connect nodes + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + + messages := prepareData(2, true, true, false) + + // Subscribe + for _, m := range messages { + s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...) + s.log.Info("Subscribing ", zap.String("PubSubTopic", m.pubSubTopic)) + _, err := s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic)) + s.Require().NoError(err) + } + + // Debug to see subscriptions in light node + for _, sub := range s.subDetails { + s.log.Info("Light Node subscription ", zap.String("PubSubTopic", sub.ContentFilter.PubsubTopic), zap.String("ContentTopic", sub.ContentFilter.ContentTopicsList()[0])) + } + + // All messages should be received + s.waitForMessages(func() { + s.publishMessages(messages) + }, s.subDetails, messages) + + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) TestPubSubMultiOverlapContentTopic() { + + // Create test context + s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds + + messages := prepareData(10, false, true, true) + + // Subscribe + for _, m := range messages { + s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + } + + // All messages should be received + s.waitForMessages(func() { + s.publishMessages(messages) + }, s.subDetails, messages) + + _, err := s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) TestSubscriptionRefresh() { + + messages := prepareData(2, false, false, true) + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + // Repeat the same subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + // Both messages should be received + s.waitForMessages(func() { + s.publishMessages(messages) + }, s.subDetails, messages) + + _, err := s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) TestContentTopicsLimit() { + var maxContentTopics = 30 + + // Create test context + s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds + + // Detect existing content topics from previous test + if len(s.contentFilter.PubsubTopic) > 0 { + existingTopics := len(s.contentFilter.ContentTopicsList()) + if existingTopics > 0 { + maxContentTopics = maxContentTopics - existingTopics + } + } + + messages := prepareData(maxContentTopics+1, false, true, true) + + // Subscribe + for _, m := range messages[:len(messages)-1] { + s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + } + + // All messages within limit should get received + s.waitForMessages(func() { + s.publishMessages(messages[:len(messages)-1]) + }, s.subDetails, messages[:len(messages)-1]) + + // Adding over the limit contentTopic should fail + for _, sub := range s.subDetails { + if sub.ContentFilter.PubsubTopic == messages[len(messages)-1].pubSubTopic { + sub.Add(messages[len(messages)-1].contentTopic) + _, err := s.lightNode.Subscribe(s.ctx, sub.ContentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().Error(err) + } + } + + // Unsubscribe for cleanup + for _, m := range messages { + _ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + } + + _, err := s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) TestSubscribeErrorHandling() { + var messages []WakuMsg + + // Prepare data + messages = append(messages, WakuMsg{ + pubSubTopic: "", + contentTopic: s.testContentTopic, + payload: "N/A", + }) + + messages = append(messages, WakuMsg{ + pubSubTopic: s.testTopic, + contentTopic: "", + payload: "N/A", + }) + + // Subscribe with empty pubsub + s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[0].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].contentTopic)} + _, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().Error(err) + + // Subscribe with empty content topic + s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[1].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].contentTopic)} + _, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().Error(err) + +} + +func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() { + log := utils.Logger() + s.log = log + s.wg = &sync.WaitGroup{} + + // Create test context + s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + + fullNodeIDHex := make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID())))) + _ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID())) + + s.log.Info("Already subscribed to", zap.String("fullNode", string(fullNodeIDHex))) + + // This will overwrite values with the second node info + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + + // Connect to second full and relay node + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + + fullNodeIDHex = make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID())))) + _ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID())) + + s.log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex))) + + // Subscribe to the second full node + s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)} + _, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) +} + +func (s *FilterTestSuite) TestSubscribeMultipleLightNodes() { + + // Create test context + s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + + lightNode2 := s.makeWakuFilterLightNode(true, true) + + // Connect node2 + lightNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + + messages := prepareData(2, true, true, true) + + // Subscribe separately: light node 1 -> full node + contentFilter := protocol.ContentFilter{PubsubTopic: messages[0].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].contentTopic)} + _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + // Subscribe separately: light node 2 -> full node + contentFilter2 := protocol.ContentFilter{PubsubTopic: messages[1].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].contentTopic)} + _, err = lightNode2.Subscribe(s.ctx, contentFilter2, WithPeer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + // Unsubscribe + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + + _, err = lightNode2.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() { + + var ( + testTopic = "/waku/2/go/filter/test2" + testContentTopic = "TopicB" + ) + + // Create test context + s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) + + _, fullNode2 := s.makeWakuFilterFullNode(testTopic, false, false) + + // Connect nodes + fullNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + + // Get stream + stream, err := fullNode2.h.NewStream(s.ctx, s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + + // Prepare subscribe request + subscribeRequest := &pb.FilterSubscribeRequest{ + FilterSubscribeType: pb.FilterSubscribeRequest_SUBSCRIBE, + PubsubTopic: &testTopic, + ContentTopics: []string{testContentTopic}, + } + + // Subscribe full node 2 -> full node 1 + fullNode2.subscribe(s.ctx, stream, subscribeRequest) + + // Check the pubsub topic related to the first node is stored within the second node + pubsubTopics, hasTopics := fullNode2.subscriptions.Get(s.fullNodeHost.ID()) + s.Require().True(hasTopics) + + // Check the pubsub topic is what we have set + contentTopics, hasTestPubsubTopic := pubsubTopics[testTopic] + s.Require().True(hasTestPubsubTopic) + + // Check the content topic is what we have set + _, hasTestContentTopic := contentTopics[testContentTopic] + s.Require().True(hasTestContentTopic) + +} diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index f1b9b8d4..0bdada3e 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -4,7 +4,8 @@ import ( "context" "crypto/rand" "errors" - "net/http" + "fmt" + "strconv" "sync" "testing" "time" @@ -46,7 +47,13 @@ type FilterTestSuite struct { log *zap.Logger } -func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) { +type WakuMsg struct { + pubSubTopic string + contentTopic string + payload string +} + +func (s *FilterTestSuite) makeWakuRelay(topic string, shared bool) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) { broadcaster := relay.NewBroadcaster(10) s.Require().NoError(broadcaster.Start(context.Background())) @@ -59,7 +66,11 @@ func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay. relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log) relay.SetHost(host) - s.fullNodeHost = host + + if shared { + s.fullNodeHost = host + } + err = relay.Start(context.Background()) s.Require().NoError(err) @@ -91,13 +102,23 @@ func (s *FilterTestSuite) makeWakuFilterLightNode(start bool, withBroadcaster bo return filterPush } -func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay, *WakuFilterFullNode) { - node, relaySub, host, broadcaster := s.makeWakuRelay(topic) - s.relaySub = relaySub +func (s *FilterTestSuite) makeWakuFilterFullNode(topic string, withRegisterAll bool, shared bool) (*relay.WakuRelay, *WakuFilterFullNode) { + var sub *relay.Subscription + node, relaySub, host, broadcaster := s.makeWakuRelay(topic, shared) + + if shared { + s.relaySub = relaySub + } node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log) node2Filter.SetHost(host) - sub := broadcaster.Register(protocol.NewContentFilter(topic)) + + if withRegisterAll { + sub = broadcaster.RegisterForAll() + } else { + sub = broadcaster.Register(protocol.NewContentFilter(topic)) + } + err := node2Filter.Start(s.ctx, sub) s.Require().NoError(err) @@ -106,11 +127,17 @@ func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) { s.wg.Add(1) + var msgFound = false go func() { defer s.wg.Done() select { case env := <-ch: - s.Require().Equal(s.contentFilter.ContentTopicsList()[0], env.Message().GetContentTopic()) + for _, topic := range s.contentFilter.ContentTopicsList() { + if topic == env.Message().GetContentTopic() { + msgFound = true + } + } + s.Require().True(msgFound) case <-time.After(5 * time.Second): s.Require().Fail("Message timeout") case <-s.ctx.Done(): @@ -125,6 +152,58 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) { s.wg.Wait() } +func matchOneOfManyMsg(one WakuMsg, many []WakuMsg) bool { + for _, m := range many { + if m.pubSubTopic == one.pubSubTopic && + m.contentTopic == one.contentTopic && + m.payload == one.payload { + return true + } + } + + return false +} + +func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.SubscriptionDetails, expected []WakuMsg) { + s.wg.Add(1) + msgCount := len(expected) + found := 0 + s.log.Info("Expected messages ", zap.String("count", strconv.Itoa(msgCount))) + s.log.Info("Existing subscriptions ", zap.String("count", strconv.Itoa(len(subs)))) + + go func() { + defer s.wg.Done() + for _, sub := range subs { + s.log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic)) + for i := 0; i < msgCount; i++ { + select { + case env := <-sub.C: + received := WakuMsg{ + pubSubTopic: env.PubsubTopic(), + contentTopic: env.Message().GetContentTopic(), + payload: string(env.Message().GetPayload()), + } + s.log.Info("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload)) + if matchOneOfManyMsg(received, expected) { + found++ + } + case <-time.After(2 * time.Second): + + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") + } + } + } + }() + + if fn != nil { + fn() + } + + s.wg.Wait() + s.Require().True(msgCount == found) +} + func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) { s.wg.Add(1) go func() { @@ -147,17 +226,47 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) } func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { + + for _, sub := range s.subDetails { + if sub.ContentFilter.PubsubTopic == pubsubTopic { + sub.Add(contentTopic) + s.contentFilter = sub.ContentFilter + subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) + s.Require().NoError(err) + return subDetails + } + } + s.contentFilter = protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)} subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) s.Require().NoError(err) // Sleep to make sure the filter is subscribed - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) return subDetails } +func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) <-chan WakuFilterPushResult { + + for _, sub := range s.subDetails { + if sub.ContentFilter.PubsubTopic == pubsubTopic { + topicsCount := len(s.contentFilter.ContentTopicsList()) + if topicsCount == 1 { + _, err := s.lightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer)) + s.Require().NoError(err) + } else { + sub.Remove(contentTopic) + } + s.contentFilter = sub.ContentFilter + return nil + } + } + + return nil +} + func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload ...string) { var payload string if len(optionalPayload) > 0 { @@ -170,6 +279,46 @@ func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload s.Require().NoError(err) } +func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) { + for _, m := range msgs { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), m.pubSubTopic) + s.Require().NoError(err) + } +} + +func prepareData(quantity int, topics, contentTopics, payloads bool) []WakuMsg { + var ( + pubsubTopic = "/waku/2/go/filter/test" + contentTopic = "TopicA" + payload = "test_msg" + messages []WakuMsg + ) + + for i := 0; i < quantity; i++ { + msg := WakuMsg{ + pubSubTopic: pubsubTopic, + contentTopic: contentTopic, + payload: payload, + } + + if topics { + msg.pubSubTopic = fmt.Sprintf("%s%02d", pubsubTopic, i) + } + + if contentTopics { + msg.contentTopic = fmt.Sprintf("%s%02d", contentTopic, i) + } + + if payloads { + msg.payload = fmt.Sprintf("%s%02d", payload, i) + } + + messages = append(messages, msg) + } + + return messages +} + func (s *FilterTestSuite) SetupTest() { log := utils.Logger() //.Named("filterv2-test") s.log = log @@ -189,7 +338,7 @@ func (s *FilterTestSuite) SetupTest() { //TODO: Add tests to verify broadcaster. - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic) + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) // Connect nodes s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) @@ -206,82 +355,6 @@ func (s *FilterTestSuite) TearDownTest() { s.ctxCancel() } -func (s *FilterTestSuite) TestWakuFilter() { - // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - - // Should be received - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "first") - }, s.subDetails[0].C) - - // Wrong content topic - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, "TopicB", "second") - }, s.subDetails[0].C) - - _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) - s.Require().NoError(err) - - // Should not receive after unsubscribe - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "third") - }, s.subDetails[0].C) - - // Two new subscriptions with same [peer, contentFilter] - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - - // Assert that we have 2 subscriptions now - s.Require().Equal(len(s.lightNode.Subscriptions()), 2) - - // Should be received on both subscriptions - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "fourth") - }, s.subDetails[0].C) - - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "fifth") - }, secondSub[0].C) - - s.waitForMsg(nil, s.subDetails[0].C) - s.waitForMsg(nil, secondSub[0].C) - - // Unsubscribe from second sub only - _, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0]) - s.Require().NoError(err) - - // Should still receive - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "sixth") - }, s.subDetails[0].C) - - // Unsubscribe from first sub only - _, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0]) - s.Require().NoError(err) - - s.Require().Equal(len(s.lightNode.Subscriptions()), 0) - - // Should not receive after unsubscribe - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "seventh") - }, s.subDetails[0].C) -} - -func (s *FilterTestSuite) TestSubscriptionPing() { - err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) - s.Require().Error(err) - filterErr, ok := err.(*FilterError) - s.Require().True(ok) - s.Require().Equal(filterErr.Code, http.StatusNotFound) - - contentTopic := "abc" - s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID()) - - err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) - s.Require().NoError(err) -} - func (s *FilterTestSuite) TestPeerFailure() { broadcaster2 := relay.NewBroadcaster(10) s.Require().NoError(broadcaster2.Start(context.Background())) @@ -325,56 +398,6 @@ func (s *FilterTestSuite) TestPeerFailure() { s.Require().False(s.fullNode.subscriptions.Has(s.lightNodeHost.ID())) // Failed peer has been removed } -func (s *FilterTestSuite) TestCreateSubscription() { - // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) - s.Require().NoError(err) - - }, s.subDetails[0].C) -} - -func (s *FilterTestSuite) TestModifySubscription() { - - // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - - s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) - s.Require().NoError(err) - - }, s.subDetails[0].C) - - // Subscribe to another content_topic - newContentTopic := "Topic_modified" - s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID()) - - s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) - s.Require().NoError(err) - - }, s.subDetails[0].C) -} - -func (s *FilterTestSuite) TestMultipleMessages() { - - // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - - s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic) - s.Require().NoError(err) - - }, s.subDetails[0].C) - - s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) - s.Require().NoError(err) - - }, s.subDetails[0].C) -} - func (s *FilterTestSuite) TestRunningGuard() { s.lightNode.Stop() @@ -463,7 +486,7 @@ func (s *FilterTestSuite) TestAutoShard() { s.testTopic = pubSubTopic.String() s.lightNode = s.makeWakuFilterLightNode(true, false) - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String()) + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String(), false, true) s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)