From 419adcb6a86850e8d7750068c62ff9dceb525964 Mon Sep 17 00:00:00 2001 From: Vitaliy Vlasov Date: Mon, 14 Aug 2023 23:29:00 +0300 Subject: [PATCH] chore(filter2): test updates --- tests/utils.go | 10 +- waku/v2/node/wakunode2.go | 30 +- waku/v2/node/wakuoptions.go | 4 +- waku/v2/protocol/filter/client.go | 40 +-- waku/v2/protocol/filter/filter_test.go | 424 ++++++++++++++----------- waku/v2/protocol/filter/server.go | 4 +- 6 files changed, 287 insertions(+), 225 deletions(-) diff --git a/tests/utils.go b/tests/utils.go index 26350b54..dff08bec 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -122,8 +122,14 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e } // CreateWakuMessage creates a WakuMessage protobuffer with default values and a custom contenttopic and timestamp -func CreateWakuMessage(contentTopic string, timestamp int64) *pb.WakuMessage { - return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp} +func CreateWakuMessage(contentTopic string, timestamp int64, optionalPayload ...string) *pb.WakuMessage { + var payload []byte + if len(optionalPayload) > 0 { + payload = []byte(optionalPayload[0]) + } else { + payload = []byte{1, 2, 3} + } + return &pb.WakuMessage{Payload: payload, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp} } // RandomHex returns a random hex string of n bytes diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index db73c680..14790833 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -90,8 +90,8 @@ type WakuNode struct { peerExchange Service rendezvous Service legacyFilter ReceptorService - filterFullnode ReceptorService - filterLightnode Service + filterFullNode ReceptorService + filterLightNode Service store ReceptorService rlnRelay RLNRelay @@ -272,9 +272,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log) w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) - w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) - w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...) - w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.peermanager, w.timesource, w.log) + w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.log, w.opts.legacyFilterOpts...) + w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.log, w.opts.filterOpts...) + w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.log) if params.storeFactory != nil { @@ -434,10 +434,10 @@ func (w *WakuNode) Start(ctx context.Context) error { w.log.Info("Subscribing filter to broadcaster") } - w.filterFullnode.SetHost(host) + w.filterFullNode.SetHost(host) if w.opts.enableFilterFullNode { sub := w.bcaster.RegisterForAll() - err := w.filterFullnode.Start(ctx, sub) + err := w.filterFullNode.Start(ctx, sub) if err != nil { return err } @@ -445,9 +445,9 @@ func (w *WakuNode) Start(ctx context.Context) error { } - w.filterLightnode.SetHost(host) + w.filterLightNode.SetHost(host) if w.opts.enableFilterLightNode { - err := w.filterLightnode.Start(ctx) + err := w.filterLightNode.Start(ctx) if err != nil { return err } @@ -501,7 +501,7 @@ func (w *WakuNode) Stop() { w.lightPush.Stop() w.store.Stop() w.legacyFilter.Stop() - w.filterFullnode.Stop() + w.filterFullNode.Stop() if w.opts.enableDiscV5 { w.discoveryV5.Stop() @@ -598,16 +598,16 @@ func (w *WakuNode) LegacyFilter() *legacy_filter.WakuFilter { } // FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature -func (w *WakuNode) FilterFullnode() *filter.WakuFilterFullNode { - if result, ok := w.filterFullnode.(*filter.WakuFilterFullNode); ok { +func (w *WakuNode) FilterFullNode() *filter.WakuFilterFullNode { + if result, ok := w.filterFullNode.(*filter.WakuFilterFullNode); ok { return result } return nil } -// FilterFullnode is used to access any operation related to Waku Filter protocol Light node feature -func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightnode { - if result, ok := w.filterLightnode.(*filter.WakuFilterLightnode); ok { +// FilterFullNode is used to access any operation related to Waku Filter protocol Light node feature +func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightNode { + if result, ok := w.filterLightNode.(*filter.WakuFilterLightNode); ok { return result } return nil diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 2c90f702..52c6c964 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -68,7 +68,7 @@ type WakuNodeParameters struct { enableRelay bool enableLegacyFilter bool - isLegacyFilterFullnode bool + isLegacyFilterFullNode bool enableFilterLightNode bool enableFilterFullNode bool legacyFilterOpts []legacy_filter.Option @@ -363,7 +363,7 @@ func WithPeerExchange() WakuNodeOption { func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableLegacyFilter = true - params.isLegacyFilterFullnode = fullnode + params.isLegacyFilterFullNode = fullnode params.legacyFilterOpts = filterOpts return nil } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index da0ab8b6..48003997 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -34,7 +34,7 @@ var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") ) -type WakuFilterLightnode struct { +type WakuFilterLightNode struct { cancel context.CancelFunc ctx context.Context h host.Host @@ -59,9 +59,9 @@ type WakuFilterPushResult struct { // NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil -func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, - timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { - wf := new(WakuFilterLightnode) +func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, + timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightNode { + wf := new(WakuFilterLightNode) wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster wf.timesource = timesource @@ -72,11 +72,11 @@ func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerM } // Sets the host to be able to mount or consume a protocol -func (wf *WakuFilterLightnode) SetHost(h host.Host) { +func (wf *WakuFilterLightNode) SetHost(h host.Host) { wf.h = h } -func (wf *WakuFilterLightnode) Start(ctx context.Context) error { +func (wf *WakuFilterLightNode) Start(ctx context.Context) error { wf.wg.Wait() // Wait for any goroutines to stop ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) @@ -98,7 +98,7 @@ func (wf *WakuFilterLightnode) Start(ctx context.Context) error { } // Stop unmounts the filter protocol -func (wf *WakuFilterLightnode) Stop() { +func (wf *WakuFilterLightNode) Stop() { if wf.cancel == nil { return } @@ -114,7 +114,7 @@ func (wf *WakuFilterLightnode) Stop() { wf.wg.Wait() } -func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Stream) { +func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Stream) { return func(s network.Stream) { defer s.Close() logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) @@ -149,7 +149,7 @@ func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Str } } -func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) { +func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) { envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic) // Broadcasting message so it's stored @@ -159,7 +159,7 @@ func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, wf.subscriptions.Notify(remotePeerID, envelope) } -func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { +func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { metrics.RecordFilterError(ctx, "dial_failure") @@ -210,7 +210,7 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr } // Subscribe setups a subscription to receive messages that match a specific content filter -func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { if contentFilter.Topic == "" { return nil, errors.New("topic is required") } @@ -248,7 +248,7 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont } // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol -func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) { return nil, errors.New("subscription does not exist") } @@ -256,7 +256,7 @@ func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics), nil } -func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { +func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { params := new(FilterUnsubscribeParameters) params.log = wf.log for _, opt := range opts { @@ -266,7 +266,7 @@ func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscrib return params, nil } -func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error { +func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { return wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID}, @@ -274,11 +274,11 @@ func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error { ContentFilter{}) } -func (wf *WakuFilterLightnode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { +func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { return wf.Ping(ctx, subscription.PeerID) } -func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails { +func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { wf.subscriptions.RLock() defer wf.subscriptions.RUnlock() @@ -295,7 +295,7 @@ func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails { return output } -func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) { +func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) { wf.subscriptions.Lock() defer wf.subscriptions.Unlock() @@ -327,7 +327,7 @@ func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilte } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { if contentFilter.Topic == "" { return nil, errors.New("topic is required") } @@ -391,7 +391,7 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { var contentTopics []string for k := range sub.ContentTopics { contentTopics = append(contentTopics, k) @@ -403,7 +403,7 @@ func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index be9d1382..1daf9884 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -9,270 +9,326 @@ import ( "time" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" ) -func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) +func TestFilterSuite(t *testing.T) { + suite.Run(t, new(FilterTestSuite)) +} + +type FilterTestSuite struct { + suite.Suite + + testTopic string + testContentTopic string + ctx context.Context + ctxCancel context.CancelFunc + lightNode *WakuFilterLightNode + lightNodeHost host.Host + relayNode *relay.WakuRelay + relaySub *relay.Subscription + fullNode *WakuFilterFullNode + fullNodeHost host.Host + wg *sync.WaitGroup + contentFilter ContentFilter + subDetails *SubscriptionDetails + log *zap.Logger +} + +func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) { + + broadcaster := relay.NewBroadcaster(10) + s.Require().NoError(broadcaster.Start(context.Background())) + + port, err := tests.FindFreePort(s.T(), "", 5) + s.Require().NoError(err) host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) + s.Require().NoError(err) - relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), s.log) relay.SetHost(host) + s.fullNodeHost = host err = relay.Start(context.Background()) - require.NoError(t, err) + s.Require().NoError(err) sub, err := relay.SubscribeToTopic(context.Background(), topic) - require.NoError(t, err) + s.Require().NoError(err) - return relay, sub, host + return relay, sub, host, broadcaster } -func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) { - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) +func (s *FilterTestSuite) makeWakuFilterLightNode() *WakuFilterLightNode { + port, err := tests.FindFreePort(s.T(), "", 5) + s.Require().NoError(err) host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) + s.Require().NoError(err) b := relay.NewBroadcaster(10) - require.NoError(t, b.Start(context.Background())) - filterPush := NewWakuFilterLightnode(b, nil, timesource.NewDefaultClock(), utils.Logger()) + s.Require().NoError(b.Start(context.Background())) + filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), s.log) filterPush.SetHost(host) + s.lightNodeHost = host err = filterPush.Start(context.Background()) - require.NoError(t, err) + s.Require().NoError(err) - return filterPush, host + return filterPush } -// Node1: Filter subscribed to content topic A -// Node2: Relay + Filter -// -// # Node1 and Node2 are peers -// -// Node2 send a successful message with topic A -// Node1 receive the message -// -// Node2 send a successful message with topic B -// Node1 doesn't receive the message -func TestWakuFilter(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - defer cancel() +func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay, *WakuFilterFullNode) { + node, relaySub, host, broadcaster := s.makeWakuRelay(topic) + s.relaySub = relaySub - testTopic := "/waku/2/go/filter/test" - testContentTopic := "TopicA" + node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), s.log) + node2Filter.SetHost(host) + sub := broadcaster.Register(topic) + err := node2Filter.Start(s.ctx, sub) + s.Require().NoError(err) - node1, host1 := makeWakuFilterLightNode(t) - defer node1.Stop() + return node, node2Filter +} - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) - defer node2.Stop() - defer sub2.Unsubscribe() +func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + select { + case env := <-ch: + s.Require().Equal(s.contentFilter.ContentTopics[0], env.Message().GetContentTopic()) + case <-time.After(5 * time.Second): + s.Require().Fail("Message timeout") + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") + } + }() - node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger()) - node2Filter.SetHost(host2) - sub := broadcaster.Register(testTopic) - err := node2Filter.Start(ctx, sub) - require.NoError(t, err) + fn() - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) - err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1) - require.NoError(t, err) + s.wg.Wait() +} - contentFilter := ContentFilter{ - Topic: string(testTopic), - ContentTopics: []string{testContentTopic}, +func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + select { + case <-ch: + s.Require().Fail("should not receive another message") + case <-time.After(1 * time.Second): + // Timeout elapsed, all good + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") + } + }() + + fn() + + s.wg.Wait() +} + +func (s *FilterTestSuite) subscribe(topic string, contentTopic string, peer peer.ID) *SubscriptionDetails { + s.contentFilter = ContentFilter{ + Topic: string(topic), + ContentTopics: []string{contentTopic}, } - subscriptionChannel, err := node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID())) - require.NoError(t, err) + subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) + s.Require().NoError(err) // Sleep to make sure the filter is subscribed time.Sleep(2 * time.Second) - var wg sync.WaitGroup + return subDetails +} - wg.Add(1) - go func() { - defer wg.Done() - env := <-subscriptionChannel.C - require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) - }() +func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload ...string) { + var payload string + if len(optionalPayload) > 0 { + payload = optionalPayload[0] + } else { + payload = "123" + } - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), topic) + s.Require().NoError(err) +} - wg.Wait() +func (s *FilterTestSuite) SetupTest() { + log := utils.Logger() //.Named("filterv2-test") + s.log = log + // Use a pointer to WaitGroup so that to avoid copying + // https://pkg.go.dev/sync#WaitGroup + s.wg = &sync.WaitGroup{} - wg.Add(1) - go func() { - select { - case <-subscriptionChannel.C: - require.Fail(t, "should not receive another message") - case <-time.After(1 * time.Second): - defer wg.Done() - case <-ctx.Done(): - require.Fail(t, "test exceeded allocated time") - } - }() + // Create test context + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + s.ctx = ctx + s.ctxCancel = cancel - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + s.testTopic = "/waku/2/go/filter/test" + s.testContentTopic = "TopicA" - wg.Wait() + s.lightNode = s.makeWakuFilterLightNode() - wg.Add(1) - go func() { - select { - case <-subscriptionChannel.C: - require.Fail(t, "should not receive another message") - case <-time.After(1 * time.Second): - defer wg.Done() - case <-ctx.Done(): - require.Fail(t, "test exceeded allocated time") - } - }() + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic) - _, err = node1.Unsubscribe(ctx, contentFilter, Peer(node2Filter.h.ID())) - require.NoError(t, err) + // Connect nodes + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) TearDownTest() { + s.fullNode.Stop() + s.relayNode.Stop() + s.relaySub.Unsubscribe() + s.lightNode.Stop() + s.ctxCancel() +} + +func (s *FilterTestSuite) TestWakuFilter() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + // Should be received + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "first") + }, s.subDetails.C) + + // Wrong content topic + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, "TopicB", "second") + }, s.subDetails.C) + + _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID())) + s.Require().NoError(err) time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) - wg.Wait() + // Should not receive after unsubscribe + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "third") + }, s.subDetails.C) } -func TestSubscriptionPing(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - defer cancel() +func (s *FilterTestSuite) TestSubscriptionPing() { - testTopic := "/waku/2/go/filter/test" - - node1, host1 := makeWakuFilterLightNode(t) - defer node1.Stop() - - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) - defer node2.Stop() - defer sub2.Unsubscribe() - - node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger()) - node2Filter.SetHost(host2) - err := node2Filter.Start(ctx, relay.NoopSubscription()) - require.NoError(t, err) - - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) - err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1) - require.NoError(t, err) - - err = node1.Ping(context.Background(), host2.ID()) - require.Error(t, err) + err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Require().Error(err) filterErr, ok := err.(*FilterError) - require.True(t, ok) - require.Equal(t, filterErr.Code, http.StatusNotFound) + s.Require().True(ok) + s.Require().Equal(filterErr.Code, http.StatusNotFound) - contentFilter := ContentFilter{ - Topic: string(testTopic), - ContentTopics: []string{"abc"}, - } - _, err = node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID())) - require.NoError(t, err) + contentTopic := "abc" + s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID()) - err = node1.Ping(context.Background(), host2.ID()) - require.NoError(t, err) + err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Require().NoError(err) } -func TestWakuFilterPeerFailure(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - defer cancel() - - testTopic := "/waku/2/go/filter/test" - testContentTopic := "TopicA" - - node1, host1 := makeWakuFilterLightNode(t) - - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) - defer node2.Stop() - defer sub2.Unsubscribe() +func (s *FilterTestSuite) TestPeerFailure() { broadcaster2 := relay.NewBroadcaster(10) - require.NoError(t, broadcaster2.Start(context.Background())) - node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second)) - node2Filter.SetHost(host2) - sub := broadcaster.Register(testTopic) - err := node2Filter.Start(ctx, sub) - require.NoError(t, err) + s.Require().NoError(broadcaster2.Start(context.Background())) - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) - err = host1.Peerstore().AddProtocols(host2.ID(), FilterPushID_v20beta1) - require.NoError(t, err) - - contentFilter := &ContentFilter{ - Topic: string(testTopic), - ContentTopics: []string{testContentTopic}, - } - - f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID())) - require.NoError(t, err) + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Simulate there's been a failure before - node2Filter.subscriptions.FlagAsFailure(host1.ID()) + s.fullNode.subscriptions.FlagAsFailure(s.lightNodeHost.ID()) // Sleep to make sure the filter is subscribed time.Sleep(2 * time.Second) - require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) + s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) - var wg sync.WaitGroup + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic) + }, s.subDetails.C) - wg.Add(1) - go func() { - defer wg.Done() - env := <-f.C - require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) - - // Failure is removed - require.False(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) - - }() - - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) - - wg.Wait() + // Failure is removed + s.Require().False(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) // Kill the subscriber - host1.Close() + s.lightNodeHost.Close() time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + s.publishMsg(s.testTopic, s.testContentTopic) // TODO: find out how to eliminate this sleep time.Sleep(1 * time.Second) - require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) + s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) time.Sleep(2 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + s.publishMsg(s.testTopic, s.testContentTopic) time.Sleep(2 * time.Second) - require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) // Failed peer has been removed - require.False(t, node2Filter.subscriptions.Has(host1.ID())) // Failed peer has been removed + s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) // Failed peer has been removed + s.Require().False(s.fullNode.subscriptions.Has(s.lightNodeHost.ID())) // Failed peer has been removed +} + +func (s *FilterTestSuite) TestCreateSubscription() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) +} + +func (s *FilterTestSuite) TestModifySubscription() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) + + // Subscribe to another content_topic + newContentTopic := "Topic_modified" + s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) +} + +func (s *FilterTestSuite) TestMultipleMessages() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 1e8fd50f..371fa0e9 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -45,8 +45,8 @@ type ( } ) -// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterFullnode(timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode { +// NewWakuFilterFullNode returns a new instance of Waku Filter struct setup according to the chosen parameter and options +func NewWakuFilterFullNode(timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode { wf := new(WakuFilterFullNode) wf.log = log.Named("filterv2-fullnode")