From a0bc53c67956d86a66d1f38b643535dc63adff42 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Sat, 4 Nov 2023 14:16:24 +0700 Subject: [PATCH] fix(subscription-map): uniform operations and encapsulation (#853) * fix(subscription-map): uniform operations and encapsulation * nit: fixes based on comments --- cmd/waku/server/rest/filter.go | 11 +- library/filter.go | 20 +- waku/v2/protocol/content_filter.go | 6 +- waku/v2/protocol/filter/client.go | 172 +++++++----------- waku/v2/protocol/filter/filter_test.go | 10 +- .../subscription/subscription_details.go | 122 +++++++++++++ .../subscription/subscriptions_map.go | 160 +++++++--------- 7 files changed, 273 insertions(+), 228 deletions(-) create mode 100644 waku/v2/protocol/subscription/subscription_details.go diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index 61cea4c9..2b2c9d31 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -171,7 +171,7 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { } // unsubscribe on filter - errCh, err := s.node.FilterLightnode().Unsubscribe( + result, err := s.node.FilterLightnode().Unsubscribe( req.Context(), protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...), filter.WithRequestID(message.RequestId), @@ -190,14 +190,17 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { // on success writeResponse(w, filterSubscriptionResponse{ RequestId: message.RequestId, - StatusDesc: s.unsubscribeGetMessage(errCh), + StatusDesc: s.unsubscribeGetMessage(result), }, http.StatusOK) } -func (s *FilterService) unsubscribeGetMessage(ch <-chan filter.WakuFilterPushResult) string { +func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResult) string { + if result == nil { + return http.StatusText(http.StatusOK) + } var peerIds string ind := 0 - for entry := range ch { + for _, entry := range result.Errors() { s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err)) if ind != 0 { peerIds += ", " diff --git a/library/filter.go b/library/filter.go index 68e4d61c..f66d2d98 100644 --- a/library/filter.go +++ b/library/filter.go @@ -150,14 +150,16 @@ func FilterUnsubscribe(filterJSON string, peerID string, ms int) error { return errors.New("peerID is required") } - pushResult, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...) + result, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...) if err != nil { return err } - result := <-pushResult - - return result.Err + errs := result.Errors() + if len(errs) == 0 { + return nil + } + return errs[0].Err } type unsubscribeAllResult struct { @@ -192,19 +194,19 @@ func FilterUnsubscribeAll(peerID string, ms int) (string, error) { fOptions = append(fOptions, filter.UnsubscribeAll()) } - pushResult, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...) + result, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...) if err != nil { return "", err } var unsubscribeResult []unsubscribeAllResult - for result := range pushResult { + for _, err := range result.Errors() { ur := unsubscribeAllResult{ - PeerID: result.PeerID.Pretty(), + PeerID: err.PeerID.Pretty(), } - if result.Err != nil { - ur.Error = result.Err.Error() + if err.Err != nil { + ur.Error = err.Err.Error() } unsubscribeResult = append(unsubscribeResult, ur) } diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index 795e3ff6..7ea7f2a5 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -15,6 +15,10 @@ func NewContentTopicSet(contentTopics ...string) ContentTopicSet { return s } +func (cf ContentTopicSet) ToList() []string { + return maps.Keys(cf) +} + // ContentFilter is used to specify the filter to be applied for a FilterNode. // Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding. // ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding) @@ -25,7 +29,7 @@ type ContentFilter struct { } func (cf ContentFilter) ContentTopicsList() []string { - return maps.Keys(cf.ContentTopics) + return cf.ContentTopics.ToList() } func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter { diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index ca4fcf51..a014d095 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -8,6 +8,7 @@ import ( "math" "net/http" "strings" + "sync" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -46,11 +47,27 @@ type WakuFilterLightNode struct { pm *peermanager.PeerManager } -type WakuFilterPushResult struct { +type WakuFilterPushError struct { Err error PeerID peer.ID } +type WakuFilterPushResult struct { + errs []WakuFilterPushError + sync.RWMutex +} + +func (arr *WakuFilterPushResult) Add(err WakuFilterPushError) { + arr.Lock() + defer arr.Unlock() + arr.errs = append(arr.errs, err) +} +func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError { + arr.RLock() + defer arr.RUnlock() + return arr.errs +} + // NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options // Note that broadcaster is optional. // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. @@ -95,7 +112,7 @@ func (wf *WakuFilterLightNode) Stop() { wf.log.Warn("unsubscribing from full nodes", zap.Error(err)) } - for r := range res { + for _, r := range res.Errors() { if r.Err != nil { wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID)) } @@ -395,59 +412,8 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip return wf.Ping(ctx, subscription.PeerID) } -func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails { - wf.RLock() - defer wf.RUnlock() - if err := wf.ErrOnNotRunning(); err != nil { - return nil - } - - wf.subscriptions.RLock() - defer wf.subscriptions.RUnlock() - - var output []*subscription.SubscriptionDetails - - for _, peerSubscription := range wf.subscriptions.Items { - for _, subscriptions := range peerSubscription.SubsPerPubsubTopic { - for _, subscriptionDetail := range subscriptions { - output = append(output, subscriptionDetail) - } - } - } - - return output -} - -func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter protocol.ContentFilter) { - wf.subscriptions.Lock() - defer wf.subscriptions.Unlock() - - peerSubscription, ok := wf.subscriptions.Items[peerID] - if !ok { - return - } - - subscriptionDetailList, ok := peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic] - if !ok { - return - } - - for subscriptionDetailID, subscriptionDetail := range subscriptionDetailList { - subscriptionDetail.Remove(contentFilter.ContentTopicsList()...) - if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 { - delete(subscriptionDetailList, subscriptionDetailID) - subscriptionDetail.CloseC() - } - } - - if len(subscriptionDetailList) == 0 { - delete(wf.subscriptions.Items[peerID].SubsPerPubsubTopic, contentFilter.PubsubTopic) - } - -} - // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -475,28 +441,18 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr if err != nil { return nil, err } - resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items)) + result := &WakuFilterPushResult{} for pTopic, cTopics := range pubSubTopicMap { cFilter := protocol.NewContentFilter(pTopic, cTopics...) - for peerID := range wf.subscriptions.Items { - if params.selectedPeer != "" && peerID != params.selectedPeer { - continue - } - - subscriptions, ok := wf.subscriptions.Items[peerID] - if !ok || subscriptions == nil { - continue - } - - wf.cleanupSubscriptions(peerID, cFilter) - if len(subscriptions.SubsPerPubsubTopic) == 0 { - delete(wf.subscriptions.Items, peerID) - } - - if params.wg != nil { - params.wg.Add(1) - } - + peers, subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter) + for _, sub := range subs { + sub.Remove(cTopics...) + } + if params.wg != nil { + params.wg.Add(len(peers)) + } + // send unsubscribe request to all the peers + for _, peerID := range peers { go func(peerID peer.ID) { defer func() { if params.wg != nil { @@ -506,10 +462,10 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr err := wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, cFilter) if params.wg != nil { - resultChan <- WakuFilterPushResult{ + result.Add(WakuFilterPushError{ Err: err, PeerID: peerID, - } + }) } }(peerID) } @@ -518,16 +474,19 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr params.wg.Wait() } - close(resultChan) + return result, nil +} - return resultChan, nil +func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails { + _, subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{}) + return subs } // UnsubscribeWithSubscription is used to close a particular subscription // If there are no more subscriptions matching the passed [peer, contentFilter] pair, // server unsubscribe is also performed func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails, - opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { + opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -542,20 +501,18 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, // Close this sub sub.Close() - resultChan := make(chan WakuFilterPushResult, 1) + result := &WakuFilterPushResult{} if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) { // Last sub for this [peer, contentFilter] pair - paramsCopy := params.Copy() - paramsCopy.selectedPeer = sub.PeerID - err = wf.unsubscribeFromServer(ctx, paramsCopy, sub.ContentFilter) - resultChan <- WakuFilterPushResult{ + params.selectedPeer = sub.PeerID + err = wf.unsubscribeFromServer(ctx, params, sub.ContentFilter) + result.Add(WakuFilterPushError{ Err: err, PeerID: sub.PeerID, - } + }) } - close(resultChan) - return resultChan, err + return result, err } @@ -573,28 +530,23 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params return err } -func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { +// close all subscribe for selectedPeer or if selectedPeer == "", then all peers +// send the unsubscribeAll request to the peers +func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err } - wf.subscriptions.Lock() - defer wf.subscriptions.Unlock() - - resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items)) - - for peerID := range wf.subscriptions.Items { - if params.selectedPeer != "" && peerID != params.selectedPeer { - continue - } - - delete(wf.subscriptions.Items, peerID) - - if params.wg != nil { - params.wg.Add(1) - } - + peerIds, subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{}) + for _, sub := range subs { + sub.Close() + } + result := &WakuFilterPushResult{} + if params.wg != nil { + params.wg.Add(len(peerIds)) + } + for _, peerId := range peerIds { go func(peerID peer.ID) { defer func() { if params.wg != nil { @@ -613,25 +565,23 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } if params.wg != nil { - resultChan <- WakuFilterPushResult{ + result.Add(WakuFilterPushError{ Err: err, PeerID: peerID, - } + }) } - }(peerID) + }(peerId) } if params.wg != nil { params.wg.Wait() } - close(resultChan) - - return resultChan, nil + return result, nil } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index c93db55b..334e9f8b 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -177,7 +177,10 @@ func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.Subscr s.log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic)) for i := 0; i < msgCount; i++ { select { - case env := <-sub.C: + case env, ok := <-sub.C: + if !ok { + continue + } received := WakuMsg{ pubSubTopic: env.PubsubTopic(), contentTopic: env.Message().GetContentTopic(), @@ -422,10 +425,9 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) s.Require().NoError(err) - ch, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, DontWait()) - _, open := <-ch + result, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, DontWait()) s.Require().NoError(err) - s.Require().False(open) + s.Require().Equal(0, len(result.Errors())) _, err = s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) s.Require().NoError(err) diff --git a/waku/v2/protocol/subscription/subscription_details.go b/waku/v2/protocol/subscription/subscription_details.go new file mode 100644 index 00000000..936aeecd --- /dev/null +++ b/waku/v2/protocol/subscription/subscription_details.go @@ -0,0 +1,122 @@ +package subscription + +import ( + "encoding/json" + "sync" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +// Map of SubscriptionDetails.ID to subscriptions +type SubscriptionSet map[string]*SubscriptionDetails + +type PeerSubscription struct { + PeerID peer.ID + SubsPerPubsubTopic map[string]SubscriptionSet +} + +type PeerContentFilter struct { + PeerID peer.ID `json:"peerID"` + PubsubTopic string `json:"pubsubTopics"` + ContentTopics []string `json:"contentTopics"` +} + +type SubscriptionDetails struct { + sync.RWMutex + + ID string `json:"subscriptionID"` + mapRef *SubscriptionsMap + Closed bool `json:"-"` + once sync.Once + + PeerID peer.ID `json:"peerID"` + ContentFilter protocol.ContentFilter `json:"contentFilters"` + C chan *protocol.Envelope `json:"-"` +} + +func (s *SubscriptionDetails) Add(contentTopics ...string) { + s.Lock() + defer s.Unlock() + + for _, ct := range contentTopics { + if _, ok := s.ContentFilter.ContentTopics[ct]; !ok { + s.ContentFilter.ContentTopics[ct] = struct{}{} + // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair + s.mapRef.Lock() + s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct) + s.mapRef.Unlock() + } + } +} + +func (s *SubscriptionDetails) Remove(contentTopics ...string) { + s.Lock() + defer s.Unlock() + + for _, ct := range contentTopics { + if _, ok := s.ContentFilter.ContentTopics[ct]; ok { + delete(s.ContentFilter.ContentTopics, ct) + // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair + s.mapRef.Lock() + s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct) + s.mapRef.Unlock() + } + } + + if len(s.ContentFilter.ContentTopics) == 0 { + // err doesn't matter + _ = s.mapRef.Delete(s) + } +} + +// C1 if contentFilter is empty, it means that given subscription is part of contentFilter +// C2 if not empty, check matching pubsubsTopic and atleast 1 contentTopic +func (s *SubscriptionDetails) isPartOf(contentFilter protocol.ContentFilter) bool { + s.RLock() + defer s.RUnlock() + if contentFilter.PubsubTopic != "" && // C1 + s.ContentFilter.PubsubTopic != contentFilter.PubsubTopic { // C2 + return false + } + // C1 + if len(contentFilter.ContentTopics) == 0 { + return true + } + // C2 + for cTopic := range contentFilter.ContentTopics { + if _, ok := s.ContentFilter.ContentTopics[cTopic]; ok { + return true + } + } + return false +} + +func (s *SubscriptionDetails) CloseC() { + s.once.Do(func() { + s.Lock() + defer s.Unlock() + + s.Closed = true + close(s.C) + }) +} + +func (s *SubscriptionDetails) Close() error { + s.CloseC() + return s.mapRef.Delete(s) +} + +func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) { + result := struct { + PeerID peer.ID `json:"peerID"` + PubsubTopic string `json:"pubsubTopics"` + ContentTopics []string `json:"contentTopics"` + }{ + PeerID: s.PeerID, + PubsubTopic: s.ContentFilter.PubsubTopic, + ContentTopics: s.ContentFilter.ContentTopics.ToList(), + } + + return json.Marshal(result) +} diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index 92f57e2d..d69c6d2f 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -1,7 +1,6 @@ package subscription import ( - "encoding/json" "errors" "sync" @@ -12,53 +11,51 @@ import ( "golang.org/x/exp/maps" ) -type SubscriptionDetails struct { - sync.RWMutex - - ID string `json:"subscriptionID"` - mapRef *SubscriptionsMap - Closed bool `json:"-"` - once sync.Once - - PeerID peer.ID `json:"peerID"` - ContentFilter protocol.ContentFilter `json:"contentFilters"` - C chan *protocol.Envelope `json:"-"` -} - -// Map of SubscriptionDetails.ID to subscriptions -type SubscriptionSet map[string]*SubscriptionDetails - -type PeerSubscription struct { - PeerID peer.ID - SubsPerPubsubTopic map[string]SubscriptionSet -} - type SubscriptionsMap struct { sync.RWMutex - logger *zap.Logger - Items map[peer.ID]*PeerSubscription + logger *zap.Logger + items map[peer.ID]*PeerSubscription + noOfSubs map[string]map[string]int } var ErrNotFound = errors.New("not found") func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap { return &SubscriptionsMap{ - logger: logger.Named("subscription-map"), - Items: make(map[peer.ID]*PeerSubscription), + logger: logger.Named("subscription-map"), + items: make(map[peer.ID]*PeerSubscription), + noOfSubs: map[string]map[string]int{}, } } +func (m *SubscriptionsMap) IsListening(pubsubTopic, contentTopic string) bool { + m.RLock() + defer m.RUnlock() + return m.noOfSubs[pubsubTopic] != nil && m.noOfSubs[pubsubTopic][contentTopic] > 0 +} + +func (m *SubscriptionsMap) increaseSubFor(pubsubTopic, contentTopic string) { + if m.noOfSubs[pubsubTopic] == nil { + m.noOfSubs[pubsubTopic] = map[string]int{} + } + m.noOfSubs[pubsubTopic][contentTopic] = m.noOfSubs[pubsubTopic][contentTopic] + 1 +} + +func (m *SubscriptionsMap) decreaseSubFor(pubsubTopic, contentTopic string) { + m.noOfSubs[pubsubTopic][contentTopic] = m.noOfSubs[pubsubTopic][contentTopic] - 1 +} + func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.ContentFilter) *SubscriptionDetails { sub.Lock() defer sub.Unlock() - peerSubscription, ok := sub.Items[peerID] + peerSubscription, ok := sub.items[peerID] if !ok { peerSubscription = &PeerSubscription{ PeerID: peerID, SubsPerPubsubTopic: make(map[string]SubscriptionSet), } - sub.Items[peerID] = peerSubscription + sub.items[peerID] = peerSubscription } _, ok = peerSubscription.SubsPerPubsubTopic[cf.PubsubTopic] @@ -74,7 +71,12 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, } - sub.Items[peerID].SubsPerPubsubTopic[cf.PubsubTopic][details.ID] = details + // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair + for contentTopic := range cf.ContentTopics { + sub.increaseSubFor(cf.PubsubTopic, contentTopic) + } + + sub.items[peerID].SubsPerPubsubTopic[cf.PubsubTopic][details.ID] = details return details } @@ -83,7 +85,7 @@ func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool { sub.RLock() defer sub.RUnlock() - _, ok := sub.Items[peerID] + _, ok := sub.items[peerID] return ok } @@ -93,7 +95,7 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool defer sub.RUnlock() // Check if peer exits - peerSubscription, ok := sub.Items[peerID] + peerSubscription, ok := sub.items[peerID] if !ok { return false } @@ -125,67 +127,24 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { sub.Lock() defer sub.Unlock() - peerSubscription, ok := sub.Items[subscription.PeerID] + peerSubscription, ok := sub.items[subscription.PeerID] if !ok { return ErrNotFound } - delete(peerSubscription.SubsPerPubsubTopic[subscription.ContentFilter.PubsubTopic], subscription.ID) + contentFilter := subscription.ContentFilter + delete(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic], subscription.ID) + + // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair + for contentTopic := range contentFilter.ContentTopics { + sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic) + } return nil } -func (s *SubscriptionDetails) Add(contentTopics ...string) { - s.Lock() - defer s.Unlock() - - for _, ct := range contentTopics { - s.ContentFilter.ContentTopics[ct] = struct{}{} - } -} - -func (s *SubscriptionDetails) Remove(contentTopics ...string) { - s.Lock() - defer s.Unlock() - - for _, ct := range contentTopics { - delete(s.ContentFilter.ContentTopics, ct) - } -} - -func (s *SubscriptionDetails) CloseC() { - s.once.Do(func() { - s.Lock() - defer s.Unlock() - - s.Closed = true - close(s.C) - }) -} - -func (s *SubscriptionDetails) Close() error { - s.CloseC() - return s.mapRef.Delete(s) -} - -func (s *SubscriptionDetails) Clone() *SubscriptionDetails { - s.RLock() - defer s.RUnlock() - - result := &SubscriptionDetails{ - ID: uuid.NewString(), - mapRef: s.mapRef, - Closed: false, - PeerID: s.PeerID, - ContentFilter: protocol.ContentFilter{PubsubTopic: s.ContentFilter.PubsubTopic, ContentTopics: maps.Clone(s.ContentFilter.ContentTopics)}, - C: make(chan *protocol.Envelope), - } - - return result -} - func (sub *SubscriptionsMap) clear() { - for _, peerSubscription := range sub.Items { + for _, peerSubscription := range sub.items { for _, subscriptionSet := range peerSubscription.SubsPerPubsubTopic { for _, subscription := range subscriptionSet { subscription.CloseC() @@ -193,7 +152,7 @@ func (sub *SubscriptionsMap) clear() { } } - sub.Items = make(map[peer.ID]*PeerSubscription) + sub.items = make(map[peer.ID]*PeerSubscription) } func (sub *SubscriptionsMap) Clear() { @@ -206,7 +165,7 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) sub.RLock() defer sub.RUnlock() - subscriptions, ok := sub.Items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()] + subscriptions, ok := sub.items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()] if ok { iterateSubscriptionSet(sub.logger, subscriptions, envelope) } @@ -234,21 +193,24 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e } } -func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) { - type resultType struct { - PeerID string `json:"peerID"` - PubsubTopic string `json:"pubsubTopic"` - ContentTopics []string `json:"contentTopics"` - } +func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) ([]peer.ID, []*SubscriptionDetails) { + m.RLock() + defer m.RUnlock() - result := resultType{ - PeerID: s.PeerID.Pretty(), - PubsubTopic: s.ContentFilter.PubsubTopic, - } + var output []*SubscriptionDetails - for c := range s.ContentFilter.ContentTopics { - result.ContentTopics = append(result.ContentTopics, c) + var peerIDs []peer.ID + for _, peerSubs := range m.items { + if peerID == "" || peerSubs.PeerID == peerID { + peerIDs = append(peerIDs, peerID) + for _, subs := range peerSubs.SubsPerPubsubTopic { + for _, subscriptionDetail := range subs { + if subscriptionDetail.isPartOf(contentFilter) { + output = append(output, subscriptionDetail) + } + } + } + } } - - return json.Marshal(result) + return peerIDs, output }