chore: lightmode ping peers when we come back online, (#5559)

* chore_: lightmode ping peers when we come back online, take filter ping fixes from go-waku

* fix_: handle network change event

* chore_: set connection state as per old state

* chore_: disable discovery for lightmode after connection change

* chore_: take filter error handling fixes from go-waku

* chore_: fix review comments
This commit is contained in:
Prem Chaitanya Prathi 2024-08-01 16:43:05 +05:30 committed by GitHub
parent b74d9e6b4e
commit f32312ff9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 159 additions and 102 deletions

2
go.mod
View File

@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d h1:thwE3nxBaINnQllutuZdMdyLMUZLKwX7TRufs1IrlQc= github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26 h1:F657EAwHvwTcVjMddQYGZjVC3mfYsdPD9AAHPvV9/hI=
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -14,7 +14,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const FilterPingTimeout = 5 * time.Second
const MultiplexChannelBuffer = 100 const MultiplexChannelBuffer = 100
type FilterConfig struct { type FilterConfig struct {

View File

@ -382,7 +382,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
} }
if len(rs) == 1 { if len(rs) == 1 {
w.log.Info("updating advertised relay shards in ENR") w.log.Info("updating advertised relay shards in ENR", zap.Any("newShardInfo", rs[0]))
if len(rs[0].ShardIDs) != len(topics) { if len(rs[0].ShardIDs) != len(topics) {
w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0])) w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0]))
} }

View File

@ -799,6 +799,17 @@ func (w *WakuNode) ClosePeerByAddress(address string) error {
return w.ClosePeerById(info.ID) return w.ClosePeerById(info.ID)
} }
func (w *WakuNode) DisconnectAllPeers() {
w.host.Network().StopNotify(w.connectionNotif)
for _, peerID := range w.host.Network().Peers() {
err := w.ClosePeerById(peerID)
if err != nil {
w.log.Info("failed to close peer", zap.Stringer("peer", peerID), zap.Error(err))
}
}
w.host.Network().Notify(w.connectionNotif)
}
// ClosePeerById is used to close a connection to a peer // ClosePeerById is used to close a connection to a peer
func (w *WakuNode) ClosePeerById(id peer.ID) error { func (w *WakuNode) ClosePeerById(id peer.ID) error {
err := w.host.Network().ClosePeer(id) err := w.host.Network().ClosePeer(id)

View File

@ -112,7 +112,7 @@ func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto
for _, shardInfo := range shardsInfo { for _, shardInfo := range shardsInfo {
err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount) err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount)
if err != nil { if err != nil {
pm.logger.Error("failed to discover and connect to peers", zap.Error(err)) pm.logger.Warn("failed to discover and connect to peers", zap.Error(err))
} }
} }
} else { } else {

View File

@ -313,8 +313,10 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
// peers. This will ensure that the peers returned by this function // peers. This will ensure that the peers returned by this function
// match those peers that are currently connected // match those peers that are currently connected
curPeerLen := pm.checkAndUpdateTopicHealth(topicInst) meshPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
if curPeerLen < pm.OutPeersTarget { topicPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(topicStr)
curPeerLen := topicPeers.Len()
if meshPeerLen < waku_proto.GossipSubDMin || curPeerLen < pm.OutPeersTarget {
pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh", pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.Int("targetPeers", pm.OutPeersTarget)) zap.Int("targetPeers", pm.OutPeersTarget))

View File

@ -147,17 +147,6 @@ func (wf *WakuFilterLightNode) Stop() {
}) })
} }
func (wf *WakuFilterLightNode) unsubscribeWithoutSubscription(cf protocol.ContentFilter, peerID peer.ID) {
err := wf.request(
wf.Context(),
protocol.GenerateRequestID(),
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
cf, peerID)
if err != nil {
wf.log.Warn("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
}
func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) { func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) { return func(stream network.Stream) {
peerID := stream.Conn().RemotePeer() peerID := stream.Conn().RemotePeer()
@ -168,8 +157,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
wf.metrics.RecordError(unknownPeerMessagePush) wf.metrics.RecordError(unknownPeerMessagePush)
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
//This could be happening due to https://github.com/waku-org/go-waku/issues/1124
go wf.unsubscribeWithoutSubscription(protocol.ContentFilter{}, peerID)
if err := stream.Reset(); err != nil { if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err)) wf.log.Error("resetting connection", zap.Error(err))
} }
@ -216,8 +203,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
cf := protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic) cf := protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)
if !wf.subscriptions.Has(peerID, cf) { if !wf.subscriptions.Has(peerID, cf) {
logger.Warn("received messagepush with invalid subscription parameters") logger.Warn("received messagepush with invalid subscription parameters")
//Unsubscribe from that peer for the contentTopic, possibly due to https://github.com/waku-org/go-waku/issues/1124
go wf.unsubscribeWithoutSubscription(cf, peerID)
wf.metrics.RecordError(invalidSubscriptionMessage) wf.metrics.RecordError(invalidSubscriptionMessage)
return return
} }
@ -242,7 +227,7 @@ func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID,
} }
func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peer peer.ID) error { reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peerID peer.ID) error {
request := &pb.FilterSubscribeRequest{ request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(requestID), RequestId: hex.EncodeToString(requestID),
FilterSubscribeType: reqType, FilterSubscribeType: reqType,
@ -255,11 +240,14 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
return err return err
} }
logger := wf.log.With(logging.HostID("peerID", peer)) logger := wf.log.With(logging.HostID("peerID", peerID))
stream, err := wf.h.NewStream(ctx, peer, FilterSubscribeID_v20beta1) stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
if err != nil { if err != nil {
wf.metrics.RecordError(dialFailure) wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
}
return err return err
} }
@ -419,21 +407,35 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
paramsCopy := params.Copy() paramsCopy := params.Copy()
paramsCopy.selectedPeers = selectedPeers paramsCopy.selectedPeers = selectedPeers
for _, peer := range selectedPeers { var wg sync.WaitGroup
err := wf.request( reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
ctx, defer cancel()
params.requestID, tmpSubs := make([]*subscription.SubscriptionDetails, len(selectedPeers))
pb.FilterSubscribeRequest_SUBSCRIBE, for i, peerID := range selectedPeers {
cFilter, wg.Add(1)
peer) go func(index int, ID peer.ID) {
if err != nil { defer wg.Done()
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), err := wf.request(
zap.Error(err)) reqCtx,
failedContentTopics = append(failedContentTopics, cTopics...) params.requestID,
continue pb.FilterSubscribeRequest_SUBSCRIBE,
cFilter,
ID)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
} else {
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", ID))
tmpSubs[index] = wf.subscriptions.NewSubscription(ID, cFilter)
}
}(i, peerID)
}
wg.Wait()
for _, sub := range tmpSubs {
if sub != nil {
subscriptions = append(subscriptions, sub)
} }
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer))
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter))
} }
} }

