diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go index 09451aef..f7bfa739 100644 --- a/waku/v2/api/filter.go +++ b/waku/v2/api/filter.go @@ -5,6 +5,7 @@ import ( "encoding/json" "time" + "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" @@ -29,35 +30,41 @@ func (fc FilterConfig) String() string { } type Sub struct { - ContentFilter protocol.ContentFilter - DataCh chan *protocol.Envelope - Config FilterConfig - subs subscription.SubscriptionSet - wf *filter.WakuFilterLightNode - ctx context.Context - cancel context.CancelFunc - log *zap.Logger - closing chan string + ContentFilter protocol.ContentFilter + DataCh chan *protocol.Envelope + Config FilterConfig + subs subscription.SubscriptionSet + wf *filter.WakuFilterLightNode + ctx context.Context + cancel context.CancelFunc + log *zap.Logger + closing chan string + isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not. + resubscribeInProgress bool + id string } // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) { sub := new(Sub) + sub.id = uuid.NewString() sub.wf = wf sub.ctx, sub.cancel = context.WithCancel(ctx) sub.subs = make(subscription.SubscriptionSet) sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer) sub.ContentFilter = contentFilter sub.Config = config - sub.log = log.Named("filter-api") - sub.log.Debug("filter subscribe params", zap.Int("maxPeers", config.MaxPeers), zap.Stringer("contentFilter", contentFilter)) - subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) + sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter)) + sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers)) + sub.isNodeOnline = online sub.closing = make(chan string, config.MaxPeers) - if err != nil { - return nil, err + if online { + subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) + if err == nil { + sub.multiplex(subs) + } } - sub.multiplex(subs) - go sub.waitOnSubClose() + go sub.subscriptionLoop() return sub, nil } @@ -65,32 +72,51 @@ func (apiSub *Sub) Unsubscribe() { apiSub.cancel() } -func (apiSub *Sub) waitOnSubClose() { +func (apiSub *Sub) SetNodeState(online bool) { + apiSub.isNodeOnline = online +} + +func (apiSub *Sub) subscriptionLoop() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { + case <-ticker.C: + if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers && + !apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers { + apiSub.closing <- "" + } case <-apiSub.ctx.Done(): - apiSub.log.Debug("apiSub context: Done()") + apiSub.log.Debug("apiSub context: done") apiSub.cleanup() return case subId := <-apiSub.closing: - //trigger closing and resubscribe flow for subscription. - apiSub.closeAndResubscribe(subId) - + apiSub.resubscribeInProgress = true + //trigger resubscribe flow for subscription. + apiSub.checkAndResubscribe(subId) } } } -func (apiSub *Sub) closeAndResubscribe(subId string) { - apiSub.log.Debug("sub closeAndResubscribe", zap.String("subID", subId)) +func (apiSub *Sub) checkAndResubscribe(subId string) { - apiSub.subs[subId].Close() - failedPeer := apiSub.subs[subId].PeerID - delete(apiSub.subs, subId) - apiSub.resubscribe(failedPeer) + var failedPeer peer.ID + if subId != "" { + apiSub.log.Debug("subscription close and resubscribe", zap.String("sub-id", subId), zap.Stringer("content-filter", apiSub.ContentFilter)) + + apiSub.subs[subId].Close() + failedPeer = apiSub.subs[subId].PeerID + delete(apiSub.subs, subId) + } + apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter)) + if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers { + apiSub.resubscribe(failedPeer) + } + apiSub.resubscribeInProgress = false } func (apiSub *Sub) cleanup() { - apiSub.log.Debug("Cleaning up subscription", zap.Stringer("config", apiSub.Config)) + apiSub.log.Debug("cleaning up subscription", zap.Stringer("config", apiSub.Config)) for _, s := range apiSub.subs { _, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s) @@ -100,25 +126,25 @@ func (apiSub *Sub) cleanup() { } } close(apiSub.DataCh) - } // Attempts to resubscribe on topics that lack subscriptions func (apiSub *Sub) resubscribe(failedPeer peer.ID) { // Re-subscribe asynchronously existingSubCount := len(apiSub.subs) - apiSub.log.Debug("subscribing again", zap.Stringer("contentFilter", apiSub.ContentFilter), zap.Int("numPeers", apiSub.Config.MaxPeers-existingSubCount)) + apiSub.log.Debug("subscribing again", zap.Int("num-peers", apiSub.Config.MaxPeers-existingSubCount)) var peersToExclude peer.IDSlice - peersToExclude = append(peersToExclude, failedPeer) + if failedPeer != "" { //little hack, couldn't find a better way to do it + peersToExclude = append(peersToExclude, failedPeer) + } for _, sub := range apiSub.subs { peersToExclude = append(peersToExclude, sub.PeerID) } subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-existingSubCount, peersToExclude...) if err != nil { + apiSub.log.Debug("failed to resubscribe for filter", zap.Error(err)) return - } //Not handling scenario where all requested subs are not received as that will get handled in next cycle. - - apiSub.log.Debug("resubscribe(): before range newSubs") + } //Not handling scenario where all requested subs are not received as that should get handled from user of the API. apiSub.multiplex(subs) } @@ -131,35 +157,34 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int options = append(options, filter.WithPeer(p)) } if len(peersToExclude) > 0 { - apiSub.log.Debug("subscribing with peersToExclude", zap.Stringer("peersToExclude", peersToExclude[0])) + apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude)) options = append(options, filter.WithPeersToExclude(peersToExclude...)) } subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...) if err != nil { + //Inform of error, so that resubscribe can be triggered if required + if len(apiSub.closing) < apiSub.Config.MaxPeers { + apiSub.closing <- "" + } if len(subs) > 0 { - // Partial Failure, for now proceed as we don't expect this to happen wrt specific topics. - // Rather it can happen in case subscription with one of the peer fails. - // This can further get automatically handled at resubscribe, - apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err), zap.Int("successCount", len(subs))) + // Partial Failure, which means atleast 1 subscription is successful + apiSub.log.Debug("partial failure in filter subscribe", zap.Error(err), zap.Int("success-count", len(subs))) return subs, nil } - // In case of complete subscription failure, application or user needs to handle and probably retry based on error - // TODO: Once filter error handling indicates specific error, this can be addressed based on the error at this layer. + // TODO: Once filter error handling indicates specific error, this can be handled better. return nil, err } - return subs, nil } func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { - // Multiplex onto single channel // Goroutines will exit once sub channels are closed for _, subDetails := range subs { apiSub.subs[subDetails.ID] = subDetails go func(subDetails *subscription.SubscriptionDetails) { - apiSub.log.Debug("New multiplex", zap.String("subID", subDetails.ID)) + apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID)) for env := range subDetails.C { apiSub.DataCh <- env } @@ -169,8 +194,7 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { case <-apiSub.ctx.Done(): return case <-subDetails.Closing: - apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID)) - + apiSub.log.Debug("sub closing", zap.String("sub-id", subDetails.ID)) apiSub.closing <- subDetails.ID } }(subDetails) diff --git a/waku/v2/api/filter_test.go b/waku/v2/api/filter_test.go index 83551019..52fa0657 100644 --- a/waku/v2/api/filter_test.go +++ b/waku/v2/api/filter_test.go @@ -48,7 +48,7 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) s.Log.Info("About to perform API Subscribe()") - apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log) + apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log, true) s.Require().NoError(err) s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Log.Info("Subscribed") diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 3186ac49..70edd644 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -591,7 +591,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub.Close() result := &WakuFilterPushResult{} - + wf.log.Debug("unsubscribing subscription", zap.String("sub-id", sub.ID), zap.Stringer("content-filter", sub.ContentFilter)) if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) { // Last sub for this [peer, contentFilter] pair err = wf.unsubscribeFromServer(ctx, params.requestID, sub.PeerID, sub.ContentFilter) @@ -599,6 +599,8 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, Err: err, PeerID: sub.PeerID, }) + wf.log.Debug("unsubscribed subscription", zap.String("sub-id", sub.ID), zap.Stringer("content-filter", sub.ContentFilter), zap.Error(err)) + } return result, err diff --git a/waku/v2/protocol/subscription/subscription_details.go b/waku/v2/protocol/subscription/subscription_details.go index 4ce928db..a36f54ad 100644 --- a/waku/v2/protocol/subscription/subscription_details.go +++ b/waku/v2/protocol/subscription/subscription_details.go @@ -112,6 +112,7 @@ func (s *SubscriptionDetails) SetClosing() { s.Lock() defer s.Unlock() if !s.Closed { + s.Closed = true s.Closing <- true } } diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index 14b3680c..35993064 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -142,6 +142,11 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { contentFilter := subscription.ContentFilter delete(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic], subscription.ID) + if len(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic]) == 0 { + sub.logger.Debug("no more subs for pubsubTopic for this peer", zap.Stringer("id", subscription.PeerID), zap.String("pubsubtopic", contentFilter.PubsubTopic)) + delete(peerSubscription.SubsPerPubsubTopic, contentFilter.PubsubTopic) + } + // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair for contentTopic := range contentFilter.ContentTopics { sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic) diff --git a/waku/v2/protocol/subscription/subscriptions_map_test.go b/waku/v2/protocol/subscription/subscriptions_map_test.go index 42bcd1bc..f5c6d21e 100644 --- a/waku/v2/protocol/subscription/subscriptions_map_test.go +++ b/waku/v2/protocol/subscription/subscriptions_map_test.go @@ -49,9 +49,6 @@ func TestSubscriptionMapAppend(t *testing.T) { err := sub.Close() require.NoError(t, err) require.True(t, sub.Closed) - - err = sub.Close() - require.NoError(t, err) } func TestSubscriptionClear(t *testing.T) {