diff --git a/go.mod b/go.mod index d98470327..5d5c4f80d 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d + github.com/waku-org/go-waku v0.8.1-0.20240806174951-6e1ff9818815 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index d95538e02..f1d6359fe 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d h1:thwE3nxBaINnQllutuZdMdyLMUZLKwX7TRufs1IrlQc= -github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= +github.com/waku-org/go-waku v0.8.1-0.20240806174951-6e1ff9818815 h1:arQLN7mhSJNBJLhnxiV9WqMEC5rYZZS7oHnuYBPEct8= +github.com/waku-org/go-waku v0.8.1-0.20240806174951-6e1ff9818815/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go index 74ba35ced..9de6c59fd 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go @@ -382,7 +382,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error { } if len(rs) == 1 { - w.log.Info("updating advertised relay shards in ENR") + w.log.Info("updating advertised relay shards in ENR", zap.Any("newShardInfo", rs[0])) if len(rs[0].ShardIDs) != len(topics) { w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0])) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go index ae18907c0..8ab1c8beb 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go @@ -112,7 +112,7 @@ func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto for _, shardInfo := range shardsInfo { err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount) if err != nil { - pm.logger.Error("failed to discover and connect to peers", zap.Error(err)) + pm.logger.Warn("failed to discover and connect to peers", zap.Error(err)) } } } else { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index 3126f9142..1441f7f48 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -309,17 +309,15 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { defer pm.topicMutex.RUnlock() for topicStr, topicInst := range pm.subRelayTopics { - // @cammellos reported that ListPeers returned an invalid number of - // peers. This will ensure that the peers returned by this function - // match those peers that are currently connected + meshPeerLen := pm.checkAndUpdateTopicHealth(topicInst) + curConnectedPeerLen := pm.getPeersBasedOnconnectionStatus(topicStr, network.Connected).Len() - curPeerLen := pm.checkAndUpdateTopicHealth(topicInst) - if curPeerLen < pm.OutPeersTarget { + if meshPeerLen < waku_proto.GossipSubDMin || curConnectedPeerLen < pm.OutPeersTarget { pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh", - zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), + zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curConnectedPeerLen), zap.Int("targetPeers", pm.OutPeersTarget)) //Find not connected peers. - notConnectedPeers := pm.getNotConnectedPers(topicStr) + notConnectedPeers := pm.getPeersBasedOnconnectionStatus(topicStr, network.NotConnected) if notConnectedPeers.Len() == 0 { pm.logger.Debug("could not find any peers in peerstore to connect to, discovering more", zap.String("pubSubTopic", topicStr)) go pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2) @@ -327,12 +325,13 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { } pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) //Connect to eligible peers. - numPeersToConnect := pm.OutPeersTarget - curPeerLen - - if numPeersToConnect > notConnectedPeers.Len() { - numPeersToConnect = notConnectedPeers.Len() + numPeersToConnect := pm.OutPeersTarget - curConnectedPeerLen + if numPeersToConnect > 0 { + if numPeersToConnect > notConnectedPeers.Len() { + numPeersToConnect = notConnectedPeers.Len() + } + pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect]) } - pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect]) } } } @@ -372,8 +371,8 @@ func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) { } } -// getNotConnectedPers returns peers for a pubSubTopic that are not connected. -func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) { +// getPeersBasedOnconnectionStatus returns peers for a pubSubTopic that are either connected/not-connected based on status passed. +func (pm *PeerManager) getPeersBasedOnconnectionStatus(pubsubTopic string, connected network.Connectedness) (filteredPeers peer.IDSlice) { var peerList peer.IDSlice if pubsubTopic == "" { peerList = pm.host.Peerstore().Peers() @@ -381,8 +380,8 @@ func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeer peerList = pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic) } for _, peerID := range peerList { - if pm.host.Network().Connectedness(peerID) != network.Connected { - notConnectedPeers = append(notConnectedPeers, peerID) + if pm.host.Network().Connectedness(peerID) == connected { + filteredPeers = append(filteredPeers, peerID) } } return diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 3342dc299..5649a3c96 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -242,7 +242,7 @@ func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID, } func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, - reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peer peer.ID) error { + reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peerID peer.ID) error { request := &pb.FilterSubscribeRequest{ RequestId: hex.EncodeToString(requestID), FilterSubscribeType: reqType, @@ -255,11 +255,14 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, return err } - logger := wf.log.With(logging.HostID("peerID", peer)) + logger := wf.log.With(logging.HostID("peerID", peerID)) - stream, err := wf.h.NewStream(ctx, peer, FilterSubscribeID_v20beta1) + stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) + if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go index 2bf63bb50..3d898f89f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -273,6 +274,9 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge wf.metrics.RecordError(pushTimeoutFailure) } else { wf.metrics.RecordError(dialFailure) + if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } } logger.Error("opening peer stream", zap.Error(err)) return err diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go index b02cd92e3..456dada54 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go @@ -207,6 +207,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor if err != nil { logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) + if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index de7bce8b4..10028fdde 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -188,14 +188,17 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu } // request sends a message via lightPush protocol to either a specified peer or peer that is selected. -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) { +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peerID peer.ID) (*pb.PushResponse, error) { - logger := wakuLP.log.With(logging.HostID("peer", peer)) + logger := wakuLP.log.With(logging.HostID("peer", peerID)) - stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1) + stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) + if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return nil, err } pushRequestRPC := &pb.PushRpc{RequestId: hex.EncodeToString(params.requestID), Request: req} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go index 23a7e455a..93a70a2bf 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb" @@ -103,6 +104,9 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak stream, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) + if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return nil, err } @@ -335,7 +339,7 @@ func (wakuM *WakuMetadata) DisconnectPeerOnShardMismatch(ctx context.Context, pe return err } - if !rs.ContainsAnyShard(rs.ClusterID, peerShards) { + if rs != nil && !rs.ContainsAnyShard(rs.ClusterID, peerShards) { wakuM.log.Info("shard mismatch", logging.HostID("peerID", peerID), zap.Uint16("clusterID", rs.ClusterID), zap.Uint16s("ourShardIDs", rs.ShardIDs), zap.Uint16s("theirShardIDs", peerShards)) wakuM.disconnect(peerID) return errors.New("shard mismatch") diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go index 915ce75fd..6baf30950 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go @@ -76,6 +76,9 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { + if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer}) + } return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go index 5f103e12c..08e5051ca 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go @@ -98,7 +98,7 @@ func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) { if wakuPX.limiter != nil && !wakuPX.limiter.Allow() { wakuPX.metrics.RecordError(rateLimitFailure) - wakuPX.log.Error("exceeds the rate limit") + wakuPX.log.Info("exceeds the rate limit") // TODO: peer exchange protocol should contain an err field if err := stream.Reset(); err != nil { wakuPX.log.Error("resetting connection", zap.Error(err)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 34a08bcf4..5cda4eef2 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -253,6 +253,9 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) + if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + } return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index e857de8a1..c8c8be231 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d +# github.com/waku-org/go-waku v0.8.1-0.20240806174951-6e1ff9818815 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests