From 49593fd61df357bee8418223b088f4cee8f2163b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 20 Nov 2023 09:27:22 -0400 Subject: [PATCH] fix: use subscription peerIds instead of separate peer slice (#906) --- waku/v2/protocol/filter/client.go | 17 +++++++++++------ .../protocol/subscription/subscriptions_map.go | 7 ++----- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 20815a65..08f06b71 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -482,15 +482,18 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr result := &WakuFilterPushResult{} for pTopic, cTopics := range pubSubTopicMap { cFilter := protocol.NewContentFilter(pTopic, cTopics...) - peers, subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter) + + peers := make(map[peer.ID]struct{}) + subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter) for _, sub := range subs { sub.Remove(cTopics...) + peers[sub.PeerID] = struct{}{} } if params.wg != nil { params.wg.Add(len(peers)) } // send unsubscribe request to all the peers - for _, peerID := range peers { + for peerID := range peers { go func(peerID peer.ID) { defer func() { if params.wg != nil { @@ -516,7 +519,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr } func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails { - _, subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{}) + subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{}) return subs } @@ -581,15 +584,17 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte return nil, err } - peerIds, subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{}) + peers := make(map[peer.ID]struct{}) + subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{}) for _, sub := range subs { sub.Close() + peers[sub.PeerID] = struct{}{} } result := &WakuFilterPushResult{} if params.wg != nil { - params.wg.Add(len(peerIds)) + params.wg.Add(len(peers)) } - for _, peerId := range peerIds { + for peerId := range peers { go func(peerID peer.ID) { defer func() { if params.wg != nil { diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index d69c6d2f..c007f623 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -193,16 +193,13 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e } } -func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) ([]peer.ID, []*SubscriptionDetails) { +func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) []*SubscriptionDetails { m.RLock() defer m.RUnlock() var output []*SubscriptionDetails - - var peerIDs []peer.ID for _, peerSubs := range m.items { if peerID == "" || peerSubs.PeerID == peerID { - peerIDs = append(peerIDs, peerID) for _, subs := range peerSubs.SubsPerPubsubTopic { for _, subscriptionDetail := range subs { if subscriptionDetail.isPartOf(contentFilter) { @@ -212,5 +209,5 @@ func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protoco } } } - return peerIDs, output + return output }