diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go index 2798d137..b9d64310 100644 --- a/waku/v2/api/filter.go +++ b/waku/v2/api/filter.go @@ -71,8 +71,12 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte return sub, nil } -func (apiSub *Sub) Unsubscribe() { - apiSub.cancel() +func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { + _, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter) + //Not reading result unless we want to do specific error handling? + if err != nil { + apiSub.log.Debug("failed to unsubscribe", zap.Error(err), zap.Stringer("content-filter", contentFilter)) + } } func (apiSub *Sub) subscriptionLoop() { diff --git a/waku/v2/api/filter_test.go b/waku/v2/api/filter_test.go index 83551019..ff22fb75 100644 --- a/waku/v2/api/filter_test.go +++ b/waku/v2/api/filter_test.go @@ -46,9 +46,9 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Require().Equal(apiConfig.MaxPeers, 2) s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) - + ctx, cancel := context.WithCancel(context.Background()) s.Log.Info("About to perform API Subscribe()") - apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log) + apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log) s.Require().NoError(err) s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Log.Info("Subscribed") @@ -89,7 +89,8 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Require().NotEqual(fullNodeData2.FullNodeHost.ID(), sub.PeerID) } - apiSub.Unsubscribe() + apiSub.Unsubscribe(contentFilter) + cancel() for range apiSub.DataCh { } s.Log.Info("DataCh is closed") diff --git a/waku/v2/protocol/subscription/subscription_details.go b/waku/v2/protocol/subscription/subscription_details.go index a36f54ad..b94f38ef 100644 --- a/waku/v2/protocol/subscription/subscription_details.go +++ b/waku/v2/protocol/subscription/subscription_details.go @@ -37,6 +37,8 @@ type SubscriptionDetails struct { } func (s *SubscriptionDetails) Add(contentTopics ...string) { + s.mapRef.Lock() + defer s.mapRef.Unlock() s.Lock() defer s.Unlock() @@ -44,14 +46,14 @@ func (s *SubscriptionDetails) Add(contentTopics ...string) { if _, ok := s.ContentFilter.ContentTopics[ct]; !ok { s.ContentFilter.ContentTopics[ct] = struct{}{} // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair - s.mapRef.Lock() s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct) - s.mapRef.Unlock() } } } func (s *SubscriptionDetails) Remove(contentTopics ...string) { + s.mapRef.Lock() + defer s.mapRef.Unlock() s.Lock() defer s.Unlock() @@ -59,15 +61,13 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) { if _, ok := s.ContentFilter.ContentTopics[ct]; ok { delete(s.ContentFilter.ContentTopics, ct) // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair - s.mapRef.Lock() s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct) - s.mapRef.Unlock() } } if len(s.ContentFilter.ContentTopics) == 0 { // err doesn't matter - _ = s.mapRef.Delete(s) + _ = s.mapRef.DeleteNoLock(s) } } @@ -105,7 +105,9 @@ func (s *SubscriptionDetails) CloseC() { func (s *SubscriptionDetails) Close() error { s.CloseC() - return s.mapRef.Delete(s) + s.mapRef.Lock() + defer s.mapRef.Unlock() + return s.mapRef.DeleteNoLock(s) } func (s *SubscriptionDetails) SetClosing() { diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index 35993064..4692d953 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -130,9 +130,9 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool return true } -func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { - sub.Lock() - defer sub.Unlock() + +// Caller has to acquire lock before invoking this method.This is done to avoid possible deadlock +func (sub *SubscriptionsMap) DeleteNoLock(subscription *SubscriptionDetails) error { peerSubscription, ok := sub.items[subscription.PeerID] if !ok {