refactor(filter): unsubscribe waitgroup and async

This commit is contained in:
Richard Ramos 2023-08-28 15:19:02 -04:00
parent 6463dbeb70
commit 1c4c3d02e8
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
2 changed files with 70 additions and 28 deletions

View File

@ -341,17 +341,33 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
return nil, err
}
localWg := sync.WaitGroup{}
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
var peersUnsubscribed []peer.ID
for peerID := range wf.subscriptions.items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
}
peersUnsubscribed = append(peersUnsubscribed, peerID)
localWg.Add(1)
subscriptions, ok := wf.subscriptions.items[peerID]
if !ok || subscriptions == nil {
continue
}
wf.cleanupSubscriptions(peerID, contentFilter)
if len(subscriptions.subscriptionsPerTopic) == 0 {
delete(wf.subscriptions.items, peerID)
}
if params.wg != nil {
params.wg.Add(1)
}
go func(peerID peer.ID) {
defer localWg.Done()
defer func() {
if params.wg != nil {
params.wg.Done()
}
}()
err := wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
@ -367,22 +383,19 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
}
}
wf.cleanupSubscriptions(peerID, contentFilter)
resultChan <- WakuFilterPushResult{
Err: err,
PeerID: peerID,
if params.wg != nil {
resultChan <- WakuFilterPushResult{
Err: err,
PeerID: peerID,
}
}
}(peerID)
}
localWg.Wait()
close(resultChan)
for _, peerID := range peersUnsubscribed {
if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 {
delete(wf.subscriptions.items, peerID)
}
if params.wg != nil {
params.wg.Wait()
}
return resultChan, nil
}
@ -408,19 +421,26 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()
localWg := sync.WaitGroup{}
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
var peersUnsubscribed []peer.ID
for peerID := range wf.subscriptions.items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
}
peersUnsubscribed = append(peersUnsubscribed, peerID)
localWg.Add(1)
delete(wf.subscriptions.items, peerID)
if params.wg != nil {
params.wg.Add(1)
}
go func(peerID peer.ID) {
defer localWg.Done()
defer func() {
if params.wg != nil {
params.wg.Done()
}
}()
err := wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
@ -429,17 +449,20 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte
if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
resultChan <- WakuFilterPushResult{
Err: err,
PeerID: peerID,
if params.wg != nil {
resultChan <- WakuFilterPushResult{
Err: err,
PeerID: peerID,
}
}
}(peerID)
}
localWg.Wait()
close(resultChan)
for _, peerID := range peersUnsubscribed {
delete(wf.subscriptions.items, peerID)
if params.wg != nil {
params.wg.Wait()
}
close(resultChan)
return resultChan, nil
}

View File

@ -2,6 +2,7 @@ package filter
import (
"context"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
@ -26,6 +27,7 @@ type (
selectedPeer peer.ID
requestID []byte
log *zap.Logger
wg *sync.WaitGroup
}
FilterParameters struct {
@ -135,9 +137,26 @@ func AutomaticRequestId() FilterUnsubscribeOption {
}
}
// WithWaitGroup allos specigying a waitgroup to wait until all
// unsubscribe requests are complete before the function is complete
func WithWaitGroup(wg *sync.WaitGroup) FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.wg = wg
}
}
// Async is used to fire and forget an unsubscription, and don't
// care about the results of it
func Async() FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.wg = nil
}
}
func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
return []FilterUnsubscribeOption{
AutomaticRequestId(),
WithWaitGroup(&sync.WaitGroup{}),
}
}