chore(filter v2): test updates (#811)

---------

Co-authored-by: Richard Ramos <info@richardramos.me>
Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>
This commit is contained in:
Roman Zajic 2023-10-28 11:40:22 +08:00 committed by Richard Ramos
parent 0868f5d4dd
commit fc3b2f76d5
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
4 changed files with 873 additions and 137 deletions

View File

@ -0,0 +1,34 @@
package filter
import (
"context"
"net/http"
)
func (s *FilterTestSuite) TestSubscriptionPing() {
err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().Error(err)
filterErr, ok := err.(*FilterError)
s.Require().True(ok)
s.Require().Equal(filterErr.Code, http.StatusNotFound)
contentTopic := "abc"
s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID())
err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestUnSubscriptionPing() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().NoError(err)
_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().Error(err)
}

View File

@ -0,0 +1,303 @@
package filter
import (
"context"
"encoding/hex"
"errors"
"fmt"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"golang.org/x/exp/slices"
"math"
"net/http"
"strings"
"sync"
"time"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
func (s *FilterTestSuite) TestCreateSubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
}
func (s *FilterTestSuite) TestModifySubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
// Subscribe to another content_topic
newContentTopic := "Topic_modified"
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
}
func (s *FilterTestSuite) TestMultipleMessages() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
}
func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd")
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_Incorrect1)
if err != nil {
wf.metrics.RecordError(dialFailure)
return err
}
defer conn.Close()
writer := pbio.NewDelimitedWriter(conn)
reader := pbio.NewDelimitedReader(conn, math.MaxInt32)
request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
FilterSubscribeType: reqType,
PubsubTopic: &contentFilter.PubsubTopic,
ContentTopics: contentFilter.ContentTopicsList(),
}
wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
err = writer.WriteMsg(request)
if err != nil {
wf.metrics.RecordError(writeRequestFailure)
wf.log.Error("sending FilterSubscribeRequest", zap.Error(err))
return err
}
filterSubscribeResponse := &pb.FilterSubscribeResponse{}
err = reader.ReadMsg(filterSubscribeResponse)
if err != nil {
wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
return err
}
if filterSubscribeResponse.RequestId != request.RequestId {
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
wf.metrics.RecordError(requestIDMismatch)
err := NewFilterError(300, "request_id_mismatch")
return &err
}
if filterSubscribeResponse.StatusCode != http.StatusOK {
wf.metrics.RecordError(errorResponse)
err := NewFilterError(int(filterSubscribeResponse.StatusCode), filterSubscribeResponse.StatusDesc)
return &err
}
return nil
}
func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return nil, err
}
if len(contentFilter.ContentTopics) == 0 {
return nil, errors.New("at least one content topic is required")
}
if slices.Contains[string](contentFilter.ContentTopicsList(), "") {
return nil, errors.New("one or more content topics specified is empty")
}
if len(contentFilter.ContentTopics) > MaxContentTopicsPerRequest {
return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
}
params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h
params.pm = wf.pm
optList := DefaultSubscriptionOptions()
optList = append(optList, opts...)
for _, opt := range optList {
err := opt(params)
if err != nil {
return nil, err
}
}
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return nil, err
}
failedContentTopics := []string{}
subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var selectedPeer peer.ID
//TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic
if params.pm != nil && params.selectedPeer == "" {
selectedPeer, err = wf.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
PubsubTopic: pubSubTopic,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
} else {
selectedPeer = params.selectedPeer
}
if selectedPeer == "" {
wf.metrics.RecordError(peerNotFoundFailure)
wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
continue
}
var cFilter protocol.ContentFilter
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)
err := wf.incorrectSubscribeRequest(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
continue
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter))
}
if len(failedContentTopics) > 0 {
return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
} else {
return subscriptions, nil
}
}
func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() {
log := utils.Logger()
s.log = log
s.wg = &sync.WaitGroup{}
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.testTopic = "/waku/2/go/filter/test"
s.testContentTopic = "TopicA"
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
//Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
// Subscribe with incorrect SubscribeID
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
_, err := s.lightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().Error(err)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (wf *WakuFilterLightNode) startWithIncorrectPushProto() error {
const FilterPushID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-push/abcd")
wf.subscriptions = subscription.NewSubscriptionMap(wf.log)
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_Incorrect1)), wf.onRequest(wf.Context()))
wf.log.Info("filter-push incorrect protocol started")
return nil
}
func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
log := utils.Logger()
s.log = log
s.wg = &sync.WaitGroup{}
// Create test context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel
s.testTopic = "/waku/2/go/filter/test"
s.testContentTopic = "TopicA"
s.lightNode = s.makeWakuFilterLightNode(false, true)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
// Re-start light node with unsupported prefix for match func
s.lightNode.Stop()
err := s.lightNode.CommonService.Start(s.ctx, s.lightNode.startWithIncorrectPushProto)
s.Require().NoError(err)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
// Subscribe
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
s.subDetails, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
time.Sleep(1 * time.Second)
// Send message
_, err = s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
s.Require().NoError(err)
// Message should never arrive -> exit after timeout
select {
case msg := <-s.subDetails[0].C:
s.log.Info("Light node received a msg")
s.Require().Nil(msg)
case <-time.After(1 * time.Second):
s.Require().True(true)
}
_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}

