From 22398b286898f2b616c46ae1b5c6b0518aaddcfb Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sat, 5 Aug 2023 08:00:04 +0530 Subject: [PATCH] fix:race in filter peer subscription map iteration and deletion --- waku/v2/protocol/filter/client.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 81ac8733..39b8d65a 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -317,10 +317,6 @@ func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilte wf.subscriptions.items[peerID].subscriptionsPerTopic[contentFilter.Topic] = subscriptionDetailList } - if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 { - delete(wf.subscriptions.items, peerID) - } - } // Unsubscribe is used to stop receiving messages from a peer that match a content filter @@ -344,12 +340,12 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co localWg := sync.WaitGroup{} resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) - + var peersUnsubscribed []peer.ID for peerID := range wf.subscriptions.items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue } - + peersUnsubscribed = append(peersUnsubscribed, peerID) localWg.Add(1) go func(peerID peer.ID) { defer localWg.Done() @@ -379,7 +375,11 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co localWg.Wait() close(resultChan) - + for _, peerID := range peersUnsubscribed { + if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 { + delete(wf.subscriptions.items, peerID) + } + } return resultChan, nil }