diff --git a/go.mod b/go.mod index dbf2947c0..33bcedf00 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659 + github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 96d865d5d..25fb1e2ad 100644 --- a/go.sum +++ b/go.sum @@ -2137,8 +2137,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659 h1:bTz24QgoRBXoS/WIy8+7jrQ/7hpE63td0fMteq5qZQM= -github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k= +github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5 h1:9UyIIy/IvlJB2nHIXydne6OfNfOWPPL08+XmCI3iEBo= +github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go index f7bfa7393..2798d1373 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" @@ -39,13 +40,13 @@ type Sub struct { cancel context.CancelFunc log *zap.Logger closing chan string - isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not. + onlineChecker onlinechecker.OnlineChecker resubscribeInProgress bool id string } // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) { +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf @@ -56,14 +57,16 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte sub.Config = config sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter)) sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers)) - sub.isNodeOnline = online sub.closing = make(chan string, config.MaxPeers) - if online { + + sub.onlineChecker = wf.OnlineChecker() + if wf.OnlineChecker().IsOnline() { subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) if err == nil { sub.multiplex(subs) } } + go sub.subscriptionLoop() return sub, nil } @@ -72,17 +75,13 @@ func (apiSub *Sub) Unsubscribe() { apiSub.cancel() } -func (apiSub *Sub) SetNodeState(online bool) { - apiSub.isNodeOnline = online -} - func (apiSub *Sub) subscriptionLoop() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers && + if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers && !apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers { apiSub.closing <- "" } @@ -109,7 +108,7 @@ func (apiSub *Sub) checkAndResubscribe(subId string) { delete(apiSub.subs, subId) } apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter)) - if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers { + if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers { apiSub.resubscribe(failedPeer) } apiSub.resubscribeInProgress = false diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 4b3508a46..d8c280c17 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -258,7 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { //Initialize peer manager. w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log) - w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log) + w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } @@ -290,7 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager)) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) - w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) + w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index a34376c18..26a82d0d0 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -25,6 +25,7 @@ import ( "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/prometheus/client_golang/prometheus" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" @@ -63,6 +64,8 @@ type WakuNodeParameters struct { circuitRelayMinInterval time.Duration circuitRelayBootDelay time.Duration + onlineChecker onlinechecker.OnlineChecker + enableNTP bool ntpURLs []string @@ -130,6 +133,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{ WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP), WithCircuitRelayParams(2*time.Second, 3*time.Minute), WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), + WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), } // MultiAddresses return the list of multiaddresses configured in the node @@ -554,6 +558,16 @@ func WithTopicHealthStatusChannel(ch chan<- peermanager.TopicHealthStatus) WakuN } } +// WithOnlineChecker sets up an OnlineChecker which will be used to determine whether the node +// is online or not. The OnlineChecker must be implemented by consumers of go-waku since they +// have additional context to determine what it means for a node to be online/offline +func WithOnlineChecker(onlineChecker onlinechecker.OnlineChecker) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.onlineChecker = onlineChecker + return nil + } +} + // Default options used in the libp2p node var DefaultLibP2POptions = []libp2p.Option{ libp2p.ChainOptions( diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go b/vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go new file mode 100644 index 000000000..814d57e75 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go @@ -0,0 +1,24 @@ +package onlinechecker + +// OnlineChecker is used to determine if node has connectivity. +type OnlineChecker interface { + IsOnline() bool +} + +type DefaultOnlineChecker struct { + online bool +} + +func NewDefaultOnlineChecker(online bool) OnlineChecker { + return &DefaultOnlineChecker{ + online: online, + } +} + +func (o *DefaultOnlineChecker) SetOnline(online bool) { + o.online = online +} + +func (o *DefaultOnlineChecker) IsOnline() bool { + return o.online +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go index e53d9c09d..ebe808e85 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/service" @@ -27,10 +28,11 @@ import ( // PeerConnectionStrategy is a utility to connect to peers, // but only if we have not recently tried connecting to them already type PeerConnectionStrategy struct { - mux sync.Mutex - cache *lru.TwoQueueCache - host host.Host - pm *PeerManager + mux sync.Mutex + cache *lru.TwoQueueCache + host host.Host + pm *PeerManager + onlineChecker onlinechecker.OnlineChecker paused atomic.Bool dialTimeout time.Duration @@ -59,8 +61,12 @@ func getBackOff() backoff.BackoffFactory { // // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have -func NewPeerConnectionStrategy(pm *PeerManager, - dialTimeout time.Duration, logger *zap.Logger) (*PeerConnectionStrategy, error) { +func NewPeerConnectionStrategy( + pm *PeerManager, + onlineChecker onlinechecker.OnlineChecker, + dialTimeout time.Duration, + logger *zap.Logger, +) (*PeerConnectionStrategy, error) { // cacheSize is the size of a TwoQueueCache cacheSize := 600 cache, err := lru.New2Q(cacheSize) @@ -72,6 +78,7 @@ func NewPeerConnectionStrategy(pm *PeerManager, cache: cache, dialTimeout: dialTimeout, CommonDiscoveryService: service.NewCommonDiscoveryService(), + onlineChecker: onlineChecker, pm: pm, backoff: getBackOff(), logger: logger.Named("discovery-connector"), @@ -171,6 +178,10 @@ func (c *PeerConnectionStrategy) isPaused() bool { return c.paused.Load() } +func (c *PeerConnectionStrategy) SetPaused(paused bool) { + c.paused.Store(paused) +} + // it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set. func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { @@ -234,10 +245,17 @@ func (c *PeerConnectionStrategy) dialPeers() { for { select { + case <-c.Context().Done(): + return case pd, ok := <-c.GetListeningChan(): if !ok { return } + + if !c.onlineChecker.IsOnline() { + continue + } + addrInfo := pd.AddrInfo if addrInfo.ID == c.host.ID() || addrInfo.ID == "" || @@ -250,8 +268,6 @@ func (c *PeerConnectionStrategy) dialPeers() { c.WaitGroup().Add(1) go c.dialPeer(addrInfo, sem) } - case <-c.Context().Done(): - return } } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go index 6d05c69ba..ae18907c0 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go @@ -55,7 +55,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol] if !ok { - pm.logger.Info("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) + pm.logger.Warn("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) return nil, errors.New("cannot do on demand discovery for non-waku protocol") } iterator, err := pm.discoveryService.PeerIterator( diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 70edd644c..5909bbbd8 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -18,6 +18,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -45,7 +46,8 @@ var ( type WakuFilterLightNode struct { *service.CommonService h host.Host - broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s + broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer. + onlineChecker onlinechecker.OnlineChecker timesource timesource.Timesource metrics Metrics log *zap.Logger @@ -79,12 +81,19 @@ func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError { // Note that broadcaster is optional. // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil -func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, - timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger) *WakuFilterLightNode { +func NewWakuFilterLightNode( + broadcaster relay.Broadcaster, + pm *peermanager.PeerManager, + timesource timesource.Timesource, + onlineChecker onlinechecker.OnlineChecker, + reg prometheus.Registerer, + log *zap.Logger, +) *WakuFilterLightNode { wf := new(WakuFilterLightNode) wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster wf.timesource = timesource + wf.onlineChecker = onlineChecker wf.pm = pm wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) @@ -701,3 +710,11 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte return wf.unsubscribeAll(ctx, opts...) } + +func (wf *WakuFilterLightNode) OnlineChecker() onlinechecker.OnlineChecker { + return wf.onlineChecker +} + +func (wf *WakuFilterLightNode) SetOnlineChecker(onlineChecker onlinechecker.OnlineChecker) { + wf.onlineChecker = onlineChecker +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go index 8be1df358..361ab561b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -165,7 +166,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log) - filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) filterPush.SetHost(host) pm.SetHost(host) return LightNodeData{filterPush, host} diff --git a/vendor/modules.txt b/vendor/modules.txt index 9568b551a..e403123d7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1015,7 +1015,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659 +# github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests @@ -1025,6 +1025,7 @@ github.com/waku-org/go-waku/waku/v2/discv5 github.com/waku-org/go-waku/waku/v2/dnsdisc github.com/waku-org/go-waku/waku/v2/hash github.com/waku-org/go-waku/waku/v2/node +github.com/waku-org/go-waku/waku/v2/onlinechecker github.com/waku-org/go-waku/waku/v2/payload github.com/waku-org/go-waku/waku/v2/peermanager github.com/waku-org/go-waku/waku/v2/peerstore diff --git a/wakuv2/filter_manager.go b/wakuv2/filter_manager.go index a5522eaa4..f0dd90cbe 100644 --- a/wakuv2/filter_manager.go +++ b/wakuv2/filter_manager.go @@ -10,6 +10,7 @@ import ( "golang.org/x/exp/maps" "github.com/waku-org/go-waku/waku/v2/api" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" ) @@ -32,7 +33,7 @@ type FilterManager struct { onNewEnvelopes func(env *protocol.Envelope) error logger *zap.Logger node *filter.WakuFilterLightNode - peersAvailable bool + onlineChecker *onlinechecker.DefaultOnlineChecker filterQueue chan filterConfig } type SubDetails struct { @@ -56,8 +57,10 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, cfg *Config, onNe mgr.onNewEnvelopes = onNewEnvelopes mgr.filters = make(map[string]SubDetails) mgr.node = node - mgr.peersAvailable = false mgr.filterQueue = make(chan filterConfig, filterQueueSize) + mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) + + mgr.node.SetOnlineChecker(mgr.onlineChecker) return mgr } @@ -68,7 +71,7 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) { contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics) mgr.logger.Debug("adding filter", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter)) - if mgr.peersAvailable { + if mgr.onlineChecker.IsOnline() { go mgr.subscribeAndRunLoop(filterConfig{filterID, contentFilter}) } else { mgr.logger.Debug("queuing filter as not online", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter)) @@ -80,7 +83,7 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { ctx, cancel := context.WithCancel(mgr.ctx) config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter} - sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.peersAvailable) + sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger) mgr.Lock() mgr.filters[f.ID] = SubDetails{cancel, sub} mgr.Unlock() @@ -109,12 +112,8 @@ func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus } } } - mgr.Lock() - for _, subDetails := range mgr.filters { - subDetails.sub.SetNodeState(newStatus) - } - mgr.Unlock() - mgr.peersAvailable = newStatus + + mgr.onlineChecker.SetOnline(newStatus) } func (mgr *FilterManager) removeFilter(filterID string) { diff --git a/wakuv2/waku.go b/wakuv2/waku.go index cf2cd1dc1..573744731 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -56,6 +56,7 @@ import ( "github.com/libp2p/go-libp2p/core/metrics" "github.com/waku-org/go-waku/waku/v2/dnsdisc" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -75,7 +76,6 @@ import ( "github.com/status-im/status-go/wakuv2/persistence" node "github.com/waku-org/go-waku/waku/v2/node" - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) @@ -91,7 +91,7 @@ const messageExpiredPerid = 10 // in seconds const maxRelayPeers = 300 type SentEnvelope struct { - Envelope *v2protocol.Envelope + Envelope *protocol.Envelope PublishMethod PublishMethod } @@ -101,7 +101,7 @@ type ErrorSendingEnvelope struct { } type ITelemetryClient interface { - PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) + PushReceivedEnvelope(receivedEnvelope *protocol.Envelope) PushSentEnvelope(sentEnvelope SentEnvelope) PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) } @@ -156,6 +156,7 @@ type Waku struct { topicHealthStatusChan chan peermanager.TopicHealthStatus connStatusSubscriptions map[string]*types.ConnStatusSubscription connStatusMu sync.Mutex + onlineChecker *onlinechecker.DefaultOnlineChecker logger *zap.Logger @@ -166,9 +167,6 @@ type Waku struct { // bootnodes successfully seededBootnodesForDiscV5 bool - // offline indicates whether we have detected connectivity - offline bool - // connectionChanged is channel that notifies when connectivity has changed connectionChanged chan struct{} @@ -242,6 +240,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, + onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), } waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) @@ -1327,13 +1326,13 @@ func (w *Waku) lightClientConnectionStatus() { } //TODO:Analyze if we need to discover and connect to peers with peerExchange loop enabled. - if w.offline && isOnline { + if !w.onlineChecker.IsOnline() && isOnline { if err := w.discoverAndConnectPeers(); err != nil { w.logger.Error("failed to add wakuv2 peers", zap.Error(err)) } } - w.offline = !isOnline + w.onlineChecker.SetOnline(isOnline) } // Start implements node.Service, starting the background data propagation thread @@ -1802,7 +1801,7 @@ func (w *Waku) StopDiscV5() error { } func (w *Waku) ConnectionChanged(state connection.State) { - if !state.Offline && w.offline { + if !state.Offline && !w.onlineChecker.IsOnline() { select { case w.connectionChanged <- struct{}{}: default: @@ -1810,7 +1809,7 @@ func (w *Waku) ConnectionChanged(state connection.State) { } } - w.offline = state.Offline + w.onlineChecker.SetOnline(!state.Offline) } // seedBootnodesForDiscV5 tries to fetch bootnodes diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index c85f8e6f5..6627ddb14 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -6,6 +6,7 @@ import ( "errors" "math/big" "os" + "sync" "testing" "time" @@ -30,6 +31,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/status-im/status-go/appdatabase" + "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/t/helpers" @@ -612,3 +614,40 @@ func TestConfirmMessageDelivered(t *testing.T) { require.NoError(t, aliceNode.Stop()) require.NoError(t, bobNode.Stop()) } + +func TestOnlineChecker(t *testing.T) { + w, err := New(nil, "shards.staging", nil, nil, nil, nil, nil, nil) + require.NoError(t, err) + require.False(t, w.onlineChecker.IsOnline()) + + w.ConnectionChanged(connection.State{Offline: false}) + require.True(t, w.onlineChecker.IsOnline()) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + <-w.connectionChanged + require.True(t, true) + }() + + time.Sleep(100 * time.Millisecond) + + w.ConnectionChanged(connection.State{Offline: true}) + require.False(t, w.onlineChecker.IsOnline()) + + // Test lightnode online checker + config := &Config{} + config.ClusterID = 16 + config.LightClient = true + lightNode, err := New(nil, "shards.staging", config, nil, nil, nil, nil, nil) + require.NoError(t, err) + + err = lightNode.Start() + require.NoError(t, err) + + require.False(t, lightNode.onlineChecker.IsOnline()) + + lightNode.filterManager.addFilter("test", &common.Filter{}) + +}