chore: move filter manager from status-go to go-waku (#1177)

This commit is contained in:
Prem Chaitanya Prathi 2024-08-06 13:10:56 +05:30 committed by GitHub
parent d047df3859
commit f3560ced3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 407 additions and 24 deletions

View File

@ -26,7 +26,7 @@ pkgs.buildGo121Module {
'' else "";
# FIXME: This needs to be manually changed when updating modules.
vendorHash = "sha256-zwvZVTiwv7cc4vAM2Fil+qAG1v1J8q4BqX5lCgCStIc=";
vendorHash = "sha256-cOh9LNmcaBnBeMFM1HS2pdH5TTraHfo8PXL37t/A3gQ=";
# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };

View File

@ -30,6 +30,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect

View File

@ -94,6 +94,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

@ -29,6 +29,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect

View File

@ -94,6 +94,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

@ -36,6 +36,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect

View File

@ -96,6 +96,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

@ -26,6 +26,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect

View File

@ -92,6 +92,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

@ -28,6 +28,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect

View File

@ -92,6 +92,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

@ -26,6 +26,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect

View File

@ -92,6 +92,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

@ -29,7 +29,7 @@
];
doCheck = false;
# FIXME: This needs to be manually changed when updating modules.
vendorHash = "sha256-zwvZVTiwv7cc4vAM2Fil+qAG1v1J8q4BqX5lCgCStIc=";
vendorHash = "sha256-cOh9LNmcaBnBeMFM1HS2pdH5TTraHfo8PXL37t/A3gQ=";
# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };
};

1
go.mod
View File

