use config instead of setting in filter_manager.go

This commit is contained in:
frank 2024-03-04 17:21:20 +08:00
parent 7eeeb07cab
commit f4013665b1
2 changed files with 9 additions and 9 deletions

View File

@ -60,11 +60,11 @@ type FilterManager struct {
getFilter func(string) *common.Filter getFilter func(string) *common.Filter
onNewEnvelopes func(env *protocol.Envelope) error onNewEnvelopes func(env *protocol.Envelope) error
logger *zap.Logger logger *zap.Logger
settings settings config *Config
node *node.WakuNode node *node.WakuNode
} }
func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func(string) *common.Filter, settings settings, onNewEnvelopes func(env *protocol.Envelope) error, node *node.WakuNode) *FilterManager { func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func(string) *common.Filter, config *Config, onNewEnvelopes func(env *protocol.Envelope) error, node *node.WakuNode) *FilterManager {
// This fn is being mocked in test // This fn is being mocked in test
mgr := new(FilterManager) mgr := new(FilterManager)
mgr.ctx = ctx mgr.ctx = ctx
@ -73,7 +73,7 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func(
mgr.onNewEnvelopes = onNewEnvelopes mgr.onNewEnvelopes = onNewEnvelopes
mgr.filterSubs = make(FilterSubs) mgr.filterSubs = make(FilterSubs)
mgr.eventChan = make(chan FilterEvent, 100) mgr.eventChan = make(chan FilterEvent, 100)
mgr.settings = settings mgr.config = config
mgr.node = node mgr.node = node
mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error { mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error {
ctx, cancel := context.WithTimeout(ctx, pingTimeout) ctx, cancel := context.WithTimeout(ctx, pingTimeout)
@ -242,7 +242,7 @@ func (mgr *FilterManager) pingPeers() {
} }
} }
logger.Debug("filter ping peers", zap.Int("len", len(subs)), zap.Int("len(nilSubs)", nilSubsCnt)) logger.Debug("filter ping peers", zap.Int("len", len(subs)), zap.Int("len(nilSubs)", nilSubsCnt))
if len(subs) < mgr.settings.MinPeersForFilter { if len(subs) < mgr.config.MinPeersForFilter {
// Trigger full resubscribe // Trigger full resubscribe
logger.Debug("filter ping peers not enough subs") logger.Debug("filter ping peers not enough subs")
go func(filterID string) { go func(filterID string) {
@ -290,15 +290,15 @@ func (mgr *FilterManager) resubscribe(filterID string) {
mgr.logger.Error("resubscribe filter not found", zap.String("filterId", filterID)) mgr.logger.Error("resubscribe filter not found", zap.String("filterId", filterID))
return return
} }
if len(subs) > mgr.settings.MinPeersForFilter { if len(subs) > mgr.config.MinPeersForFilter {
mgr.logger.Error("filter resubscribe too many subs", zap.String("filterId", filterID), zap.Int("len", len(subs))) mgr.logger.Error("filter resubscribe too many subs", zap.String("filterId", filterID), zap.Int("len", len(subs)))
} }
if len(subs) == mgr.settings.MinPeersForFilter { if len(subs) == mgr.config.MinPeersForFilter {
// do nothing // do nothing
return return
} }
mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs))) mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs)))
for i := len(subs); i < mgr.settings.MinPeersForFilter; i++ { for i := len(subs); i < mgr.config.MinPeersForFilter; i++ {
mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID)) mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID))
// Create sub placeholder in order to avoid potentially too many subs // Create sub placeholder in order to avoid potentially too many subs

View File

@ -352,7 +352,7 @@ func TestWakuV2Filter(t *testing.T) {
// Mock peers going down // Mock peers going down
isFilterSubAliveBak := w.filterManager.isFilterSubAlive isFilterSubAliveBak := w.filterManager.isFilterSubAlive
w.filterManager.settings.MinPeersForFilter = 0 w.filterManager.config.MinPeersForFilter = 0
w.filterManager.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error { w.filterManager.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error {
return errors.New("peer down") return errors.New("peer down")
} }
@ -365,7 +365,7 @@ func TestWakuV2Filter(t *testing.T) {
require.Len(t, stats[filterID], 0) require.Len(t, stats[filterID], 0)
// Reconnect // Reconnect
w.filterManager.settings.MinPeersForFilter = 2 w.filterManager.config.MinPeersForFilter = 2
w.filterManager.isFilterSubAlive = isFilterSubAliveBak w.filterManager.isFilterSubAlive = isFilterSubAliveBak
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)