fix: filter batch duration opt was not propagated correctly (#1224)

This commit is contained in:
Prem Chaitanya Prathi 2024-09-21 06:47:19 +05:30 committed by GitHub
parent 2800391204
commit 821481fec4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 18 additions and 14 deletions

View File

@ -27,6 +27,8 @@ func (fc FilterConfig) String() string {
return string(jsonStr) return string(jsonStr)
} }
const filterSubLoopInterval = 5 * time.Second
type Sub struct { type Sub struct {
ContentFilter protocol.ContentFilter ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope DataCh chan *protocol.Envelope
@ -69,13 +71,7 @@ func defaultOptions() []SubscribeOptions {
} }
// Subscribe // Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) { func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
optList := append(defaultOptions(), opts...)
params := new(subscribeParameters)
for _, opt := range optList {
opt(params)
}
sub := new(Sub) sub := new(Sub)
sub.id = uuid.NewString() sub.id = uuid.NewString()
sub.wf = wf sub.wf = wf
@ -95,8 +91,9 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
sub.multiplex(subs) sub.multiplex(subs)
} }
} }
// filter subscription loop is to check if target subscriptions for a filter are active and if not
go sub.subscriptionLoop(params.batchInterval) // trigger resubscribe.
go sub.subscriptionLoop(filterSubLoopInterval)
return sub, nil return sub, nil
} }

View File

@ -31,7 +31,7 @@ type appFilterMap map[string]filterConfig
type FilterManager struct { type FilterManager struct {
sync.Mutex sync.Mutex
ctx context.Context ctx context.Context
opts []SubscribeOptions params *subscribeParameters
minPeersPerFilter int minPeersPerFilter int
onlineChecker *onlinechecker.DefaultOnlineChecker onlineChecker *onlinechecker.DefaultOnlineChecker
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
@ -64,7 +64,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
// This fn is being mocked in test // This fn is being mocked in test
mgr := new(FilterManager) mgr := new(FilterManager)
mgr.ctx = ctx mgr.ctx = ctx
mgr.opts = opts
mgr.logger = logger mgr.logger = logger
mgr.minPeersPerFilter = minPeersPerFilter mgr.minPeersPerFilter = minPeersPerFilter
mgr.envProcessor = envProcessor mgr.envProcessor = envProcessor
@ -72,10 +71,17 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.node = node mgr.node = node
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
mgr.node.SetOnlineChecker(mgr.onlineChecker) mgr.node.SetOnlineChecker(mgr.onlineChecker)
mgr.filterSubBatchDuration = 5 * time.Second
mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap) mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100) mgr.waitingToSubQueue = make(chan filterConfig, 100)
//parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters)
opts = append(defaultOptions(), opts...)
for _, opt := range opts {
opt(mgr.params)
}
mgr.filterSubBatchDuration = mgr.params.batchInterval
go mgr.startFilterSubLoop() go mgr.startFilterSubLoop()
return mgr return mgr
} }
@ -153,7 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
ctx, cancel := context.WithCancel(mgr.ctx) ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...) sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock() mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
mgr.Unlock() mgr.Unlock()

View File

@ -54,7 +54,8 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()") s.Log.Info("About to perform API Subscribe()")
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log) params := subscribeParameters{300 * time.Second, 1024}
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Require().Equal(apiSub.ContentFilter, contentFilter)
s.Log.Info("Subscribed") s.Log.Info("Subscribed")