fix: use subscription peerIds instead of separate peer slice (#906)

This commit is contained in:
richΛrd 2023-11-20 09:27:22 -04:00 committed by GitHub
parent f0fbe62b8d
commit 49593fd61d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 11 deletions

View File

@ -482,15 +482,18 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
result := &WakuFilterPushResult{} result := &WakuFilterPushResult{}
for pTopic, cTopics := range pubSubTopicMap { for pTopic, cTopics := range pubSubTopicMap {
cFilter := protocol.NewContentFilter(pTopic, cTopics...) cFilter := protocol.NewContentFilter(pTopic, cTopics...)
peers, subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter)
peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter)
for _, sub := range subs { for _, sub := range subs {
sub.Remove(cTopics...) sub.Remove(cTopics...)
peers[sub.PeerID] = struct{}{}
} }
if params.wg != nil { if params.wg != nil {
params.wg.Add(len(peers)) params.wg.Add(len(peers))
} }
// send unsubscribe request to all the peers // send unsubscribe request to all the peers
for _, peerID := range peers { for peerID := range peers {
go func(peerID peer.ID) { go func(peerID peer.ID) {
defer func() { defer func() {
if params.wg != nil { if params.wg != nil {
@ -516,7 +519,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
} }
func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails { func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails {
_, subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{}) subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{})
return subs return subs
} }
@ -581,15 +584,17 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
return nil, err return nil, err
} }
peerIds, subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{}) peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{})
for _, sub := range subs { for _, sub := range subs {
sub.Close() sub.Close()
peers[sub.PeerID] = struct{}{}
} }
result := &WakuFilterPushResult{} result := &WakuFilterPushResult{}
if params.wg != nil { if params.wg != nil {
params.wg.Add(len(peerIds)) params.wg.Add(len(peers))
} }
for _, peerId := range peerIds { for peerId := range peers {
go func(peerID peer.ID) { go func(peerID peer.ID) {
defer func() { defer func() {
if params.wg != nil { if params.wg != nil {

View File

@ -193,16 +193,13 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e
} }
} }
func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) ([]peer.ID, []*SubscriptionDetails) { func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) []*SubscriptionDetails {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
var output []*SubscriptionDetails var output []*SubscriptionDetails
var peerIDs []peer.ID
for _, peerSubs := range m.items { for _, peerSubs := range m.items {
if peerID == "" || peerSubs.PeerID == peerID { if peerID == "" || peerSubs.PeerID == peerID {
peerIDs = append(peerIDs, peerID)
for _, subs := range peerSubs.SubsPerPubsubTopic { for _, subs := range peerSubs.SubsPerPubsubTopic {
for _, subscriptionDetail := range subs { for _, subscriptionDetail := range subs {
if subscriptionDetail.isPartOf(contentFilter) { if subscriptionDetail.isPartOf(contentFilter) {
@ -212,5 +209,5 @@ func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protoco
} }
} }
} }
return peerIDs, output return output
} }