fix:race in filter peer subscription map iteration and deletion

This commit is contained in:
Prem Chaitanya Prathi 2023-08-05 08:00:04 +05:30
parent 87a7501166
commit 22398b2868
No known key found for this signature in database
1 changed files with 7 additions and 7 deletions

View File

@ -317,10 +317,6 @@ func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilte
wf.subscriptions.items[peerID].subscriptionsPerTopic[contentFilter.Topic] = subscriptionDetailList
}
if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 {
delete(wf.subscriptions.items, peerID)
}
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
@ -344,12 +340,12 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co
localWg := sync.WaitGroup{}
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
var peersUnsubscribed []peer.ID
for peerID := range wf.subscriptions.items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
}
peersUnsubscribed = append(peersUnsubscribed, peerID)
localWg.Add(1)
go func(peerID peer.ID) {
defer localWg.Done()
@ -379,7 +375,11 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co
localWg.Wait()
close(resultChan)
for _, peerID := range peersUnsubscribed {
if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 {
delete(wf.subscriptions.items, peerID)
}
}
return resultChan, nil
}