@ -35,6 +35,7 @@ require (
require (
github.com/avast/retry-go/v4 v4.5.1
github.com/cenkalti/backoff/v3 v3.2.2
github.com/cenkalti/backoff/v4 v4.1.2
github.com/dustin/go-humanize v1.0.1
github.com/go-chi/chi/v5 v5.0.0

2
go.sum
View File

@ -245,6 +245,8 @@ github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8n
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50=
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=

View File

@ -21,6 +21,7 @@ import (
"time"
"unicode/utf8"
"github.com/cenkalti/backoff/v3"
"github.com/waku-org/go-waku/waku/v2/protocol"
gcrypto "github.com/ethereum/go-ethereum/crypto"
@ -437,3 +438,21 @@ func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg
wg.Wait()
}
type BackOffOption func(*backoff.ExponentialBackOff)
func RetryWithBackOff(o func() error, options ...BackOffOption) error {
b := backoff.ExponentialBackOff{
InitialInterval: time.Millisecond * 100,
RandomizationFactor: 0.1,
Multiplier: 1,
MaxInterval: time.Second,
MaxElapsedTime: time.Second * 10,
Clock: backoff.SystemClock,
}
for _, option := range options {
option(&b)
}
b.Reset()
return backoff.Retry(o, &b)
}

View File

@ -0,0 +1,248 @@
package filter
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
)
// Methods on FilterManager just aggregate filters from application and subscribe to them
//
// startFilterSubLoop runs a loop where-in it waits for an interval to batch subscriptions
//
// runFilterSubscriptionLoop runs a loop for receiving messages from underlying subscriptions and invokes onNewEnvelopes
//
// filterConfigs is the map of filer IDs to filter configs
// filterSubscriptions is the map of filter subscription IDs to subscriptions
const filterSubBatchSize = 90
type appFilterMap map[string]filterConfig
type FilterManager struct {
sync.Mutex
ctx context.Context
minPeersPerFilter int
onlineChecker *onlinechecker.DefaultOnlineChecker
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
logger *zap.Logger
node *filter.WakuFilterLightNode
filterSubBatchDuration time.Duration
incompleteFilterBatch map[string]filterConfig
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
envProcessor EnevelopeProcessor
}
type SubDetails struct {
cancel func()
sub *Sub
}
type filterConfig struct {
ID string
contentFilter protocol.ContentFilter
}
// EnevelopeProcessor is responsible for processing of received messages
// This is application specific
type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
mgr.logger = logger
mgr.minPeersPerFilter = minPeersPerFilter
mgr.envProcessor = envProcessor
mgr.filterSubscriptions = make(map[string]SubDetails)
mgr.node = node
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
mgr.node.SetOnlineChecker(mgr.onlineChecker)
mgr.filterSubBatchDuration = 5 * time.Second
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
go mgr.startFilterSubLoop()
return mgr
}
func (mgr *FilterManager) startFilterSubLoop() {
ticker := time.NewTicker(mgr.filterSubBatchDuration)
defer ticker.Stop()
for {
select {
case <-mgr.ctx.Done():
return
case <-ticker.C:
// TODO: Optimization, handle case where 1st addFilter happens just before ticker expires.
if mgr.onlineChecker.IsOnline() {
mgr.Lock()
for _, af := range mgr.incompleteFilterBatch {
mgr.logger.Debug("ticker hit, hence subscribing", zap.String("agg-filter-id", af.ID), zap.Int("batch-size", len(af.contentFilter.ContentTopics)),
zap.Stringer("agg-content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
}
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.Unlock()
}
subs := mgr.node.Subscriptions()
mgr.logger.Debug("filter stats", zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
}
}
}
// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batchlimit is hit, all filters are subscribed to and new batch is created.
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created
func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
mgr.logger.Debug("adding filter", zap.String("filter-id", filterID))
mgr.Lock()
defer mgr.Unlock()
afilter, ok := mgr.incompleteFilterBatch[cf.PubsubTopic]
if !ok {
// no existing batch for pubsubTopic
mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", cf.PubsubTopic))
afilter = filterConfig{uuid.NewString(), cf}
mgr.incompleteFilterBatch[cf.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else {
mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic))
if len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics) > filterSubBatchSize {
// filter batch limit is hit
if mgr.onlineChecker.IsOnline() {
// node is online, go ahead and subscribe the batch
mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics)))
go mgr.subscribeAndRunLoop(afilter)
} else {
mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics)))
// queue existing batch as node is not online
mgr.waitingToSubQueue <- afilter
}
afilter = filterConfig{uuid.NewString(), cf}
mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Stringer("content-filter", cf))
mgr.incompleteFilterBatch[cf.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else {
// add to existing batch as batch limit not reached
for _, ct := range maps.Keys(cf.ContentTopics) {
afilter.contentFilter.ContentTopics[ct] = struct{}{}
}
mgr.logger.Debug("adding to existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.Stringer("content-filter", cf), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)))
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
}
}
}
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
mgr.Unlock()
if err == nil {
mgr.logger.Debug("subscription successful, running loop", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter))
mgr.runFilterSubscriptionLoop(sub)
} else {
mgr.logger.Error("subscription fail, need to debug issue", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err))
}
}
// NetworkChange is to be invoked when there is a change in network detected by application
// This should retrigger a ping to verify if subscriptions are fine.
func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}
// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) {
subs := mgr.node.Subscriptions()
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
mgr.Unlock()
}
mgr.onlineChecker.SetOnline(newStatus)
}
func (mgr *FilterManager) UnsubscribeFilter(filterID string) {
mgr.Lock()
defer mgr.Unlock()
mgr.logger.Debug("removing filter", zap.String("filter-id", filterID))
filterConfig, ok := mgr.filterConfigs[filterID]
if !ok {
mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID))
return
}
af, ok := mgr.filterSubscriptions[filterConfig.ID]
if ok {
delete(mgr.filterConfigs, filterID)
for ct := range filterConfig.contentFilter.ContentTopics {
delete(af.sub.ContentFilter.ContentTopics, ct)
}
if len(af.sub.ContentFilter.ContentTopics) == 0 {
af.cancel()
} else {
go af.sub.Unsubscribe(filterConfig.contentFilter)
}
} else {
mgr.logger.Debug("filter removal: aggregated filter not found", zap.String("filter-id", filterID), zap.String("agg-filter-id", filterConfig.ID))
}
}
func (mgr *FilterManager) runFilterSubscriptionLoop(sub *Sub) {
for {
select {
case <-mgr.ctx.Done():
mgr.logger.Debug("subscription loop ended", zap.Stringer("content-filter", sub.ContentFilter))
return
case env, ok := <-sub.DataCh:
if ok {
err := mgr.envProcessor.OnNewEnvelope(env)
if err != nil {
mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err))
}
} else {
mgr.logger.Debug("filter sub is closed", zap.Any("content-filter", sub.ContentFilter))
return
}
}
}
}

