fix: filter sub issues (#1123)

This commit is contained in:
Prem Chaitanya Prathi 2024-06-12 18:33:57 +05:30 committed by GitHub
parent 389b359e43
commit 32da07cad9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 81 additions and 52 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"time" "time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
@ -29,35 +30,41 @@ func (fc FilterConfig) String() string {
} }
type Sub struct { type Sub struct {
ContentFilter protocol.ContentFilter ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope DataCh chan *protocol.Envelope
Config FilterConfig Config FilterConfig
subs subscription.SubscriptionSet subs subscription.SubscriptionSet
wf *filter.WakuFilterLightNode wf *filter.WakuFilterLightNode
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
log *zap.Logger log *zap.Logger
closing chan string closing chan string
isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not.
resubscribeInProgress bool
id string
} }
// Subscribe // Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) {
sub := new(Sub) sub := new(Sub)
sub.id = uuid.NewString()
sub.wf = wf sub.wf = wf
sub.ctx, sub.cancel = context.WithCancel(ctx) sub.ctx, sub.cancel = context.WithCancel(ctx)
sub.subs = make(subscription.SubscriptionSet) sub.subs = make(subscription.SubscriptionSet)
sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer) sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer)
sub.ContentFilter = contentFilter sub.ContentFilter = contentFilter
sub.Config = config sub.Config = config
sub.log = log.Named("filter-api") sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter))
sub.log.Debug("filter subscribe params", zap.Int("maxPeers", config.MaxPeers), zap.Stringer("contentFilter", contentFilter)) sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers))
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) sub.isNodeOnline = online
sub.closing = make(chan string, config.MaxPeers) sub.closing = make(chan string, config.MaxPeers)
if err != nil { if online {
return nil, err subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
if err == nil {
sub.multiplex(subs)
}
} }
sub.multiplex(subs) go sub.subscriptionLoop()
go sub.waitOnSubClose()
return sub, nil return sub, nil
} }
@ -65,32 +72,51 @@ func (apiSub *Sub) Unsubscribe() {
apiSub.cancel() apiSub.cancel()
} }
func (apiSub *Sub) waitOnSubClose() { func (apiSub *Sub) SetNodeState(online bool) {
apiSub.isNodeOnline = online
}
func (apiSub *Sub) subscriptionLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for { for {
select { select {
case <-ticker.C:
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers &&
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
}
case <-apiSub.ctx.Done(): case <-apiSub.ctx.Done():
apiSub.log.Debug("apiSub context: Done()") apiSub.log.Debug("apiSub context: done")
apiSub.cleanup() apiSub.cleanup()
return return
case subId := <-apiSub.closing: case subId := <-apiSub.closing:
//trigger closing and resubscribe flow for subscription. apiSub.resubscribeInProgress = true
apiSub.closeAndResubscribe(subId) //trigger resubscribe flow for subscription.
apiSub.checkAndResubscribe(subId)
} }
} }
} }
func (apiSub *Sub) closeAndResubscribe(subId string) { func (apiSub *Sub) checkAndResubscribe(subId string) {
apiSub.log.Debug("sub closeAndResubscribe", zap.String("subID", subId))
apiSub.subs[subId].Close() var failedPeer peer.ID
failedPeer := apiSub.subs[subId].PeerID if subId != "" {
delete(apiSub.subs, subId) apiSub.log.Debug("subscription close and resubscribe", zap.String("sub-id", subId), zap.Stringer("content-filter", apiSub.ContentFilter))
apiSub.resubscribe(failedPeer)
apiSub.subs[subId].Close()
failedPeer = apiSub.subs[subId].PeerID
delete(apiSub.subs, subId)
}
apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter))
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers {
apiSub.resubscribe(failedPeer)
}
apiSub.resubscribeInProgress = false
} }
func (apiSub *Sub) cleanup() { func (apiSub *Sub) cleanup() {
apiSub.log.Debug("Cleaning up subscription", zap.Stringer("config", apiSub.Config)) apiSub.log.Debug("cleaning up subscription", zap.Stringer("config", apiSub.Config))
for _, s := range apiSub.subs { for _, s := range apiSub.subs {
_, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s) _, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)
@ -100,25 +126,25 @@ func (apiSub *Sub) cleanup() {
} }
} }
close(apiSub.DataCh) close(apiSub.DataCh)
} }
// Attempts to resubscribe on topics that lack subscriptions // Attempts to resubscribe on topics that lack subscriptions
func (apiSub *Sub) resubscribe(failedPeer peer.ID) { func (apiSub *Sub) resubscribe(failedPeer peer.ID) {
// Re-subscribe asynchronously // Re-subscribe asynchronously
existingSubCount := len(apiSub.subs) existingSubCount := len(apiSub.subs)
apiSub.log.Debug("subscribing again", zap.Stringer("contentFilter", apiSub.ContentFilter), zap.Int("numPeers", apiSub.Config.MaxPeers-existingSubCount)) apiSub.log.Debug("subscribing again", zap.Int("num-peers", apiSub.Config.MaxPeers-existingSubCount))
var peersToExclude peer.IDSlice var peersToExclude peer.IDSlice
peersToExclude = append(peersToExclude, failedPeer) if failedPeer != "" { //little hack, couldn't find a better way to do it
peersToExclude = append(peersToExclude, failedPeer)
}
for _, sub := range apiSub.subs { for _, sub := range apiSub.subs {
peersToExclude = append(peersToExclude, sub.PeerID) peersToExclude = append(peersToExclude, sub.PeerID)
} }
subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-existingSubCount, peersToExclude...) subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-existingSubCount, peersToExclude...)
if err != nil { if err != nil {
apiSub.log.Debug("failed to resubscribe for filter", zap.Error(err))
return return
} //Not handling scenario where all requested subs are not received as that will get handled in next cycle. } //Not handling scenario where all requested subs are not received as that should get handled from user of the API.
apiSub.log.Debug("resubscribe(): before range newSubs")
apiSub.multiplex(subs) apiSub.multiplex(subs)
} }
@ -131,35 +157,34 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
options = append(options, filter.WithPeer(p)) options = append(options, filter.WithPeer(p))
} }
if len(peersToExclude) > 0 { if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peersToExclude", zap.Stringer("peersToExclude", peersToExclude[0])) apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude))
options = append(options, filter.WithPeersToExclude(peersToExclude...)) options = append(options, filter.WithPeersToExclude(peersToExclude...))
} }
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...) subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)
if err != nil { if err != nil {
//Inform of error, so that resubscribe can be triggered if required
if len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
}
if len(subs) > 0 { if len(subs) > 0 {
// Partial Failure, for now proceed as we don't expect this to happen wrt specific topics. // Partial Failure, which means atleast 1 subscription is successful
// Rather it can happen in case subscription with one of the peer fails. apiSub.log.Debug("partial failure in filter subscribe", zap.Error(err), zap.Int("success-count", len(subs)))
// This can further get automatically handled at resubscribe,
apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err), zap.Int("successCount", len(subs)))
return subs, nil return subs, nil
} }
// In case of complete subscription failure, application or user needs to handle and probably retry based on error // TODO: Once filter error handling indicates specific error, this can be handled better.
// TODO: Once filter error handling indicates specific error, this can be addressed based on the error at this layer.
return nil, err return nil, err
} }
return subs, nil return subs, nil
} }
func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
// Multiplex onto single channel // Multiplex onto single channel
// Goroutines will exit once sub channels are closed // Goroutines will exit once sub channels are closed
for _, subDetails := range subs { for _, subDetails := range subs {
apiSub.subs[subDetails.ID] = subDetails apiSub.subs[subDetails.ID] = subDetails
go func(subDetails *subscription.SubscriptionDetails) { go func(subDetails *subscription.SubscriptionDetails) {
apiSub.log.Debug("New multiplex", zap.String("subID", subDetails.ID)) apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
for env := range subDetails.C { for env := range subDetails.C {
apiSub.DataCh <- env apiSub.DataCh <- env
} }
@ -169,8 +194,7 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
case <-apiSub.ctx.Done(): case <-apiSub.ctx.Done():
return return
case <-subDetails.Closing: case <-subDetails.Closing:
apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID)) apiSub.log.Debug("sub closing", zap.String("sub-id", subDetails.ID))
apiSub.closing <- subDetails.ID apiSub.closing <- subDetails.ID
} }
}(subDetails) }(subDetails)

