diff --git a/go.mod b/go.mod index 08da158c6..c24ac6d57 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd + github.com/waku-org/go-waku v0.8.1-0.20240612113959-d9dc91a162d9 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 4254fbdec..0dcf35845 100644 --- a/go.sum +++ b/go.sum @@ -2137,8 +2137,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd h1:g5EneT88eHOakr8Zukx4RP1JICgrSl9ZtmgGS+wQbC8= -github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k= +github.com/waku-org/go-waku v0.8.1-0.20240612113959-d9dc91a162d9 h1:OvO9PxpYYh4cNuuHpwXguZBQWEppdhdHAT2jIDls02k= +github.com/waku-org/go-waku v0.8.1-0.20240612113959-d9dc91a162d9/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go new file mode 100644 index 000000000..f7bfa7393 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go @@ -0,0 +1,202 @@ +package api + +import ( + "context" + "encoding/json" + "time" + + "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "go.uber.org/zap" +) + +const FilterPingTimeout = 5 * time.Second +const MultiplexChannelBuffer = 100 + +type FilterConfig struct { + MaxPeers int `json:"maxPeers"` + Peers []peer.ID `json:"peers"` +} + +func (fc FilterConfig) String() string { + jsonStr, err := json.Marshal(fc) + if err != nil { + return "" + } + return string(jsonStr) +} + +type Sub struct { + ContentFilter protocol.ContentFilter + DataCh chan *protocol.Envelope + Config FilterConfig + subs subscription.SubscriptionSet + wf *filter.WakuFilterLightNode + ctx context.Context + cancel context.CancelFunc + log *zap.Logger + closing chan string + isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not. + resubscribeInProgress bool + id string +} + +// Subscribe +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) { + sub := new(Sub) + sub.id = uuid.NewString() + sub.wf = wf + sub.ctx, sub.cancel = context.WithCancel(ctx) + sub.subs = make(subscription.SubscriptionSet) + sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer) + sub.ContentFilter = contentFilter + sub.Config = config + sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter)) + sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers)) + sub.isNodeOnline = online + sub.closing = make(chan string, config.MaxPeers) + if online { + subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) + if err == nil { + sub.multiplex(subs) + } + } + go sub.subscriptionLoop() + return sub, nil +} + +func (apiSub *Sub) Unsubscribe() { + apiSub.cancel() +} + +func (apiSub *Sub) SetNodeState(online bool) { + apiSub.isNodeOnline = online +} + +func (apiSub *Sub) subscriptionLoop() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers && + !apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers { + apiSub.closing <- "" + } + case <-apiSub.ctx.Done(): + apiSub.log.Debug("apiSub context: done") + apiSub.cleanup() + return + case subId := <-apiSub.closing: + apiSub.resubscribeInProgress = true + //trigger resubscribe flow for subscription. + apiSub.checkAndResubscribe(subId) + } + } +} + +func (apiSub *Sub) checkAndResubscribe(subId string) { + + var failedPeer peer.ID + if subId != "" { + apiSub.log.Debug("subscription close and resubscribe", zap.String("sub-id", subId), zap.Stringer("content-filter", apiSub.ContentFilter)) + + apiSub.subs[subId].Close() + failedPeer = apiSub.subs[subId].PeerID + delete(apiSub.subs, subId) + } + apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter)) + if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers { + apiSub.resubscribe(failedPeer) + } + apiSub.resubscribeInProgress = false +} + +func (apiSub *Sub) cleanup() { + apiSub.log.Debug("cleaning up subscription", zap.Stringer("config", apiSub.Config)) + + for _, s := range apiSub.subs { + _, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s) + if err != nil { + //Logging with info as this is part of cleanup + apiSub.log.Info("failed to unsubscribe filter", zap.Error(err)) + } + } + close(apiSub.DataCh) +} + +// Attempts to resubscribe on topics that lack subscriptions +func (apiSub *Sub) resubscribe(failedPeer peer.ID) { + // Re-subscribe asynchronously + existingSubCount := len(apiSub.subs) + apiSub.log.Debug("subscribing again", zap.Int("num-peers", apiSub.Config.MaxPeers-existingSubCount)) + var peersToExclude peer.IDSlice + if failedPeer != "" { //little hack, couldn't find a better way to do it + peersToExclude = append(peersToExclude, failedPeer) + } + for _, sub := range apiSub.subs { + peersToExclude = append(peersToExclude, sub.PeerID) + } + subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-existingSubCount, peersToExclude...) + if err != nil { + apiSub.log.Debug("failed to resubscribe for filter", zap.Error(err)) + return + } //Not handling scenario where all requested subs are not received as that should get handled from user of the API. + + apiSub.multiplex(subs) +} + +func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) { + // Low-level subscribe, returns a set of SubscriptionDetails + options := make([]filter.FilterSubscribeOption, 0) + options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) + for _, p := range apiSub.Config.Peers { + options = append(options, filter.WithPeer(p)) + } + if len(peersToExclude) > 0 { + apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude)) + options = append(options, filter.WithPeersToExclude(peersToExclude...)) + } + subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...) + + if err != nil { + //Inform of error, so that resubscribe can be triggered if required + if len(apiSub.closing) < apiSub.Config.MaxPeers { + apiSub.closing <- "" + } + if len(subs) > 0 { + // Partial Failure, which means atleast 1 subscription is successful + apiSub.log.Debug("partial failure in filter subscribe", zap.Error(err), zap.Int("success-count", len(subs))) + return subs, nil + } + // TODO: Once filter error handling indicates specific error, this can be handled better. + return nil, err + } + return subs, nil +} + +func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { + // Multiplex onto single channel + // Goroutines will exit once sub channels are closed + for _, subDetails := range subs { + apiSub.subs[subDetails.ID] = subDetails + go func(subDetails *subscription.SubscriptionDetails) { + apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID)) + for env := range subDetails.C { + apiSub.DataCh <- env + } + }(subDetails) + go func(subDetails *subscription.SubscriptionDetails) { + select { + case <-apiSub.ctx.Done(): + return + case <-subDetails.Closing: + apiSub.log.Debug("sub closing", zap.String("sub-id", subDetails.ID)) + apiSub.closing <- subDetails.ID + } + }(subDetails) + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index edbba8d3f..70edd644c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -88,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM wf.pm = pm wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) - wf.peerPingInterval = 5 * time.Second + wf.peerPingInterval = 1 * time.Minute return wf } @@ -591,7 +591,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub.Close() result := &WakuFilterPushResult{} - + wf.log.Debug("unsubscribing subscription", zap.String("sub-id", sub.ID), zap.Stringer("content-filter", sub.ContentFilter)) if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) { // Last sub for this [peer, contentFilter] pair err = wf.unsubscribeFromServer(ctx, params.requestID, sub.PeerID, sub.ContentFilter) @@ -599,6 +599,8 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, Err: err, PeerID: sub.PeerID, }) + wf.log.Debug("unsubscribed subscription", zap.String("sub-id", sub.ID), zap.Stringer("content-filter", sub.ContentFilter), zap.Error(err)) + } return result, err diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go index 11b9a7200..836175b53 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go @@ -26,9 +26,8 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) for _, subscription := range subscriptions { wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) - //Indicating that subscription is closing, - close(subscription.Closing) + subscription.SetClosing() } } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go index f2ec88706..a36f54ad4 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go @@ -29,7 +29,7 @@ type SubscriptionDetails struct { mapRef *SubscriptionsMap Closed bool `json:"-"` once sync.Once - Closing chan struct{} + Closing chan bool PeerID peer.ID `json:"peerID"` ContentFilter protocol.ContentFilter `json:"contentFilters"` @@ -99,6 +99,7 @@ func (s *SubscriptionDetails) CloseC() { defer s.Unlock() s.Closed = true close(s.C) + close(s.Closing) }) } @@ -107,6 +108,15 @@ func (s *SubscriptionDetails) Close() error { return s.mapRef.Delete(s) } +func (s *SubscriptionDetails) SetClosing() { + s.Lock() + defer s.Unlock() + if !s.Closed { + s.Closed = true + s.Closing <- true + } +} + func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) { result := struct { PeerID peer.ID `json:"peerID"` diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go index c308d9bba..359930643 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go @@ -75,7 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content PeerID: peerID, C: make(chan *protocol.Envelope, 1024), ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, - Closing: make(chan struct{}), + Closing: make(chan bool), } // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair @@ -142,11 +142,21 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { contentFilter := subscription.ContentFilter delete(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic], subscription.ID) + if len(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic]) == 0 { + sub.logger.Debug("no more subs for pubsubTopic for this peer", zap.Stringer("id", subscription.PeerID), zap.String("pubsubtopic", contentFilter.PubsubTopic)) + delete(peerSubscription.SubsPerPubsubTopic, contentFilter.PubsubTopic) + } + // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair for contentTopic := range contentFilter.ContentTopics { sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic) } + if len(peerSubscription.SubsPerPubsubTopic) == 0 { + sub.logger.Debug("no more subs for peer", zap.Stringer("id", subscription.PeerID)) + delete(sub.items, subscription.PeerID) + } + return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 032901b62..b2adbc570 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1015,11 +1015,12 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd +# github.com/waku-org/go-waku v0.8.1-0.20240612113959-d9dc91a162d9 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/waku/persistence +github.com/waku-org/go-waku/waku/v2/api github.com/waku-org/go-waku/waku/v2/discv5 github.com/waku-org/go-waku/waku/v2/dnsdisc github.com/waku-org/go-waku/waku/v2/hash diff --git a/wakuv2/config.go b/wakuv2/config.go index dd838bdb9..e84adfaa7 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -39,33 +39,34 @@ var ( // Config represents the configuration state of a waku node. type Config struct { - MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node - Host string `toml:",omitempty"` - Port int `toml:",omitempty"` - EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient - EnablePeerExchangeClient bool `toml:",omitempty"` - KeepAliveInterval int `toml:",omitempty"` - MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol - MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol - LightClient bool `toml:",omitempty"` // Indicates if the node is a light client - WakuNodes []string `toml:",omitempty"` - Rendezvous bool `toml:",omitempty"` - DiscV5BootstrapNodes []string `toml:",omitempty"` - Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery - Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery - EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not - DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client - AutoUpdate bool `toml:",omitempty"` - UDPPort int `toml:",omitempty"` - EnableStore bool `toml:",omitempty"` - StoreCapacity int `toml:",omitempty"` - StoreSeconds int `toml:",omitempty"` - TelemetryServerURL string `toml:",omitempty"` - DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not) - UseShardAsDefaultTopic bool `toml:",omitempty"` - ClusterID uint16 `toml:",omitempty"` - EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations - SkipPublishToTopic bool `toml:",omitempty"` // Used in testing + MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node + Host string `toml:",omitempty"` + Port int `toml:",omitempty"` + EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient + EnablePeerExchangeClient bool `toml:",omitempty"` + KeepAliveInterval int `toml:",omitempty"` + MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol + MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol + LightClient bool `toml:",omitempty"` // Indicates if the node is a light client + WakuNodes []string `toml:",omitempty"` + Rendezvous bool `toml:",omitempty"` + DiscV5BootstrapNodes []string `toml:",omitempty"` + Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery + Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery + EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not + DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client + AutoUpdate bool `toml:",omitempty"` + UDPPort int `toml:",omitempty"` + EnableStore bool `toml:",omitempty"` + StoreCapacity int `toml:",omitempty"` + StoreSeconds int `toml:",omitempty"` + TelemetryServerURL string `toml:",omitempty"` + DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not) + DefaultShardedPubsubTopics []string `toml:", omitempty"` + UseShardAsDefaultTopic bool `toml:",omitempty"` + ClusterID uint16 `toml:",omitempty"` + EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations + SkipPublishToTopic bool `toml:",omitempty"` // Used in testing } func (c *Config) Validate(logger *zap.Logger) error { @@ -123,6 +124,9 @@ func setDefaults(cfg *Config) *Config { if cfg.DefaultShardPubsubTopic == "" { if cfg.UseShardAsDefaultTopic { cfg.DefaultShardPubsubTopic = shard.DefaultShardPubsubTopic() + //For now populating with both used shards, but this can be populated from user subscribed communities etc once community sharding is implemented + cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultShardPubsubTopic()) + cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultNonProtectedPubsubTopic()) } else { cfg.DefaultShardPubsubTopic = relay.DefaultWakuTopic } diff --git a/wakuv2/filter_manager.go b/wakuv2/filter_manager.go index 1c58d1f74..a5522eaa4 100644 --- a/wakuv2/filter_manager.go +++ b/wakuv2/filter_manager.go @@ -3,43 +3,17 @@ package wakuv2 import ( "context" "sync" - "time" - - "github.com/google/uuid" - "github.com/libp2p/go-libp2p/core/peer" "github.com/status-im/status-go/wakuv2/common" "go.uber.org/zap" "golang.org/x/exp/maps" - node "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/api" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/subscription" ) -const ( - FilterEventAdded = iota - FilterEventRemoved - FilterEventPingResult - FilterEventSubscribeResult - FilterEventUnsubscribeResult - FilterEventGetStats -) - -type FilterSubs map[string]subscription.SubscriptionSet - -type FilterEvent struct { - eventType int - filterID string - success bool - peerID peer.ID - tempID string - sub *subscription.SubscriptionDetails - ch chan FilterSubs -} - // Methods on FilterManager maintain filter peer health // // runFilterLoop is the main event loop @@ -51,224 +25,112 @@ type FilterEvent struct { // filterSubs is the map of filter IDs to subscriptions type FilterManager struct { - ctx context.Context - filterSubs FilterSubs - eventChan chan (FilterEvent) - isFilterSubAlive func(sub *subscription.SubscriptionDetails) error - getFilter func(string) *common.Filter - onNewEnvelopes func(env *protocol.Envelope) error - logger *zap.Logger - config *Config - node *node.WakuNode + sync.Mutex + ctx context.Context + cfg *Config + filters map[string]SubDetails // map of filters to apiSub details + onNewEnvelopes func(env *protocol.Envelope) error + logger *zap.Logger + node *filter.WakuFilterLightNode + peersAvailable bool + filterQueue chan filterConfig +} +type SubDetails struct { + cancel func() + sub *api.Sub } -func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func(string) *common.Filter, config *Config, onNewEnvelopes func(env *protocol.Envelope) error, node *node.WakuNode) *FilterManager { +const filterQueueSize = 1000 + +type filterConfig struct { + ID string + contentFilter protocol.ContentFilter +} + +func newFilterManager(ctx context.Context, logger *zap.Logger, cfg *Config, onNewEnvelopes func(env *protocol.Envelope) error, node *filter.WakuFilterLightNode) *FilterManager { // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx mgr.logger = logger - mgr.getFilter = getFilterFn + mgr.cfg = cfg mgr.onNewEnvelopes = onNewEnvelopes - mgr.filterSubs = make(FilterSubs) - mgr.eventChan = make(chan FilterEvent, 100) - mgr.config = config + mgr.filters = make(map[string]SubDetails) mgr.node = node - mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error { - return nil - } + mgr.peersAvailable = false + mgr.filterQueue = make(chan filterConfig, filterQueueSize) return mgr } -func (mgr *FilterManager) runFilterLoop(wg *sync.WaitGroup) { - defer wg.Done() - // Use it to ping filter peer(s) periodically - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-mgr.ctx.Done(): - mgr.logger.Debug("filter loop stopped") - return - case <-ticker.C: - mgr.pingPeers() - case ev := <-mgr.eventChan: - mgr.processEvents(&ev) - } - } -} - -func (mgr *FilterManager) processEvents(ev *FilterEvent) { - switch ev.eventType { - - case FilterEventAdded: - mgr.filterSubs[ev.filterID] = make(subscription.SubscriptionSet) - mgr.resubscribe(ev.filterID) - - case FilterEventRemoved: - for _, sub := range mgr.filterSubs[ev.filterID] { - if sub == nil { - // Skip temp subs - continue - } - go mgr.unsubscribeFromFilter(ev.filterID, sub) - } - delete(mgr.filterSubs, ev.filterID) - - case FilterEventPingResult: - if ev.success { - break - } - // filterID field is only set when there are no subs to check for this filter, - // therefore no particular peers that could be unreachable. - if ev.filterID != "" { - // Trigger full resubscribe, filter has too few peers - mgr.logger.Debug("filter has too few subs", zap.String("filterId", ev.filterID)) - mgr.resubscribe(ev.filterID) - break - } - - // Delete subs for removed peer - for filterID, subs := range mgr.filterSubs { - for _, sub := range subs { - if sub == nil { - // Skip temp subs - continue - } - if sub.PeerID == ev.peerID { - mgr.logger.Debug("filter sub is inactive", zap.String("filterId", filterID), zap.Stringer("peerId", sub.PeerID), zap.String("subID", sub.ID)) - delete(subs, sub.ID) - go mgr.unsubscribeFromFilter(filterID, sub) - } - } - mgr.resubscribe(filterID) - } - - case FilterEventSubscribeResult: - subs, found := mgr.filterSubs[ev.filterID] - if ev.success { - if found { - subs[ev.sub.ID] = ev.sub - go mgr.runFilterSubscriptionLoop(ev.sub) - } else { - // We subscribed to a filter that is already uninstalled; invoke unsubscribe - go mgr.unsubscribeFromFilter(ev.filterID, ev.sub) - } - } - if found { - // Delete temp subscription record - delete(subs, ev.tempID) - } - - case FilterEventUnsubscribeResult: - mgr.logger.Debug("filter event unsubscribe result", zap.String("filterId", ev.filterID), zap.Stringer("peerID", ev.sub.PeerID)) - - case FilterEventGetStats: - stats := make(FilterSubs) - for id, subs := range mgr.filterSubs { - stats[id] = make(subscription.SubscriptionSet) - for subID, sub := range subs { - if sub == nil { - // Skip temp subs - continue - } - - stats[id][subID] = sub - } - } - ev.ch <- stats - } -} - -func (mgr *FilterManager) subscribeToFilter(filterID string, tempID string) { - - logger := mgr.logger.With(zap.String("filterId", filterID)) - f := mgr.getFilter(filterID) - if f == nil { - logger.Error("filter subscribeToFilter: No filter found") - mgr.eventChan <- FilterEvent{eventType: FilterEventSubscribeResult, filterID: filterID, tempID: tempID, success: false} - return - } +func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) { + mgr.Lock() + defer mgr.Unlock() contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics) - logger.Debug("filter subscribe to filter node", zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList())) - ctx, cancel := context.WithTimeout(mgr.ctx, requestTimeout) - defer cancel() + mgr.logger.Debug("adding filter", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter)) - subDetails, err := mgr.node.FilterLightnode().Subscribe(ctx, contentFilter, filter.WithAutomaticPeerSelection()) - var sub *subscription.SubscriptionDetails - if err != nil { - logger.Warn("filter could not add wakuv2 filter for peers", zap.Error(err)) + if mgr.peersAvailable { + go mgr.subscribeAndRunLoop(filterConfig{filterID, contentFilter}) } else { - sub = subDetails[0] - logger.Debug("filter subscription success", zap.Stringer("peer", sub.PeerID), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList())) + mgr.logger.Debug("queuing filter as not online", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter)) + mgr.filterQueue <- filterConfig{filterID, contentFilter} } - - success := err == nil - mgr.eventChan <- FilterEvent{eventType: FilterEventSubscribeResult, filterID: filterID, tempID: tempID, sub: sub, success: success} } -func (mgr *FilterManager) unsubscribeFromFilter(filterID string, sub *subscription.SubscriptionDetails) { - mgr.logger.Debug("filter unsubscribe from filter node", zap.String("filterId", filterID), zap.String("subId", sub.ID), zap.Stringer("peer", sub.PeerID)) - // Unsubscribe on light node - ctx, cancel := context.WithTimeout(mgr.ctx, requestTimeout) - defer cancel() - _, err := mgr.node.FilterLightnode().UnsubscribeWithSubscription(ctx, sub) +func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { + ctx, cancel := context.WithCancel(mgr.ctx) + config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter} - if err != nil { - mgr.logger.Warn("could not unsubscribe wakuv2 filter for peer", zap.String("filterId", filterID), zap.String("subId", sub.ID), zap.Error(err)) + sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.peersAvailable) + mgr.Lock() + mgr.filters[f.ID] = SubDetails{cancel, sub} + mgr.Unlock() + if err == nil { + mgr.logger.Debug("subscription successful, running loop", zap.String("filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter)) + mgr.runFilterSubscriptionLoop(sub) + } else { + mgr.logger.Error("subscription fail, need to debug issue", zap.String("filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err)) } - - success := err == nil - mgr.eventChan <- FilterEvent{eventType: FilterEventUnsubscribeResult, filterID: filterID, success: success, sub: sub} } -// Check whether each of the installed filters -// has enough alive subscriptions to peers -func (mgr *FilterManager) pingPeers() { - mgr.logger.Debug("filter pingPeers") - - distinctPeers := make(map[peer.ID]struct{}) - for filterID, subs := range mgr.filterSubs { - logger := mgr.logger.With(zap.String("filterId", filterID)) - nilSubsCnt := 0 - for _, s := range subs { - if s == nil { - nilSubsCnt++ - } - } - logger.Debug("filter ping peers", zap.Int("len", len(subs)), zap.Int("len(nilSubs)", nilSubsCnt)) - if len(subs) < mgr.config.MinPeersForFilter { - // Trigger full resubscribe - logger.Debug("filter ping peers not enough subs") - go func(filterID string) { - mgr.eventChan <- FilterEvent{eventType: FilterEventPingResult, filterID: filterID, success: false} - }(filterID) - } - for _, sub := range subs { - if sub == nil { - // Skip temp subs - continue - } - _, found := distinctPeers[sub.PeerID] - if found { - continue - } - distinctPeers[sub.PeerID] = struct{}{} - logger.Debug("filter ping peer", zap.Stringer("peerId", sub.PeerID)) - go func(sub *subscription.SubscriptionDetails) { - err := mgr.isFilterSubAlive(sub) - alive := err == nil - - if alive { - logger.Debug("filter aliveness check succeeded", zap.Stringer("peerId", sub.PeerID)) - } else { - logger.Debug("filter aliveness check failed", zap.Stringer("peerId", sub.PeerID), zap.Error(err)) +func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) { + mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), + zap.Int("filters count", len(mgr.filters)), zap.Int("filter-queue-len", len(mgr.filterQueue))) + //TODO: Needs optimization because only on transition from offline to online should trigger this logic. + if newStatus { //Online + if len(mgr.filterQueue) > 0 { + //Check if any filter subs are pending and subscribe them + for filter := range mgr.filterQueue { + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", filter.ID), zap.Stringer("content-filter", filter.contentFilter)) + go mgr.subscribeAndRunLoop(filter) + if len(mgr.filterQueue) == 0 { + mgr.logger.Debug("filter queue empty") + break } - mgr.eventChan <- FilterEvent{eventType: FilterEventPingResult, peerID: sub.PeerID, success: alive} - }(sub) + } } } + mgr.Lock() + for _, subDetails := range mgr.filters { + subDetails.sub.SetNodeState(newStatus) + } + mgr.Unlock() + mgr.peersAvailable = newStatus +} + +func (mgr *FilterManager) removeFilter(filterID string) { + mgr.Lock() + defer mgr.Unlock() + mgr.logger.Debug("removing filter", zap.String("filter-id", filterID)) + + subDetails, ok := mgr.filters[filterID] + if ok { + delete(mgr.filters, filterID) + // close goroutine running runFilterSubscriptionLoop + // this will also close api.Sub + subDetails.cancel() + } else { + mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID)) + } } func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet common.TopicSet) protocol.ContentFilter { @@ -280,43 +142,20 @@ func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet return protocol.NewContentFilter(pubsubTopic, contentTopics...) } -func (mgr *FilterManager) resubscribe(filterID string) { - subs, found := mgr.filterSubs[filterID] - if !found { - mgr.logger.Error("resubscribe filter not found", zap.String("filterId", filterID)) - return - } - if len(subs) > mgr.config.MinPeersForFilter { - mgr.logger.Error("filter resubscribe too many subs", zap.String("filterId", filterID), zap.Int("len", len(subs))) - } - if len(subs) == mgr.config.MinPeersForFilter { - // do nothing - return - } - mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs))) - for i := len(subs); i < mgr.config.MinPeersForFilter; i++ { - mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID)) - - // Create sub placeholder in order to avoid potentially too many subs - tempID := uuid.NewString() - subs[tempID] = nil - go mgr.subscribeToFilter(filterID, tempID) - } -} - -func (mgr *FilterManager) runFilterSubscriptionLoop(sub *subscription.SubscriptionDetails) { +func (mgr *FilterManager) runFilterSubscriptionLoop(sub *api.Sub) { for { select { case <-mgr.ctx.Done(): + mgr.logger.Debug("subscription loop ended", zap.Stringer("content-filter", sub.ContentFilter)) return - case env, ok := <-sub.C: + case env, ok := <-sub.DataCh: if ok { err := (mgr.onNewEnvelopes)(env) if err != nil { - mgr.logger.Error("OnNewEnvelopes error", zap.Error(err)) + mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err)) } } else { - mgr.logger.Debug("filter sub is closed", zap.String("id", sub.ID)) + mgr.logger.Debug("filter sub is closed", zap.Any("content-filter", sub.ContentFilter)) return } } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 3f0c27af4..95b2106da 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -278,6 +278,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge if cfg.LightClient { opts = append(opts, node.WithWakuFilterLightNode()) + cfg.EnablePeerExchangeClient = false //TODO: Need to fix: Disabling for now to test only with fleet nodes. } else { relayOpts := []pubsub.Option{ pubsub.WithMaxMessageSize(int(waku.cfg.MaxMessageSize)), @@ -430,7 +431,7 @@ func (w *Waku) discoverAndConnectPeers() error { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { defer wg.Done() if len(d.PeerInfo.Addrs) != 0 { - go w.connect(d.PeerInfo, wps.DNSDiscovery) + go w.connect(d.PeerInfo, d.ENR, wps.DNSDiscovery) } } @@ -457,17 +458,17 @@ func (w *Waku) discoverAndConnectPeers() error { continue } - go w.connect(*peerInfo, wps.Static) + go w.connect(*peerInfo, nil, wps.Static) } } return nil } -func (w *Waku) connect(peerInfo peer.AddrInfo, origin wps.Origin) { +func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) { // Connection will be prunned eventually by the connection manager if needed // The peer connector in go-waku uses Connect, so it will execute identify as part of its - w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, []string{w.cfg.DefaultShardPubsubTopic}, nil, true) + w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, w.cfg.DefaultShardedPubsubTopics, enr, true) } func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { @@ -538,7 +539,7 @@ func (w *Waku) runPeerExchangeLoop() { } // Attempt to connect to the peers. // Peers will be added to the libp2p peer store thanks to identify - go w.connect(discoveredNode.PeerInfo, wps.DNSDiscovery) + go w.connect(discoveredNode.PeerInfo, discoveredNode.ENR, wps.DNSDiscovery) peers = append(peers, discoveredNode.PeerID) } } @@ -904,7 +905,7 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) { } if w.cfg.LightClient { - w.filterManager.eventChan <- FilterEvent{eventType: FilterEventAdded, filterID: id} + w.filterManager.addFilter(id, f) } return id, nil @@ -918,21 +919,12 @@ func (w *Waku) Unsubscribe(ctx context.Context, id string) error { } if w.cfg.LightClient { - w.filterManager.eventChan <- FilterEvent{eventType: FilterEventRemoved, filterID: id} + w.filterManager.removeFilter(id) } return nil } -// Used for testing -func (w *Waku) getFilterStats() FilterSubs { - ch := make(chan FilterSubs) - w.filterManager.eventChan <- FilterEvent{eventType: FilterEventGetStats, ch: ch} - stats := <-ch - - return stats -} - // GetFilter returns the filter by id. func (w *Waku) GetFilter(id string) *common.Filter { return w.filters.Get(id) @@ -1271,6 +1263,48 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Que return result.Cursor(), len(result.Messages), nil } +func (w *Waku) lightClientConnectionStatus() { + + peers := w.node.Host().Network().Peers() + w.logger.Debug("peer stats", + zap.Int("peersCount", len(peers))) + subs := w.node.FilterLightnode().Subscriptions() + w.logger.Debug("filter subs count", zap.Int("count", len(subs))) + isOnline := false + if len(peers) > 0 { + isOnline = true + } + //TODOL needs fixing, right now invoking everytime. + //Trigger FilterManager to take care of any pending filter subscriptions + //TODO: Pass pubsubTopic based on topicHealth notif received. + go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline) + + w.connStatusMu.Lock() + + connStatus := types.ConnStatus{ + IsOnline: isOnline, + Peers: FormatPeerStats(w.node), + } + for k, subs := range w.connStatusSubscriptions { + if !subs.Send(connStatus) { + delete(w.connStatusSubscriptions, k) + } + } + w.connStatusMu.Unlock() + if w.onPeerStats != nil { + w.onPeerStats(connStatus) + } + + //TODO:Analyze if we need to discover and connect to peers with peerExchange loop enabled. + if w.offline && isOnline { + if err := w.discoverAndConnectPeers(); err != nil { + w.logger.Error("failed to add wakuv2 peers", zap.Error(err)) + } + } + + w.offline = !isOnline +} + // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. func (w *Waku) Start() error { @@ -1313,10 +1347,20 @@ func (w *Waku) Start() error { go func() { defer w.wg.Done() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { case <-w.ctx.Done(): return + case <-ticker.C: + //TODO: Need to fix. + // Temporary changes for lightNodes to have health check based on connected peers. + //This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard. + //This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114 + if w.cfg.LightClient { + w.lightClientConnectionStatus() + } case c := <-w.topicHealthStatusChan: w.connStatusMu.Lock() @@ -1345,19 +1389,16 @@ func (w *Waku) Start() error { }() go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL) + //TODO: commenting for now so that only fleet nodes are used. + //Need to uncomment once filter peer scoring etc is implemented. go w.runPeerExchangeLoop() if w.cfg.LightClient { // Create FilterManager that will main peer connectivity // for installed filters - w.filterManager = newFilterManager(w.ctx, w.logger, - func(id string) *common.Filter { return w.GetFilter(id) }, - w.cfg, + w.filterManager = newFilterManager(w.ctx, w.logger, w.cfg, func(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) }, - w.node) - - w.wg.Add(1) - go w.filterManager.runFilterLoop(&w.wg) + w.node.FilterLightnode()) } err = w.setupRelaySubscriptions() @@ -1777,6 +1818,7 @@ func (w *Waku) seedBootnodesForDiscV5() { retries = 0 lastTry = now() } + case <-w.ctx.Done(): w.wg.Done() w.logger.Debug("bootnode seeding stopped") diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 2779abe07..27da1fcda 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -23,28 +23,42 @@ import ( "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" - "github.com/waku-org/go-waku/waku/v2/dnsdisc" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/protocol/subscription" - "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/wakuv2/common" + "github.com/waku-org/go-waku/waku/v2/dnsdisc" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) var testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im" +func setDefaultConfig(config *Config, lightMode bool) { + config.ClusterID = 16 + config.UseShardAsDefaultTopic = true + + if lightMode { + config.EnablePeerExchangeClient = true + config.LightClient = true + config.EnableDiscV5 = false + } else { + config.EnableDiscV5 = true + config.EnablePeerExchangeServer = true + config.LightClient = false + config.EnablePeerExchangeClient = false + } +} + func TestDiscoveryV5(t *testing.T) { config := &Config{} - config.EnableDiscV5 = true + setDefaultConfig(config, false) config.DiscV5BootstrapNodes = []string{testENRBootstrap} config.DiscoveryLimit = 20 - config.ClusterID = 16 - w, err := New(nil, "", config, nil, nil, nil, nil, nil) + w, err := New(nil, "shards.staging", config, nil, nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) @@ -64,7 +78,7 @@ func TestDiscoveryV5(t *testing.T) { func TestRestartDiscoveryV5(t *testing.T) { config := &Config{} - config.EnableDiscV5 = true + setDefaultConfig(config, false) // Use wrong discv5 bootstrap address, to simulate being offline config.DiscV5BootstrapNodes = []string{"enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@1.1.1.2"} config.DiscoveryLimit = 20 @@ -74,7 +88,6 @@ func TestRestartDiscoveryV5(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Start()) - require.False(t, w.seededBootnodesForDiscV5) options := func(b *backoff.ExponentialBackOff) { @@ -111,17 +124,15 @@ func TestRestartDiscoveryV5(t *testing.T) { } func TestBasicWakuV2(t *testing.T) { - enrTreeAddress := testENRBootstrap + enrTreeAddress := testENRBootstrap //"enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im" envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") if envEnrTreeAddress != "" { enrTreeAddress = envEnrTreeAddress } config := &Config{} + setDefaultConfig(config, false) config.Port = 0 - config.ClusterID = 16 - config.UseShardAsDefaultTopic = true - config.EnableDiscV5 = true config.DiscV5BootstrapNodes = []string{enrTreeAddress} config.DiscoveryLimit = 20 config.WakuNodes = []string{enrTreeAddress} @@ -148,7 +159,7 @@ func TestBasicWakuV2(t *testing.T) { // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) > 2 { + if len(w.Peers()) < 2 { return errors.New("no peers discovered") } return nil @@ -176,6 +187,7 @@ func TestBasicWakuV2(t *testing.T) { Version: proto.Uint32(0), Timestamp: &msgTimestamp, }) + require.NoError(t, err) time.Sleep(1 * time.Second) @@ -313,47 +325,49 @@ func TestWakuV2Filter(t *testing.T) { if envEnrTreeAddress != "" { enrTreeAddress = envEnrTreeAddress } - config := &Config{} - config.ClusterID = 16 + setDefaultConfig(config, true) config.Port = 0 - config.LightClient = true - config.KeepAliveInterval = 1 + config.KeepAliveInterval = 0 config.MinPeersForFilter = 2 - config.EnableDiscV5 = true + config.DiscV5BootstrapNodes = []string{enrTreeAddress} config.DiscoveryLimit = 20 config.WakuNodes = []string{enrTreeAddress} - fleet := "status.test" // Need a name fleet so that LightClient is not set to false - w, err := New(nil, fleet, config, nil, nil, nil, nil, nil) + w, err := New(nil, "", config, nil, nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 10 * time.Second } - + time.Sleep(10 * time.Second) //TODO: Check if we can remove this sleep. // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) > 2 { + peers, err := w.node.PeerManager().FilterPeersByProto(nil, nil, filter.FilterSubscribeID_v20beta1) + if err != nil { + return err + } + if len(peers) < 2 { return errors.New("no peers discovered") } return nil }, options) require.NoError(t, err) - + testPubsubTopic := "/waku/2/rs/16/32" filter := &common.Filter{ Messages: common.NewMemoryMessageStore(), + PubsubTopic: testPubsubTopic, ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}), } - filterID, err := w.Subscribe(filter) + _, err = w.Subscribe(filter) require.NoError(t, err) msgTimestamp := w.timestamp() contentTopic := maps.Keys(filter.ContentTopics)[0] - _, err = w.Send("", &pb.WakuMessage{ + _, err = w.Send(testPubsubTopic, &pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: contentTopic.ContentTopic(), Version: proto.Uint32(0), @@ -361,41 +375,37 @@ func TestWakuV2Filter(t *testing.T) { }) require.NoError(t, err) - time.Sleep(15 * time.Second) + time.Sleep(5 * time.Second) // Ensure there is at least 1 active filter subscription subscriptions := w.node.FilterLightnode().Subscriptions() require.Greater(t, len(subscriptions), 0) - // Ensure there are some active peers for this filter subscription - stats := w.getFilterStats() - require.Greater(t, len(stats[filterID]), 0) - messages := filter.Retrieve() require.Len(t, messages, 1) // Mock peers going down - isFilterSubAliveBak := w.filterManager.isFilterSubAlive - w.filterManager.config.MinPeersForFilter = 0 - w.filterManager.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error { - return errors.New("peer down") - } + _, err = w.node.FilterLightnode().UnsubscribeWithSubscription(w.ctx, subscriptions[0]) + require.NoError(t, err) - time.Sleep(5 * time.Second) - - // Ensure there are 0 active peers now - - stats = w.getFilterStats() - require.Len(t, stats[filterID], 0) - - // Reconnect - w.filterManager.config.MinPeersForFilter = 2 - w.filterManager.isFilterSubAlive = isFilterSubAliveBak time.Sleep(10 * time.Second) - // Ensure there are some active peers now - stats = w.getFilterStats() - require.Greater(t, len(stats[filterID]), 0) + // Ensure there is at least 1 active filter subscription + subscriptions = w.node.FilterLightnode().Subscriptions() + require.Greater(t, len(subscriptions), 0) + + // Ensure that messages are retrieved with a fresh sub + _, err = w.Send(testPubsubTopic, &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5, 6}, + ContentTopic: contentTopic.ContentTopic(), + Version: proto.Uint32(0), + Timestamp: &msgTimestamp, + }) + require.NoError(t, err) + time.Sleep(10 * time.Second) + + messages = filter.Retrieve() + require.Len(t, messages, 1) require.NoError(t, w.Stop()) }