chore_: move filter mgr to go-waku (#5653)

This commit is contained in:
Prem Chaitanya Prathi 2024-08-06 19:05:56 +05:30 committed by GitHub
parent 2129a3f95c
commit 063756b4ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 124 additions and 90 deletions

2
go.mod
View File

@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2
github.com/waku-org/go-waku v0.8.1-0.20240806122111-5aa11311f833
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2 h1:S8KqFD9b1T+fJvKqDbb95e4X9TS0gf8XOJUR551+tRQ=
github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2/go.mod h1:OH0Z4ZMVXbgs6cNRap+ENDSNrfp1v2LA6K1qWWMT30M=
github.com/waku-org/go-waku v0.8.1-0.20240806122111-5aa11311f833 h1:ywaQQJ4WASimv8Y6ut7xhkBYMXyRZQCEw64CFPJJCbQ=
github.com/waku-org/go-waku v0.8.1-0.20240806122111-5aa11311f833/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -21,6 +21,7 @@ import (
"time"
"unicode/utf8"
"github.com/cenkalti/backoff/v3"
"github.com/waku-org/go-waku/waku/v2/protocol"
gcrypto "github.com/ethereum/go-ethereum/crypto"
@ -437,3 +438,21 @@ func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg
wg.Wait()
}
type BackOffOption func(*backoff.ExponentialBackOff)
func RetryWithBackOff(o func() error, options ...BackOffOption) error {
b := backoff.ExponentialBackOff{
InitialInterval: time.Millisecond * 100,
RandomizationFactor: 0.1,
Multiplier: 1,
MaxInterval: time.Second,
MaxElapsedTime: time.Second * 10,
Clock: backoff.SystemClock,
}
for _, option := range options {
option(&b)
}
b.Reset()
return backoff.Retry(o, &b)
}

View File

@ -1,4 +1,4 @@
package wakuv2
package filter
import (
"context"
@ -7,12 +7,9 @@ import (
"github.com/google/uuid"
"github.com/status-im/status-go/wakuv2/common"
"go.uber.org/zap"
"golang.org/x/exp/maps"
api "github.com/waku-org/go-waku/waku/v2/api/filter"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
@ -34,21 +31,21 @@ type appFilterMap map[string]filterConfig
type FilterManager struct {
sync.Mutex
ctx context.Context
cfg *Config
minPeersPerFilter int
onlineChecker *onlinechecker.DefaultOnlineChecker
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
onNewEnvelopes func(env *protocol.Envelope) error
logger *zap.Logger
node *filter.WakuFilterLightNode
filterSubBatchDuration time.Duration
incompleteFilterBatch map[string]filterConfig
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
envProcessor EnevelopeProcessor
}
type SubDetails struct {
cancel func()
sub *api.Sub
sub *Sub
}
type filterConfig struct {
@ -56,13 +53,19 @@ type filterConfig struct {
contentFilter protocol.ContentFilter
}
func newFilterManager(ctx context.Context, logger *zap.Logger, cfg *Config, onNewEnvelopes func(env *protocol.Envelope) error, node *filter.WakuFilterLightNode) *FilterManager {
// EnevelopeProcessor is responsible for processing of received messages
// This is application specific
type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
mgr.logger = logger
mgr.cfg = cfg
mgr.onNewEnvelopes = onNewEnvelopes
mgr.minPeersPerFilter = minPeersPerFilter
mgr.envProcessor = envProcessor
mgr.filterSubscriptions = make(map[string]SubDetails)
mgr.node = node
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
@ -94,55 +97,51 @@ func (mgr *FilterManager) startFilterSubLoop() {
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.Unlock()
}
subs := mgr.node.Subscriptions()
mgr.logger.Debug("filter stats", zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
}
}
}
/*
addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
once batchlimit is hit, all filters are subscribed to and new batch is created.
if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created
*/
func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batchlimit is hit, all filters are subscribed to and new batch is created.
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created
func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
mgr.logger.Debug("adding filter", zap.String("filter-id", filterID))
mgr.Lock()
defer mgr.Unlock()
afilter, ok := mgr.incompleteFilterBatch[f.PubsubTopic]
afilter, ok := mgr.incompleteFilterBatch[cf.PubsubTopic]
if !ok {
// no existing batch for pubsubTopic
mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", f.PubsubTopic))
cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", cf.PubsubTopic))
afilter = filterConfig{uuid.NewString(), cf}
mgr.incompleteFilterBatch[f.PubsubTopic] = afilter
mgr.incompleteFilterBatch[cf.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else {
mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic))
if len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics) > filterSubBatchSize {
mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic))
if len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics) > filterSubBatchSize {
// filter batch limit is hit
if mgr.onlineChecker.IsOnline() {
// node is online, go ahead and subscribe the batch
mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics)))
mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics)))
go mgr.subscribeAndRunLoop(afilter)
} else {
mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics)))
mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics)))
// queue existing batch as node is not online
mgr.waitingToSubQueue <- afilter
}
cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
afilter = filterConfig{uuid.NewString(), cf}
mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Stringer("content-filter", cf))
mgr.incompleteFilterBatch[f.PubsubTopic] = afilter
mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Stringer("content-filter", cf))
mgr.incompleteFilterBatch[cf.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else {
// add to existing batch as batch limit not reached
var contentTopics []string
for _, ct := range maps.Keys(f.ContentTopics) {
afilter.contentFilter.ContentTopics[ct.ContentTopic()] = struct{}{}
contentTopics = append(contentTopics, ct.ContentTopic())
for _, ct := range maps.Keys(cf.ContentTopics) {
afilter.contentFilter.ContentTopics[ct] = struct{}{}
}
cf := protocol.NewContentFilter(f.PubsubTopic, contentTopics...)
mgr.logger.Debug("adding to existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.Stringer("content-filter", cf), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)))
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
}
@ -151,8 +150,8 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
ctx, cancel := context.WithCancel(mgr.ctx)
config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter}
sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
mgr.Unlock()
@ -164,16 +163,21 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
}
}
func (mgr *FilterManager) networkChange() {
// NetworkChange is to be invoked when there is a change in network detected by application
// This should retrigger a ping to verify if subscriptions are fine.
func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}
func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) {
// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) {
subs := mgr.node.Subscriptions()
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.networkChange()
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
if len(mgr.waitingToSubQueue) > 0 {
@ -184,7 +188,6 @@ func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
// TODO: Can this cause issues?
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
@ -199,7 +202,7 @@ func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus
mgr.onlineChecker.SetOnline(newStatus)
}
func (mgr *FilterManager) removeFilter(filterID string) {
func (mgr *FilterManager) UnsubscribeFilter(filterID string) {
mgr.Lock()
defer mgr.Unlock()
mgr.logger.Debug("removing filter", zap.String("filter-id", filterID))
@ -224,16 +227,7 @@ func (mgr *FilterManager) removeFilter(filterID string) {
}
}
func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet common.TopicSet) protocol.ContentFilter {
contentTopics := make([]string, len(contentTopicSet))
for i, ct := range maps.Keys(contentTopicSet) {
contentTopics[i] = ct.ContentTopic()
}
return protocol.NewContentFilter(pubsubTopic, contentTopics...)
}
func (mgr *FilterManager) runFilterSubscriptionLoop(sub *api.Sub) {
func (mgr *FilterManager) runFilterSubscriptionLoop(sub *Sub) {
for {
select {
case <-mgr.ctx.Done():
@ -241,7 +235,7 @@ func (mgr *FilterManager) runFilterSubscriptionLoop(sub *api.Sub) {
return
case env, ok := <-sub.DataCh:
if ok {
err := (mgr.onNewEnvelopes)(env)
err := mgr.envProcessor.OnNewEnvelope(env)
if err != nil {
mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err))
}

View File

@ -309,19 +309,15 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
defer pm.topicMutex.RUnlock()
for topicStr, topicInst := range pm.subRelayTopics {
// @cammellos reported that ListPeers returned an invalid number of
// peers. This will ensure that the peers returned by this function
// match those peers that are currently connected
meshPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
topicPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(topicStr)
curPeerLen := topicPeers.Len()
if meshPeerLen < waku_proto.GossipSubDMin || curPeerLen < pm.OutPeersTarget {
curConnectedPeerLen := pm.getPeersBasedOnconnectionStatus(topicStr, network.Connected).Len()
if meshPeerLen < waku_proto.GossipSubDMin || curConnectedPeerLen < pm.OutPeersTarget {
pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curConnectedPeerLen),
zap.Int("targetPeers", pm.OutPeersTarget))
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr)
notConnectedPeers := pm.getPeersBasedOnconnectionStatus(topicStr, network.NotConnected)
if notConnectedPeers.Len() == 0 {
pm.logger.Debug("could not find any peers in peerstore to connect to, discovering more", zap.String("pubSubTopic", topicStr))
go pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2)
@ -329,12 +325,13 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
}
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
//Connect to eligible peers.
numPeersToConnect := pm.OutPeersTarget - curPeerLen
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()
numPeersToConnect := pm.OutPeersTarget - curConnectedPeerLen
if numPeersToConnect > 0 {
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()
}
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
}
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
}
}
}
@ -374,8 +371,8 @@ func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) {
}
}
// getNotConnectedPers returns peers for a pubSubTopic that are not connected.
func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) {
// getPeersBasedOnconnectionStatus returns peers for a pubSubTopic that are either connected/not-connected based on status passed.
func (pm *PeerManager) getPeersBasedOnconnectionStatus(pubsubTopic string, connected network.Connectedness) (filteredPeers peer.IDSlice) {
var peerList peer.IDSlice
if pubsubTopic == "" {
peerList = pm.host.Peerstore().Peers()
@ -383,8 +380,8 @@ func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeer
peerList = pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic)
}
for _, peerID := range peerList {
if pm.host.Network().Connectedness(peerID) != network.Connected {
notConnectedPeers = append(notConnectedPeers, peerID)
if pm.host.Network().Connectedness(peerID) == connected {
filteredPeers = append(filteredPeers, peerID)
}
}
return

View File

@ -47,7 +47,7 @@ type FilterTestSuite struct {
ctx context.Context
ctxCancel context.CancelFunc
wg *sync.WaitGroup
contentFilter protocol.ContentFilter
ContentFilter protocol.ContentFilter
subDetails []*subscription.SubscriptionDetails
Log *zap.Logger
@ -63,7 +63,7 @@ type WakuMsg struct {
}
func (s *FilterTestSuite) SetupTest() {
log := utils.Logger() //.Named("filterv2-test")
log := utils.Logger()
s.Log = log
s.Log.Info("SetupTest()")
@ -192,7 +192,7 @@ func (s *FilterTestSuite) waitForMsgFromChan(msg *WakuMsg, ch chan *protocol.Env
defer s.wg.Done()
select {
case env := <-ch:
for _, topic := range s.contentFilter.ContentTopicsList() {
for _, topic := range s.ContentFilter.ContentTopicsList() {
if topic == env.Message().GetContentTopic() {
msgFound = true
}
@ -308,8 +308,8 @@ func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, pee
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
sub.Add(contentTopic)
s.contentFilter = sub.ContentFilter
subDetails, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.ContentFilter = sub.ContentFilter
subDetails, err := s.LightNode.Subscribe(s.ctx, s.ContentFilter, WithPeer(peer))
s.subDetails = subDetails
s.Require().NoError(err)
return
@ -317,7 +317,7 @@ func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, pee
}
s.subDetails = s.getSub(pubsubTopic, contentTopic, peer)
s.contentFilter = s.subDetails[0].ContentFilter
s.ContentFilter = s.subDetails[0].ContentFilter
}
func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
@ -331,7 +331,7 @@ func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, p
} else {
sub.Remove(contentTopic)
}
s.contentFilter = sub.ContentFilter
s.ContentFilter = sub.ContentFilter
}
}

2
vendor/modules.txt vendored
View File

@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2
# github.com/waku-org/go-waku v0.8.1-0.20240806122111-5aa11311f833
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests

View File

@ -22,6 +22,8 @@ import (
"errors"
"strings"
"golang.org/x/exp/maps"
"github.com/ethereum/go-ethereum/common/hexutil"
)
@ -102,3 +104,11 @@ func ExtractTopicFromContentTopic(s string) (TopicType, error) {
result := BytesToTopic(str)
return result, nil
}
func (t TopicSet) ContentTopics() []string {
contentTopics := make([]string, len(t))
for i, ct := range maps.Keys(t) {
contentTopics[i] = ct.ContentTopic()
}
return contentTopics
}

View File

@ -21,6 +21,8 @@ package common
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/require"
)
var topicStringTests = []struct {
@ -33,6 +35,13 @@ var topicStringTests = []struct {
{topic: TopicType{0xf2, 0x6e, 0x77, 0x79}, str: "0xf26e7779"},
}
func TestTopicSet(t *testing.T) {
tSet := NewTopicSet([]TopicType{{0x00, 0x00, 0x00, 0x00}, {0x00, 0x7f, 0x80, 0xff}})
topics := tSet.ContentTopics()
require.Equal(t, len(topics), 2)
}
func TestTopicString(t *testing.T) {
for i, tst := range topicStringTests {
s := tst.topic.String()

View File

@ -57,6 +57,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/metrics"
filterapi "github.com/waku-org/go-waku/waku/v2/api/filter"
"github.com/waku-org/go-waku/waku/v2/api/publish"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
@ -126,7 +127,7 @@ type Waku struct {
// Filter-related
filters *common.Filters // Message filters installed with Subscribe function
filterManager *FilterManager
filterManager *filterapi.FilterManager
privateKeys map[string]*ecdsa.PrivateKey // Private key storage
symKeys map[string][]byte // Symmetric key storage
@ -956,7 +957,8 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) {
}
if w.cfg.LightClient {
w.filterManager.addFilter(id, f)
cf := protocol.NewContentFilter(f.PubsubTopic, f.ContentTopics.ContentTopics()...)
w.filterManager.SubscribeFilter(id, cf)
}
return id, nil
@ -970,7 +972,7 @@ func (w *Waku) Unsubscribe(ctx context.Context, id string) error {
}
if w.cfg.LightClient {
w.filterManager.removeFilter(id)
w.filterManager.UnsubscribeFilter(id)
}
return nil
@ -1207,6 +1209,11 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, query store.FilterCrit
return result.Cursor(), envelopesCount, nil
}
// OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter.
func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error {
return w.OnNewEnvelopes(env, common.RelayedMessageType, false)
}
// Start implements node.Service, starting the background data propagation thread
// of the Waku protocol.
func (w *Waku) Start() error {
@ -1302,8 +1309,8 @@ func (w *Waku) Start() error {
if w.cfg.LightClient {
// Create FilterManager that will main peer connectivity
// for installed filters
w.filterManager = newFilterManager(w.ctx, w.logger, w.cfg,
func(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) },
w.filterManager = filterapi.NewFilterManager(w.ctx, w.logger, w.cfg.MinPeersForFilter,
w,
w.node.FilterLightnode())
}
@ -1676,7 +1683,7 @@ func (w *Waku) handleNetworkChangeFromApp(state connection.State) {
w.logger.Info("connection switched or offline detected via mobile, disconnecting all peers")
w.node.DisconnectAllPeers()
if w.cfg.LightClient {
w.filterManager.networkChange()
w.filterManager.NetworkChange()
}
}
}
@ -1685,8 +1692,7 @@ func (w *Waku) ConnectionChanged(state connection.State) {
isOnline := !state.Offline
if w.cfg.LightClient {
//TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114
// trigger FilterManager to take care of any pending filter subscriptions
go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline)
go w.filterManager.OnConnectionStatusChange("", isOnline)
w.handleNetworkChangeFromApp(state)
} else {
// for lightClient state update and onlineChange is handled in filterManager.

View File

@ -376,7 +376,6 @@ func TestWakuV2Filter(t *testing.T) {
w, err := New(nil, "", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, w.Start())
w.filterManager.filterSubBatchDuration = 1 * time.Second
options := func(b *backoff.ExponentialBackOff) {
b.MaxElapsedTime = 10 * time.Second
@ -689,8 +688,8 @@ func TestOnlineChecker(t *testing.T) {
require.NoError(t, err)
require.False(t, lightNode.onlineChecker.IsOnline())
lightNode.filterManager.addFilter("test", &common.Filter{})
f := &common.Filter{}
lightNode.filterManager.SubscribeFilter("test", protocol.NewContentFilter(f.PubsubTopic, f.ContentTopics.ContentTopics()...))
}