View File

@ -8,6 +8,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const PingTimeout = 5 * time.Second
func (wf *WakuFilterLightNode) PingPeers() { func (wf *WakuFilterLightNode) PingPeers() {
//Send a ping to all the peers and report their status to corresponding subscriptions //Send a ping to all the peers and report their status to corresponding subscriptions
// Alive or not or set state of subcription?? // Alive or not or set state of subcription??
@ -17,17 +19,23 @@ func (wf *WakuFilterLightNode) PingPeers() {
} }
func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), wf.peerPingInterval) ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
defer cancel() defer cancel()
err := wf.Ping(ctxWithTimeout, peer) err := wf.Ping(ctxWithTimeout, peer)
if err != nil { if err != nil {
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
//quickly retry ping again before marking subscription as failure
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) //Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent.
for _, subscription := range subscriptions { ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) defer cancel()
//Indicating that subscription is closing, err = wf.Ping(ctxWithTimeout, peer)
subscription.SetClosing() if err != nil {
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer)
for _, subscription := range subscriptions {
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID))
//Indicating that subscription is closing,
subscription.SetClosing()
}
} }
} }
} }
@ -39,7 +47,9 @@ func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
wf.PingPeers() if wf.onlineChecker.IsOnline() {
wf.PingPeers()
}
case <-wf.CommonService.Context().Done(): case <-wf.CommonService.Context().Done():
return return
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-msgio/pbio" "github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
@ -273,6 +274,9 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
wf.metrics.RecordError(pushTimeoutFailure) wf.metrics.RecordError(pushTimeoutFailure)
} else { } else {
wf.metrics.RecordError(dialFailure) wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
}
} }
logger.Error("opening peer stream", zap.Error(err)) logger.Error("opening peer stream", zap.Error(err))
return err return err

View File

@ -207,6 +207,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
store.metrics.RecordError(dialFailure) store.metrics.RecordError(dialFailure)
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer})
}
return nil, err return nil, err
} }

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"math" "math"
"sync" "sync"
"time"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
@ -188,14 +189,17 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu
} }
// request sends a message via lightPush protocol to either a specified peer or peer that is selected. // request sends a message via lightPush protocol to either a specified peer or peer that is selected.
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) { func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peerID peer.ID) (*pb.PushResponse, error) {
logger := wakuLP.log.With(logging.HostID("peer", peer)) logger := wakuLP.log.With(logging.HostID("peer", peerID))
stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1) stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure) wakuLP.metrics.RecordError(dialFailure)
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
}
return nil, err return nil, err
} }
pushRequestRPC := &pb.PushRpc{RequestId: hex.EncodeToString(params.requestID), Request: req} pushRequestRPC := &pb.PushRpc{RequestId: hex.EncodeToString(params.requestID), Request: req}
@ -325,19 +329,21 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
logger.Debug("publishing message", zap.Stringers("peers", params.selectedPeers)) logger.Debug("publishing message", zap.Stringers("peers", params.selectedPeers))
var wg sync.WaitGroup var wg sync.WaitGroup
var responses []*pb.PushResponse responses := make([]*pb.PushResponse, params.selectedPeers.Len())
for _, peerID := range params.selectedPeers { reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
for i, peerID := range params.selectedPeers {
wg.Add(1) wg.Add(1)
go func(id peer.ID) { go func(index int, id peer.ID) {
paramsValue := *params paramsValue := *params
paramsValue.requestID = protocol.GenerateRequestID() paramsValue.requestID = protocol.GenerateRequestID()
defer wg.Done() defer wg.Done()
response, err := wakuLP.request(ctx, req, &paramsValue, id) response, err := wakuLP.request(reqCtx, req, &paramsValue, id)
if err != nil { if err != nil {
logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id)) logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id))
} }
responses = append(responses, response) responses[index] = response
}(peerID) }(i, peerID)
} }
wg.Wait() wg.Wait()
var successCount int var successCount int

View File

@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-msgio/pbio" "github.com/libp2p/go-msgio/pbio"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb" "github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb"
@ -103,6 +104,9 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak
stream, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1) stream, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1)
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
}
return nil, err return nil, err
} }
@ -335,7 +339,7 @@ func (wakuM *WakuMetadata) DisconnectPeerOnShardMismatch(ctx context.Context, pe
return err return err
} }
if !rs.ContainsAnyShard(rs.ClusterID, peerShards) { if rs != nil && !rs.ContainsAnyShard(rs.ClusterID, peerShards) {
wakuM.log.Info("shard mismatch", logging.HostID("peerID", peerID), zap.Uint16("clusterID", rs.ClusterID), zap.Uint16s("ourShardIDs", rs.ShardIDs), zap.Uint16s("theirShardIDs", peerShards)) wakuM.log.Info("shard mismatch", logging.HostID("peerID", peerID), zap.Uint16("clusterID", rs.ClusterID), zap.Uint16s("ourShardIDs", rs.ShardIDs), zap.Uint16s("theirShardIDs", peerShards))
wakuM.disconnect(peerID) wakuM.disconnect(peerID)
return errors.New("shard mismatch") return errors.New("shard mismatch")

View File

@ -76,6 +76,9 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
if err != nil { if err != nil {
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer})
}
return err return err
} }

View File

@ -98,7 +98,7 @@ func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
if wakuPX.limiter != nil && !wakuPX.limiter.Allow() { if wakuPX.limiter != nil && !wakuPX.limiter.Allow() {
wakuPX.metrics.RecordError(rateLimitFailure) wakuPX.metrics.RecordError(rateLimitFailure)
wakuPX.log.Error("exceeds the rate limit") wakuPX.log.Info("exceeds the rate limit")
// TODO: peer exchange protocol should contain an err field // TODO: peer exchange protocol should contain an err field
if err := stream.Reset(); err != nil { if err := stream.Reset(); err != nil {
wakuPX.log.Error("resetting connection", zap.Error(err)) wakuPX.log.Error("resetting connection", zap.Error(err))

View File

@ -253,6 +253,9 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer})
}
return nil, err return nil, err
} }

