fix: panic due to closed channel and other fixes(#1115)

This commit is contained in:
Prem Chaitanya Prathi 2024-06-07 15:35:04 +05:30 committed by GitHub
parent d2d2f5672e
commit 389b359e43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 27 additions and 13 deletions

View File

@ -75,6 +75,7 @@ func (apiSub *Sub) waitOnSubClose() {
case subId := <-apiSub.closing:
//trigger closing and resubscribe flow for subscription.
apiSub.closeAndResubscribe(subId)
}
}
}
@ -89,13 +90,9 @@ func (apiSub *Sub) closeAndResubscribe(subId string) {
}
func (apiSub *Sub) cleanup() {
apiSub.log.Debug("ENTER cleanup()")
defer func() {
apiSub.log.Debug("EXIT cleanup()")
}()
apiSub.log.Debug("Cleaning up subscription", zap.Stringer("config", apiSub.Config))
for _, s := range apiSub.subs {
close(s.Closing)
_, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)
if err != nil {
//Logging with info as this is part of cleanup
@ -168,10 +165,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
<-subDetails.Closing
apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID))
select {
case <-apiSub.ctx.Done():
return
case <-subDetails.Closing:
apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID))
apiSub.closing <- subDetails.ID
apiSub.closing <- subDetails.ID
}
}(subDetails)
}
}

View File

@ -88,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
wf.pm = pm
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.peerPingInterval = 5 * time.Second
wf.peerPingInterval = 1 * time.Minute
return wf
}

View File

@ -26,9 +26,8 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer)
for _, subscription := range subscriptions {
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID))
//Indicating that subscription is closing,
close(subscription.Closing)
subscription.SetClosing()
}
}
}

View File

@ -29,7 +29,7 @@ type SubscriptionDetails struct {
mapRef *SubscriptionsMap
Closed bool `json:"-"`
once sync.Once
Closing chan struct{}
Closing chan bool
PeerID peer.ID `json:"peerID"`
ContentFilter protocol.ContentFilter `json:"contentFilters"`
@ -99,6 +99,7 @@ func (s *SubscriptionDetails) CloseC() {
defer s.Unlock()
s.Closed = true
close(s.C)
close(s.Closing)
})
}
@ -107,6 +108,14 @@ func (s *SubscriptionDetails) Close() error {
return s.mapRef.Delete(s)
}
func (s *SubscriptionDetails) SetClosing() {
s.Lock()
defer s.Unlock()
if !s.Closed {
s.Closing <- true
}
}
func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) {
result := struct {
PeerID peer.ID `json:"peerID"`

View File

@ -75,7 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content
PeerID: peerID,
C: make(chan *protocol.Envelope, 1024),
ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
Closing: make(chan struct{}),
Closing: make(chan bool),
}
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
@ -147,6 +147,11 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic)
}
if len(peerSubscription.SubsPerPubsubTopic) == 0 {
sub.logger.Debug("no more subs for peer", zap.Stringer("id", subscription.PeerID))
delete(sub.items, subscription.PeerID)
}
return nil
}