diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go index f7bfa739..2798d137 100644 --- a/waku/v2/api/filter.go +++ b/waku/v2/api/filter.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" @@ -39,13 +40,13 @@ type Sub struct { cancel context.CancelFunc log *zap.Logger closing chan string - isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not. + onlineChecker onlinechecker.OnlineChecker resubscribeInProgress bool id string } // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) { +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf @@ -56,14 +57,16 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte sub.Config = config sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter)) sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers)) - sub.isNodeOnline = online sub.closing = make(chan string, config.MaxPeers) - if online { + + sub.onlineChecker = wf.OnlineChecker() + if wf.OnlineChecker().IsOnline() { subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) if err == nil { sub.multiplex(subs) } } + go sub.subscriptionLoop() return sub, nil } @@ -72,17 +75,13 @@ func (apiSub *Sub) Unsubscribe() { apiSub.cancel() } -func (apiSub *Sub) SetNodeState(online bool) { - apiSub.isNodeOnline = online -} - func (apiSub *Sub) subscriptionLoop() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers && + if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers && !apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers { apiSub.closing <- "" } @@ -109,7 +108,7 @@ func (apiSub *Sub) checkAndResubscribe(subId string) { delete(apiSub.subs, subId) } apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter)) - if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers { + if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers { apiSub.resubscribe(failedPeer) } apiSub.resubscribeInProgress = false diff --git a/waku/v2/api/filter_test.go b/waku/v2/api/filter_test.go index 52fa0657..83551019 100644 --- a/waku/v2/api/filter_test.go +++ b/waku/v2/api/filter_test.go @@ -48,7 +48,7 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) s.Log.Info("About to perform API Subscribe()") - apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log, true) + apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log) s.Require().NoError(err) s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Log.Info("Subscribed") diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 7c0e70c8..e39c2f06 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -258,7 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { //Initialize peer manager. w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, w.log) - w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log) + w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } @@ -290,7 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager)) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) - w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) + w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 0c175992..7c1c2e45 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -25,6 +25,7 @@ import ( "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/prometheus/client_golang/prometheus" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" @@ -61,6 +62,8 @@ type WakuNodeParameters struct { circuitRelayMinInterval time.Duration circuitRelayBootDelay time.Duration + onlineChecker onlinechecker.OnlineChecker + enableNTP bool ntpURLs []string @@ -127,6 +130,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{ WithMaxPeerConnections(50), WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP), WithCircuitRelayParams(2*time.Second, 3*time.Minute), + WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), } // MultiAddresses return the list of multiaddresses configured in the node @@ -551,6 +555,16 @@ func WithTopicHealthStatusChannel(ch chan<- peermanager.TopicHealthStatus) WakuN } } +// WithOnlineChecker sets up an OnlineChecker which will be used to determine whether the node +// is online or not. The OnlineChecker must be implemented by consumers of go-waku since they +// have additional context to determine what it means for a node to be online/offline +func WithOnlineChecker(onlineChecker onlinechecker.OnlineChecker) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.onlineChecker = onlineChecker + return nil + } +} + // Default options used in the libp2p node var DefaultLibP2POptions = []libp2p.Option{ libp2p.ChainOptions( diff --git a/waku/v2/onlinechecker/online.go b/waku/v2/onlinechecker/online.go new file mode 100644 index 00000000..814d57e7 --- /dev/null +++ b/waku/v2/onlinechecker/online.go @@ -0,0 +1,24 @@ +package onlinechecker + +// OnlineChecker is used to determine if node has connectivity. +type OnlineChecker interface { + IsOnline() bool +} + +type DefaultOnlineChecker struct { + online bool +} + +func NewDefaultOnlineChecker(online bool) OnlineChecker { + return &DefaultOnlineChecker{ + online: online, + } +} + +func (o *DefaultOnlineChecker) SetOnline(online bool) { + o.online = online +} + +func (o *DefaultOnlineChecker) IsOnline() bool { + return o.online +} diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 6f40a171..b5eb45aa 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/service" @@ -28,10 +29,11 @@ import ( // PeerConnectionStrategy is a utility to connect to peers, // but only if we have not recently tried connecting to them already type PeerConnectionStrategy struct { - mux sync.Mutex - cache *lru.TwoQueueCache - host host.Host - pm *PeerManager + mux sync.Mutex + cache *lru.TwoQueueCache + host host.Host + pm *PeerManager + onlineChecker onlinechecker.OnlineChecker paused atomic.Bool dialTimeout time.Duration @@ -60,8 +62,12 @@ func getBackOff() backoff.BackoffFactory { // // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have -func NewPeerConnectionStrategy(pm *PeerManager, - dialTimeout time.Duration, logger *zap.Logger) (*PeerConnectionStrategy, error) { +func NewPeerConnectionStrategy( + pm *PeerManager, + onlineChecker onlinechecker.OnlineChecker, + dialTimeout time.Duration, + logger *zap.Logger, +) (*PeerConnectionStrategy, error) { // cacheSize is the size of a TwoQueueCache cacheSize := 600 cache, err := lru.New2Q(cacheSize) @@ -73,6 +79,7 @@ func NewPeerConnectionStrategy(pm *PeerManager, cache: cache, dialTimeout: dialTimeout, CommonDiscoveryService: service.NewCommonDiscoveryService(), + onlineChecker: onlineChecker, pm: pm, backoff: getBackOff(), logger: logger.Named("discovery-connector"), @@ -173,6 +180,10 @@ func (c *PeerConnectionStrategy) isPaused() bool { return c.paused.Load() } +func (c *PeerConnectionStrategy) SetPaused(paused bool) { + c.paused.Store(paused) +} + // it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set. func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { @@ -236,10 +247,17 @@ func (c *PeerConnectionStrategy) dialPeers() { for { select { + case <-c.Context().Done(): + return case pd, ok := <-c.GetListeningChan(): if !ok { return } + + if !c.onlineChecker.IsOnline() { + continue + } + addrInfo := pd.AddrInfo if addrInfo.ID == c.host.ID() || addrInfo.ID == "" || @@ -252,8 +270,6 @@ func (c *PeerConnectionStrategy) dialPeers() { c.WaitGroup().Add(1) go c.dialPeer(addrInfo, sem) } - case <-c.Context().Done(): - return } } } diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index c63efc0d..5e73ef49 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/discv5" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" @@ -220,7 +221,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { func TestConnectToRelayPeers(t *testing.T) { ctx, pm, deferFn := initTest(t) - pc, err := NewPeerConnectionStrategy(pm, 120*time.Second, pm.logger) + pc, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 120*time.Second, pm.logger) require.NoError(t, err) err = pc.Start(ctx) require.NoError(t, err) @@ -254,7 +255,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF require.NoError(t, err) pm := NewPeerManager(10, 20, nil, logger) pm.SetHost(host) - peerconn, err := NewPeerConnectionStrategy(pm, 30*time.Second, logger) + peerconn, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, logger) require.NoError(t, err) discv5, err := discv5.NewDiscoveryV5(prvKey1, localNode, peerconn, prometheus.DefaultRegisterer, logger, discv5.WithUDPPort(uint(udpPort)), discv5.WithBootnodes(bootnode)) require.NoError(t, err) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 70edd644..5909bbbd 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -18,6 +18,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -45,7 +46,8 @@ var ( type WakuFilterLightNode struct { *service.CommonService h host.Host - broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s + broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer. + onlineChecker onlinechecker.OnlineChecker timesource timesource.Timesource metrics Metrics log *zap.Logger @@ -79,12 +81,19 @@ func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError { // Note that broadcaster is optional. // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil -func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, - timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger) *WakuFilterLightNode { +func NewWakuFilterLightNode( + broadcaster relay.Broadcaster, + pm *peermanager.PeerManager, + timesource timesource.Timesource, + onlineChecker onlinechecker.OnlineChecker, + reg prometheus.Registerer, + log *zap.Logger, +) *WakuFilterLightNode { wf := new(WakuFilterLightNode) wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster wf.timesource = timesource + wf.onlineChecker = onlineChecker wf.pm = pm wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) @@ -701,3 +710,11 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte return wf.unsubscribeAll(ctx, opts...) } + +func (wf *WakuFilterLightNode) OnlineChecker() onlinechecker.OnlineChecker { + return wf.onlineChecker +} + +func (wf *WakuFilterLightNode) SetOnlineChecker(onlineChecker onlinechecker.OnlineChecker) { + wf.onlineChecker = onlineChecker +} diff --git a/waku/v2/protocol/filter/test_utils.go b/waku/v2/protocol/filter/test_utils.go index 2beabc2d..898eacf7 100644 --- a/waku/v2/protocol/filter/test_utils.go +++ b/waku/v2/protocol/filter/test_utils.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -165,7 +166,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) pm := peermanager.NewPeerManager(5, 5, nil, s.Log) - filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) filterPush.SetHost(host) pm.SetHost(host) return LightNodeData{filterPush, host} diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 37139c36..6b53bb8d 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "go.uber.org/zap" @@ -293,7 +294,7 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { // Prepare peer manager for host3 pm3 := peermanager.NewPeerManager(10, 20, nil, log) pm3.SetHost(host3) - pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) + pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, utils.Logger()) require.NoError(t, err) pxPeerConn3.SetHost(host3) err = pxPeerConn3.Start(context.Background()) @@ -368,7 +369,7 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) { // Prepare peer manager for host3 pm3 := peermanager.NewPeerManager(10, 20, nil, log) pm3.SetHost(host3) - pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) + pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, utils.Logger()) require.NoError(t, err) pxPeerConn3.SetHost(host3) err = pxPeerConn3.Start(context.Background())