View File

@ -0,0 +1,376 @@
package filter
import (
"context"
"encoding/hex"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
func (s *FilterTestSuite) TestWakuFilter() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Should be received
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "first")
}, s.subDetails[0].C)
// Wrong content topic
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, "TopicB", "second")
}, s.subDetails[0].C)
_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "third")
}, s.subDetails[0].C)
// Two new subscriptions with same [peer, contentFilter]
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Assert that we have 2 subscriptions now
s.Require().Equal(len(s.lightNode.Subscriptions()), 2)
// Should be received on both subscriptions
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "fourth")
}, s.subDetails[0].C)
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "fifth")
}, secondSub[0].C)
s.waitForMsg(nil, s.subDetails[0].C)
s.waitForMsg(nil, secondSub[0].C)
// Unsubscribe from second sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0])
s.Require().NoError(err)
// Should still receive
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "sixth")
}, s.subDetails[0].C)
// Unsubscribe from first sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0])
s.Require().NoError(err)
s.Require().Equal(len(s.lightNode.Subscriptions()), 0)
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "seventh")
}, s.subDetails[0].C)
}
func (s *FilterTestSuite) TestPubSubSingleContentTopic() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Message should be received
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "test_msg")
}, s.subDetails[0].C)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestPubSubMultiContentTopic() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds
messages := prepareData(3, false, true, false)
// Subscribe
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestMultiPubSubMultiContentTopic() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
messages := prepareData(2, true, true, false)
// Subscribe
for _, m := range messages {
s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...)
s.log.Info("Subscribing ", zap.String("PubSubTopic", m.pubSubTopic))
_, err := s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic))
s.Require().NoError(err)
}
// Debug to see subscriptions in light node
for _, sub := range s.subDetails {
s.log.Info("Light Node subscription ", zap.String("PubSubTopic", sub.ContentFilter.PubsubTopic), zap.String("ContentTopic", sub.ContentFilter.ContentTopicsList()[0]))
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestPubSubMultiOverlapContentTopic() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds
messages := prepareData(10, false, true, true)
// Subscribe
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestSubscriptionRefresh() {
messages := prepareData(2, false, false, true)
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Repeat the same subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Both messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestContentTopicsLimit() {
var maxContentTopics = 30
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds
// Detect existing content topics from previous test
if len(s.contentFilter.PubsubTopic) > 0 {
existingTopics := len(s.contentFilter.ContentTopicsList())
if existingTopics > 0 {
maxContentTopics = maxContentTopics - existingTopics
}
}
messages := prepareData(maxContentTopics+1, false, true, true)
// Subscribe
for _, m := range messages[:len(messages)-1] {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
}
// All messages within limit should get received
s.waitForMessages(func() {
s.publishMessages(messages[:len(messages)-1])
}, s.subDetails, messages[:len(messages)-1])
// Adding over the limit contentTopic should fail
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == messages[len(messages)-1].pubSubTopic {
sub.Add(messages[len(messages)-1].contentTopic)
_, err := s.lightNode.Subscribe(s.ctx, sub.ContentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().Error(err)
}
}
// Unsubscribe for cleanup
for _, m := range messages {
_ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
}
_, err := s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestSubscribeErrorHandling() {
var messages []WakuMsg
// Prepare data
messages = append(messages, WakuMsg{
pubSubTopic: "",
contentTopic: s.testContentTopic,
payload: "N/A",
})
messages = append(messages, WakuMsg{
pubSubTopic: s.testTopic,
contentTopic: "",
payload: "N/A",
})
// Subscribe with empty pubsub
s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[0].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].contentTopic)}
_, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().Error(err)
// Subscribe with empty content topic
s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[1].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].contentTopic)}
_, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().Error(err)
}
func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() {
log := utils.Logger()
s.log = log
s.wg = &sync.WaitGroup{}
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
fullNodeIDHex := make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID()))))
_ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID()))
s.log.Info("Already subscribed to", zap.String("fullNode", string(fullNodeIDHex)))
// This will overwrite values with the second node info
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
// Connect to second full and relay node
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
fullNodeIDHex = make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID()))))
_ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID()))
s.log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex)))
// Subscribe to the second full node
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
_, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestSubscribeMultipleLightNodes() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
lightNode2 := s.makeWakuFilterLightNode(true, true)
// Connect node2
lightNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
messages := prepareData(2, true, true, true)
// Subscribe separately: light node 1 -> full node
contentFilter := protocol.ContentFilter{PubsubTopic: messages[0].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].contentTopic)}
_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
// Subscribe separately: light node 2 -> full node
contentFilter2 := protocol.ContentFilter{PubsubTopic: messages[1].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].contentTopic)}
_, err = lightNode2.Subscribe(s.ctx, contentFilter2, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
// Unsubscribe
_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
_, err = lightNode2.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() {
var (
testTopic = "/waku/2/go/filter/test2"
testContentTopic = "TopicB"
)
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second)
_, fullNode2 := s.makeWakuFilterFullNode(testTopic, false, false)
// Connect nodes
fullNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
// Get stream
stream, err := fullNode2.h.NewStream(s.ctx, s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
// Prepare subscribe request
subscribeRequest := &pb.FilterSubscribeRequest{
FilterSubscribeType: pb.FilterSubscribeRequest_SUBSCRIBE,
PubsubTopic: &testTopic,
ContentTopics: []string{testContentTopic},
}
// Subscribe full node 2 -> full node 1
fullNode2.subscribe(s.ctx, stream, subscribeRequest)
// Check the pubsub topic related to the first node is stored within the second node
pubsubTopics, hasTopics := fullNode2.subscriptions.Get(s.fullNodeHost.ID())
s.Require().True(hasTopics)
// Check the pubsub topic is what we have set
contentTopics, hasTestPubsubTopic := pubsubTopics[testTopic]
s.Require().True(hasTestPubsubTopic)
// Check the content topic is what we have set
_, hasTestContentTopic := contentTopics[testContentTopic]
s.Require().True(hasTestContentTopic)
}

View File

@ -4,7 +4,8 @@ import (
"context"
"crypto/rand"
"errors"
"net/http"
"fmt"
"strconv"
"sync"
"testing"
"time"
@ -46,7 +47,13 @@ type FilterTestSuite struct {
log *zap.Logger
}
func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) {
type WakuMsg struct {
pubSubTopic string
contentTopic string
payload string
}
func (s *FilterTestSuite) makeWakuRelay(topic string, shared bool) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) {
broadcaster := relay.NewBroadcaster(10)
s.Require().NoError(broadcaster.Start(context.Background()))
@ -59,7 +66,11 @@ func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay.
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log)
relay.SetHost(host)
s.fullNodeHost = host
if shared {
s.fullNodeHost = host
}
err = relay.Start(context.Background())
s.Require().NoError(err)
@ -91,13 +102,23 @@ func (s *FilterTestSuite) makeWakuFilterLightNode(start bool, withBroadcaster bo
return filterPush
}
func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay, *WakuFilterFullNode) {
node, relaySub, host, broadcaster := s.makeWakuRelay(topic)
s.relaySub = relaySub
func (s *FilterTestSuite) makeWakuFilterFullNode(topic string, withRegisterAll bool, shared bool) (*relay.WakuRelay, *WakuFilterFullNode) {
var sub *relay.Subscription
node, relaySub, host, broadcaster := s.makeWakuRelay(topic, shared)
if shared {
s.relaySub = relaySub
}
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log)
node2Filter.SetHost(host)
sub := broadcaster.Register(protocol.NewContentFilter(topic))
if withRegisterAll {
sub = broadcaster.RegisterForAll()
} else {
sub = broadcaster.Register(protocol.NewContentFilter(topic))
}
err := node2Filter.Start(s.ctx, sub)
s.Require().NoError(err)
@ -106,11 +127,17 @@ func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay
func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
s.wg.Add(1)
var msgFound = false
go func() {
defer s.wg.Done()
select {
case env := <-ch:
s.Require().Equal(s.contentFilter.ContentTopicsList()[0], env.Message().GetContentTopic())
for _, topic := range s.contentFilter.ContentTopicsList() {
if topic == env.Message().GetContentTopic() {
msgFound = true
}
}
s.Require().True(msgFound)
case <-time.After(5 * time.Second):
s.Require().Fail("Message timeout")
case <-s.ctx.Done():
@ -125,6 +152,58 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
s.wg.Wait()
}
func matchOneOfManyMsg(one WakuMsg, many []WakuMsg) bool {
for _, m := range many {
if m.pubSubTopic == one.pubSubTopic &&
m.contentTopic == one.contentTopic &&
m.payload == one.payload {
return true
}
}
return false
}
func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.SubscriptionDetails, expected []WakuMsg) {
s.wg.Add(1)
msgCount := len(expected)
found := 0
s.log.Info("Expected messages ", zap.String("count", strconv.Itoa(msgCount)))
s.log.Info("Existing subscriptions ", zap.String("count", strconv.Itoa(len(subs))))
go func() {
defer s.wg.Done()
for _, sub := range subs {
s.log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic))
for i := 0; i < msgCount; i++ {
select {
case env := <-sub.C:
received := WakuMsg{
pubSubTopic: env.PubsubTopic(),
contentTopic: env.Message().GetContentTopic(),
payload: string(env.Message().GetPayload()),
}
s.log.Info("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload))
if matchOneOfManyMsg(received, expected) {
found++
}
case <-time.After(2 * time.Second):
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}
}
}()
if fn != nil {
fn()
}
s.wg.Wait()
s.Require().True(msgCount == found)
}
func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) {
s.wg.Add(1)
go func() {
@ -147,17 +226,47 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope)
}
func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
sub.Add(contentTopic)
s.contentFilter = sub.ContentFilter
subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.Require().NoError(err)
return subDetails
}
}
s.contentFilter = protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)}
subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.Require().NoError(err)
// Sleep to make sure the filter is subscribed
time.Sleep(2 * time.Second)
time.Sleep(1 * time.Second)
return subDetails
}
func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) <-chan WakuFilterPushResult {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
topicsCount := len(s.contentFilter.ContentTopicsList())
if topicsCount == 1 {
_, err := s.lightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer))
s.Require().NoError(err)
} else {
sub.Remove(contentTopic)
}
s.contentFilter = sub.ContentFilter
return nil
}
}
return nil
}
func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload ...string) {
var payload string
if len(optionalPayload) > 0 {
@ -170,6 +279,46 @@ func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload
s.Require().NoError(err)
}
func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) {
for _, m := range msgs {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), m.pubSubTopic)
s.Require().NoError(err)
}
}
func prepareData(quantity int, topics, contentTopics, payloads bool) []WakuMsg {
var (
pubsubTopic = "/waku/2/go/filter/test"
contentTopic = "TopicA"
payload = "test_msg"
messages []WakuMsg
)
for i := 0; i < quantity; i++ {
msg := WakuMsg{
pubSubTopic: pubsubTopic,
contentTopic: contentTopic,
payload: payload,
}
if topics {
msg.pubSubTopic = fmt.Sprintf("%s%02d", pubsubTopic, i)
}
if contentTopics {
msg.contentTopic = fmt.Sprintf("%s%02d", contentTopic, i)
}
if payloads {
msg.payload = fmt.Sprintf("%s%02d", payload, i)
}
messages = append(messages, msg)
}
return messages
}
func (s *FilterTestSuite) SetupTest() {
log := utils.Logger() //.Named("filterv2-test")
s.log = log
@ -189,7 +338,7 @@ func (s *FilterTestSuite) SetupTest() {
//TODO: Add tests to verify broadcaster.
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
@ -206,82 +355,6 @@ func (s *FilterTestSuite) TearDownTest() {
s.ctxCancel()
}
func (s *FilterTestSuite) TestWakuFilter() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Should be received
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "first")
}, s.subDetails[0].C)
// Wrong content topic
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, "TopicB", "second")
}, s.subDetails[0].C)
_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "third")
}, s.subDetails[0].C)
// Two new subscriptions with same [peer, contentFilter]
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Assert that we have 2 subscriptions now
s.Require().Equal(len(s.lightNode.Subscriptions()), 2)
// Should be received on both subscriptions
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "fourth")
}, s.subDetails[0].C)
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "fifth")
}, secondSub[0].C)
s.waitForMsg(nil, s.subDetails[0].C)
s.waitForMsg(nil, secondSub[0].C)
// Unsubscribe from second sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0])
s.Require().NoError(err)
// Should still receive
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "sixth")
}, s.subDetails[0].C)
// Unsubscribe from first sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0])
s.Require().NoError(err)
s.Require().Equal(len(s.lightNode.Subscriptions()), 0)
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "seventh")
}, s.subDetails[0].C)
}
func (s *FilterTestSuite) TestSubscriptionPing() {
err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().Error(err)
filterErr, ok := err.(*FilterError)
s.Require().True(ok)
s.Require().Equal(filterErr.Code, http.StatusNotFound)
contentTopic := "abc"
s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID())
err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestPeerFailure() {
broadcaster2 := relay.NewBroadcaster(10)
s.Require().NoError(broadcaster2.Start(context.Background()))
@ -325,56 +398,6 @@ func (s *FilterTestSuite) TestPeerFailure() {
s.Require().False(s.fullNode.subscriptions.Has(s.lightNodeHost.ID())) // Failed peer has been removed
}
func (s *FilterTestSuite) TestCreateSubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
}
func (s *FilterTestSuite) TestModifySubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
// Subscribe to another content_topic
newContentTopic := "Topic_modified"
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
}
func (s *FilterTestSuite) TestMultipleMessages() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
s.Require().NoError(err)
}, s.subDetails[0].C)
}
func (s *FilterTestSuite) TestRunningGuard() {
s.lightNode.Stop()
@ -463,7 +486,7 @@ func (s *FilterTestSuite) TestAutoShard() {
s.testTopic = pubSubTopic.String()
s.lightNode = s.makeWakuFilterLightNode(true, false)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String())
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String(), false, true)
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)