From 437f830b5101c086272968da459d4a732105395f Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 4 Jul 2024 10:34:53 +0530 Subject: [PATCH] feat_: aggregate filter subscriptions to do bulk subs with peer (#5440) * feat_: aggregate filter subscriptions to do bulk subs with peer * chore_: take possible deadlock fix in go-waku * fix_: don't resubscribe filters unless there is a change in shard for community (#5467) --- go.mod | 2 +- go.sum | 4 +- protocol/messenger_communities.go | 12 +- .../messenger_communities_sharding_test.go | 8 +- .../waku-org/go-waku/waku/v2/api/filter.go | 8 +- .../go-waku/waku/v2/node/localnode.go | 8 + .../go-waku/waku/v2/protocol/enr/enr.go | 3 +- .../subscription/subscription_details.go | 14 +- .../subscription/subscriptions_map.go | 6 +- vendor/modules.txt | 2 +- wakuv2/filter_manager.go | 182 +++++++++++++----- wakuv2/waku_test.go | 7 +- 12 files changed, 182 insertions(+), 74 deletions(-) diff --git a/go.mod b/go.mod index 1b9390ef6..2ab5451a5 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240628140035-3604bf39ad28 + github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index b04fc834c..b31902829 100644 --- a/go.sum +++ b/go.sum @@ -2137,8 +2137,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240628140035-3604bf39ad28 h1:7BqEcKgJs9QNzrLlC4jn1opCjGKZxNX2B/AVqhsvwzw= -github.com/waku-org/go-waku v0.8.1-0.20240628140035-3604bf39ad28/go.mod h1:fHQ6WCSAlTollYHvAeZeO+d7lOYwcvQxHk+DyGLeoMI= +github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 h1:3Idg7XvXc9iQpUyg8KNKgZnziHJRs3xm7EDJHFzC9to= +github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0/go.mod h1:hkW5zXyM/ZIMDPniVooTk4dOGwY+OzrB0Q6fx+1sLpo= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index f012fc082..decebf705 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -3554,7 +3554,7 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti } // Unsubscribing from existing shard - if community.Shard() != nil { + if community.Shard() != nil && community.Shard() != shard.FromProtobuff(message.GetShard()) { err := m.unsubscribeFromShard(community.Shard()) if err != nil { return err @@ -3567,12 +3567,14 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti if err != nil { return err } + // Update community filters in case of change of shard + if community.Shard() != shard.FromProtobuff(message.GetShard()) { + err = m.UpdateCommunityFilters(community) + if err != nil { + return err + } - err = m.UpdateCommunityFilters(community) - if err != nil { - return err } - return nil } diff --git a/protocol/messenger_communities_sharding_test.go b/protocol/messenger_communities_sharding_test.go index 3dc314074..c9e205a23 100644 --- a/protocol/messenger_communities_sharding_test.go +++ b/protocol/messenger_communities_sharding_test.go @@ -140,9 +140,13 @@ func (s *MessengerCommunitiesShardingSuite) TestPostToCommunityChat() { s.testPostToCommunityChat(shard, community, chat) } - // Members should continue to receive messages in a community if sharding is disabled after it was previously enabled. + // Members should continue to receive messages in a community if it is moved back to default shard. { - s.testPostToCommunityChat(nil, community, chat) + shard := &shard.Shard{ + Cluster: shard.MainStatusShardCluster, + Index: 32, + } + s.testPostToCommunityChat(shard, community, chat) } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go index 2798d1373..b9d64310a 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go @@ -71,8 +71,12 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte return sub, nil } -func (apiSub *Sub) Unsubscribe() { - apiSub.cancel() +func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { + _, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter) + //Not reading result unless we want to do specific error handling? + if err != nil { + apiSub.log.Debug("failed to unsubscribe", zap.Error(err), zap.Stringer("content-filter", contentFilter)) + } } func (apiSub *Sub) subscriptionLoop() { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go index d9d30d71e..5f0f47d73 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go @@ -240,11 +240,19 @@ func selectWSListenAddresses(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) { func selectCircuitRelayListenAddresses(ctx context.Context, addresses []ma.Multiaddr) ([]ma.Multiaddr, error) { var result []ma.Multiaddr + for _, addr := range addresses { addr, err := decapsulateCircuitRelayAddr(ctx, addr) if err != nil { continue } + + _, noWS := addr.ValueForProtocol(ma.P_WSS) + _, noWSS := addr.ValueForProtocol(ma.P_WS) + if noWS == nil || noWSS == nil { // WS or WSS found + continue + } + result = append(result, addr) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/enr.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/enr.go index 1ff1265cc..fe1e54625 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/enr.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/enr.go @@ -120,7 +120,8 @@ func Multiaddress(node *enode.Node) (peer.ID, []multiaddr.Multiaddr, error) { maRaw := multiaddrRaw[offset+2 : offset+2+int(maSize)] addr, err := multiaddr.NewMultiaddrBytes(maRaw) if err != nil { - return "", nil, fmt.Errorf("invalid multiaddress field length") + // The value is not a multiaddress. Ignoring... + continue } hostInfoStr := fmt.Sprintf("/p2p/%s", peerID.String()) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go index a36f54ad4..b94f38ef9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go @@ -37,6 +37,8 @@ type SubscriptionDetails struct { } func (s *SubscriptionDetails) Add(contentTopics ...string) { + s.mapRef.Lock() + defer s.mapRef.Unlock() s.Lock() defer s.Unlock() @@ -44,14 +46,14 @@ func (s *SubscriptionDetails) Add(contentTopics ...string) { if _, ok := s.ContentFilter.ContentTopics[ct]; !ok { s.ContentFilter.ContentTopics[ct] = struct{}{} // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair - s.mapRef.Lock() s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct) - s.mapRef.Unlock() } } } func (s *SubscriptionDetails) Remove(contentTopics ...string) { + s.mapRef.Lock() + defer s.mapRef.Unlock() s.Lock() defer s.Unlock() @@ -59,15 +61,13 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) { if _, ok := s.ContentFilter.ContentTopics[ct]; ok { delete(s.ContentFilter.ContentTopics, ct) // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair - s.mapRef.Lock() s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct) - s.mapRef.Unlock() } } if len(s.ContentFilter.ContentTopics) == 0 { // err doesn't matter - _ = s.mapRef.Delete(s) + _ = s.mapRef.DeleteNoLock(s) } } @@ -105,7 +105,9 @@ func (s *SubscriptionDetails) CloseC() { func (s *SubscriptionDetails) Close() error { s.CloseC() - return s.mapRef.Delete(s) + s.mapRef.Lock() + defer s.mapRef.Unlock() + return s.mapRef.DeleteNoLock(s) } func (s *SubscriptionDetails) SetClosing() { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go index 359930643..4692d9538 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go @@ -130,9 +130,9 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool return true } -func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { - sub.Lock() - defer sub.Unlock() + +// Caller has to acquire lock before invoking this method.This is done to avoid possible deadlock +func (sub *SubscriptionsMap) DeleteNoLock(subscription *SubscriptionDetails) error { peerSubscription, ok := sub.items[subscription.PeerID] if !ok { diff --git a/vendor/modules.txt b/vendor/modules.txt index 2cdb548ec..ec587762d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1015,7 +1015,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240628140035-3604bf39ad28 +# github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/filter_manager.go b/wakuv2/filter_manager.go index f0dd90cbe..ce1984a3a 100644 --- a/wakuv2/filter_manager.go +++ b/wakuv2/filter_manager.go @@ -3,6 +3,9 @@ package wakuv2 import ( "context" "sync" + "time" + + "github.com/google/uuid" "github.com/status-im/status-go/wakuv2/common" @@ -15,34 +18,39 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/filter" ) -// Methods on FilterManager maintain filter peer health +// Methods on FilterManager just aggregate filters from application and subscribe to them // -// runFilterLoop is the main event loop +// startFilterSubLoop runs a loop where-in it waits for an interval to batch subscriptions // -// Filter Install/Uninstall events are pushed onto eventChan -// Subscribe, UnsubscribeWithSubscription, IsSubscriptionAlive calls -// are invoked from goroutines and request results pushed onto eventChan +// runFilterSubscriptionLoop runs a loop for receiving messages from underlying subscriptions and invokes onNewEnvelopes // -// filterSubs is the map of filter IDs to subscriptions +// filterConfigs is the map of filer IDs to filter configs +// filterSubscriptions is the map of filter subscription IDs to subscriptions + +const filterSubBatchSize = 90 + +type appFilterMap map[string]filterConfig type FilterManager struct { sync.Mutex - ctx context.Context - cfg *Config - filters map[string]SubDetails // map of filters to apiSub details - onNewEnvelopes func(env *protocol.Envelope) error - logger *zap.Logger - node *filter.WakuFilterLightNode - onlineChecker *onlinechecker.DefaultOnlineChecker - filterQueue chan filterConfig + ctx context.Context + cfg *Config + onlineChecker *onlinechecker.DefaultOnlineChecker + filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details + onNewEnvelopes func(env *protocol.Envelope) error + logger *zap.Logger + node *filter.WakuFilterLightNode + filterSubBatchDuration time.Duration + incompleteFilterBatch map[string]filterConfig + filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} + waitingToSubQueue chan filterConfig } + type SubDetails struct { cancel func() sub *api.Sub } -const filterQueueSize = 1000 - type filterConfig struct { ID string contentFilter protocol.ContentFilter @@ -55,62 +63,131 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, cfg *Config, onNe mgr.logger = logger mgr.cfg = cfg mgr.onNewEnvelopes = onNewEnvelopes - mgr.filters = make(map[string]SubDetails) + mgr.filterSubscriptions = make(map[string]SubDetails) mgr.node = node - mgr.filterQueue = make(chan filterConfig, filterQueueSize) mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) - mgr.node.SetOnlineChecker(mgr.onlineChecker) - + mgr.filterSubBatchDuration = 5 * time.Second + mgr.incompleteFilterBatch = make(map[string]filterConfig) + mgr.filterConfigs = make(appFilterMap) + mgr.waitingToSubQueue = make(chan filterConfig, 100) + go mgr.startFilterSubLoop() return mgr } +func (mgr *FilterManager) startFilterSubLoop() { + ticker := time.NewTicker(mgr.filterSubBatchDuration) + defer ticker.Stop() + for { + select { + case <-mgr.ctx.Done(): + return + case <-ticker.C: + // TODO: Optimization, handle case where 1st addFilter happens just before ticker expires. + if mgr.onlineChecker.IsOnline() { + mgr.Lock() + for _, af := range mgr.incompleteFilterBatch { + mgr.logger.Debug("ticker hit, hence subscribing", zap.String("agg-filter-id", af.ID), zap.Int("batch-size", len(af.contentFilter.ContentTopics)), + zap.Stringer("agg-content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } + mgr.incompleteFilterBatch = make(map[string]filterConfig) + mgr.Unlock() + } + } + } +} + +/* +addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch +once batchlimit is hit, all filters are subscribed to and new batch is created. +if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created +*/ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) { + mgr.logger.Debug("adding filter", zap.String("filter-id", filterID)) + mgr.Lock() defer mgr.Unlock() - contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics) - mgr.logger.Debug("adding filter", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter)) - if mgr.onlineChecker.IsOnline() { - go mgr.subscribeAndRunLoop(filterConfig{filterID, contentFilter}) + afilter, ok := mgr.incompleteFilterBatch[f.PubsubTopic] + if !ok { + //no existing batch for pubsubTopic + mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", f.PubsubTopic)) + cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics) + afilter = filterConfig{uuid.NewString(), cf} + mgr.incompleteFilterBatch[f.PubsubTopic] = afilter + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} } else { - mgr.logger.Debug("queuing filter as not online", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter)) - mgr.filterQueue <- filterConfig{filterID, contentFilter} + mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic)) + if len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics) > filterSubBatchSize { + //filter batch limit is hit + if mgr.onlineChecker.IsOnline() { + //node is online, go ahead and subscribe the batch + mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics))) + go mgr.subscribeAndRunLoop(afilter) + } else { + mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics))) + // queue existing batch as node is not online + mgr.waitingToSubQueue <- afilter + } + cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics) + afilter = filterConfig{uuid.NewString(), cf} + mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Stringer("content-filter", cf)) + mgr.incompleteFilterBatch[f.PubsubTopic] = afilter + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} + } else { + //add to existing batch as batch limit not reached + var contentTopics []string + for _, ct := range maps.Keys(f.ContentTopics) { + afilter.contentFilter.ContentTopics[ct.ContentTopic()] = struct{}{} + contentTopics = append(contentTopics, ct.ContentTopic()) + } + cf := protocol.NewContentFilter(f.PubsubTopic, contentTopics...) + mgr.logger.Debug("adding to existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.Stringer("content-filter", cf), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics))) + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} + } } } func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { ctx, cancel := context.WithCancel(mgr.ctx) config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter} - sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger) mgr.Lock() - mgr.filters[f.ID] = SubDetails{cancel, sub} + mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.Unlock() if err == nil { - mgr.logger.Debug("subscription successful, running loop", zap.String("filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter)) + mgr.logger.Debug("subscription successful, running loop", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter)) mgr.runFilterSubscriptionLoop(sub) } else { - mgr.logger.Error("subscription fail, need to debug issue", zap.String("filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err)) + mgr.logger.Error("subscription fail, need to debug issue", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err)) } } func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) { mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), - zap.Int("filters count", len(mgr.filters)), zap.Int("filter-queue-len", len(mgr.filterQueue))) - //TODO: Needs optimization because only on transition from offline to online should trigger this logic. - if newStatus { //Online - if len(mgr.filterQueue) > 0 { - //Check if any filter subs are pending and subscribe them - for filter := range mgr.filterQueue { - mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", filter.ID), zap.Stringer("content-filter", filter.contentFilter)) - go mgr.subscribeAndRunLoop(filter) - if len(mgr.filterQueue) == 0 { - mgr.logger.Debug("filter queue empty") + zap.Int("agg filters count", len(mgr.filterSubscriptions))) + if newStatus && !mgr.onlineChecker.IsOnline() { //switched from offline to Online + mgr.logger.Debug("switching from offline to online") + mgr.Lock() + if len(mgr.waitingToSubQueue) > 0 { + for af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // Check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + // TODO: Can this cause issues? + mgr.waitingToSubQueue <- af + } + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") break } } } + mgr.Unlock() } mgr.onlineChecker.SetOnline(newStatus) @@ -120,15 +197,24 @@ func (mgr *FilterManager) removeFilter(filterID string) { mgr.Lock() defer mgr.Unlock() mgr.logger.Debug("removing filter", zap.String("filter-id", filterID)) - - subDetails, ok := mgr.filters[filterID] - if ok { - delete(mgr.filters, filterID) - // close goroutine running runFilterSubscriptionLoop - // this will also close api.Sub - subDetails.cancel() - } else { + filterConfig, ok := mgr.filterConfigs[filterID] + if !ok { mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID)) + return + } + af, ok := mgr.filterSubscriptions[filterConfig.ID] + if ok { + delete(mgr.filterConfigs, filterID) + for ct := range filterConfig.contentFilter.ContentTopics { + delete(af.sub.ContentFilter.ContentTopics, ct) + } + if len(af.sub.ContentFilter.ContentTopics) == 0 { + af.cancel() + } else { + go af.sub.Unsubscribe(filterConfig.contentFilter) + } + } else { + mgr.logger.Debug("filter removal: aggregated filter not found", zap.String("filter-id", filterID), zap.String("agg-filter-id", filterConfig.ID)) } } diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 5cb1b2454..cd18b0fc8 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -343,7 +343,7 @@ func TestWakuV2Filter(t *testing.T) { w, err := New(nil, "", config, nil, nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) - + w.filterManager.filterSubBatchDuration = 1 * time.Second options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 10 * time.Second } @@ -371,7 +371,7 @@ func TestWakuV2Filter(t *testing.T) { ContentTopics: common.NewTopicSetFromBytes([][]byte{contentTopicBytes}), } - _, err = w.Subscribe(filter) + fID, err := w.Subscribe(filter) require.NoError(t, err) msgTimestamp := w.timestamp() @@ -415,7 +415,8 @@ func TestWakuV2Filter(t *testing.T) { messages = filter.Retrieve() require.Len(t, messages, 1) - + err = w.Unsubscribe(context.Background(), fID) + require.NoError(t, err) require.NoError(t, w.Stop()) }