feat: Simplify FilterManager and move subscription management to waku (#4665)

* feat_: simplify filter management

fix_: try using shard as a default topic

fix_: filter test to work with shards.staging fleet

* fix_: handle connection status change and manage filter subscriptions better

* chore_: bump go-waku to latest and with some fixes

Co-authored-by: richΛrd <info@richardramos.me>

* chore_: disabling pxClient so that only fleet nodes are used for now

---------

Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>
Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Vit∀ly Vlasov 2024-06-14 15:41:45 +03:00 committed by GitHub
parent 1844ab7c83
commit 68acef62d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 478 additions and 359 deletions

2
go.mod
View File

@ -93,7 +93,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd
github.com/waku-org/go-waku v0.8.1-0.20240612113959-d9dc91a162d9
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2137,8 +2137,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd h1:g5EneT88eHOakr8Zukx4RP1JICgrSl9ZtmgGS+wQbC8=
github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k=
github.com/waku-org/go-waku v0.8.1-0.20240612113959-d9dc91a162d9 h1:OvO9PxpYYh4cNuuHpwXguZBQWEppdhdHAT2jIDls02k=
github.com/waku-org/go-waku v0.8.1-0.20240612113959-d9dc91a162d9/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -0,0 +1,202 @@
package api
import (
"context"
"encoding/json"
"time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"go.uber.org/zap"
)
const FilterPingTimeout = 5 * time.Second
const MultiplexChannelBuffer = 100
type FilterConfig struct {
MaxPeers int `json:"maxPeers"`
Peers []peer.ID `json:"peers"`
}
func (fc FilterConfig) String() string {
jsonStr, err := json.Marshal(fc)
if err != nil {
return ""
}
return string(jsonStr)
}
type Sub struct {
ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope
Config FilterConfig
subs subscription.SubscriptionSet
wf *filter.WakuFilterLightNode
ctx context.Context
cancel context.CancelFunc
log *zap.Logger
closing chan string
isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not.
resubscribeInProgress bool
id string
}
// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) {
sub := new(Sub)
sub.id = uuid.NewString()
sub.wf = wf
sub.ctx, sub.cancel = context.WithCancel(ctx)
sub.subs = make(subscription.SubscriptionSet)
sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer)
sub.ContentFilter = contentFilter
sub.Config = config
sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter))
sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers))
sub.isNodeOnline = online
sub.closing = make(chan string, config.MaxPeers)
if online {
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
if err == nil {
sub.multiplex(subs)
}
}
go sub.subscriptionLoop()
return sub, nil
}
func (apiSub *Sub) Unsubscribe() {
apiSub.cancel()
}
func (apiSub *Sub) SetNodeState(online bool) {
apiSub.isNodeOnline = online
}
func (apiSub *Sub) subscriptionLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers &&
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
}
case <-apiSub.ctx.Done():
apiSub.log.Debug("apiSub context: done")
apiSub.cleanup()
return
case subId := <-apiSub.closing:
apiSub.resubscribeInProgress = true
//trigger resubscribe flow for subscription.
apiSub.checkAndResubscribe(subId)
}
}
}
func (apiSub *Sub) checkAndResubscribe(subId string) {
var failedPeer peer.ID
if subId != "" {
apiSub.log.Debug("subscription close and resubscribe", zap.String("sub-id", subId), zap.Stringer("content-filter", apiSub.ContentFilter))
apiSub.subs[subId].Close()
failedPeer = apiSub.subs[subId].PeerID
delete(apiSub.subs, subId)
}
apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter))
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers {
apiSub.resubscribe(failedPeer)
}
apiSub.resubscribeInProgress = false
}
func (apiSub *Sub) cleanup() {
apiSub.log.Debug("cleaning up subscription", zap.Stringer("config", apiSub.Config))
for _, s := range apiSub.subs {
_, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)
if err != nil {
//Logging with info as this is part of cleanup
apiSub.log.Info("failed to unsubscribe filter", zap.Error(err))
}
}
close(apiSub.DataCh)
}
// Attempts to resubscribe on topics that lack subscriptions
func (apiSub *Sub) resubscribe(failedPeer peer.ID) {
// Re-subscribe asynchronously
existingSubCount := len(apiSub.subs)
apiSub.log.Debug("subscribing again", zap.Int("num-peers", apiSub.Config.MaxPeers-existingSubCount))
var peersToExclude peer.IDSlice
if failedPeer != "" { //little hack, couldn't find a better way to do it
peersToExclude = append(peersToExclude, failedPeer)
}
for _, sub := range apiSub.subs {
peersToExclude = append(peersToExclude, sub.PeerID)
}
subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-existingSubCount, peersToExclude...)
if err != nil {
apiSub.log.Debug("failed to resubscribe for filter", zap.Error(err))
return
} //Not handling scenario where all requested subs are not received as that should get handled from user of the API.
apiSub.multiplex(subs)
}
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
}
if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude))
options = append(options, filter.WithPeersToExclude(peersToExclude...))
}
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)
if err != nil {
//Inform of error, so that resubscribe can be triggered if required
if len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
}
if len(subs) > 0 {
// Partial Failure, which means atleast 1 subscription is successful
apiSub.log.Debug("partial failure in filter subscribe", zap.Error(err), zap.Int("success-count", len(subs)))
return subs, nil
}
// TODO: Once filter error handling indicates specific error, this can be handled better.
return nil, err
}
return subs, nil
}
func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
// Multiplex onto single channel
// Goroutines will exit once sub channels are closed
for _, subDetails := range subs {
apiSub.subs[subDetails.ID] = subDetails
go func(subDetails *subscription.SubscriptionDetails) {
apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
for env := range subDetails.C {
apiSub.DataCh <- env
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
select {
case <-apiSub.ctx.Done():
return
case <-subDetails.Closing:
apiSub.log.Debug("sub closing", zap.String("sub-id", subDetails.ID))
apiSub.closing <- subDetails.ID
}
}(subDetails)
}
}

