diff --git a/waku/node.go b/waku/node.go index e1eeecde..7b1bcc76 100644 --- a/waku/node.go +++ b/waku/node.go @@ -162,9 +162,10 @@ func Execute(options Options) { if options.Filter.Enable { if options.Filter.UseV2 { if !options.Filter.DisableFullNode { + nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode()) + } else { nodeOpts = append(nodeOpts, node.WithWakuFilterV2FullNode(filter.WithTimeout(options.Filter.Timeout))) } - nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode()) } else { nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(options.Filter.Timeout))) } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 9acc8300..e03e807b 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -211,8 +211,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) - w.filterV2Full = filterv2.NewWakuFilter(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...) - w.filterV2Light = filterv2.NewWakuFilterPush(w.host, w.bcaster, w.timesource, w.log) + w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...) + w.filterV2Light = filterv2.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log) w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log) if w.opts.enableSwap { @@ -240,8 +240,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { return nil, err } - w.enrChangeCh = make(chan struct{}, 10) - if params.connStatusC != nil { w.connStatusChan = params.connStatusC } @@ -293,6 +291,8 @@ func (w *WakuNode) Start(ctx context.Context) error { w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log) w.host.Network().Notify(w.connectionNotif) + w.enrChangeCh = make(chan struct{}, 10) + w.wg.Add(3) go w.connectednessListener(ctx) go w.watchMultiaddressChanges(ctx) @@ -513,8 +513,8 @@ func (w *WakuNode) Filter() *filter.WakuFilter { } // FilterV2 is used to access any operation related to Waku Filter protocol -func (w *WakuNode) FilterV2() *filterv2.WakuFilterPush { - if result, ok := w.filterV2Light.(*filterv2.WakuFilterPush); ok { +func (w *WakuNode) FilterV2() *filterv2.WakuFilterLightnode { + if result, ok := w.filterV2Light.(*filterv2.WakuFilterLightnode); ok { return result } return nil diff --git a/waku/v2/protocol/filterv2/client.go b/waku/v2/protocol/filterv2/client.go index 51467b90..de384692 100644 --- a/waku/v2/protocol/filterv2/client.go +++ b/waku/v2/protocol/filterv2/client.go @@ -32,7 +32,7 @@ var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") ) -type WakuFilterPush struct { +type WakuFilterLightnode struct { cancel context.CancelFunc ctx context.Context h host.Host @@ -54,9 +54,9 @@ type WakuFilterPushResult struct { } // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterPush { - wf := new(WakuFilterPush) - wf.log = log.Named("filter") +func NewWakuFilterLightnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { + wf := new(WakuFilterLightnode) + wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster wf.timesource = timesource wf.wg = &sync.WaitGroup{} @@ -65,7 +65,7 @@ func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource ti return wf } -func (wf *WakuFilterPush) Start(ctx context.Context) error { +func (wf *WakuFilterLightnode) Start(ctx context.Context) error { wf.wg.Wait() // Wait for any goroutines to stop ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) @@ -81,8 +81,7 @@ func (wf *WakuFilterPush) Start(ctx context.Context) error { wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx)) - wf.wg.Add(1) - + // wf.wg.Add(1) // TODO: go wf.keepAliveSubscriptions(ctx) wf.log.Info("filter protocol (light) started") @@ -91,7 +90,7 @@ func (wf *WakuFilterPush) Start(ctx context.Context) error { } // Stop unmounts the filter protocol -func (wf *WakuFilterPush) Stop() { +func (wf *WakuFilterLightnode) Stop() { if wf.cancel == nil { return } @@ -100,14 +99,14 @@ func (wf *WakuFilterPush) Stop() { wf.h.RemoveStreamHandler(FilterPushID_v20beta1) - wf.UnsubscribeAll(wf.ctx) + _, _ = wf.UnsubscribeAll(wf.ctx) wf.subscriptions.Clear() wf.wg.Wait() } -func (wf *WakuFilterPush) onRequest(ctx context.Context) func(s network.Stream) { +func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Stream) { return func(s network.Stream) { defer s.Close() logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) @@ -127,7 +126,7 @@ func (wf *WakuFilterPush) onRequest(ctx context.Context) func(s network.Stream) } } -func (wf *WakuFilterPush) notify(remotePeerID peer.ID, pubsubTopic string, msg *pb.WakuMessage) { +func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, msg *pb.WakuMessage) { envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic) // Broadcasting message so it's stored @@ -137,7 +136,7 @@ func (wf *WakuFilterPush) notify(remotePeerID peer.ID, pubsubTopic string, msg * wf.subscriptions.Notify(remotePeerID, envelope) } -func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { +func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer)) if err != nil { return err @@ -182,13 +181,13 @@ func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribePa } // Subscribe setups a subscription to receive messages that match a specific content filter -func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) error { +func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { if contentFilter.Topic == "" { - return errors.New("topic is required") + return nil, errors.New("topic is required") } if len(contentFilter.ContentTopics) == 0 { - return errors.New("at least one content topic is required") + return nil, errors.New("at least one content topic is required") } params := new(FilterSubscribeParameters) @@ -202,23 +201,23 @@ func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFi } if params.selectedPeer == "" { - return ErrNoPeersAvailable + return nil, ErrNoPeersAvailable } err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, contentFilter) if err != nil { - return err + return nil, err } - return nil + return wf.FilterSubscription(params.selectedPeer, contentFilter), nil } // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol -func (wf *WakuFilterPush) FilterSubscription(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails { - return wf.subscriptions.NewSubscription(peerID, topic, contentTopics) +func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) *SubscriptionDetails { + return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics) } -func (wf *WakuFilterPush) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { +func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { params := new(FilterUnsubscribeParameters) params.log = wf.log for _, opt := range opts { @@ -233,18 +232,18 @@ func (wf *WakuFilterPush) getUnsubscribeParameters(opts ...FilterUnsubscribeOpti } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (error, <-chan WakuFilterPushResult) { +func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { if contentFilter.Topic == "" { - return errors.New("topic is required"), nil + return nil, errors.New("topic is required") } if len(contentFilter.ContentTopics) == 0 { - return errors.New("at least one content topic is required"), nil + return nil, errors.New("at least one content topic is required") } params, err := wf.getUnsubscribeParameters(opts...) if err != nil { - return err, nil + return nil, err } localWg := sync.WaitGroup{} @@ -262,7 +261,7 @@ func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter Content ctx, &FilterSubscribeParameters{selectedPeer: peerID}, pb.FilterSubscribeRequest_UNSUBSCRIBE, - ContentFilter{}) + contentFilter) if err != nil { wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } @@ -277,11 +276,23 @@ func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter Content localWg.Wait() close(resultChan) - return nil, resultChan + return resultChan, nil +} + +// Unsubscribe is used to stop receiving messages from a peer that match a content filter +func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { + var contentTopics []string + for k := range sub.contentTopics { + contentTopics = append(contentTopics, k) + } + + opts = append(opts, Peer(sub.peerID)) + + return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.pubsubTopic, ContentTopics: contentTopics}, opts...) } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err diff --git a/waku/v2/protocol/filterv2/filter_test.go b/waku/v2/protocol/filterv2/filter_test.go new file mode 100644 index 00000000..f248a8c3 --- /dev/null +++ b/waku/v2/protocol/filterv2/filter_test.go @@ -0,0 +1,230 @@ +package filterv2 + +import ( + "context" + "crypto/rand" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + v2 "github.com/waku-org/go-waku/waku/v2" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + relay := relay.NewWakuRelay(host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger()) + err = relay.Start(context.Background()) + require.NoError(t, err) + + sub, err := relay.SubscribeToTopic(context.Background(), topic) + require.NoError(t, err) + + return relay, sub, host +} + +func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) { + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + filterPush := NewWakuFilterLightnode(host, v2.NewBroadcaster(10), timesource.NewDefaultClock(), utils.Logger()) + err = filterPush.Start(context.Background()) + require.NoError(t, err) + + return filterPush, host +} + +// Node1: Filter subscribed to content topic A +// Node2: Relay + Filter +// +// # Node1 and Node2 are peers +// +// Node2 send a successful message with topic A +// Node1 receive the message +// +// Node2 send a successful message with topic B +// Node1 doesn't receive the message +func TestWakuFilter(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + defer cancel() + + testTopic := "/waku/2/go/filter/test" + testContentTopic := "TopicA" + + node1, host1 := makeWakuFilterLightNode(t) + defer node1.Stop() + + broadcaster := v2.NewBroadcaster(10) + node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) + defer node2.Stop() + defer sub2.Unsubscribe() + + node2Filter := NewWakuFilterFullnode(host2, broadcaster, timesource.NewDefaultClock(), utils.Logger()) + err := node2Filter.Start(ctx) + require.NoError(t, err) + + broadcaster.Register(&testTopic, node2Filter.MessageChannel()) + + host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = host1.Peerstore().AddProtocols(host2.ID(), string(FilterSubscribeID_v20beta1)) + require.NoError(t, err) + + contentFilter := ContentFilter{ + Topic: string(testTopic), + ContentTopics: []string{testContentTopic}, + } + + subscriptionChannel, err := node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID())) + require.NoError(t, err) + + // Sleep to make sure the filter is subscribed + time.Sleep(2 * time.Second) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + env := <-subscriptionChannel.C + require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) + }() + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) + require.NoError(t, err) + + wg.Wait() + + wg.Add(1) + go func() { + select { + case <-subscriptionChannel.C: + require.Fail(t, "should not receive another message") + case <-time.After(1 * time.Second): + defer wg.Done() + case <-ctx.Done(): + require.Fail(t, "test exceeded allocated time") + } + }() + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic) + require.NoError(t, err) + + wg.Wait() + + wg.Add(1) + go func() { + select { + case <-subscriptionChannel.C: + require.Fail(t, "should not receive another message") + case <-time.After(1 * time.Second): + defer wg.Done() + case <-ctx.Done(): + require.Fail(t, "test exceeded allocated time") + } + }() + + _, err = node1.Unsubscribe(ctx, contentFilter, Peer(node2Filter.h.ID())) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) + require.NoError(t, err) + wg.Wait() +} + +func TestWakuFilterPeerFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + defer cancel() + + testTopic := "/waku/2/go/filter/test" + testContentTopic := "TopicA" + + node1, host1 := makeWakuFilterLightNode(t) + + broadcaster := v2.NewBroadcaster(10) + node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) + defer node2.Stop() + defer sub2.Unsubscribe() + + node2Filter := NewWakuFilterFullnode(host2, v2.NewBroadcaster(10), timesource.NewDefaultClock(), utils.Logger(), filter.WithTimeout(5*time.Second)) + err := node2Filter.Start(ctx) + require.NoError(t, err) + + broadcaster.Register(&testTopic, node2Filter.MessageChannel()) + + host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = host1.Peerstore().AddProtocols(host2.ID(), string(FilterPushID_v20beta1)) + require.NoError(t, err) + + contentFilter := &ContentFilter{ + Topic: string(testTopic), + ContentTopics: []string{testContentTopic}, + } + + f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID())) + require.NoError(t, err) + + // Simulate there's been a failure before + node2Filter.subscriptions.FlagAsFailure(host1.ID()) + + // Sleep to make sure the filter is subscribed + time.Sleep(2 * time.Second) + + require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + env := <-f.C + require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) + + // Failure is removed + require.False(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) + + }() + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) + require.NoError(t, err) + + wg.Wait() + + // Kill the subscriber + host1.Close() + + time.Sleep(1 * time.Second) + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 1), testTopic) + require.NoError(t, err) + + // TODO: find out how to eliminate this sleep + time.Sleep(1 * time.Second) + require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) + + time.Sleep(2 * time.Second) + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) // Failed peer has been removed + require.False(t, node2Filter.subscriptions.Has(host1.ID())) // Failed peer has been removed +} diff --git a/waku/v2/protocol/filterv2/server.go b/waku/v2/protocol/filterv2/server.go index eb53b86c..c5f85d07 100644 --- a/waku/v2/protocol/filterv2/server.go +++ b/waku/v2/protocol/filterv2/server.go @@ -28,7 +28,7 @@ import ( const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1") type ( - WakuFilter struct { + WakuFilterFull struct { cancel context.CancelFunc h host.Host msgC chan *protocol.Envelope @@ -39,9 +39,9 @@ type ( } ) -// NewWakuFilter returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilter { - wf := new(WakuFilter) +// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options +func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilterFull { + wf := new(WakuFilterFull) wf.log = log.Named("filterv2-fullnode") params := new(filter.FilterParameters) @@ -58,7 +58,7 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, timesource timeso return wf } -func (wf *WakuFilter) Start(ctx context.Context) error { +func (wf *WakuFilterFull) Start(ctx context.Context) error { wf.wg.Wait() // Wait for any goroutines to stop ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) @@ -82,7 +82,7 @@ func (wf *WakuFilter) Start(ctx context.Context) error { return nil } -func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) { +func (wf *WakuFilterFull) onRequest(ctx context.Context) func(s network.Stream) { return func(s network.Stream) { defer s.Close() logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) @@ -132,7 +132,7 @@ func reply(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequ } } -func (wf *WakuFilter) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { +func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { exists := wf.subscriptions.Has(s.Conn().RemotePeer()) if exists { @@ -142,7 +142,7 @@ func (wf *WakuFilter) ping(s network.Stream, logger *zap.Logger, request *pb.Fil } } -func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { +func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { if request.PubsubTopic == "" { reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") return @@ -160,7 +160,7 @@ func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *p reply(s, logger, request, http.StatusOK) } -func (wf *WakuFilter) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { +func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { if request.PubsubTopic == "" { reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") return @@ -179,7 +179,7 @@ func (wf *WakuFilter) unsubscribe(s network.Stream, logger *zap.Logger, request } } -func (wf *WakuFilter) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { +func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer()) if err != nil { reply(s, logger, request, http.StatusNotFound) @@ -188,7 +188,7 @@ func (wf *WakuFilter) unsubscribeAll(s network.Stream, logger *zap.Logger, reque } } -func (wf *WakuFilter) filterListener(ctx context.Context) { +func (wf *WakuFilterFull) filterListener(ctx context.Context) { defer wf.wg.Done() // This function is invoked for each message received @@ -225,7 +225,7 @@ func (wf *WakuFilter) filterListener(ctx context.Context) { } } -func (wf *WakuFilter) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error { +func (wf *WakuFilterFull) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error { logger := wf.log.With(logging.HostID("peer", peerID)) messagePush := &pb.MessagePushV2{ @@ -264,7 +264,7 @@ func (wf *WakuFilter) pushMessage(ctx context.Context, peerID peer.ID, env *prot } // Stop unmounts the filter protocol -func (wf *WakuFilter) Stop() { +func (wf *WakuFilterFull) Stop() { if wf.cancel == nil { return } @@ -278,6 +278,6 @@ func (wf *WakuFilter) Stop() { wf.wg.Wait() } -func (wf *WakuFilter) MessageChannel() chan *protocol.Envelope { +func (wf *WakuFilterFull) MessageChannel() chan *protocol.Envelope { return wf.msgC } diff --git a/waku/v2/protocol/filterv2/subscribers_map.go b/waku/v2/protocol/filterv2/subscribers_map.go index eee9679d..767056f5 100644 --- a/waku/v2/protocol/filterv2/subscribers_map.go +++ b/waku/v2/protocol/filterv2/subscribers_map.go @@ -37,6 +37,15 @@ func NewSubscribersMap(timeout time.Duration) *SubscribersMap { } } +func (sub *SubscribersMap) Clear() { + sub.Lock() + defer sub.Unlock() + + sub.items = make(map[peer.ID]PubsubTopics) + sub.interestMap = make(map[string]PeerSet) + sub.failedPeers = make(map[peer.ID]time.Time) +} + func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) { sub.Lock() defer sub.Unlock() @@ -134,7 +143,6 @@ func (sub *SubscribersMap) deleteAll(peerID peer.ID) error { } delete(sub.items, peerID) - delete(sub.failedPeers, peerID) return nil } @@ -223,8 +231,8 @@ func (sub *SubscribersMap) FlagAsFailure(peerID peer.ID) { lastFailure, ok := sub.failedPeers[peerID] if ok { elapsedTime := time.Since(lastFailure) - if elapsedTime > sub.timeout { - sub.deleteAll(peerID) + if elapsedTime < sub.timeout { + _ = sub.deleteAll(peerID) } } else { sub.failedPeers[peerID] = time.Now() diff --git a/waku/v2/protocol/filterv2/subscribers_map_test.go b/waku/v2/protocol/filterv2/subscribers_map_test.go index 4fbd0851..54737d96 100644 --- a/waku/v2/protocol/filterv2/subscribers_map_test.go +++ b/waku/v2/protocol/filterv2/subscribers_map_test.go @@ -32,21 +32,21 @@ func TestAppend(t *testing.T) { subs.Set(peerId, TOPIC, []string{"topic1"}) sub := firstSubscriber(subs, TOPIC, "topic1") - assert.NotNil(t, sub) + assert.NotEmpty(t, sub) // Adding to existing peer subs.Set(peerId, TOPIC, []string{"topic2"}) sub = firstSubscriber(subs, TOPIC, "topic2") - assert.NotNil(t, sub) + assert.NotEmpty(t, sub) subs.Set(peerId, TOPIC+"2", []string{"topic1"}) sub = firstSubscriber(subs, TOPIC+"2", "topic1") - assert.NotNil(t, sub) + assert.NotEmpty(t, sub) - sub = firstSubscriber(subs, TOPIC, "topic2") - assert.Nil(t, sub) + sub = firstSubscriber(subs, TOPIC+"2", "topic2") + assert.Empty(t, sub) } func TestRemove(t *testing.T) { @@ -56,16 +56,17 @@ func TestRemove(t *testing.T) { subs.Set(peerId, TOPIC+"1", []string{"topic1", "topic2"}) subs.Set(peerId, TOPIC+"2", []string{"topic1"}) - subs.DeleteAll(peerId) + err := subs.DeleteAll(peerId) + assert.Empty(t, err) sub := firstSubscriber(subs, TOPIC+"1", "topic1") - assert.Nil(t, sub) + assert.Empty(t, sub) sub = firstSubscriber(subs, TOPIC+"1", "topic2") - assert.Nil(t, sub) + assert.Empty(t, sub) sub = firstSubscriber(subs, TOPIC+"2", "topic1") - assert.Nil(t, sub) + assert.Empty(t, sub) assert.False(t, subs.Has(peerId)) @@ -85,7 +86,7 @@ func TestRemovePartial(t *testing.T) { require.NoError(t, err) sub := firstSubscriber(subs, TOPIC, "topic2") - assert.NotNil(t, sub) + assert.NotEmpty(t, sub) } func TestRemoveBogus(t *testing.T) { @@ -97,9 +98,9 @@ func TestRemoveBogus(t *testing.T) { require.NoError(t, err) sub := firstSubscriber(subs, TOPIC, "topic1") - assert.Nil(t, sub) + assert.Empty(t, sub) sub = firstSubscriber(subs, TOPIC, "does not exist") - assert.Nil(t, sub) + assert.Empty(t, sub) err = subs.Delete(peerId, "DOES_NOT_EXIST", []string{"topic1"}) require.Error(t, err) diff --git a/waku/v2/protocol/filterv2/subscriptions_map.go b/waku/v2/protocol/filterv2/subscriptions_map.go index 501971d8..b7b10ab1 100644 --- a/waku/v2/protocol/filterv2/subscriptions_map.go +++ b/waku/v2/protocol/filterv2/subscriptions_map.go @@ -124,6 +124,27 @@ func (s *SubscriptionDetails) Close() error { return s.mapRef.Delete(s) } +func (s *SubscriptionDetails) Clone() *SubscriptionDetails { + s.RLock() + defer s.RUnlock() + + result := &SubscriptionDetails{ + id: uuid.NewString(), + mapRef: s.mapRef, + closed: false, + peerID: s.peerID, + pubsubTopic: s.pubsubTopic, + contentTopics: make(map[string]struct{}), + C: make(chan *protocol.Envelope), + } + + for k := range s.contentTopics { + result.contentTopics[k] = struct{}{} + } + + return result +} + func (sub *SubscriptionsMap) clear() { for _, peerSubscription := range sub.items { for _, subscriptionSet := range peerSubscription.subscriptionsPerTopic {