View File

@ -1,10 +1,15 @@
//go:build !race
package filter
import (
"context"
"crypto/rand"
"encoding/hex"
"testing"
"time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/waku/v2/protocol"
@ -19,6 +24,7 @@ func TestFilterApiSuite(t *testing.T) {
type FilterApiTestSuite struct {
filter.FilterTestSuite
msgRcvd chan bool
}
func (s *FilterApiTestSuite) SetupTest() {
@ -96,3 +102,92 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Log.Info("DataCh is closed")
}
func (s *FilterApiTestSuite) OnNewEnvelope(env *protocol.Envelope) error {
if env.Message().ContentTopic == s.ContentFilter.ContentTopicsList()[0] {
s.Log.Info("received message via filter")
s.msgRcvd <- true
} else {
s.Log.Info("received message via filter but doesn't match contentTopic")
}
return nil
}
func (s *FilterApiTestSuite) TestFilterManager() {
ctx, cancel := context.WithCancel(context.Background())
testPubsubTopic := s.TestTopic
contentTopicBytes := make([]byte, 4)
_, err := rand.Read(contentTopicBytes)
s.Require().NoError(err)
s.ContentFilter = protocol.ContentFilter{
PubsubTopic: testPubsubTopic,
ContentTopics: protocol.NewContentTopicSet("/test/filtermgr" + hex.EncodeToString(contentTopicBytes) + "/topic/proto"),
}
s.msgRcvd = make(chan bool, 1)
s.Log.Info("creating filterManager")
fm := NewFilterManager(ctx, s.Log, 2, s, s.LightNode)
fm.filterSubBatchDuration = 1 * time.Second
fm.onlineChecker.SetOnline(true)
fID := uuid.NewString()
fm.SubscribeFilter(fID, s.ContentFilter)
time.Sleep(2 * time.Second)
// Ensure there is at least 1 active filter subscription
subscriptions := s.LightNode.Subscriptions()
s.Require().Greater(len(subscriptions), 0)
s.Log.Info("publishing msg")
s.PublishMsg(&filter.WakuMsg{
Payload: "filtermgr testMsg",
ContentTopic: s.ContentFilter.ContentTopicsList()[0],
PubSubTopic: testPubsubTopic,
})
t := time.NewTicker(2 * time.Second)
select {
case received := <-s.msgRcvd:
s.Require().True(received)
s.Log.Info("unsubscribe 1")
case <-t.C:
s.Log.Error("timed out waiting for message")
s.Fail("timed out waiting for message")
}
// Mock peers going down
s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID())
fm.OnConnectionStatusChange("", false)
time.Sleep(2 * time.Second)
fm.OnConnectionStatusChange("", true)
s.ConnectToFullNode(s.LightNode, s.FullNode)
time.Sleep(3 * time.Second)
// Ensure there is at least 1 active filter subscription
subscriptions = s.LightNode.Subscriptions()
s.Require().Greater(len(subscriptions), 0)
s.Log.Info("publish message 2")
// Ensure that messages are retrieved with a fresh sub
s.PublishMsg(&filter.WakuMsg{
Payload: "filtermgr testMsg2",
ContentTopic: s.ContentFilter.ContentTopicsList()[0],
PubSubTopic: testPubsubTopic,
})
t = time.NewTicker(2 * time.Second)
select {
case received := <-s.msgRcvd:
s.Require().True(received)
s.Log.Info("received message 2")
case <-t.C:
s.Log.Error("timed out waiting for message 2")
s.Fail("timed out waiting for message 2")
}
fm.UnsubscribeFilter(fID)
cancel()
}

View File

@ -26,7 +26,7 @@ func (s *FilterTestSuite) TestUnSubscriptionPing() {
err := s.LightNode.Ping(context.Background(), s.FullNodeHost.ID())
s.Require().NoError(err)
_, err = s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
_, err = s.LightNode.Unsubscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
err = s.LightNode.Ping(context.Background(), s.FullNodeHost.ID())

View File

@ -220,8 +220,8 @@ func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() {
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
// Subscribe with incorrect SubscribeID
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
_, err := s.LightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.ContentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
_, err := s.LightNode.IncorrectSubscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
@ -266,8 +266,8 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
s.Require().NoError(err)
// Subscribe
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
s.subDetails, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.ContentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
s.subDetails, err = s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
time.Sleep(1 * time.Second)

View File

@ -26,7 +26,7 @@ func (s *FilterTestSuite) TestWakuFilter() {
// Wrong content topic
s.waitForTimeout(&WakuMsg{s.TestTopic, "TopicB", "second"})
_, err := s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
_, err := s.LightNode.Unsubscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
// Should not receive after unsubscribe
@ -180,8 +180,8 @@ func (s *FilterTestSuite) TestContentTopicsLimit() {
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds
// Detect existing content topics from previous test
if len(s.contentFilter.PubsubTopic) > 0 {
existingTopics := len(s.contentFilter.ContentTopicsList())
if len(s.ContentFilter.PubsubTopic) > 0 {
existingTopics := len(s.ContentFilter.ContentTopicsList())
if existingTopics > 0 {
maxContentTopics = maxContentTopics - existingTopics
}
@ -233,13 +233,13 @@ func (s *FilterTestSuite) TestSubscribeErrorHandling() {
})
// Subscribe with empty pubsub
s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[0].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].ContentTopic)}
_, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.ContentFilter = protocol.ContentFilter{PubsubTopic: messages[0].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].ContentTopic)}
_, err := s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
// Subscribe with empty content topic
s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[1].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].ContentTopic)}
_, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.ContentFilter = protocol.ContentFilter{PubsubTopic: messages[1].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].ContentTopic)}
_, err = s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
}
@ -271,8 +271,8 @@ func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() {
s.Log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex)))
// Subscribe to the second full node
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
_, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.ContentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
_, err = s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
_, err = s.LightNode.UnsubscribeAll(s.ctx)

