diff --git a/default.nix b/default.nix index be1c1928..c2c37add 100644 --- a/default.nix +++ b/default.nix @@ -26,7 +26,7 @@ pkgs.buildGo121Module { '' else ""; # FIXME: This needs to be manually changed when updating modules. - vendorHash = "sha256-zwvZVTiwv7cc4vAM2Fil+qAG1v1J8q4BqX5lCgCStIc="; + vendorHash = "sha256-cOh9LNmcaBnBeMFM1HS2pdH5TTraHfo8PXL37t/A3gQ="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; diff --git a/examples/basic-light-client/go.mod b/examples/basic-light-client/go.mod index 87361937..a374667e 100644 --- a/examples/basic-light-client/go.mod +++ b/examples/basic-light-client/go.mod @@ -30,6 +30,7 @@ require ( github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/examples/basic-light-client/go.sum b/examples/basic-light-client/go.sum index 2f3f8015..1a8f34b5 100644 --- a/examples/basic-light-client/go.sum +++ b/examples/basic-light-client/go.sum @@ -94,6 +94,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/examples/basic-relay/go.mod b/examples/basic-relay/go.mod index 9262ff91..b672c665 100644 --- a/examples/basic-relay/go.mod +++ b/examples/basic-relay/go.mod @@ -29,6 +29,7 @@ require ( github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/examples/basic-relay/go.sum b/examples/basic-relay/go.sum index 7feb9818..cf29a44f 100644 --- a/examples/basic-relay/go.sum +++ b/examples/basic-relay/go.sum @@ -94,6 +94,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/examples/chat2/go.mod b/examples/chat2/go.mod index 2407fb63..d7588312 100644 --- a/examples/chat2/go.mod +++ b/examples/chat2/go.mod @@ -36,6 +36,7 @@ require ( github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/examples/chat2/go.sum b/examples/chat2/go.sum index 810f4931..0b06be1d 100644 --- a/examples/chat2/go.sum +++ b/examples/chat2/go.sum @@ -96,6 +96,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/examples/filter2/go.mod b/examples/filter2/go.mod index 6dda8f4a..263a2e06 100644 --- a/examples/filter2/go.mod +++ b/examples/filter2/go.mod @@ -26,6 +26,7 @@ require ( github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/examples/filter2/go.sum b/examples/filter2/go.sum index a5203d1b..343d42b1 100644 --- a/examples/filter2/go.sum +++ b/examples/filter2/go.sum @@ -92,6 +92,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/examples/noise/go.mod b/examples/noise/go.mod index 70bc5787..762ed67d 100644 --- a/examples/noise/go.mod +++ b/examples/noise/go.mod @@ -28,6 +28,7 @@ require ( github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/examples/noise/go.sum b/examples/noise/go.sum index 4cc14671..a2ad5e27 100644 --- a/examples/noise/go.sum +++ b/examples/noise/go.sum @@ -92,6 +92,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/examples/rln/go.mod b/examples/rln/go.mod index 5c9995ef..2500810b 100644 --- a/examples/rln/go.mod +++ b/examples/rln/go.mod @@ -26,6 +26,7 @@ require ( github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/examples/rln/go.sum b/examples/rln/go.sum index a5203d1b..343d42b1 100644 --- a/examples/rln/go.sum +++ b/examples/rln/go.sum @@ -92,6 +92,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/flake.nix b/flake.nix index e81473e2..37010eca 100644 --- a/flake.nix +++ b/flake.nix @@ -29,7 +29,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorHash = "sha256-zwvZVTiwv7cc4vAM2Fil+qAG1v1J8q4BqX5lCgCStIc="; + vendorHash = "sha256-cOh9LNmcaBnBeMFM1HS2pdH5TTraHfo8PXL37t/A3gQ="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/go.mod b/go.mod index 3a181efc..95089d26 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( require ( github.com/avast/retry-go/v4 v4.5.1 + github.com/cenkalti/backoff/v3 v3.2.2 github.com/cenkalti/backoff/v4 v4.1.2 github.com/dustin/go-humanize v1.0.1 github.com/go-chi/chi/v5 v5.0.0 diff --git a/go.sum b/go.sum index 21d942b5..08b33d97 100644 --- a/go.sum +++ b/go.sum @@ -245,6 +245,8 @@ github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8n github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= diff --git a/tests/utils.go b/tests/utils.go index d5a57912..82f086ae 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -21,6 +21,7 @@ import ( "time" "unicode/utf8" + "github.com/cenkalti/backoff/v3" "github.com/waku-org/go-waku/waku/v2/protocol" gcrypto "github.com/ethereum/go-ethereum/crypto" @@ -437,3 +438,21 @@ func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg wg.Wait() } + +type BackOffOption func(*backoff.ExponentialBackOff) + +func RetryWithBackOff(o func() error, options ...BackOffOption) error { + b := backoff.ExponentialBackOff{ + InitialInterval: time.Millisecond * 100, + RandomizationFactor: 0.1, + Multiplier: 1, + MaxInterval: time.Second, + MaxElapsedTime: time.Second * 10, + Clock: backoff.SystemClock, + } + for _, option := range options { + option(&b) + } + b.Reset() + return backoff.Retry(o, &b) +} diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go new file mode 100644 index 00000000..4dc92c3d --- /dev/null +++ b/waku/v2/api/filter/filter_manager.go @@ -0,0 +1,248 @@ +package filter + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + + "go.uber.org/zap" + "golang.org/x/exp/maps" + + "github.com/waku-org/go-waku/waku/v2/onlinechecker" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" +) + +// Methods on FilterManager just aggregate filters from application and subscribe to them +// +// startFilterSubLoop runs a loop where-in it waits for an interval to batch subscriptions +// +// runFilterSubscriptionLoop runs a loop for receiving messages from underlying subscriptions and invokes onNewEnvelopes +// +// filterConfigs is the map of filer IDs to filter configs +// filterSubscriptions is the map of filter subscription IDs to subscriptions + +const filterSubBatchSize = 90 + +type appFilterMap map[string]filterConfig + +type FilterManager struct { + sync.Mutex + ctx context.Context + minPeersPerFilter int + onlineChecker *onlinechecker.DefaultOnlineChecker + filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details + logger *zap.Logger + node *filter.WakuFilterLightNode + filterSubBatchDuration time.Duration + incompleteFilterBatch map[string]filterConfig + filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} + waitingToSubQueue chan filterConfig + envProcessor EnevelopeProcessor +} + +type SubDetails struct { + cancel func() + sub *Sub +} + +type filterConfig struct { + ID string + contentFilter protocol.ContentFilter +} + +// EnevelopeProcessor is responsible for processing of received messages +// This is application specific +type EnevelopeProcessor interface { + OnNewEnvelope(env *protocol.Envelope) error +} + +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode) *FilterManager { + // This fn is being mocked in test + mgr := new(FilterManager) + mgr.ctx = ctx + mgr.logger = logger + mgr.minPeersPerFilter = minPeersPerFilter + mgr.envProcessor = envProcessor + mgr.filterSubscriptions = make(map[string]SubDetails) + mgr.node = node + mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) + mgr.node.SetOnlineChecker(mgr.onlineChecker) + mgr.filterSubBatchDuration = 5 * time.Second + mgr.incompleteFilterBatch = make(map[string]filterConfig) + mgr.filterConfigs = make(appFilterMap) + mgr.waitingToSubQueue = make(chan filterConfig, 100) + go mgr.startFilterSubLoop() + return mgr +} + +func (mgr *FilterManager) startFilterSubLoop() { + ticker := time.NewTicker(mgr.filterSubBatchDuration) + defer ticker.Stop() + for { + select { + case <-mgr.ctx.Done(): + return + case <-ticker.C: + // TODO: Optimization, handle case where 1st addFilter happens just before ticker expires. + if mgr.onlineChecker.IsOnline() { + mgr.Lock() + for _, af := range mgr.incompleteFilterBatch { + mgr.logger.Debug("ticker hit, hence subscribing", zap.String("agg-filter-id", af.ID), zap.Int("batch-size", len(af.contentFilter.ContentTopics)), + zap.Stringer("agg-content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } + mgr.incompleteFilterBatch = make(map[string]filterConfig) + mgr.Unlock() + } + subs := mgr.node.Subscriptions() + mgr.logger.Debug("filter stats", zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + } + } +} + +// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch +// once batchlimit is hit, all filters are subscribed to and new batch is created. +// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created + +func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) { + mgr.logger.Debug("adding filter", zap.String("filter-id", filterID)) + + mgr.Lock() + defer mgr.Unlock() + + afilter, ok := mgr.incompleteFilterBatch[cf.PubsubTopic] + if !ok { + // no existing batch for pubsubTopic + mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", cf.PubsubTopic)) + afilter = filterConfig{uuid.NewString(), cf} + mgr.incompleteFilterBatch[cf.PubsubTopic] = afilter + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} + } else { + mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic)) + if len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics) > filterSubBatchSize { + // filter batch limit is hit + if mgr.onlineChecker.IsOnline() { + // node is online, go ahead and subscribe the batch + mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics))) + go mgr.subscribeAndRunLoop(afilter) + } else { + mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics))) + // queue existing batch as node is not online + mgr.waitingToSubQueue <- afilter + } + afilter = filterConfig{uuid.NewString(), cf} + mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Stringer("content-filter", cf)) + mgr.incompleteFilterBatch[cf.PubsubTopic] = afilter + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} + } else { + // add to existing batch as batch limit not reached + for _, ct := range maps.Keys(cf.ContentTopics) { + afilter.contentFilter.ContentTopics[ct] = struct{}{} + } + mgr.logger.Debug("adding to existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.Stringer("content-filter", cf), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics))) + mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} + } + } +} + +func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { + ctx, cancel := context.WithCancel(mgr.ctx) + config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} + sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger) + mgr.Lock() + mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} + mgr.Unlock() + if err == nil { + mgr.logger.Debug("subscription successful, running loop", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter)) + mgr.runFilterSubscriptionLoop(sub) + } else { + mgr.logger.Error("subscription fail, need to debug issue", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err)) + } +} + +// NetworkChange is to be invoked when there is a change in network detected by application +// This should retrigger a ping to verify if subscriptions are fine. +func (mgr *FilterManager) NetworkChange() { + mgr.node.PingPeers() // ping all peers to check if subscriptions are alive +} + +// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa +// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic, +// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online. +func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) { + subs := mgr.node.Subscriptions() + mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), + zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online + mgr.NetworkChange() + mgr.logger.Debug("switching from offline to online") + mgr.Lock() + if len(mgr.waitingToSubQueue) > 0 { + for af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + mgr.waitingToSubQueue <- af + } + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") + break + } + } + } + mgr.Unlock() + } + + mgr.onlineChecker.SetOnline(newStatus) +} + +func (mgr *FilterManager) UnsubscribeFilter(filterID string) { + mgr.Lock() + defer mgr.Unlock() + mgr.logger.Debug("removing filter", zap.String("filter-id", filterID)) + filterConfig, ok := mgr.filterConfigs[filterID] + if !ok { + mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID)) + return + } + af, ok := mgr.filterSubscriptions[filterConfig.ID] + if ok { + delete(mgr.filterConfigs, filterID) + for ct := range filterConfig.contentFilter.ContentTopics { + delete(af.sub.ContentFilter.ContentTopics, ct) + } + if len(af.sub.ContentFilter.ContentTopics) == 0 { + af.cancel() + } else { + go af.sub.Unsubscribe(filterConfig.contentFilter) + } + } else { + mgr.logger.Debug("filter removal: aggregated filter not found", zap.String("filter-id", filterID), zap.String("agg-filter-id", filterConfig.ID)) + } +} + +func (mgr *FilterManager) runFilterSubscriptionLoop(sub *Sub) { + for { + select { + case <-mgr.ctx.Done(): + mgr.logger.Debug("subscription loop ended", zap.Stringer("content-filter", sub.ContentFilter)) + return + case env, ok := <-sub.DataCh: + if ok { + err := mgr.envProcessor.OnNewEnvelope(env) + if err != nil { + mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err)) + } + } else { + mgr.logger.Debug("filter sub is closed", zap.Any("content-filter", sub.ContentFilter)) + return + } + } + } +} diff --git a/waku/v2/api/filter/filter_test.go b/waku/v2/api/filter/filter_test.go index af976a69..140dedc6 100644 --- a/waku/v2/api/filter/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -1,10 +1,15 @@ +//go:build !race + package filter import ( "context" + "crypto/rand" + "encoding/hex" "testing" "time" + "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -19,6 +24,7 @@ func TestFilterApiSuite(t *testing.T) { type FilterApiTestSuite struct { filter.FilterTestSuite + msgRcvd chan bool } func (s *FilterApiTestSuite) SetupTest() { @@ -96,3 +102,92 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Log.Info("DataCh is closed") } + +func (s *FilterApiTestSuite) OnNewEnvelope(env *protocol.Envelope) error { + if env.Message().ContentTopic == s.ContentFilter.ContentTopicsList()[0] { + s.Log.Info("received message via filter") + s.msgRcvd <- true + } else { + s.Log.Info("received message via filter but doesn't match contentTopic") + } + return nil +} + +func (s *FilterApiTestSuite) TestFilterManager() { + ctx, cancel := context.WithCancel(context.Background()) + + testPubsubTopic := s.TestTopic + contentTopicBytes := make([]byte, 4) + _, err := rand.Read(contentTopicBytes) + + s.Require().NoError(err) + + s.ContentFilter = protocol.ContentFilter{ + PubsubTopic: testPubsubTopic, + ContentTopics: protocol.NewContentTopicSet("/test/filtermgr" + hex.EncodeToString(contentTopicBytes) + "/topic/proto"), + } + + s.msgRcvd = make(chan bool, 1) + + s.Log.Info("creating filterManager") + fm := NewFilterManager(ctx, s.Log, 2, s, s.LightNode) + fm.filterSubBatchDuration = 1 * time.Second + fm.onlineChecker.SetOnline(true) + fID := uuid.NewString() + fm.SubscribeFilter(fID, s.ContentFilter) + time.Sleep(2 * time.Second) + + // Ensure there is at least 1 active filter subscription + subscriptions := s.LightNode.Subscriptions() + s.Require().Greater(len(subscriptions), 0) + + s.Log.Info("publishing msg") + + s.PublishMsg(&filter.WakuMsg{ + Payload: "filtermgr testMsg", + ContentTopic: s.ContentFilter.ContentTopicsList()[0], + PubSubTopic: testPubsubTopic, + }) + t := time.NewTicker(2 * time.Second) + select { + case received := <-s.msgRcvd: + s.Require().True(received) + s.Log.Info("unsubscribe 1") + case <-t.C: + s.Log.Error("timed out waiting for message") + s.Fail("timed out waiting for message") + } + // Mock peers going down + s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID()) + + fm.OnConnectionStatusChange("", false) + time.Sleep(2 * time.Second) + fm.OnConnectionStatusChange("", true) + s.ConnectToFullNode(s.LightNode, s.FullNode) + time.Sleep(3 * time.Second) + + // Ensure there is at least 1 active filter subscription + subscriptions = s.LightNode.Subscriptions() + s.Require().Greater(len(subscriptions), 0) + s.Log.Info("publish message 2") + + // Ensure that messages are retrieved with a fresh sub + s.PublishMsg(&filter.WakuMsg{ + Payload: "filtermgr testMsg2", + ContentTopic: s.ContentFilter.ContentTopicsList()[0], + PubSubTopic: testPubsubTopic, + }) + t = time.NewTicker(2 * time.Second) + + select { + case received := <-s.msgRcvd: + s.Require().True(received) + s.Log.Info("received message 2") + case <-t.C: + s.Log.Error("timed out waiting for message 2") + s.Fail("timed out waiting for message 2") + } + + fm.UnsubscribeFilter(fID) + cancel() +} diff --git a/waku/v2/protocol/filter/filter_ping_test.go b/waku/v2/protocol/filter/filter_ping_test.go index cc6dfb2a..619b9e93 100644 --- a/waku/v2/protocol/filter/filter_ping_test.go +++ b/waku/v2/protocol/filter/filter_ping_test.go @@ -26,7 +26,7 @@ func (s *FilterTestSuite) TestUnSubscriptionPing() { err := s.LightNode.Ping(context.Background(), s.FullNodeHost.ID()) s.Require().NoError(err) - _, err = s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) + _, err = s.LightNode.Unsubscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) err = s.LightNode.Ping(context.Background(), s.FullNodeHost.ID()) diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index 549071a1..6614bfdc 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -220,8 +220,8 @@ func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() { s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) // Subscribe with incorrect SubscribeID - s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} - _, err := s.LightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) + s.ContentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} + _, err := s.LightNode.IncorrectSubscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) _, err = s.LightNode.UnsubscribeAll(s.ctx) @@ -266,8 +266,8 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() { s.Require().NoError(err) // Subscribe - s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} - s.subDetails, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) + s.ContentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} + s.subDetails, err = s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) time.Sleep(1 * time.Second) diff --git a/waku/v2/protocol/filter/filter_subscribe_test.go b/waku/v2/protocol/filter/filter_subscribe_test.go index 112718ee..c8ec33c9 100644 --- a/waku/v2/protocol/filter/filter_subscribe_test.go +++ b/waku/v2/protocol/filter/filter_subscribe_test.go @@ -26,7 +26,7 @@ func (s *FilterTestSuite) TestWakuFilter() { // Wrong content topic s.waitForTimeout(&WakuMsg{s.TestTopic, "TopicB", "second"}) - _, err := s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) + _, err := s.LightNode.Unsubscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) // Should not receive after unsubscribe @@ -180,8 +180,8 @@ func (s *FilterTestSuite) TestContentTopicsLimit() { s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds // Detect existing content topics from previous test - if len(s.contentFilter.PubsubTopic) > 0 { - existingTopics := len(s.contentFilter.ContentTopicsList()) + if len(s.ContentFilter.PubsubTopic) > 0 { + existingTopics := len(s.ContentFilter.ContentTopicsList()) if existingTopics > 0 { maxContentTopics = maxContentTopics - existingTopics } @@ -233,13 +233,13 @@ func (s *FilterTestSuite) TestSubscribeErrorHandling() { }) // Subscribe with empty pubsub - s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[0].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].ContentTopic)} - _, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) + s.ContentFilter = protocol.ContentFilter{PubsubTopic: messages[0].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].ContentTopic)} + _, err := s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) // Subscribe with empty content topic - s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[1].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].ContentTopic)} - _, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) + s.ContentFilter = protocol.ContentFilter{PubsubTopic: messages[1].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].ContentTopic)} + _, err = s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) } @@ -271,8 +271,8 @@ func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() { s.Log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex))) // Subscribe to the second full node - s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} - _, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) + s.ContentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} + _, err = s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) _, err = s.LightNode.UnsubscribeAll(s.ctx) diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 5fa0c413..ad590901 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -117,7 +117,7 @@ func (s *FilterTestSuite) TestAutoShard() { // Wrong content topic s.waitForTimeout(&WakuMsg{s.TestTopic, "TopicB", "second"}) - _, err = s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) + _, err = s.LightNode.Unsubscribe(s.ctx, s.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) time.Sleep(1 * time.Second) diff --git a/waku/v2/protocol/filter/test_utils.go b/waku/v2/protocol/filter/test_utils.go index 9a2b651e..015cb352 100644 --- a/waku/v2/protocol/filter/test_utils.go +++ b/waku/v2/protocol/filter/test_utils.go @@ -47,7 +47,7 @@ type FilterTestSuite struct { ctx context.Context ctxCancel context.CancelFunc wg *sync.WaitGroup - contentFilter protocol.ContentFilter + ContentFilter protocol.ContentFilter subDetails []*subscription.SubscriptionDetails Log *zap.Logger @@ -63,7 +63,7 @@ type WakuMsg struct { } func (s *FilterTestSuite) SetupTest() { - log := utils.Logger() //.Named("filterv2-test") + log := utils.Logger() s.Log = log s.Log.Info("SetupTest()") @@ -192,7 +192,7 @@ func (s *FilterTestSuite) waitForMsgFromChan(msg *WakuMsg, ch chan *protocol.Env defer s.wg.Done() select { case env := <-ch: - for _, topic := range s.contentFilter.ContentTopicsList() { + for _, topic := range s.ContentFilter.ContentTopicsList() { if topic == env.Message().GetContentTopic() { msgFound = true } @@ -308,8 +308,8 @@ func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, pee for _, sub := range s.subDetails { if sub.ContentFilter.PubsubTopic == pubsubTopic { sub.Add(contentTopic) - s.contentFilter = sub.ContentFilter - subDetails, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) + s.ContentFilter = sub.ContentFilter + subDetails, err := s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(peer)) s.subDetails = subDetails s.Require().NoError(err) return @@ -317,7 +317,7 @@ func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, pee } s.subDetails = s.getSub(pubsubTopic, contentTopic, peer) - s.contentFilter = s.subDetails[0].ContentFilter + s.ContentFilter = s.subDetails[0].ContentFilter } func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { @@ -331,7 +331,7 @@ func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, p } else { sub.Remove(contentTopic) } - s.contentFilter = sub.ContentFilter + s.ContentFilter = sub.ContentFilter } }