refactor_: use go-waku onlinechecker to manage connection state (#5340)
This commit is contained in:
parent
e6bf7e7df9
commit
e47e867b9b
2
go.mod
2
go.mod
|
@ -93,7 +93,7 @@ require (
|
|||
github.com/schollz/peerdiscovery v1.7.0
|
||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -2137,8 +2137,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry
|
|||
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659 h1:bTz24QgoRBXoS/WIy8+7jrQ/7hpE63td0fMteq5qZQM=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5 h1:9UyIIy/IvlJB2nHIXydne6OfNfOWPPL08+XmCI3iEBo=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"github.com/google/uuid"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
||||
|
@ -39,13 +40,13 @@ type Sub struct {
|
|||
cancel context.CancelFunc
|
||||
log *zap.Logger
|
||||
closing chan string
|
||||
isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not.
|
||||
onlineChecker onlinechecker.OnlineChecker
|
||||
resubscribeInProgress bool
|
||||
id string
|
||||
}
|
||||
|
||||
// Subscribe
|
||||
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) {
|
||||
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) {
|
||||
sub := new(Sub)
|
||||
sub.id = uuid.NewString()
|
||||
sub.wf = wf
|
||||
|
@ -56,14 +57,16 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
|
|||
sub.Config = config
|
||||
sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter))
|
||||
sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers))
|
||||
sub.isNodeOnline = online
|
||||
sub.closing = make(chan string, config.MaxPeers)
|
||||
if online {
|
||||
|
||||
sub.onlineChecker = wf.OnlineChecker()
|
||||
if wf.OnlineChecker().IsOnline() {
|
||||
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
|
||||
if err == nil {
|
||||
sub.multiplex(subs)
|
||||
}
|
||||
}
|
||||
|
||||
go sub.subscriptionLoop()
|
||||
return sub, nil
|
||||
}
|
||||
|
@ -72,17 +75,13 @@ func (apiSub *Sub) Unsubscribe() {
|
|||
apiSub.cancel()
|
||||
}
|
||||
|
||||
func (apiSub *Sub) SetNodeState(online bool) {
|
||||
apiSub.isNodeOnline = online
|
||||
}
|
||||
|
||||
func (apiSub *Sub) subscriptionLoop() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers &&
|
||||
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers &&
|
||||
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
|
||||
apiSub.closing <- ""
|
||||
}
|
||||
|
@ -109,7 +108,7 @@ func (apiSub *Sub) checkAndResubscribe(subId string) {
|
|||
delete(apiSub.subs, subId)
|
||||
}
|
||||
apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter))
|
||||
if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers {
|
||||
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers {
|
||||
apiSub.resubscribe(failedPeer)
|
||||
}
|
||||
apiSub.resubscribeInProgress = false
|
||||
|
|
|
@ -258,7 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||
//Initialize peer manager.
|
||||
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log)
|
||||
|
||||
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
|
||||
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log)
|
||||
if err != nil {
|
||||
w.log.Error("creating peer connection strategy", zap.Error(err))
|
||||
}
|
||||
|
@ -290,7 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||
w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager))
|
||||
|
||||
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
|
||||
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log)
|
||||
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
||||
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
||||
|
||||
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
|
||||
|
@ -63,6 +64,8 @@ type WakuNodeParameters struct {
|
|||
circuitRelayMinInterval time.Duration
|
||||
circuitRelayBootDelay time.Duration
|
||||
|
||||
onlineChecker onlinechecker.OnlineChecker
|
||||
|
||||
enableNTP bool
|
||||
ntpURLs []string
|
||||
|
||||
|
@ -130,6 +133,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
|
|||
WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP),
|
||||
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
||||
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
|
||||
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
||||
}
|
||||
|
||||
// MultiAddresses return the list of multiaddresses configured in the node
|
||||
|
@ -554,6 +558,16 @@ func WithTopicHealthStatusChannel(ch chan<- peermanager.TopicHealthStatus) WakuN
|
|||
}
|
||||
}
|
||||
|
||||
// WithOnlineChecker sets up an OnlineChecker which will be used to determine whether the node
|
||||
// is online or not. The OnlineChecker must be implemented by consumers of go-waku since they
|
||||
// have additional context to determine what it means for a node to be online/offline
|
||||
func WithOnlineChecker(onlineChecker onlinechecker.OnlineChecker) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.onlineChecker = onlineChecker
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Default options used in the libp2p node
|
||||
var DefaultLibP2POptions = []libp2p.Option{
|
||||
libp2p.ChainOptions(
|
||||
|
|
24
vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go
generated
vendored
Normal file
24
vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go
generated
vendored
Normal file
|
@ -0,0 +1,24 @@
|
|||
package onlinechecker
|
||||
|
||||
// OnlineChecker is used to determine if node has connectivity.
|
||||
type OnlineChecker interface {
|
||||
IsOnline() bool
|
||||
}
|
||||
|
||||
type DefaultOnlineChecker struct {
|
||||
online bool
|
||||
}
|
||||
|
||||
func NewDefaultOnlineChecker(online bool) OnlineChecker {
|
||||
return &DefaultOnlineChecker{
|
||||
online: online,
|
||||
}
|
||||
}
|
||||
|
||||
func (o *DefaultOnlineChecker) SetOnline(online bool) {
|
||||
o.online = online
|
||||
}
|
||||
|
||||
func (o *DefaultOnlineChecker) IsOnline() bool {
|
||||
return o.online
|
||||
}
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/service"
|
||||
|
||||
|
@ -31,6 +32,7 @@ type PeerConnectionStrategy struct {
|
|||
cache *lru.TwoQueueCache
|
||||
host host.Host
|
||||
pm *PeerManager
|
||||
onlineChecker onlinechecker.OnlineChecker
|
||||
|
||||
paused atomic.Bool
|
||||
dialTimeout time.Duration
|
||||
|
@ -59,8 +61,12 @@ func getBackOff() backoff.BackoffFactory {
|
|||
//
|
||||
// dialTimeout is how long we attempt to connect to a peer before giving up
|
||||
// minPeers is the minimum number of peers that the node should have
|
||||
func NewPeerConnectionStrategy(pm *PeerManager,
|
||||
dialTimeout time.Duration, logger *zap.Logger) (*PeerConnectionStrategy, error) {
|
||||
func NewPeerConnectionStrategy(
|
||||
pm *PeerManager,
|
||||
onlineChecker onlinechecker.OnlineChecker,
|
||||
dialTimeout time.Duration,
|
||||
logger *zap.Logger,
|
||||
) (*PeerConnectionStrategy, error) {
|
||||
// cacheSize is the size of a TwoQueueCache
|
||||
cacheSize := 600
|
||||
cache, err := lru.New2Q(cacheSize)
|
||||
|
@ -72,6 +78,7 @@ func NewPeerConnectionStrategy(pm *PeerManager,
|
|||
cache: cache,
|
||||
dialTimeout: dialTimeout,
|
||||
CommonDiscoveryService: service.NewCommonDiscoveryService(),
|
||||
onlineChecker: onlineChecker,
|
||||
pm: pm,
|
||||
backoff: getBackOff(),
|
||||
logger: logger.Named("discovery-connector"),
|
||||
|
@ -171,6 +178,10 @@ func (c *PeerConnectionStrategy) isPaused() bool {
|
|||
return c.paused.Load()
|
||||
}
|
||||
|
||||
func (c *PeerConnectionStrategy) SetPaused(paused bool) {
|
||||
c.paused.Store(paused)
|
||||
}
|
||||
|
||||
// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
|
||||
func (c *PeerConnectionStrategy) consumeSubscriptions() {
|
||||
for _, subs := range c.subscriptions {
|
||||
|
@ -234,10 +245,17 @@ func (c *PeerConnectionStrategy) dialPeers() {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-c.Context().Done():
|
||||
return
|
||||
case pd, ok := <-c.GetListeningChan():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if !c.onlineChecker.IsOnline() {
|
||||
continue
|
||||
}
|
||||
|
||||
addrInfo := pd.AddrInfo
|
||||
|
||||
if addrInfo.ID == c.host.ID() || addrInfo.ID == "" ||
|
||||
|
@ -250,8 +268,6 @@ func (c *PeerConnectionStrategy) dialPeers() {
|
|||
c.WaitGroup().Add(1)
|
||||
go c.dialPeer(addrInfo, sem)
|
||||
}
|
||||
case <-c.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
|
|||
|
||||
wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol]
|
||||
if !ok {
|
||||
pm.logger.Info("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol)))
|
||||
pm.logger.Warn("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol)))
|
||||
return nil, errors.New("cannot do on demand discovery for non-waku protocol")
|
||||
}
|
||||
iterator, err := pm.discoveryService.PeerIterator(
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
|
@ -45,7 +46,8 @@ var (
|
|||
type WakuFilterLightNode struct {
|
||||
*service.CommonService
|
||||
h host.Host
|
||||
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
|
||||
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.
|
||||
onlineChecker onlinechecker.OnlineChecker
|
||||
timesource timesource.Timesource
|
||||
metrics Metrics
|
||||
log *zap.Logger
|
||||
|
@ -79,12 +81,19 @@ func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError {
|
|||
// Note that broadcaster is optional.
|
||||
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
|
||||
// If using libp2p host, then pass peermanager as nil
|
||||
func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager,
|
||||
timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger) *WakuFilterLightNode {
|
||||
func NewWakuFilterLightNode(
|
||||
broadcaster relay.Broadcaster,
|
||||
pm *peermanager.PeerManager,
|
||||
timesource timesource.Timesource,
|
||||
onlineChecker onlinechecker.OnlineChecker,
|
||||
reg prometheus.Registerer,
|
||||
log *zap.Logger,
|
||||
) *WakuFilterLightNode {
|
||||
wf := new(WakuFilterLightNode)
|
||||
wf.log = log.Named("filterv2-lightnode")
|
||||
wf.broadcaster = broadcaster
|
||||
wf.timesource = timesource
|
||||
wf.onlineChecker = onlineChecker
|
||||
wf.pm = pm
|
||||
wf.CommonService = service.NewCommonService()
|
||||
wf.metrics = newMetrics(reg)
|
||||
|
@ -701,3 +710,11 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte
|
|||
|
||||
return wf.unsubscribeAll(ctx, opts...)
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) OnlineChecker() onlinechecker.OnlineChecker {
|
||||
return wf.onlineChecker
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) SetOnlineChecker(onlineChecker onlinechecker.OnlineChecker) {
|
||||
wf.onlineChecker = onlineChecker
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
|
@ -165,7 +166,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
|
|||
b := relay.NewBroadcaster(10)
|
||||
s.Require().NoError(b.Start(context.Background()))
|
||||
pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log)
|
||||
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
|
||||
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
|
||||
filterPush.SetHost(host)
|
||||
pm.SetHost(host)
|
||||
return LightNodeData{filterPush, host}
|
||||
|
|
|
@ -1015,7 +1015,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
|||
github.com/waku-org/go-libp2p-rendezvous
|
||||
github.com/waku-org/go-libp2p-rendezvous/db
|
||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5
|
||||
## explicit; go 1.21
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/tests
|
||||
|
@ -1025,6 +1025,7 @@ github.com/waku-org/go-waku/waku/v2/discv5
|
|||
github.com/waku-org/go-waku/waku/v2/dnsdisc
|
||||
github.com/waku-org/go-waku/waku/v2/hash
|
||||
github.com/waku-org/go-waku/waku/v2/node
|
||||
github.com/waku-org/go-waku/waku/v2/onlinechecker
|
||||
github.com/waku-org/go-waku/waku/v2/payload
|
||||
github.com/waku-org/go-waku/waku/v2/peermanager
|
||||
github.com/waku-org/go-waku/waku/v2/peerstore
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/api"
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
)
|
||||
|
@ -32,7 +33,7 @@ type FilterManager struct {
|
|||
onNewEnvelopes func(env *protocol.Envelope) error
|
||||
logger *zap.Logger
|
||||
node *filter.WakuFilterLightNode
|
||||
peersAvailable bool
|
||||
onlineChecker *onlinechecker.DefaultOnlineChecker
|
||||
filterQueue chan filterConfig
|
||||
}
|
||||
type SubDetails struct {
|
||||
|
@ -56,8 +57,10 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, cfg *Config, onNe
|
|||
mgr.onNewEnvelopes = onNewEnvelopes
|
||||
mgr.filters = make(map[string]SubDetails)
|
||||
mgr.node = node
|
||||
mgr.peersAvailable = false
|
||||
mgr.filterQueue = make(chan filterConfig, filterQueueSize)
|
||||
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
|
||||
|
||||
mgr.node.SetOnlineChecker(mgr.onlineChecker)
|
||||
|
||||
return mgr
|
||||
}
|
||||
|
@ -68,7 +71,7 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
|
|||
contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
|
||||
mgr.logger.Debug("adding filter", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter))
|
||||
|
||||
if mgr.peersAvailable {
|
||||
if mgr.onlineChecker.IsOnline() {
|
||||
go mgr.subscribeAndRunLoop(filterConfig{filterID, contentFilter})
|
||||
} else {
|
||||
mgr.logger.Debug("queuing filter as not online", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter))
|
||||
|
@ -80,7 +83,7 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
|
|||
ctx, cancel := context.WithCancel(mgr.ctx)
|
||||
config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter}
|
||||
|
||||
sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.peersAvailable)
|
||||
sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger)
|
||||
mgr.Lock()
|
||||
mgr.filters[f.ID] = SubDetails{cancel, sub}
|
||||
mgr.Unlock()
|
||||
|
@ -109,12 +112,8 @@ func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus
|
|||
}
|
||||
}
|
||||
}
|
||||
mgr.Lock()
|
||||
for _, subDetails := range mgr.filters {
|
||||
subDetails.sub.SetNodeState(newStatus)
|
||||
}
|
||||
mgr.Unlock()
|
||||
mgr.peersAvailable = newStatus
|
||||
|
||||
mgr.onlineChecker.SetOnline(newStatus)
|
||||
}
|
||||
|
||||
func (mgr *FilterManager) removeFilter(filterID string) {
|
||||
|
|
|
@ -56,6 +56,7 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/metrics"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
|
@ -75,7 +76,6 @@ import (
|
|||
"github.com/status-im/status-go/wakuv2/persistence"
|
||||
|
||||
node "github.com/waku-org/go-waku/waku/v2/node"
|
||||
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
)
|
||||
|
||||
|
@ -91,7 +91,7 @@ const messageExpiredPerid = 10 // in seconds
|
|||
const maxRelayPeers = 300
|
||||
|
||||
type SentEnvelope struct {
|
||||
Envelope *v2protocol.Envelope
|
||||
Envelope *protocol.Envelope
|
||||
PublishMethod PublishMethod
|
||||
}
|
||||
|
||||
|
@ -101,7 +101,7 @@ type ErrorSendingEnvelope struct {
|
|||
}
|
||||
|
||||
type ITelemetryClient interface {
|
||||
PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope)
|
||||
PushReceivedEnvelope(receivedEnvelope *protocol.Envelope)
|
||||
PushSentEnvelope(sentEnvelope SentEnvelope)
|
||||
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
|
||||
}
|
||||
|
@ -156,6 +156,7 @@ type Waku struct {
|
|||
topicHealthStatusChan chan peermanager.TopicHealthStatus
|
||||
connStatusSubscriptions map[string]*types.ConnStatusSubscription
|
||||
connStatusMu sync.Mutex
|
||||
onlineChecker *onlinechecker.DefaultOnlineChecker
|
||||
|
||||
logger *zap.Logger
|
||||
|
||||
|
@ -166,9 +167,6 @@ type Waku struct {
|
|||
// bootnodes successfully
|
||||
seededBootnodesForDiscV5 bool
|
||||
|
||||
// offline indicates whether we have detected connectivity
|
||||
offline bool
|
||||
|
||||
// connectionChanged is channel that notifies when connectivity has changed
|
||||
connectionChanged chan struct{}
|
||||
|
||||
|
@ -242,6 +240,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
|
|||
discV5BootstrapNodes: cfg.DiscV5BootstrapNodes,
|
||||
onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed,
|
||||
onPeerStats: onPeerStats,
|
||||
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
|
||||
}
|
||||
|
||||
waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger)
|
||||
|
@ -1327,13 +1326,13 @@ func (w *Waku) lightClientConnectionStatus() {
|
|||
}
|
||||
|
||||
//TODO:Analyze if we need to discover and connect to peers with peerExchange loop enabled.
|
||||
if w.offline && isOnline {
|
||||
if !w.onlineChecker.IsOnline() && isOnline {
|
||||
if err := w.discoverAndConnectPeers(); err != nil {
|
||||
w.logger.Error("failed to add wakuv2 peers", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
w.offline = !isOnline
|
||||
w.onlineChecker.SetOnline(isOnline)
|
||||
}
|
||||
|
||||
// Start implements node.Service, starting the background data propagation thread
|
||||
|
@ -1802,7 +1801,7 @@ func (w *Waku) StopDiscV5() error {
|
|||
}
|
||||
|
||||
func (w *Waku) ConnectionChanged(state connection.State) {
|
||||
if !state.Offline && w.offline {
|
||||
if !state.Offline && !w.onlineChecker.IsOnline() {
|
||||
select {
|
||||
case w.connectionChanged <- struct{}{}:
|
||||
default:
|
||||
|
@ -1810,7 +1809,7 @@ func (w *Waku) ConnectionChanged(state connection.State) {
|
|||
}
|
||||
}
|
||||
|
||||
w.offline = state.Offline
|
||||
w.onlineChecker.SetOnline(!state.Offline)
|
||||
}
|
||||
|
||||
// seedBootnodesForDiscV5 tries to fetch bootnodes
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"math/big"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -30,6 +31,7 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
|
||||
"github.com/status-im/status-go/appdatabase"
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
"github.com/status-im/status-go/t/helpers"
|
||||
|
@ -612,3 +614,40 @@ func TestConfirmMessageDelivered(t *testing.T) {
|
|||
require.NoError(t, aliceNode.Stop())
|
||||
require.NoError(t, bobNode.Stop())
|
||||
}
|
||||
|
||||
func TestOnlineChecker(t *testing.T) {
|
||||
w, err := New(nil, "shards.staging", nil, nil, nil, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
require.False(t, w.onlineChecker.IsOnline())
|
||||
|
||||
w.ConnectionChanged(connection.State{Offline: false})
|
||||
require.True(t, w.onlineChecker.IsOnline())
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-w.connectionChanged
|
||||
require.True(t, true)
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
w.ConnectionChanged(connection.State{Offline: true})
|
||||
require.False(t, w.onlineChecker.IsOnline())
|
||||
|
||||
// Test lightnode online checker
|
||||
config := &Config{}
|
||||
config.ClusterID = 16
|
||||
config.LightClient = true
|
||||
lightNode, err := New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = lightNode.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.False(t, lightNode.onlineChecker.IsOnline())
|
||||
|
||||
lightNode.filterManager.addFilter("test", &common.Filter{})
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue