diff --git a/go.mod b/go.mod index ae43f8617..6d7e8a8d5 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d + github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index f9b192bd0..c5ab705a5 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d h1:thwE3nxBaINnQllutuZdMdyLMUZLKwX7TRufs1IrlQc= -github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= +github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26 h1:F657EAwHvwTcVjMddQYGZjVC3mfYsdPD9AAHPvV9/hI= +github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go index b9d64310a..1f9ea6be4 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go @@ -14,7 +14,6 @@ import ( "go.uber.org/zap" ) -const FilterPingTimeout = 5 * time.Second const MultiplexChannelBuffer = 100 type FilterConfig struct { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go index 74ba35ced..9de6c59fd 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go @@ -382,7 +382,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error { } if len(rs) == 1 { - w.log.Info("updating advertised relay shards in ENR") + w.log.Info("updating advertised relay shards in ENR", zap.Any("newShardInfo", rs[0])) if len(rs[0].ShardIDs) != len(topics) { w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0])) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index c29a2b93a..dd7fbae9b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -799,6 +799,17 @@ func (w *WakuNode) ClosePeerByAddress(address string) error { return w.ClosePeerById(info.ID) } +func (w *WakuNode) DisconnectAllPeers() { + w.host.Network().StopNotify(w.connectionNotif) + for _, peerID := range w.host.Network().Peers() { + err := w.ClosePeerById(peerID) + if err != nil { + w.log.Info("failed to close peer", zap.Stringer("peer", peerID), zap.Error(err)) + } + } + w.host.Network().Notify(w.connectionNotif) +} + // ClosePeerById is used to close a connection to a peer func (w *WakuNode) ClosePeerById(id peer.ID) error { err := w.host.Network().ClosePeer(id) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go index ae18907c0..8ab1c8beb 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go @@ -112,7 +112,7 @@ func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto for _, shardInfo := range shardsInfo { err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount) if err != nil { - pm.logger.Error("failed to discover and connect to peers", zap.Error(err)) + pm.logger.Warn("failed to discover and connect to peers", zap.Error(err)) } } } else { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index 3126f9142..1cfc5484f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -313,8 +313,10 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { // peers. This will ensure that the peers returned by this function // match those peers that are currently connected - curPeerLen := pm.checkAndUpdateTopicHealth(topicInst) - if curPeerLen < pm.OutPeersTarget { + meshPeerLen := pm.checkAndUpdateTopicHealth(topicInst) + topicPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(topicStr) + curPeerLen := topicPeers.Len() + if meshPeerLen < waku_proto.GossipSubDMin || curPeerLen < pm.OutPeersTarget { pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh", zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), zap.Int("targetPeers", pm.OutPeersTarget)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 3342dc299..c52c90981 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -147,17 +147,6 @@ func (wf *WakuFilterLightNode) Stop() { }) } -func (wf *WakuFilterLightNode) unsubscribeWithoutSubscription(cf protocol.ContentFilter, peerID peer.ID) { - err := wf.request( - wf.Context(), - protocol.GenerateRequestID(), - pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, - cf, peerID) - if err != nil { - wf.log.Warn("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) - } -} - func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { peerID := stream.Conn().RemotePeer() @@ -168,8 +157,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) wf.metrics.RecordError(unknownPeerMessagePush) //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us - //This could be happening due to https://github.com/waku-org/go-waku/issues/1124 - go wf.unsubscribeWithoutSubscription(protocol.ContentFilter{}, peerID) if err := stream.Reset(); err != nil { wf.log.Error("resetting connection", zap.Error(err)) } @@ -216,8 +203,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea cf := protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic) if !wf.subscriptions.Has(peerID, cf) { logger.Warn("received messagepush with invalid subscription parameters") - //Unsubscribe from that peer for the contentTopic, possibly due to https://github.com/waku-org/go-waku/issues/1124 - go wf.unsubscribeWithoutSubscription(cf, peerID) wf.metrics.RecordError(invalidSubscriptionMessage) return } @@ -242,7 +227,7 @@ func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID, } func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, - reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peer peer.ID) error { + reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peerID peer.ID) error { request := &pb.FilterSubscribeRequest{ RequestId: hex.EncodeToString(requestID), FilterSubscribeType: reqType, @@ -255,11 +240,14 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, return err } - logger := wf.log.With(logging.HostID("peerID", peer)) + logger := wf.log.With(logging.HostID("peerID", peerID)) - stream, err := wf.h.NewStream(ctx, peer, FilterSubscribeID_v20beta1) + stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) + if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return err } @@ -419,21 +407,35 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot paramsCopy := params.Copy() paramsCopy.selectedPeers = selectedPeers - for _, peer := range selectedPeers { - err := wf.request( - ctx, - params.requestID, - pb.FilterSubscribeRequest_SUBSCRIBE, - cFilter, - peer) - if err != nil { - wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), - zap.Error(err)) - failedContentTopics = append(failedContentTopics, cTopics...) - continue + var wg sync.WaitGroup + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + tmpSubs := make([]*subscription.SubscriptionDetails, len(selectedPeers)) + for i, peerID := range selectedPeers { + wg.Add(1) + go func(index int, ID peer.ID) { + defer wg.Done() + err := wf.request( + reqCtx, + params.requestID, + pb.FilterSubscribeRequest_SUBSCRIBE, + cFilter, + ID) + if err != nil { + wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + } else { + wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", ID)) + tmpSubs[index] = wf.subscriptions.NewSubscription(ID, cFilter) + } + }(i, peerID) + } + wg.Wait() + for _, sub := range tmpSubs { + if sub != nil { + subscriptions = append(subscriptions, sub) } - wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer)) - subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter)) } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go index 836175b53..a6b76a340 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go @@ -8,6 +8,8 @@ import ( "go.uber.org/zap" ) +const PingTimeout = 5 * time.Second + func (wf *WakuFilterLightNode) PingPeers() { //Send a ping to all the peers and report their status to corresponding subscriptions // Alive or not or set state of subcription?? @@ -17,17 +19,23 @@ func (wf *WakuFilterLightNode) PingPeers() { } func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { - ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), wf.peerPingInterval) + ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) defer cancel() err := wf.Ping(ctxWithTimeout, peer) if err != nil { wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) - - subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) - for _, subscription := range subscriptions { - wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) - //Indicating that subscription is closing, - subscription.SetClosing() + //quickly retry ping again before marking subscription as failure + //Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent. + ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) + defer cancel() + err = wf.Ping(ctxWithTimeout, peer) + if err != nil { + subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) + for _, subscription := range subscriptions { + wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) + //Indicating that subscription is closing, + subscription.SetClosing() + } } } } @@ -39,7 +47,9 @@ func (wf *WakuFilterLightNode) FilterHealthCheckLoop() { for { select { case <-ticker.C: - wf.PingPeers() + if wf.onlineChecker.IsOnline() { + wf.PingPeers() + } case <-wf.CommonService.Context().Done(): return } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go index 2bf63bb50..3d898f89f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -273,6 +274,9 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge wf.metrics.RecordError(pushTimeoutFailure) } else { wf.metrics.RecordError(dialFailure) + if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } } logger.Error("opening peer stream", zap.Error(err)) return err diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go index b02cd92e3..456dada54 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go @@ -207,6 +207,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor if err != nil { logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) + if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index de7bce8b4..c0a72c2ed 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "sync" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -188,14 +189,17 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu } // request sends a message via lightPush protocol to either a specified peer or peer that is selected. -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) { +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peerID peer.ID) (*pb.PushResponse, error) { - logger := wakuLP.log.With(logging.HostID("peer", peer)) + logger := wakuLP.log.With(logging.HostID("peer", peerID)) - stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1) + stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) + if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return nil, err } pushRequestRPC := &pb.PushRpc{RequestId: hex.EncodeToString(params.requestID), Request: req} @@ -325,19 +329,21 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa logger.Debug("publishing message", zap.Stringers("peers", params.selectedPeers)) var wg sync.WaitGroup - var responses []*pb.PushResponse - for _, peerID := range params.selectedPeers { + responses := make([]*pb.PushResponse, params.selectedPeers.Len()) + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + for i, peerID := range params.selectedPeers { wg.Add(1) - go func(id peer.ID) { + go func(index int, id peer.ID) { paramsValue := *params paramsValue.requestID = protocol.GenerateRequestID() defer wg.Done() - response, err := wakuLP.request(ctx, req, ¶msValue, id) + response, err := wakuLP.request(reqCtx, req, ¶msValue, id) if err != nil { logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id)) } - responses = append(responses, response) - }(peerID) + responses[index] = response + }(i, peerID) } wg.Wait() var successCount int diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go index 23a7e455a..93a70a2bf 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb" @@ -103,6 +104,9 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak stream, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) + if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return nil, err } @@ -335,7 +339,7 @@ func (wakuM *WakuMetadata) DisconnectPeerOnShardMismatch(ctx context.Context, pe return err } - if !rs.ContainsAnyShard(rs.ClusterID, peerShards) { + if rs != nil && !rs.ContainsAnyShard(rs.ClusterID, peerShards) { wakuM.log.Info("shard mismatch", logging.HostID("peerID", peerID), zap.Uint16("clusterID", rs.ClusterID), zap.Uint16s("ourShardIDs", rs.ShardIDs), zap.Uint16s("theirShardIDs", peerShards)) wakuM.disconnect(peerID) return errors.New("shard mismatch") diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go index 915ce75fd..6baf30950 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go @@ -76,6 +76,9 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { + if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer}) + } return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go index 5f103e12c..08e5051ca 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go @@ -98,7 +98,7 @@ func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) { if wakuPX.limiter != nil && !wakuPX.limiter.Allow() { wakuPX.metrics.RecordError(rateLimitFailure) - wakuPX.log.Error("exceeds the rate limit") + wakuPX.log.Info("exceeds the rate limit") // TODO: peer exchange protocol should contain an err field if err := stream.Reset(); err != nil { wakuPX.log.Error("resetting connection", zap.Error(err)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 34a08bcf4..5cda4eef2 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -253,6 +253,9 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) + if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + } return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 53b8785c2..231ba1e21 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d +# github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/filter_manager.go b/wakuv2/filter_manager.go index ce1984a3a..892f20391 100644 --- a/wakuv2/filter_manager.go +++ b/wakuv2/filter_manager.go @@ -111,7 +111,7 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) { afilter, ok := mgr.incompleteFilterBatch[f.PubsubTopic] if !ok { - //no existing batch for pubsubTopic + // no existing batch for pubsubTopic mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", f.PubsubTopic)) cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics) afilter = filterConfig{uuid.NewString(), cf} @@ -120,9 +120,9 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) { } else { mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic)) if len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics) > filterSubBatchSize { - //filter batch limit is hit + // filter batch limit is hit if mgr.onlineChecker.IsOnline() { - //node is online, go ahead and subscribe the batch + // node is online, go ahead and subscribe the batch mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics))) go mgr.subscribeAndRunLoop(afilter) } else { @@ -136,7 +136,7 @@ func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) { mgr.incompleteFilterBatch[f.PubsubTopic] = afilter mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf} } else { - //add to existing batch as batch limit not reached + // add to existing batch as batch limit not reached var contentTopics []string for _, ct := range maps.Keys(f.ContentTopics) { afilter.contentFilter.ContentTopics[ct.ContentTopic()] = struct{}{} @@ -164,17 +164,23 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { } } +func (mgr *FilterManager) networkChange() { + mgr.node.PingPeers() // ping all peers to check if subscriptions are alive +} + func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) { + subs := mgr.node.Subscriptions() mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), - zap.Int("agg filters count", len(mgr.filterSubscriptions))) - if newStatus && !mgr.onlineChecker.IsOnline() { //switched from offline to Online + zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online + mgr.networkChange() mgr.logger.Debug("switching from offline to online") mgr.Lock() if len(mgr.waitingToSubQueue) > 0 { for af := range mgr.waitingToSubQueue { // TODO: change the below logic once topic specific health is implemented for lightClients if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { - // Check if any filter subs are pending and subscribe them + // check if any filter subs are pending and subscribe them mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) go mgr.subscribeAndRunLoop(af) } else { diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 5a08bbe54..9f6a83d19 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -161,6 +161,7 @@ type Waku struct { connStatusSubscriptions map[string]*types.ConnStatusSubscription connStatusMu sync.Mutex onlineChecker *onlinechecker.DefaultOnlineChecker + state connection.State logger *zap.Logger @@ -228,7 +229,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), sendQueue: make(chan *protocol.Envelope, 1000), topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), - connectionNotifChan: make(chan node.PeerConnection), + connectionNotifChan: make(chan node.PeerConnection, 20), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), topicInterest: make(map[string]TopicInterest), ctx: ctx, @@ -461,7 +462,7 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA return nil } -func (w *Waku) discoverAndConnectPeers() error { +func (w *Waku) discoverAndConnectPeers() { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { defer wg.Done() if len(d.PeerInfo.Addrs) != 0 { @@ -495,8 +496,6 @@ func (w *Waku) discoverAndConnectPeers() error { go w.connect(*peerInfo, nil, wps.Static) } } - - return nil } func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) { @@ -1336,9 +1335,7 @@ func (w *Waku) Start() error { w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID())) - if err = w.discoverAndConnectPeers(); err != nil { - return fmt.Errorf("failed to add wakuv2 peers: %v", err) - } + w.discoverAndConnectPeers() if w.cfg.EnableDiscV5 { err := w.node.DiscV5().Start(w.ctx) @@ -1364,20 +1361,6 @@ func (w *Waku) Start() error { isOnline := len(w.node.Host().Network().Peers()) > 0 - if w.cfg.LightClient { - // TODO: Temporary changes for lightNodes to have health check based on connected peers. - //This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard. - //This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114 - - subs := w.node.FilterLightnode().Subscriptions() - w.logger.Debug("filter subs count", zap.Int("count", len(subs))) - - //TODO: needs fixing, right now invoking everytime. - //Trigger FilterManager to take care of any pending filter subscriptions - //TODO: Pass pubsubTopic based on topicHealth notif received. - go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline) - - } w.connStatusMu.Lock() latestConnStatus := types.ConnStatus{ @@ -1404,14 +1387,8 @@ func (w *Waku) Start() error { w.statusTelemetryClient.PushPeerCount(w.PeerCount()) } - //TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled. - if !w.onlineChecker.IsOnline() && isOnline { - if err := w.discoverAndConnectPeers(); err != nil { - w.logger.Error("failed to add wakuv2 peers", zap.Error(err)) - } - } - w.ConnectionChanged(connection.State{ + Type: w.state.Type, //setting state type as previous one since there won't be a change here Offline: !latestConnStatus.IsOnline, }) } @@ -1796,16 +1773,41 @@ func (w *Waku) StopDiscV5() error { return nil } -func (w *Waku) ConnectionChanged(state connection.State) { - if !state.Offline && !w.onlineChecker.IsOnline() { - select { - case w.goingOnline <- struct{}{}: - default: - w.logger.Warn("could not write on connection changed channel") +func (w *Waku) handleNetworkChangeFromApp(state connection.State) { + //If connection state is reported by something other than peerCount becoming 0 e.g from mobile app, disconnect all peers + if (state.Offline && len(w.node.Host().Network().Peers()) > 0) || + (w.state.Type != state.Type && !w.state.Offline && !state.Offline) { // network switched between wifi and cellular + w.logger.Info("connection switched or offline detected via mobile, disconnecting all peers") + w.node.DisconnectAllPeers() + if w.cfg.LightClient { + w.filterManager.networkChange() } } +} - w.onlineChecker.SetOnline(!state.Offline) +func (w *Waku) ConnectionChanged(state connection.State) { + isOnline := !state.Offline + if w.cfg.LightClient { + //TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114 + // trigger FilterManager to take care of any pending filter subscriptions + go w.filterManager.onConnectionStatusChange(w.cfg.DefaultShardPubsubTopic, isOnline) + w.handleNetworkChangeFromApp(state) + } else { + // for lightClient state update and onlineChange is handled in filterManager. + // going online + if isOnline && !w.onlineChecker.IsOnline() { + //TODO: analyze if we need to discover and connect to peers for relay. + w.discoverAndConnectPeers() + select { + case w.goingOnline <- struct{}{}: + default: + w.logger.Warn("could not write on connection changed channel") + } + } + // update state + w.onlineChecker.SetOnline(isOnline) + } + w.state = state } // seedBootnodesForDiscV5 tries to fetch bootnodes diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index ebf9b2577..8c30487ac 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -657,6 +657,8 @@ func TestConfirmMessageDelivered(t *testing.T) { func TestOnlineChecker(t *testing.T) { w, err := New(nil, "shards.staging", nil, nil, nil, nil, nil, nil) + require.NoError(t, w.Start()) + require.NoError(t, err) require.False(t, w.onlineChecker.IsOnline())