mirror of
synced 2025-02-11 14:26:53 +00:00
* feat_: aggregate filter subscriptions to do bulk subs with peer * chore_: take possible deadlock fix in go-waku * fix_: don't resubscribe filters unless there is a change in shard for community (#5467)
249 lines
9.5 KiB
249 lines
9.5 KiB
package wakuv2
import (
// Methods on FilterManager just aggregate filters from application and subscribe to them
// startFilterSubLoop runs a loop where-in it waits for an interval to batch subscriptions
// runFilterSubscriptionLoop runs a loop for receiving messages from underlying subscriptions and invokes onNewEnvelopes
// filterConfigs is the map of filer IDs to filter configs
// filterSubscriptions is the map of filter subscription IDs to subscriptions
const filterSubBatchSize = 90
type appFilterMap map[string]filterConfig
type FilterManager struct {
ctx context.Context
cfg *Config
onlineChecker *onlinechecker.DefaultOnlineChecker
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
onNewEnvelopes func(env *protocol.Envelope) error
logger *zap.Logger
node *filter.WakuFilterLightNode
filterSubBatchDuration time.Duration
incompleteFilterBatch map[string]filterConfig
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
type SubDetails struct {
cancel func()
sub *api.Sub
type filterConfig struct {
ID string
contentFilter protocol.ContentFilter
func newFilterManager(ctx context.Context, logger *zap.Logger, cfg *Config, onNewEnvelopes func(env *protocol.Envelope) error, node *filter.WakuFilterLightNode) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
mgr.logger = logger
mgr.cfg = cfg
mgr.onNewEnvelopes = onNewEnvelopes
mgr.filterSubscriptions = make(map[string]SubDetails)
mgr.node = node
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
mgr.filterSubBatchDuration = 5 * time.Second
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
go mgr.startFilterSubLoop()
return mgr
func (mgr *FilterManager) startFilterSubLoop() {
ticker := time.NewTicker(mgr.filterSubBatchDuration)
defer ticker.Stop()
for {
select {
case <-mgr.ctx.Done():
case <-ticker.C:
// TODO: Optimization, handle case where 1st addFilter happens just before ticker expires.
if mgr.onlineChecker.IsOnline() {
for _, af := range mgr.incompleteFilterBatch {
mgr.logger.Debug("ticker hit, hence subscribing", zap.String("agg-filter-id", af.ID), zap.Int("batch-size", len(af.contentFilter.ContentTopics)),
zap.Stringer("agg-content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
mgr.incompleteFilterBatch = make(map[string]filterConfig)
addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
once batchlimit is hit, all filters are subscribed to and new batch is created.
if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created
func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
mgr.logger.Debug("adding filter", zap.String("filter-id", filterID))
defer mgr.Unlock()
afilter, ok := mgr.incompleteFilterBatch[f.PubsubTopic]
if !ok {
//no existing batch for pubsubTopic
mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", f.PubsubTopic))
cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
afilter = filterConfig{uuid.NewString(), cf}
mgr.incompleteFilterBatch[f.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else {
mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic))
if len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics) > filterSubBatchSize {
//filter batch limit is hit
if mgr.onlineChecker.IsOnline() {
//node is online, go ahead and subscribe the batch
mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics)))
go mgr.subscribeAndRunLoop(afilter)
} else {
mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics)))
// queue existing batch as node is not online
mgr.waitingToSubQueue <- afilter
cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
afilter = filterConfig{uuid.NewString(), cf}
mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Stringer("content-filter", cf))
mgr.incompleteFilterBatch[f.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else {
//add to existing batch as batch limit not reached
var contentTopics []string
for _, ct := range maps.Keys(f.ContentTopics) {
afilter.contentFilter.ContentTopics[ct.ContentTopic()] = struct{}{}
contentTopics = append(contentTopics, ct.ContentTopic())
cf := protocol.NewContentFilter(f.PubsubTopic, contentTopics...)
mgr.logger.Debug("adding to existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.Stringer("content-filter", cf), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)))
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
ctx, cancel := context.WithCancel(mgr.ctx)
config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter}
sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger)
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
if err == nil {
mgr.logger.Debug("subscription successful, running loop", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter))
} else {
mgr.logger.Error("subscription fail, need to debug issue", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err))
func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) {
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)))
if newStatus && !mgr.onlineChecker.IsOnline() { //switched from offline to Online
mgr.logger.Debug("switching from offline to online")
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// Check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
// TODO: Can this cause issues?
mgr.waitingToSubQueue <- af
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
func (mgr *FilterManager) removeFilter(filterID string) {
defer mgr.Unlock()
mgr.logger.Debug("removing filter", zap.String("filter-id", filterID))
filterConfig, ok := mgr.filterConfigs[filterID]
if !ok {
mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID))
af, ok := mgr.filterSubscriptions[filterConfig.ID]
if ok {
delete(mgr.filterConfigs, filterID)
for ct := range filterConfig.contentFilter.ContentTopics {
delete(af.sub.ContentFilter.ContentTopics, ct)
if len(af.sub.ContentFilter.ContentTopics) == 0 {
} else {
go af.sub.Unsubscribe(filterConfig.contentFilter)
} else {
mgr.logger.Debug("filter removal: aggregated filter not found", zap.String("filter-id", filterID), zap.String("agg-filter-id", filterConfig.ID))
func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet common.TopicSet) protocol.ContentFilter {
contentTopics := make([]string, len(contentTopicSet))
for i, ct := range maps.Keys(contentTopicSet) {
contentTopics[i] = ct.ContentTopic()
return protocol.NewContentFilter(pubsubTopic, contentTopics...)
func (mgr *FilterManager) runFilterSubscriptionLoop(sub *api.Sub) {
for {
select {
case <-mgr.ctx.Done():
mgr.logger.Debug("subscription loop ended", zap.Stringer("content-filter", sub.ContentFilter))
case env, ok := <-sub.DataCh:
if ok {
err := (mgr.onNewEnvelopes)(env)
if err != nil {
mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err))
} else {
mgr.logger.Debug("filter sub is closed", zap.Any("content-filter", sub.ContentFilter))