View File

@ -88,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
wf.pm = pm
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.peerPingInterval = 5 * time.Second
wf.peerPingInterval = 1 * time.Minute
return wf
}
@ -591,7 +591,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
sub.Close()
result := &WakuFilterPushResult{}
wf.log.Debug("unsubscribing subscription", zap.String("sub-id", sub.ID), zap.Stringer("content-filter", sub.ContentFilter))
if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) {
// Last sub for this [peer, contentFilter] pair
err = wf.unsubscribeFromServer(ctx, params.requestID, sub.PeerID, sub.ContentFilter)
@ -599,6 +599,8 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
Err: err,
PeerID: sub.PeerID,
})
wf.log.Debug("unsubscribed subscription", zap.String("sub-id", sub.ID), zap.Stringer("content-filter", sub.ContentFilter), zap.Error(err))
}
return result, err

View File

@ -26,9 +26,8 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer)
for _, subscription := range subscriptions {
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID))
//Indicating that subscription is closing,
close(subscription.Closing)
subscription.SetClosing()
}
}
}

View File

@ -29,7 +29,7 @@ type SubscriptionDetails struct {
mapRef *SubscriptionsMap
Closed bool `json:"-"`
once sync.Once
Closing chan struct{}
Closing chan bool
PeerID peer.ID `json:"peerID"`
ContentFilter protocol.ContentFilter `json:"contentFilters"`
@ -99,6 +99,7 @@ func (s *SubscriptionDetails) CloseC() {
defer s.Unlock()
s.Closed = true
close(s.C)
close(s.Closing)
})
}
@ -107,6 +108,15 @@ func (s *SubscriptionDetails) Close() error {
return s.mapRef.Delete(s)
}
func (s *SubscriptionDetails) SetClosing() {
s.Lock()
defer s.Unlock()
if !s.Closed {
s.Closed = true
s.Closing <- true
}
}
func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) {
result := struct {
PeerID peer.ID `json:"peerID"`

View File

@ -75,7 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content
PeerID: peerID,
C: make(chan *protocol.Envelope, 1024),
ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
Closing: make(chan struct{}),
Closing: make(chan bool),
}
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
@ -142,11 +142,21 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
contentFilter := subscription.ContentFilter
delete(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic], subscription.ID)
if len(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic]) == 0 {
sub.logger.Debug("no more subs for pubsubTopic for this peer", zap.Stringer("id", subscription.PeerID), zap.String("pubsubtopic", contentFilter.PubsubTopic))
delete(peerSubscription.SubsPerPubsubTopic, contentFilter.PubsubTopic)
}
// Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair
for contentTopic := range contentFilter.ContentTopics {
sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic)
}
if len(peerSubscription.SubsPerPubsubTopic) == 0 {
sub.logger.Debug("no more subs for peer", zap.Stringer("id", subscription.PeerID))
delete(sub.items, subscription.PeerID)
}
return nil
}

3
vendor/modules.txt vendored
View File

@ -1015,11 +1015,12 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd
# github.com/waku-org/go-waku v0.8.1-0.20240612113959-d9dc91a162d9
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests
github.com/waku-org/go-waku/waku/persistence
github.com/waku-org/go-waku/waku/v2/api
github.com/waku-org/go-waku/waku/v2/discv5
github.com/waku-org/go-waku/waku/v2/dnsdisc
github.com/waku-org/go-waku/waku/v2/hash