View File

@ -48,7 +48,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
s.Log.Info("About to perform API Subscribe()") s.Log.Info("About to perform API Subscribe()")
apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log) apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log, true)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Require().Equal(apiSub.ContentFilter, contentFilter)
s.Log.Info("Subscribed") s.Log.Info("Subscribed")

View File

@ -591,7 +591,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
sub.Close() sub.Close()
result := &WakuFilterPushResult{} result := &WakuFilterPushResult{}
wf.log.Debug("unsubscribing subscription", zap.String("sub-id", sub.ID), zap.Stringer("content-filter", sub.ContentFilter))
if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) { if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) {
// Last sub for this [peer, contentFilter] pair // Last sub for this [peer, contentFilter] pair
err = wf.unsubscribeFromServer(ctx, params.requestID, sub.PeerID, sub.ContentFilter) err = wf.unsubscribeFromServer(ctx, params.requestID, sub.PeerID, sub.ContentFilter)
@ -599,6 +599,8 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
Err: err, Err: err,
PeerID: sub.PeerID, PeerID: sub.PeerID,
}) })
wf.log.Debug("unsubscribed subscription", zap.String("sub-id", sub.ID), zap.Stringer("content-filter", sub.ContentFilter), zap.Error(err))
} }
return result, err return result, err

View File

@ -112,6 +112,7 @@ func (s *SubscriptionDetails) SetClosing() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if !s.Closed { if !s.Closed {
s.Closed = true
s.Closing <- true s.Closing <- true
} }
} }

View File

@ -142,6 +142,11 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
contentFilter := subscription.ContentFilter contentFilter := subscription.ContentFilter
delete(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic], subscription.ID) delete(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic], subscription.ID)
if len(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic]) == 0 {
sub.logger.Debug("no more subs for pubsubTopic for this peer", zap.Stringer("id", subscription.PeerID), zap.String("pubsubtopic", contentFilter.PubsubTopic))
delete(peerSubscription.SubsPerPubsubTopic, contentFilter.PubsubTopic)
}
// Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair
for contentTopic := range contentFilter.ContentTopics { for contentTopic := range contentFilter.ContentTopics {
sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic) sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic)

View File

@ -49,9 +49,6 @@ func TestSubscriptionMapAppend(t *testing.T) {
err := sub.Close() err := sub.Close()
require.NoError(t, err) require.NoError(t, err)
require.True(t, sub.Closed) require.True(t, sub.Closed)
err = sub.Close()
require.NoError(t, err)
} }
func TestSubscriptionClear(t *testing.T) { func TestSubscriptionClear(t *testing.T) {