2
vendor/modules.txt vendored
View File

@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d # github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26
## explicit; go 1.21 ## explicit; go 1.21
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/tests

View File

@ -111,7 +111,7 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
afilter, ok := mgr.incompleteFilterBatch[f.PubsubTopic] afilter, ok := mgr.incompleteFilterBatch[f.PubsubTopic]
if !ok { if !ok {
//no existing batch for pubsubTopic // no existing batch for pubsubTopic
mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", f.PubsubTopic)) mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", f.PubsubTopic))
cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics) cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
afilter = filterConfig{uuid.NewString(), cf} afilter = filterConfig{uuid.NewString(), cf}
@ -120,9 +120,9 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
} else { } else {
mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic)) mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic))
if len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics) > filterSubBatchSize { if len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics) > filterSubBatchSize {
//filter batch limit is hit // filter batch limit is hit
if mgr.onlineChecker.IsOnline() { if mgr.onlineChecker.IsOnline() {
//node is online, go ahead and subscribe the batch // node is online, go ahead and subscribe the batch
mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics))) mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics)))
go mgr.subscribeAndRunLoop(afilter) go mgr.subscribeAndRunLoop(afilter)
} else { } else {
@ -136,7 +136,7 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
mgr.incompleteFilterBatch[f.PubsubTopic] = afilter mgr.incompleteFilterBatch[f.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else { } else {
//add to existing batch as batch limit not reached // add to existing batch as batch limit not reached
var contentTopics []string var contentTopics []string
for _, ct := range maps.Keys(f.ContentTopics) { for _, ct := range maps.Keys(f.ContentTopics) {
afilter.contentFilter.ContentTopics[ct.ContentTopic()] = struct{}{} afilter.contentFilter.ContentTopics[ct.ContentTopic()] = struct{}{}
@ -164,17 +164,23 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
} }
} }
func (mgr *FilterManager) networkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}
func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) { func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) {
subs := mgr.node.Subscriptions()
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions))) zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if newStatus && !mgr.onlineChecker.IsOnline() { //switched from offline to Online if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.networkChange()
mgr.logger.Debug("switching from offline to online") mgr.logger.Debug("switching from offline to online")
mgr.Lock() mgr.Lock()
if len(mgr.waitingToSubQueue) > 0 { if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue { for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients // TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// Check if any filter subs are pending and subscribe them // check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af) go mgr.subscribeAndRunLoop(af)
} else { } else {

View File

@ -161,6 +161,7 @@ type Waku struct {
connStatusSubscriptions map[string]*types.ConnStatusSubscription connStatusSubscriptions map[string]*types.ConnStatusSubscription
connStatusMu sync.Mutex connStatusMu sync.Mutex
onlineChecker *onlinechecker.DefaultOnlineChecker onlineChecker *onlinechecker.DefaultOnlineChecker
state connection.State
logger *zap.Logger logger *zap.Logger
@ -228,7 +229,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
sendQueue: make(chan *protocol.Envelope, 1000), sendQueue: make(chan *protocol.Envelope, 1000),
topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100),
connectionNotifChan: make(chan node.PeerConnection), connectionNotifChan: make(chan node.PeerConnection, 20),
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
topicInterest: make(map[string]TopicInterest), topicInterest: make(map[string]TopicInterest),
ctx: ctx, ctx: ctx,
@ -461,7 +462,7 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
return nil return nil
} }
func (w *Waku) discoverAndConnectPeers() error { func (w *Waku) discoverAndConnectPeers() {
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
if len(d.PeerInfo.Addrs) != 0 { if len(d.PeerInfo.Addrs) != 0 {
@ -495,8 +496,6 @@ func (w *Waku) discoverAndConnectPeers() error {
go w.connect(*peerInfo, nil, wps.Static) go w.connect(*peerInfo, nil, wps.Static)
} }
} }
return nil
} }
func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) { func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) {
@ -1336,9 +1335,7 @@ func (w *Waku) Start() error {
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID())) w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID()))
if err = w.discoverAndConnectPeers(); err != nil { w.discoverAndConnectPeers()
return fmt.Errorf("failed to add wakuv2 peers: %v", err)
}
if w.cfg.EnableDiscV5 { if w.cfg.EnableDiscV5 {
err := w.node.DiscV5().Start(w.ctx) err := w.node.DiscV5().Start(w.ctx)
@ -1364,20 +1361,6 @@ func (w *Waku) Start() error {
isOnline := len(w.node.Host().Network().Peers()) > 0 isOnline := len(w.node.Host().Network().Peers()) > 0
if w.cfg.LightClient {
// TODO: Temporary changes for lightNodes to have health check based on connected peers.
//This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard.
//This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114
subs := w.node.FilterLightnode().Subscriptions()
w.logger.Debug("filter subs count", zap.Int("count", len(subs)))
//TODO: needs fixing, right now invoking everytime.
//Trigger FilterManager to take care of any pending filter subscriptions
//TODO: Pass pubsubTopic based on topicHealth notif received.
go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline)
}
w.connStatusMu.Lock() w.connStatusMu.Lock()
latestConnStatus := types.ConnStatus{ latestConnStatus := types.ConnStatus{
@ -1404,14 +1387,8 @@ func (w *Waku) Start() error {
w.statusTelemetryClient.PushPeerCount(w.PeerCount()) w.statusTelemetryClient.PushPeerCount(w.PeerCount())
} }
//TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled.
if !w.onlineChecker.IsOnline() && isOnline {
if err := w.discoverAndConnectPeers(); err != nil {
w.logger.Error("failed to add wakuv2 peers", zap.Error(err))
}
}
w.ConnectionChanged(connection.State{ w.ConnectionChanged(connection.State{
Type: w.state.Type, //setting state type as previous one since there won't be a change here
Offline: !latestConnStatus.IsOnline, Offline: !latestConnStatus.IsOnline,
}) })
} }
@ -1796,16 +1773,41 @@ func (w *Waku) StopDiscV5() error {
return nil return nil
} }
func (w *Waku) ConnectionChanged(state connection.State) { func (w *Waku) handleNetworkChangeFromApp(state connection.State) {
if !state.Offline && !w.onlineChecker.IsOnline() { //If connection state is reported by something other than peerCount becoming 0 e.g from mobile app, disconnect all peers
select { if (state.Offline && len(w.node.Host().Network().Peers()) > 0) ||
case w.goingOnline <- struct{}{}: (w.state.Type != state.Type && !w.state.Offline && !state.Offline) { // network switched between wifi and cellular
default: w.logger.Info("connection switched or offline detected via mobile, disconnecting all peers")
w.logger.Warn("could not write on connection changed channel") w.node.DisconnectAllPeers()
if w.cfg.LightClient {
w.filterManager.networkChange()
} }
} }
}
w.onlineChecker.SetOnline(!state.Offline) func (w *Waku) ConnectionChanged(state connection.State) {
isOnline := !state.Offline
if w.cfg.LightClient {
//TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114
// trigger FilterManager to take care of any pending filter subscriptions
go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline)
w.handleNetworkChangeFromApp(state)
} else {
// for lightClient state update and onlineChange is handled in filterManager.
// going online
if isOnline && !w.onlineChecker.IsOnline() {
//TODO: analyze if we need to discover and connect to peers for relay.
w.discoverAndConnectPeers()
select {
case w.goingOnline <- struct{}{}:
default:
w.logger.Warn("could not write on connection changed channel")
}
}
// update state
w.onlineChecker.SetOnline(isOnline)
}
w.state = state
} }
// seedBootnodesForDiscV5 tries to fetch bootnodes // seedBootnodesForDiscV5 tries to fetch bootnodes

View File

@ -657,6 +657,8 @@ func TestConfirmMessageDelivered(t *testing.T) {
func TestOnlineChecker(t *testing.T) { func TestOnlineChecker(t *testing.T) {
w, err := New(nil, "shards.staging", nil, nil, nil, nil, nil, nil) w, err := New(nil, "shards.staging", nil, nil, nil, nil, nil, nil)
require.NoError(t, w.Start())
require.NoError(t, err) require.NoError(t, err)
require.False(t, w.onlineChecker.IsOnline()) require.False(t, w.onlineChecker.IsOnline())