From 1c4c3d02e8ee17ece828371776c7f087a9bbd677 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 28 Aug 2023 15:19:02 -0400 Subject: [PATCH] refactor(filter): unsubscribe waitgroup and async --- waku/v2/protocol/filter/client.go | 79 +++++++++++++++++++----------- waku/v2/protocol/filter/options.go | 19 +++++++ 2 files changed, 70 insertions(+), 28 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 6f59c655..28721b18 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -341,17 +341,33 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co return nil, err } - localWg := sync.WaitGroup{} resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) - var peersUnsubscribed []peer.ID for peerID := range wf.subscriptions.items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue } - peersUnsubscribed = append(peersUnsubscribed, peerID) - localWg.Add(1) + + subscriptions, ok := wf.subscriptions.items[peerID] + if !ok || subscriptions == nil { + continue + } + + wf.cleanupSubscriptions(peerID, contentFilter) + if len(subscriptions.subscriptionsPerTopic) == 0 { + delete(wf.subscriptions.items, peerID) + } + + if params.wg != nil { + params.wg.Add(1) + } + go func(peerID peer.ID) { - defer localWg.Done() + defer func() { + if params.wg != nil { + params.wg.Done() + } + }() + err := wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, @@ -367,22 +383,19 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co } } - wf.cleanupSubscriptions(peerID, contentFilter) - - resultChan <- WakuFilterPushResult{ - Err: err, - PeerID: peerID, + if params.wg != nil { + resultChan <- WakuFilterPushResult{ + Err: err, + PeerID: peerID, + } } }(peerID) } - localWg.Wait() - close(resultChan) - for _, peerID := range peersUnsubscribed { - if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 { - delete(wf.subscriptions.items, peerID) - } + if params.wg != nil { + params.wg.Wait() } + return resultChan, nil } @@ -408,19 +421,26 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte wf.subscriptions.Lock() defer wf.subscriptions.Unlock() - localWg := sync.WaitGroup{} resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) - var peersUnsubscribed []peer.ID for peerID := range wf.subscriptions.items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue } - peersUnsubscribed = append(peersUnsubscribed, peerID) - localWg.Add(1) + delete(wf.subscriptions.items, peerID) + + if params.wg != nil { + params.wg.Add(1) + } + go func(peerID peer.ID) { - defer localWg.Done() + defer func() { + if params.wg != nil { + params.wg.Done() + } + }() + err := wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, @@ -429,17 +449,20 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte if err != nil { wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } - resultChan <- WakuFilterPushResult{ - Err: err, - PeerID: peerID, + if params.wg != nil { + resultChan <- WakuFilterPushResult{ + Err: err, + PeerID: peerID, + } } }(peerID) } - localWg.Wait() - close(resultChan) - for _, peerID := range peersUnsubscribed { - delete(wf.subscriptions.items, peerID) + if params.wg != nil { + params.wg.Wait() } + + close(resultChan) + return resultChan, nil } diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 188638b9..b58043c7 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -2,6 +2,7 @@ package filter import ( "context" + "sync" "time" "github.com/libp2p/go-libp2p/core/host" @@ -26,6 +27,7 @@ type ( selectedPeer peer.ID requestID []byte log *zap.Logger + wg *sync.WaitGroup } FilterParameters struct { @@ -135,9 +137,26 @@ func AutomaticRequestId() FilterUnsubscribeOption { } } +// WithWaitGroup allos specigying a waitgroup to wait until all +// unsubscribe requests are complete before the function is complete +func WithWaitGroup(wg *sync.WaitGroup) FilterUnsubscribeOption { + return func(params *FilterUnsubscribeParameters) { + params.wg = wg + } +} + +// Async is used to fire and forget an unsubscription, and don't +// care about the results of it +func Async() FilterUnsubscribeOption { + return func(params *FilterUnsubscribeParameters) { + params.wg = nil + } +} + func DefaultUnsubscribeOptions() []FilterUnsubscribeOption { return []FilterUnsubscribeOption{ AutomaticRequestId(), + WithWaitGroup(&sync.WaitGroup{}), } }