diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index 4dc92c3d..e4b6e524 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -31,6 +31,7 @@ type appFilterMap map[string]filterConfig type FilterManager struct { sync.Mutex ctx context.Context + opts []SubscribeOptions minPeersPerFilter int onlineChecker *onlinechecker.DefaultOnlineChecker filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details @@ -59,10 +60,11 @@ type EnevelopeProcessor interface { OnNewEnvelope(env *protocol.Envelope) error } -func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode) *FilterManager { +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx + mgr.opts = opts mgr.logger = logger mgr.minPeersPerFilter = minPeersPerFilter mgr.envProcessor = envProcessor @@ -151,7 +153,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} - sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger) + sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.Unlock()