diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go index 7281b915..09451aef 100644 --- a/waku/v2/api/filter.go +++ b/waku/v2/api/filter.go @@ -75,6 +75,7 @@ func (apiSub *Sub) waitOnSubClose() { case subId := <-apiSub.closing: //trigger closing and resubscribe flow for subscription. apiSub.closeAndResubscribe(subId) + } } } @@ -89,13 +90,9 @@ func (apiSub *Sub) closeAndResubscribe(subId string) { } func (apiSub *Sub) cleanup() { - apiSub.log.Debug("ENTER cleanup()") - defer func() { - apiSub.log.Debug("EXIT cleanup()") - }() + apiSub.log.Debug("Cleaning up subscription", zap.Stringer("config", apiSub.Config)) for _, s := range apiSub.subs { - close(s.Closing) _, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s) if err != nil { //Logging with info as this is part of cleanup @@ -168,10 +165,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { } }(subDetails) go func(subDetails *subscription.SubscriptionDetails) { - <-subDetails.Closing - apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID)) + select { + case <-apiSub.ctx.Done(): + return + case <-subDetails.Closing: + apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID)) - apiSub.closing <- subDetails.ID + apiSub.closing <- subDetails.ID + } }(subDetails) } } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index edbba8d3..3186ac49 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -88,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM wf.pm = pm wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) - wf.peerPingInterval = 5 * time.Second + wf.peerPingInterval = 1 * time.Minute return wf } diff --git a/waku/v2/protocol/filter/filter_health_check.go b/waku/v2/protocol/filter/filter_health_check.go index 11b9a720..836175b5 100644 --- a/waku/v2/protocol/filter/filter_health_check.go +++ b/waku/v2/protocol/filter/filter_health_check.go @@ -26,9 +26,8 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) for _, subscription := range subscriptions { wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) - //Indicating that subscription is closing, - close(subscription.Closing) + subscription.SetClosing() } } } diff --git a/waku/v2/protocol/subscription/subscription_details.go b/waku/v2/protocol/subscription/subscription_details.go index f2ec8870..4ce928db 100644 --- a/waku/v2/protocol/subscription/subscription_details.go +++ b/waku/v2/protocol/subscription/subscription_details.go @@ -29,7 +29,7 @@ type SubscriptionDetails struct { mapRef *SubscriptionsMap Closed bool `json:"-"` once sync.Once - Closing chan struct{} + Closing chan bool PeerID peer.ID `json:"peerID"` ContentFilter protocol.ContentFilter `json:"contentFilters"` @@ -99,6 +99,7 @@ func (s *SubscriptionDetails) CloseC() { defer s.Unlock() s.Closed = true close(s.C) + close(s.Closing) }) } @@ -107,6 +108,14 @@ func (s *SubscriptionDetails) Close() error { return s.mapRef.Delete(s) } +func (s *SubscriptionDetails) SetClosing() { + s.Lock() + defer s.Unlock() + if !s.Closed { + s.Closing <- true + } +} + func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) { result := struct { PeerID peer.ID `json:"peerID"` diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index c308d9bb..14b3680c 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -75,7 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content PeerID: peerID, C: make(chan *protocol.Envelope, 1024), ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, - Closing: make(chan struct{}), + Closing: make(chan bool), } // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair @@ -147,6 +147,11 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic) } + if len(peerSubscription.SubsPerPubsubTopic) == 0 { + sub.logger.Debug("no more subs for peer", zap.Stringer("id", subscription.PeerID)) + delete(sub.items, subscription.PeerID) + } + return nil }