View File

@ -39,33 +39,34 @@ var (
// Config represents the configuration state of a waku node.
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node
Host string `toml:",omitempty"`
Port int `toml:",omitempty"`
EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient
EnablePeerExchangeClient bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol
LightClient bool `toml:",omitempty"` // Indicates if the node is a light client
WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery
Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery
EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not
DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client
AutoUpdate bool `toml:",omitempty"`
UDPPort int `toml:",omitempty"`
EnableStore bool `toml:",omitempty"`
StoreCapacity int `toml:",omitempty"`
StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"`
DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
UseShardAsDefaultTopic bool `toml:",omitempty"`
ClusterID uint16 `toml:",omitempty"`
EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations
SkipPublishToTopic bool `toml:",omitempty"` // Used in testing
MaxMessageSize uint32 `toml:",omitempty"` // Maximal message length allowed by the waku node
Host string `toml:",omitempty"`
Port int `toml:",omitempty"`
EnablePeerExchangeServer bool `toml:",omitempty"` // PeerExchange server makes sense only when discv5 is running locally as it will have a cache of peers that it can respond to in case a PeerExchange request comes from the PeerExchangeClient
EnablePeerExchangeClient bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"` // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol
LightClient bool `toml:",omitempty"` // Indicates if the node is a light client
WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery
Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery
EnableDiscV5 bool `toml:",omitempty"` // Indicates whether discv5 is enabled or not
DiscoveryLimit int `toml:",omitempty"` // Indicates the number of nodes to discover with peer exchange client
AutoUpdate bool `toml:",omitempty"`
UDPPort int `toml:",omitempty"`
EnableStore bool `toml:",omitempty"`
StoreCapacity int `toml:",omitempty"`
StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"`
DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
DefaultShardedPubsubTopics []string `toml:", omitempty"`
UseShardAsDefaultTopic bool `toml:",omitempty"`
ClusterID uint16 `toml:",omitempty"`
EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations
SkipPublishToTopic bool `toml:",omitempty"` // Used in testing
}
func (c *Config) Validate(logger *zap.Logger) error {
@ -123,6 +124,9 @@ func setDefaults(cfg *Config) *Config {
if cfg.DefaultShardPubsubTopic == "" {
if cfg.UseShardAsDefaultTopic {
cfg.DefaultShardPubsubTopic = shard.DefaultShardPubsubTopic()
//For now populating with both used shards, but this can be populated from user subscribed communities etc once community sharding is implemented
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultShardPubsubTopic())
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultNonProtectedPubsubTopic())
} else {
cfg.DefaultShardPubsubTopic = relay.DefaultWakuTopic
}

View File

@ -3,43 +3,17 @@ package wakuv2
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/status-go/wakuv2/common"
"go.uber.org/zap"
"golang.org/x/exp/maps"
node "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/api"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
)
const (
FilterEventAdded = iota
FilterEventRemoved
FilterEventPingResult
FilterEventSubscribeResult
FilterEventUnsubscribeResult
FilterEventGetStats
)
type FilterSubs map[string]subscription.SubscriptionSet
type FilterEvent struct {
eventType int
filterID string
success bool
peerID peer.ID
tempID string
sub *subscription.SubscriptionDetails
ch chan FilterSubs
}
// Methods on FilterManager maintain filter peer health
//
// runFilterLoop is the main event loop
@ -51,224 +25,112 @@ type FilterEvent struct {
// filterSubs is the map of filter IDs to subscriptions
type FilterManager struct {
ctx context.Context
filterSubs FilterSubs
eventChan chan (FilterEvent)
isFilterSubAlive func(sub *subscription.SubscriptionDetails) error
getFilter func(string) *common.Filter
onNewEnvelopes func(env *protocol.Envelope) error
logger *zap.Logger
config *Config
node *node.WakuNode
sync.Mutex
ctx context.Context
cfg *Config
filters map[string]SubDetails // map of filters to apiSub details
onNewEnvelopes func(env *protocol.Envelope) error
logger *zap.Logger
node *filter.WakuFilterLightNode
peersAvailable bool
filterQueue chan filterConfig
}
type SubDetails struct {
cancel func()
sub *api.Sub
}
func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func(string) *common.Filter, config *Config, onNewEnvelopes func(env *protocol.Envelope) error, node *node.WakuNode) *FilterManager {
const filterQueueSize = 1000
type filterConfig struct {
ID string
contentFilter protocol.ContentFilter
}
func newFilterManager(ctx context.Context, logger *zap.Logger, cfg *Config, onNewEnvelopes func(env *protocol.Envelope) error, node *filter.WakuFilterLightNode) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
mgr.logger = logger
mgr.getFilter = getFilterFn
mgr.cfg = cfg
mgr.onNewEnvelopes = onNewEnvelopes
mgr.filterSubs = make(FilterSubs)
mgr.eventChan = make(chan FilterEvent, 100)
mgr.config = config
mgr.filters = make(map[string]SubDetails)
mgr.node = node
mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error {
return nil
}
mgr.peersAvailable = false
mgr.filterQueue = make(chan filterConfig, filterQueueSize)
return mgr
}
func (mgr *FilterManager) runFilterLoop(wg *sync.WaitGroup) {
defer wg.Done()
// Use it to ping filter peer(s) periodically
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-mgr.ctx.Done():
mgr.logger.Debug("filter loop stopped")
return
case <-ticker.C:
mgr.pingPeers()
case ev := <-mgr.eventChan:
mgr.processEvents(&ev)
}
}
}
func (mgr *FilterManager) processEvents(ev *FilterEvent) {
switch ev.eventType {
case FilterEventAdded:
mgr.filterSubs[ev.filterID] = make(subscription.SubscriptionSet)
mgr.resubscribe(ev.filterID)
case FilterEventRemoved:
for _, sub := range mgr.filterSubs[ev.filterID] {
if sub == nil {
// Skip temp subs
continue
}
go mgr.unsubscribeFromFilter(ev.filterID, sub)
}
delete(mgr.filterSubs, ev.filterID)
case FilterEventPingResult:
if ev.success {
break
}
// filterID field is only set when there are no subs to check for this filter,
// therefore no particular peers that could be unreachable.
if ev.filterID != "" {
// Trigger full resubscribe, filter has too few peers
mgr.logger.Debug("filter has too few subs", zap.String("filterId", ev.filterID))
mgr.resubscribe(ev.filterID)
break
}
// Delete subs for removed peer
for filterID, subs := range mgr.filterSubs {
for _, sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
if sub.PeerID == ev.peerID {
mgr.logger.Debug("filter sub is inactive", zap.String("filterId", filterID), zap.Stringer("peerId", sub.PeerID), zap.String("subID", sub.ID))
delete(subs, sub.ID)
go mgr.unsubscribeFromFilter(filterID, sub)
}
}
mgr.resubscribe(filterID)
}
case FilterEventSubscribeResult:
subs, found := mgr.filterSubs[ev.filterID]
if ev.success {
if found {
subs[ev.sub.ID] = ev.sub
go mgr.runFilterSubscriptionLoop(ev.sub)
} else {
// We subscribed to a filter that is already uninstalled; invoke unsubscribe
go mgr.unsubscribeFromFilter(ev.filterID, ev.sub)
}
}
if found {
// Delete temp subscription record
delete(subs, ev.tempID)
}
case FilterEventUnsubscribeResult:
mgr.logger.Debug("filter event unsubscribe result", zap.String("filterId", ev.filterID), zap.Stringer("peerID", ev.sub.PeerID))
case FilterEventGetStats:
stats := make(FilterSubs)
for id, subs := range mgr.filterSubs {
stats[id] = make(subscription.SubscriptionSet)
for subID, sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
stats[id][subID] = sub
}
}
ev.ch <- stats
}
}
func (mgr *FilterManager) subscribeToFilter(filterID string, tempID string) {
logger := mgr.logger.With(zap.String("filterId", filterID))
f := mgr.getFilter(filterID)
if f == nil {
logger.Error("filter subscribeToFilter: No filter found")
mgr.eventChan <- FilterEvent{eventType: FilterEventSubscribeResult, filterID: filterID, tempID: tempID, success: false}
return
}
func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
mgr.Lock()
defer mgr.Unlock()
contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
logger.Debug("filter subscribe to filter node", zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
ctx, cancel := context.WithTimeout(mgr.ctx, requestTimeout)
defer cancel()
mgr.logger.Debug("adding filter", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter))
subDetails, err := mgr.node.FilterLightnode().Subscribe(ctx, contentFilter, filter.WithAutomaticPeerSelection())
var sub *subscription.SubscriptionDetails
if err != nil {
logger.Warn("filter could not add wakuv2 filter for peers", zap.Error(err))
if mgr.peersAvailable {
go mgr.subscribeAndRunLoop(filterConfig{filterID, contentFilter})
} else {
sub = subDetails[0]
logger.Debug("filter subscription success", zap.Stringer("peer", sub.PeerID), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
mgr.logger.Debug("queuing filter as not online", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter))
mgr.filterQueue <- filterConfig{filterID, contentFilter}
}
success := err == nil
mgr.eventChan <- FilterEvent{eventType: FilterEventSubscribeResult, filterID: filterID, tempID: tempID, sub: sub, success: success}
}
func (mgr *FilterManager) unsubscribeFromFilter(filterID string, sub *subscription.SubscriptionDetails) {
mgr.logger.Debug("filter unsubscribe from filter node", zap.String("filterId", filterID), zap.String("subId", sub.ID), zap.Stringer("peer", sub.PeerID))
// Unsubscribe on light node
ctx, cancel := context.WithTimeout(mgr.ctx, requestTimeout)
defer cancel()
_, err := mgr.node.FilterLightnode().UnsubscribeWithSubscription(ctx, sub)
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
ctx, cancel := context.WithCancel(mgr.ctx)
config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter}
if err != nil {
mgr.logger.Warn("could not unsubscribe wakuv2 filter for peer", zap.String("filterId", filterID), zap.String("subId", sub.ID), zap.Error(err))
sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.peersAvailable)
mgr.Lock()
mgr.filters[f.ID] = SubDetails{cancel, sub}
mgr.Unlock()
if err == nil {
mgr.logger.Debug("subscription successful, running loop", zap.String("filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter))
mgr.runFilterSubscriptionLoop(sub)
} else {
mgr.logger.Error("subscription fail, need to debug issue", zap.String("filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err))
}
success := err == nil
mgr.eventChan <- FilterEvent{eventType: FilterEventUnsubscribeResult, filterID: filterID, success: success, sub: sub}
}
// Check whether each of the installed filters
// has enough alive subscriptions to peers
func (mgr *FilterManager) pingPeers() {
mgr.logger.Debug("filter pingPeers")
distinctPeers := make(map[peer.ID]struct{})
for filterID, subs := range mgr.filterSubs {
logger := mgr.logger.With(zap.String("filterId", filterID))
nilSubsCnt := 0
for _, s := range subs {
if s == nil {
nilSubsCnt++
}
}
logger.Debug("filter ping peers", zap.Int("len", len(subs)), zap.Int("len(nilSubs)", nilSubsCnt))
if len(subs) < mgr.config.MinPeersForFilter {
// Trigger full resubscribe
logger.Debug("filter ping peers not enough subs")
go func(filterID string) {
mgr.eventChan <- FilterEvent{eventType: FilterEventPingResult, filterID: filterID, success: false}
}(filterID)
}
for _, sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
_, found := distinctPeers[sub.PeerID]
if found {
continue
}
distinctPeers[sub.PeerID] = struct{}{}
logger.Debug("filter ping peer", zap.Stringer("peerId", sub.PeerID))
go func(sub *subscription.SubscriptionDetails) {
err := mgr.isFilterSubAlive(sub)
alive := err == nil
if alive {
logger.Debug("filter aliveness check succeeded", zap.Stringer("peerId", sub.PeerID))
} else {
logger.Debug("filter aliveness check failed", zap.Stringer("peerId", sub.PeerID), zap.Error(err))
func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) {
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("filters count", len(mgr.filters)), zap.Int("filter-queue-len", len(mgr.filterQueue)))
//TODO: Needs optimization because only on transition from offline to online should trigger this logic.
if newStatus { //Online
if len(mgr.filterQueue) > 0 {
//Check if any filter subs are pending and subscribe them
for filter := range mgr.filterQueue {
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", filter.ID), zap.Stringer("content-filter", filter.contentFilter))
go mgr.subscribeAndRunLoop(filter)
if len(mgr.filterQueue) == 0 {
mgr.logger.Debug("filter queue empty")
break
}
mgr.eventChan <- FilterEvent{eventType: FilterEventPingResult, peerID: sub.PeerID, success: alive}
}(sub)
}
}
}
mgr.Lock()
for _, subDetails := range mgr.filters {
subDetails.sub.SetNodeState(newStatus)
}
mgr.Unlock()
mgr.peersAvailable = newStatus
}
func (mgr *FilterManager) removeFilter(filterID string) {
mgr.Lock()
defer mgr.Unlock()
mgr.logger.Debug("removing filter", zap.String("filter-id", filterID))
subDetails, ok := mgr.filters[filterID]
if ok {
delete(mgr.filters, filterID)
// close goroutine running runFilterSubscriptionLoop
// this will also close api.Sub
subDetails.cancel()
} else {
mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID))
}
}
func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet common.TopicSet) protocol.ContentFilter {
@ -280,43 +142,20 @@ func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet
return protocol.NewContentFilter(pubsubTopic, contentTopics...)
}
func (mgr *FilterManager) resubscribe(filterID string) {
subs, found := mgr.filterSubs[filterID]
if !found {
mgr.logger.Error("resubscribe filter not found", zap.String("filterId", filterID))
return
}
if len(subs) > mgr.config.MinPeersForFilter {
mgr.logger.Error("filter resubscribe too many subs", zap.String("filterId", filterID), zap.Int("len", len(subs)))
}
if len(subs) == mgr.config.MinPeersForFilter {
// do nothing
return
}
mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs)))
for i := len(subs); i < mgr.config.MinPeersForFilter; i++ {
mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID))
// Create sub placeholder in order to avoid potentially too many subs
tempID := uuid.NewString()
subs[tempID] = nil
go mgr.subscribeToFilter(filterID, tempID)
}
}
func (mgr *FilterManager) runFilterSubscriptionLoop(sub *subscription.SubscriptionDetails) {
func (mgr *FilterManager) runFilterSubscriptionLoop(sub *api.Sub) {
for {
select {
case <-mgr.ctx.Done():
mgr.logger.Debug("subscription loop ended", zap.Stringer("content-filter", sub.ContentFilter))
return
case env, ok := <-sub.C:
case env, ok := <-sub.DataCh:
if ok {
err := (mgr.onNewEnvelopes)(env)
if err != nil {
mgr.logger.Error("OnNewEnvelopes error", zap.Error(err))
mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err))
}
} else {
mgr.logger.Debug("filter sub is closed", zap.String("id", sub.ID))
mgr.logger.Debug("filter sub is closed", zap.Any("content-filter", sub.ContentFilter))
return
}
}

View File

@ -278,6 +278,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
if cfg.LightClient {
opts = append(opts, node.WithWakuFilterLightNode())
cfg.EnablePeerExchangeClient = false //TODO: Need to fix: Disabling for now to test only with fleet nodes.
} else {
relayOpts := []pubsub.Option{
pubsub.WithMaxMessageSize(int(waku.cfg.MaxMessageSize)),
@ -430,7 +431,7 @@ func (w *Waku) discoverAndConnectPeers() error {
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
defer wg.Done()
if len(d.PeerInfo.Addrs) != 0 {
go w.connect(d.PeerInfo, wps.DNSDiscovery)
go w.connect(d.PeerInfo, d.ENR, wps.DNSDiscovery)
}
}
@ -457,17 +458,17 @@ func (w *Waku) discoverAndConnectPeers() error {
continue
}
go w.connect(*peerInfo, wps.Static)
go w.connect(*peerInfo, nil, wps.Static)
}
}
return nil
}
func (w *Waku) connect(peerInfo peer.AddrInfo, origin wps.Origin) {
func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) {
// Connection will be prunned eventually by the connection manager if needed
// The peer connector in go-waku uses Connect, so it will execute identify as part of its
w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, []string{w.cfg.DefaultShardPubsubTopic}, nil, true)
w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, w.cfg.DefaultShardedPubsubTopics, enr, true)
}
func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) {
@ -538,7 +539,7 @@ func (w *Waku) runPeerExchangeLoop() {
}
// Attempt to connect to the peers.
// Peers will be added to the libp2p peer store thanks to identify
go w.connect(discoveredNode.PeerInfo, wps.DNSDiscovery)
go w.connect(discoveredNode.PeerInfo, discoveredNode.ENR, wps.DNSDiscovery)
peers = append(peers, discoveredNode.PeerID)
}
}
@ -904,7 +905,7 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) {
}
if w.cfg.LightClient {
w.filterManager.eventChan <- FilterEvent{eventType: FilterEventAdded, filterID: id}
w.filterManager.addFilter(id, f)
}
return id, nil
@ -918,21 +919,12 @@ func (w *Waku) Unsubscribe(ctx context.Context, id string) error {
}
if w.cfg.LightClient {
w.filterManager.eventChan <- FilterEvent{eventType: FilterEventRemoved, filterID: id}
w.filterManager.removeFilter(id)
}
return nil
}
// Used for testing
func (w *Waku) getFilterStats() FilterSubs {
ch := make(chan FilterSubs)
w.filterManager.eventChan <- FilterEvent{eventType: FilterEventGetStats, ch: ch}
stats := <-ch
return stats
}
// GetFilter returns the filter by id.
func (w *Waku) GetFilter(id string) *common.Filter {
return w.filters.Get(id)
@ -1271,6 +1263,48 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Que
return result.Cursor(), len(result.Messages), nil
}
func (w *Waku) lightClientConnectionStatus() {
peers := w.node.Host().Network().Peers()
w.logger.Debug("peer stats",
zap.Int("peersCount", len(peers)))
subs := w.node.FilterLightnode().Subscriptions()
w.logger.Debug("filter subs count", zap.Int("count", len(subs)))
isOnline := false
if len(peers) > 0 {
isOnline = true
}
//TODOL needs fixing, right now invoking everytime.
//Trigger FilterManager to take care of any pending filter subscriptions
//TODO: Pass pubsubTopic based on topicHealth notif received.
go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline)
w.connStatusMu.Lock()
connStatus := types.ConnStatus{
IsOnline: isOnline,
Peers: FormatPeerStats(w.node),
}
for k, subs := range w.connStatusSubscriptions {
if !subs.Send(connStatus) {
delete(w.connStatusSubscriptions, k)
}
}
w.connStatusMu.Unlock()
if w.onPeerStats != nil {
w.onPeerStats(connStatus)
}
//TODO:Analyze if we need to discover and connect to peers with peerExchange loop enabled.
if w.offline && isOnline {
if err := w.discoverAndConnectPeers(); err != nil {
w.logger.Error("failed to add wakuv2 peers", zap.Error(err))
}
}
w.offline = !isOnline
}
// Start implements node.Service, starting the background data propagation thread
// of the Waku protocol.
func (w *Waku) Start() error {
@ -1313,10 +1347,20 @@ func (w *Waku) Start() error {
go func() {
defer w.wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-w.ctx.Done():
return
case <-ticker.C:
//TODO: Need to fix.
// Temporary changes for lightNodes to have health check based on connected peers.
//This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard.
//This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114
if w.cfg.LightClient {
w.lightClientConnectionStatus()
}
case c := <-w.topicHealthStatusChan:
w.connStatusMu.Lock()
@ -1345,19 +1389,16 @@ func (w *Waku) Start() error {
}()
go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL)
//TODO: commenting for now so that only fleet nodes are used.
//Need to uncomment once filter peer scoring etc is implemented.
go w.runPeerExchangeLoop()
if w.cfg.LightClient {
// Create FilterManager that will main peer connectivity
// for installed filters
w.filterManager = newFilterManager(w.ctx, w.logger,
func(id string) *common.Filter { return w.GetFilter(id) },
w.cfg,
w.filterManager = newFilterManager(w.ctx, w.logger, w.cfg,
func(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) },
w.node)
w.wg.Add(1)
go w.filterManager.runFilterLoop(&w.wg)
w.node.FilterLightnode())
}
err = w.setupRelaySubscriptions()
@ -1777,6 +1818,7 @@ func (w *Waku) seedBootnodesForDiscV5() {
retries = 0
lastTry = now()
}
case <-w.ctx.Done():
w.wg.Done()
w.logger.Debug("bootnode seeding stopped")

View File

@ -23,28 +23,42 @@ import (
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/wakuv2/common"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
var testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im"
func setDefaultConfig(config *Config, lightMode bool) {
config.ClusterID = 16
config.UseShardAsDefaultTopic = true
if lightMode {
config.EnablePeerExchangeClient = true
config.LightClient = true
config.EnableDiscV5 = false
} else {
config.EnableDiscV5 = true
config.EnablePeerExchangeServer = true
config.LightClient = false
config.EnablePeerExchangeClient = false
}
}
func TestDiscoveryV5(t *testing.T) {
config := &Config{}
config.EnableDiscV5 = true
setDefaultConfig(config, false)
config.DiscV5BootstrapNodes = []string{testENRBootstrap}
config.DiscoveryLimit = 20
config.ClusterID = 16
w, err := New(nil, "", config, nil, nil, nil, nil, nil)
w, err := New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, w.Start())
@ -64,7 +78,7 @@ func TestDiscoveryV5(t *testing.T) {
func TestRestartDiscoveryV5(t *testing.T) {
config := &Config{}
config.EnableDiscV5 = true
setDefaultConfig(config, false)
// Use wrong discv5 bootstrap address, to simulate being offline
config.DiscV5BootstrapNodes = []string{"enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@1.1.1.2"}
config.DiscoveryLimit = 20
@ -74,7 +88,6 @@ func TestRestartDiscoveryV5(t *testing.T) {
require.NoError(t, err)
require.NoError(t, w.Start())
require.False(t, w.seededBootnodesForDiscV5)
options := func(b *backoff.ExponentialBackOff) {
@ -111,17 +124,15 @@ func TestRestartDiscoveryV5(t *testing.T) {
}
func TestBasicWakuV2(t *testing.T) {
enrTreeAddress := testENRBootstrap
enrTreeAddress := testENRBootstrap //"enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im"
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")
if envEnrTreeAddress != "" {
enrTreeAddress = envEnrTreeAddress
}
config := &Config{}
setDefaultConfig(config, false)
config.Port = 0
config.ClusterID = 16
config.UseShardAsDefaultTopic = true
config.EnableDiscV5 = true
config.DiscV5BootstrapNodes = []string{enrTreeAddress}
config.DiscoveryLimit = 20
config.WakuNodes = []string{enrTreeAddress}
@ -148,7 +159,7 @@ func TestBasicWakuV2(t *testing.T) {
// Sanity check, not great, but it's probably helpful
err = tt.RetryWithBackOff(func() error {
if len(w.Peers()) > 2 {
if len(w.Peers()) < 2 {
return errors.New("no peers discovered")
}
return nil
@ -176,6 +187,7 @@ func TestBasicWakuV2(t *testing.T) {
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
})
require.NoError(t, err)
time.Sleep(1 * time.Second)
@ -313,47 +325,49 @@ func TestWakuV2Filter(t *testing.T) {
if envEnrTreeAddress != "" {
enrTreeAddress = envEnrTreeAddress
}
config := &Config{}
config.ClusterID = 16
setDefaultConfig(config, true)
config.Port = 0
config.LightClient = true
config.KeepAliveInterval = 1
config.KeepAliveInterval = 0
config.MinPeersForFilter = 2
config.EnableDiscV5 = true
config.DiscV5BootstrapNodes = []string{enrTreeAddress}
config.DiscoveryLimit = 20
config.WakuNodes = []string{enrTreeAddress}
fleet := "status.test" // Need a name fleet so that LightClient is not set to false
w, err := New(nil, fleet, config, nil, nil, nil, nil, nil)
w, err := New(nil, "", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, w.Start())
options := func(b *backoff.ExponentialBackOff) {
b.MaxElapsedTime = 10 * time.Second
}
time.Sleep(10 * time.Second) //TODO: Check if we can remove this sleep.
// Sanity check, not great, but it's probably helpful
err = tt.RetryWithBackOff(func() error {
if len(w.Peers()) > 2 {
peers, err := w.node.PeerManager().FilterPeersByProto(nil, nil, filter.FilterSubscribeID_v20beta1)
if err != nil {
return err
}
if len(peers) < 2 {
return errors.New("no peers discovered")
}
return nil
}, options)
require.NoError(t, err)
testPubsubTopic := "/waku/2/rs/16/32"
filter := &common.Filter{
Messages: common.NewMemoryMessageStore(),
PubsubTopic: testPubsubTopic,
ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}),
}
filterID, err := w.Subscribe(filter)
_, err = w.Subscribe(filter)
require.NoError(t, err)
msgTimestamp := w.timestamp()
contentTopic := maps.Keys(filter.ContentTopics)[0]
_, err = w.Send("", &pb.WakuMessage{
_, err = w.Send(testPubsubTopic, &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
@ -361,41 +375,37 @@ func TestWakuV2Filter(t *testing.T) {
})
require.NoError(t, err)
time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
// Ensure there is at least 1 active filter subscription
subscriptions := w.node.FilterLightnode().Subscriptions()
require.Greater(t, len(subscriptions), 0)
// Ensure there are some active peers for this filter subscription
stats := w.getFilterStats()
require.Greater(t, len(stats[filterID]), 0)
messages := filter.Retrieve()
require.Len(t, messages, 1)
// Mock peers going down
isFilterSubAliveBak := w.filterManager.isFilterSubAlive
w.filterManager.config.MinPeersForFilter = 0
w.filterManager.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error {
return errors.New("peer down")
}
_, err = w.node.FilterLightnode().UnsubscribeWithSubscription(w.ctx, subscriptions[0])
require.NoError(t, err)
time.Sleep(5 * time.Second)
// Ensure there are 0 active peers now
stats = w.getFilterStats()
require.Len(t, stats[filterID], 0)
// Reconnect
w.filterManager.config.MinPeersForFilter = 2
w.filterManager.isFilterSubAlive = isFilterSubAliveBak
time.Sleep(10 * time.Second)
// Ensure there are some active peers now
stats = w.getFilterStats()
require.Greater(t, len(stats[filterID]), 0)
// Ensure there is at least 1 active filter subscription
subscriptions = w.node.FilterLightnode().Subscriptions()
require.Greater(t, len(subscriptions), 0)
// Ensure that messages are retrieved with a fresh sub
_, err = w.Send(testPubsubTopic, &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5, 6},
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
})
require.NoError(t, err)
time.Sleep(10 * time.Second)
messages = filter.Retrieve()
require.Len(t, messages, 1)
require.NoError(t, w.Stop())
}