mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-04 06:53:06 +00:00
feat: online checker (#1125)
This commit is contained in:
parent
8303c592d3
commit
ee33baa283
@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
||||||
@ -39,13 +40,13 @@ type Sub struct {
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
closing chan string
|
closing chan string
|
||||||
isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not.
|
onlineChecker onlinechecker.OnlineChecker
|
||||||
resubscribeInProgress bool
|
resubscribeInProgress bool
|
||||||
id string
|
id string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe
|
// Subscribe
|
||||||
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) {
|
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) {
|
||||||
sub := new(Sub)
|
sub := new(Sub)
|
||||||
sub.id = uuid.NewString()
|
sub.id = uuid.NewString()
|
||||||
sub.wf = wf
|
sub.wf = wf
|
||||||
@ -56,14 +57,16 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
|
|||||||
sub.Config = config
|
sub.Config = config
|
||||||
sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter))
|
sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter))
|
||||||
sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers))
|
sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers))
|
||||||
sub.isNodeOnline = online
|
|
||||||
sub.closing = make(chan string, config.MaxPeers)
|
sub.closing = make(chan string, config.MaxPeers)
|
||||||
if online {
|
|
||||||
|
sub.onlineChecker = wf.OnlineChecker()
|
||||||
|
if wf.OnlineChecker().IsOnline() {
|
||||||
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
|
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
sub.multiplex(subs)
|
sub.multiplex(subs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go sub.subscriptionLoop()
|
go sub.subscriptionLoop()
|
||||||
return sub, nil
|
return sub, nil
|
||||||
}
|
}
|
||||||
@ -72,17 +75,13 @@ func (apiSub *Sub) Unsubscribe() {
|
|||||||
apiSub.cancel()
|
apiSub.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (apiSub *Sub) SetNodeState(online bool) {
|
|
||||||
apiSub.isNodeOnline = online
|
|
||||||
}
|
|
||||||
|
|
||||||
func (apiSub *Sub) subscriptionLoop() {
|
func (apiSub *Sub) subscriptionLoop() {
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers &&
|
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers &&
|
||||||
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
|
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
|
||||||
apiSub.closing <- ""
|
apiSub.closing <- ""
|
||||||
}
|
}
|
||||||
@ -109,7 +108,7 @@ func (apiSub *Sub) checkAndResubscribe(subId string) {
|
|||||||
delete(apiSub.subs, subId)
|
delete(apiSub.subs, subId)
|
||||||
}
|
}
|
||||||
apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter))
|
apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter))
|
||||||
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers {
|
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers {
|
||||||
apiSub.resubscribe(failedPeer)
|
apiSub.resubscribe(failedPeer)
|
||||||
}
|
}
|
||||||
apiSub.resubscribeInProgress = false
|
apiSub.resubscribeInProgress = false
|
||||||
|
|||||||
@ -48,7 +48,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
|
|||||||
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
|
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
|
||||||
|
|
||||||
s.Log.Info("About to perform API Subscribe()")
|
s.Log.Info("About to perform API Subscribe()")
|
||||||
apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log, true)
|
apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().Equal(apiSub.ContentFilter, contentFilter)
|
s.Require().Equal(apiSub.ContentFilter, contentFilter)
|
||||||
s.Log.Info("Subscribed")
|
s.Log.Info("Subscribed")
|
||||||
|
|||||||
@ -258,7 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||||||
//Initialize peer manager.
|
//Initialize peer manager.
|
||||||
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, w.log)
|
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, w.log)
|
||||||
|
|
||||||
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
|
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.log.Error("creating peer connection strategy", zap.Error(err))
|
w.log.Error("creating peer connection strategy", zap.Error(err))
|
||||||
}
|
}
|
||||||
@ -290,7 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||||||
w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager))
|
w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager))
|
||||||
|
|
||||||
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
|
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
|
||||||
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log)
|
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
||||||
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
||||||
|
|
||||||
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
|
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
|
||||||
|
|||||||
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
manet "github.com/multiformats/go-multiaddr/net"
|
manet "github.com/multiformats/go-multiaddr/net"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
|
||||||
@ -61,6 +62,8 @@ type WakuNodeParameters struct {
|
|||||||
circuitRelayMinInterval time.Duration
|
circuitRelayMinInterval time.Duration
|
||||||
circuitRelayBootDelay time.Duration
|
circuitRelayBootDelay time.Duration
|
||||||
|
|
||||||
|
onlineChecker onlinechecker.OnlineChecker
|
||||||
|
|
||||||
enableNTP bool
|
enableNTP bool
|
||||||
ntpURLs []string
|
ntpURLs []string
|
||||||
|
|
||||||
@ -127,6 +130,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
|
|||||||
WithMaxPeerConnections(50),
|
WithMaxPeerConnections(50),
|
||||||
WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP),
|
WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP),
|
||||||
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
||||||
|
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultiAddresses return the list of multiaddresses configured in the node
|
// MultiAddresses return the list of multiaddresses configured in the node
|
||||||
@ -551,6 +555,16 @@ func WithTopicHealthStatusChannel(ch chan<- peermanager.TopicHealthStatus) WakuN
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithOnlineChecker sets up an OnlineChecker which will be used to determine whether the node
|
||||||
|
// is online or not. The OnlineChecker must be implemented by consumers of go-waku since they
|
||||||
|
// have additional context to determine what it means for a node to be online/offline
|
||||||
|
func WithOnlineChecker(onlineChecker onlinechecker.OnlineChecker) WakuNodeOption {
|
||||||
|
return func(params *WakuNodeParameters) error {
|
||||||
|
params.onlineChecker = onlineChecker
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Default options used in the libp2p node
|
// Default options used in the libp2p node
|
||||||
var DefaultLibP2POptions = []libp2p.Option{
|
var DefaultLibP2POptions = []libp2p.Option{
|
||||||
libp2p.ChainOptions(
|
libp2p.ChainOptions(
|
||||||
|
|||||||
24
waku/v2/onlinechecker/online.go
Normal file
24
waku/v2/onlinechecker/online.go
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package onlinechecker
|
||||||
|
|
||||||
|
// OnlineChecker is used to determine if node has connectivity.
|
||||||
|
type OnlineChecker interface {
|
||||||
|
IsOnline() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type DefaultOnlineChecker struct {
|
||||||
|
online bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDefaultOnlineChecker(online bool) OnlineChecker {
|
||||||
|
return &DefaultOnlineChecker{
|
||||||
|
online: online,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DefaultOnlineChecker) SetOnline(online bool) {
|
||||||
|
o.online = online
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DefaultOnlineChecker) IsOnline() bool {
|
||||||
|
return o.online
|
||||||
|
}
|
||||||
@ -16,6 +16,7 @@ import (
|
|||||||
|
|
||||||
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
|
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
@ -28,10 +29,11 @@ import (
|
|||||||
// PeerConnectionStrategy is a utility to connect to peers,
|
// PeerConnectionStrategy is a utility to connect to peers,
|
||||||
// but only if we have not recently tried connecting to them already
|
// but only if we have not recently tried connecting to them already
|
||||||
type PeerConnectionStrategy struct {
|
type PeerConnectionStrategy struct {
|
||||||
mux sync.Mutex
|
mux sync.Mutex
|
||||||
cache *lru.TwoQueueCache
|
cache *lru.TwoQueueCache
|
||||||
host host.Host
|
host host.Host
|
||||||
pm *PeerManager
|
pm *PeerManager
|
||||||
|
onlineChecker onlinechecker.OnlineChecker
|
||||||
|
|
||||||
paused atomic.Bool
|
paused atomic.Bool
|
||||||
dialTimeout time.Duration
|
dialTimeout time.Duration
|
||||||
@ -60,8 +62,12 @@ func getBackOff() backoff.BackoffFactory {
|
|||||||
//
|
//
|
||||||
// dialTimeout is how long we attempt to connect to a peer before giving up
|
// dialTimeout is how long we attempt to connect to a peer before giving up
|
||||||
// minPeers is the minimum number of peers that the node should have
|
// minPeers is the minimum number of peers that the node should have
|
||||||
func NewPeerConnectionStrategy(pm *PeerManager,
|
func NewPeerConnectionStrategy(
|
||||||
dialTimeout time.Duration, logger *zap.Logger) (*PeerConnectionStrategy, error) {
|
pm *PeerManager,
|
||||||
|
onlineChecker onlinechecker.OnlineChecker,
|
||||||
|
dialTimeout time.Duration,
|
||||||
|
logger *zap.Logger,
|
||||||
|
) (*PeerConnectionStrategy, error) {
|
||||||
// cacheSize is the size of a TwoQueueCache
|
// cacheSize is the size of a TwoQueueCache
|
||||||
cacheSize := 600
|
cacheSize := 600
|
||||||
cache, err := lru.New2Q(cacheSize)
|
cache, err := lru.New2Q(cacheSize)
|
||||||
@ -73,6 +79,7 @@ func NewPeerConnectionStrategy(pm *PeerManager,
|
|||||||
cache: cache,
|
cache: cache,
|
||||||
dialTimeout: dialTimeout,
|
dialTimeout: dialTimeout,
|
||||||
CommonDiscoveryService: service.NewCommonDiscoveryService(),
|
CommonDiscoveryService: service.NewCommonDiscoveryService(),
|
||||||
|
onlineChecker: onlineChecker,
|
||||||
pm: pm,
|
pm: pm,
|
||||||
backoff: getBackOff(),
|
backoff: getBackOff(),
|
||||||
logger: logger.Named("discovery-connector"),
|
logger: logger.Named("discovery-connector"),
|
||||||
@ -173,6 +180,10 @@ func (c *PeerConnectionStrategy) isPaused() bool {
|
|||||||
return c.paused.Load()
|
return c.paused.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *PeerConnectionStrategy) SetPaused(paused bool) {
|
||||||
|
c.paused.Store(paused)
|
||||||
|
}
|
||||||
|
|
||||||
// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
|
// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
|
||||||
func (c *PeerConnectionStrategy) consumeSubscriptions() {
|
func (c *PeerConnectionStrategy) consumeSubscriptions() {
|
||||||
for _, subs := range c.subscriptions {
|
for _, subs := range c.subscriptions {
|
||||||
@ -236,10 +247,17 @@ func (c *PeerConnectionStrategy) dialPeers() {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-c.Context().Done():
|
||||||
|
return
|
||||||
case pd, ok := <-c.GetListeningChan():
|
case pd, ok := <-c.GetListeningChan():
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !c.onlineChecker.IsOnline() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
addrInfo := pd.AddrInfo
|
addrInfo := pd.AddrInfo
|
||||||
|
|
||||||
if addrInfo.ID == c.host.ID() || addrInfo.ID == "" ||
|
if addrInfo.ID == c.host.ID() || addrInfo.ID == "" ||
|
||||||
@ -252,8 +270,6 @@ func (c *PeerConnectionStrategy) dialPeers() {
|
|||||||
c.WaitGroup().Add(1)
|
c.WaitGroup().Add(1)
|
||||||
go c.dialPeer(addrInfo, sem)
|
go c.dialPeer(addrInfo, sem)
|
||||||
}
|
}
|
||||||
case <-c.Context().Done():
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/waku-org/go-waku/tests"
|
"github.com/waku-org/go-waku/tests"
|
||||||
"github.com/waku-org/go-waku/waku/v2/discv5"
|
"github.com/waku-org/go-waku/waku/v2/discv5"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
|
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
@ -220,7 +221,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
|
|||||||
func TestConnectToRelayPeers(t *testing.T) {
|
func TestConnectToRelayPeers(t *testing.T) {
|
||||||
|
|
||||||
ctx, pm, deferFn := initTest(t)
|
ctx, pm, deferFn := initTest(t)
|
||||||
pc, err := NewPeerConnectionStrategy(pm, 120*time.Second, pm.logger)
|
pc, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 120*time.Second, pm.logger)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = pc.Start(ctx)
|
err = pc.Start(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -254,7 +255,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
pm := NewPeerManager(10, 20, nil, logger)
|
pm := NewPeerManager(10, 20, nil, logger)
|
||||||
pm.SetHost(host)
|
pm.SetHost(host)
|
||||||
peerconn, err := NewPeerConnectionStrategy(pm, 30*time.Second, logger)
|
peerconn, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, logger)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
discv5, err := discv5.NewDiscoveryV5(prvKey1, localNode, peerconn, prometheus.DefaultRegisterer, logger, discv5.WithUDPPort(uint(udpPort)), discv5.WithBootnodes(bootnode))
|
discv5, err := discv5.NewDiscoveryV5(prvKey1, localNode, peerconn, prometheus.DefaultRegisterer, logger, discv5.WithUDPPort(uint(udpPort)), discv5.WithBootnodes(bootnode))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|||||||
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
@ -45,7 +46,8 @@ var (
|
|||||||
type WakuFilterLightNode struct {
|
type WakuFilterLightNode struct {
|
||||||
*service.CommonService
|
*service.CommonService
|
||||||
h host.Host
|
h host.Host
|
||||||
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
|
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.
|
||||||
|
onlineChecker onlinechecker.OnlineChecker
|
||||||
timesource timesource.Timesource
|
timesource timesource.Timesource
|
||||||
metrics Metrics
|
metrics Metrics
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
@ -79,12 +81,19 @@ func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError {
|
|||||||
// Note that broadcaster is optional.
|
// Note that broadcaster is optional.
|
||||||
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
|
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
|
||||||
// If using libp2p host, then pass peermanager as nil
|
// If using libp2p host, then pass peermanager as nil
|
||||||
func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager,
|
func NewWakuFilterLightNode(
|
||||||
timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger) *WakuFilterLightNode {
|
broadcaster relay.Broadcaster,
|
||||||
|
pm *peermanager.PeerManager,
|
||||||
|
timesource timesource.Timesource,
|
||||||
|
onlineChecker onlinechecker.OnlineChecker,
|
||||||
|
reg prometheus.Registerer,
|
||||||
|
log *zap.Logger,
|
||||||
|
) *WakuFilterLightNode {
|
||||||
wf := new(WakuFilterLightNode)
|
wf := new(WakuFilterLightNode)
|
||||||
wf.log = log.Named("filterv2-lightnode")
|
wf.log = log.Named("filterv2-lightnode")
|
||||||
wf.broadcaster = broadcaster
|
wf.broadcaster = broadcaster
|
||||||
wf.timesource = timesource
|
wf.timesource = timesource
|
||||||
|
wf.onlineChecker = onlineChecker
|
||||||
wf.pm = pm
|
wf.pm = pm
|
||||||
wf.CommonService = service.NewCommonService()
|
wf.CommonService = service.NewCommonService()
|
||||||
wf.metrics = newMetrics(reg)
|
wf.metrics = newMetrics(reg)
|
||||||
@ -701,3 +710,11 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte
|
|||||||
|
|
||||||
return wf.unsubscribeAll(ctx, opts...)
|
return wf.unsubscribeAll(ctx, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wf *WakuFilterLightNode) OnlineChecker() onlinechecker.OnlineChecker {
|
||||||
|
return wf.onlineChecker
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wf *WakuFilterLightNode) SetOnlineChecker(onlineChecker onlinechecker.OnlineChecker) {
|
||||||
|
wf.onlineChecker = onlineChecker
|
||||||
|
}
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"github.com/waku-org/go-waku/tests"
|
"github.com/waku-org/go-waku/tests"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
@ -165,7 +166,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
|
|||||||
b := relay.NewBroadcaster(10)
|
b := relay.NewBroadcaster(10)
|
||||||
s.Require().NoError(b.Start(context.Background()))
|
s.Require().NoError(b.Start(context.Background()))
|
||||||
pm := peermanager.NewPeerManager(5, 5, nil, s.Log)
|
pm := peermanager.NewPeerManager(5, 5, nil, s.Log)
|
||||||
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
|
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
|
||||||
filterPush.SetHost(host)
|
filterPush.SetHost(host)
|
||||||
pm.SetHost(host)
|
pm.SetHost(host)
|
||||||
return LightNodeData{filterPush, host}
|
return LightNodeData{filterPush, host}
|
||||||
|
|||||||
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p/core/host"
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -293,7 +294,7 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) {
|
|||||||
// Prepare peer manager for host3
|
// Prepare peer manager for host3
|
||||||
pm3 := peermanager.NewPeerManager(10, 20, nil, log)
|
pm3 := peermanager.NewPeerManager(10, 20, nil, log)
|
||||||
pm3.SetHost(host3)
|
pm3.SetHost(host3)
|
||||||
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger())
|
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, utils.Logger())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
pxPeerConn3.SetHost(host3)
|
pxPeerConn3.SetHost(host3)
|
||||||
err = pxPeerConn3.Start(context.Background())
|
err = pxPeerConn3.Start(context.Background())
|
||||||
@ -368,7 +369,7 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) {
|
|||||||
// Prepare peer manager for host3
|
// Prepare peer manager for host3
|
||||||
pm3 := peermanager.NewPeerManager(10, 20, nil, log)
|
pm3 := peermanager.NewPeerManager(10, 20, nil, log)
|
||||||
pm3.SetHost(host3)
|
pm3.SetHost(host3)
|
||||||
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger())
|
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, utils.Logger())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
pxPeerConn3.SetHost(host3)
|
pxPeerConn3.SetHost(host3)
|
||||||
err = pxPeerConn3.Start(context.Background())
|
err = pxPeerConn3.Start(context.Background())
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user