fix: provide option to set onlinechecker

This commit is contained in:
Prem Chaitanya Prathi 2024-10-08 11:47:59 +05:30
parent 0ed94ce0b1
commit 644c68a66f
No known key found for this signature in database
3 changed files with 25 additions and 15 deletions

View File

@ -49,34 +49,41 @@ type Sub struct {
errcnt int errcnt int
} }
type subscribeParameters struct { type FilterParameters struct {
batchInterval time.Duration batchInterval time.Duration
multiplexChannelBuffer int multiplexChannelBuffer int
checker *onlinechecker.DefaultOnlineChecker
} }
type SubscribeOptions func(*subscribeParameters) type FilterOptions func(*FilterParameters)
func WithBatchInterval(t time.Duration) SubscribeOptions { func WithBatchInterval(t time.Duration) FilterOptions {
return func(params *subscribeParameters) { return func(params *FilterParameters) {
params.batchInterval = t params.batchInterval = t
} }
} }
func WithMultiplexChannelBuffer(value int) SubscribeOptions { func WithMultiplexChannelBuffer(value int) FilterOptions {
return func(params *subscribeParameters) { return func(params *FilterParameters) {
params.multiplexChannelBuffer = value params.multiplexChannelBuffer = value
} }
} }
func defaultOptions() []SubscribeOptions { func WithOnlineChecker(checker *onlinechecker.DefaultOnlineChecker) FilterOptions {
return []SubscribeOptions{ return func(params *FilterParameters) {
params.checker = checker
}
}
func defaultOptions() []FilterOptions {
return []FilterOptions{
WithBatchInterval(5 * time.Second), WithBatchInterval(5 * time.Second),
WithMultiplexChannelBuffer(100), WithMultiplexChannelBuffer(100),
} }
} }
// Subscribe // Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *FilterParameters) (*Sub, error) {
sub := new(Sub) sub := new(Sub)
sub.id = uuid.NewString() sub.id = uuid.NewString()
sub.wf = wf sub.wf = wf

View File

@ -32,7 +32,7 @@ type appFilterMap map[string]filterConfig
type FilterManager struct { type FilterManager struct {
sync.Mutex sync.Mutex
ctx context.Context ctx context.Context
params *subscribeParameters params *FilterParameters
minPeersPerFilter int minPeersPerFilter int
onlineChecker *onlinechecker.DefaultOnlineChecker onlineChecker *onlinechecker.DefaultOnlineChecker
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
@ -61,7 +61,7 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error OnNewEnvelope(env *protocol.Envelope) error
} }
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...FilterOptions) *FilterManager {
// This fn is being mocked in test // This fn is being mocked in test
mgr := new(FilterManager) mgr := new(FilterManager)
mgr.ctx = ctx mgr.ctx = ctx
@ -70,18 +70,21 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.envProcessor = envProcessor mgr.envProcessor = envProcessor
mgr.filterSubscriptions = make(map[string]SubDetails) mgr.filterSubscriptions = make(map[string]SubDetails)
mgr.node = node mgr.node = node
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
mgr.node.SetOnlineChecker(mgr.onlineChecker)
mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap) mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100) mgr.waitingToSubQueue = make(chan filterConfig, 100)
//parsing the subscribe params only to read the batchInterval passed. //parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters) mgr.params = new(FilterParameters)
opts = append(defaultOptions(), opts...) opts = append(defaultOptions(), opts...)
for _, opt := range opts { for _, opt := range opts {
opt(mgr.params) opt(mgr.params)
} }
if mgr.params.checker == nil {
mgr.params.checker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
}
mgr.node.SetOnlineChecker(mgr.params.checker)
mgr.filterSubBatchDuration = mgr.params.batchInterval mgr.filterSubBatchDuration = mgr.params.batchInterval
go mgr.startFilterSubLoop() go mgr.startFilterSubLoop()
return mgr return mgr

View File

@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()") s.Log.Info("About to perform API Subscribe()")
params := subscribeParameters{300 * time.Second, 1024} params := FilterParameters{300 * time.Second, 1024}
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params) apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Require().Equal(apiSub.ContentFilter, contentFilter)