View File

@ -117,7 +117,7 @@ func (s *FilterTestSuite) TestAutoShard() {
// Wrong content topic
s.waitForTimeout(&WakuMsg{s.TestTopic, "TopicB", "second"})
_, err = s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
_, err = s.LightNode.Unsubscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
time.Sleep(1 * time.Second)

View File

@ -47,7 +47,7 @@ type FilterTestSuite struct {
ctx context.Context
ctxCancel context.CancelFunc
wg *sync.WaitGroup
contentFilter protocol.ContentFilter
ContentFilter protocol.ContentFilter
subDetails []*subscription.SubscriptionDetails
Log *zap.Logger
@ -63,7 +63,7 @@ type WakuMsg struct {
}
func (s *FilterTestSuite) SetupTest() {
log := utils.Logger() //.Named("filterv2-test")
log := utils.Logger()
s.Log = log
s.Log.Info("SetupTest()")
@ -192,7 +192,7 @@ func (s *FilterTestSuite) waitForMsgFromChan(msg *WakuMsg, ch chan *protocol.Env
defer s.wg.Done()
select {
case env := <-ch:
for _, topic := range s.contentFilter.ContentTopicsList() {
for _, topic := range s.ContentFilter.ContentTopicsList() {
if topic == env.Message().GetContentTopic() {
msgFound = true
}
@ -308,8 +308,8 @@ func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, pee
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
sub.Add(contentTopic)
s.contentFilter = sub.ContentFilter
subDetails, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.ContentFilter = sub.ContentFilter
subDetails, err := s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(peer))
s.subDetails = subDetails
s.Require().NoError(err)
return
@ -317,7 +317,7 @@ func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, pee
}
s.subDetails = s.getSub(pubsubTopic, contentTopic, peer)
s.contentFilter = s.subDetails[0].ContentFilter
s.ContentFilter = s.subDetails[0].ContentFilter
}
func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
@ -331,7 +331,7 @@ func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, p
} else {
sub.Remove(contentTopic)
}
s.contentFilter = sub.ContentFilter
s.ContentFilter = sub.ContentFilter
}
}