diff --git a/go.mod b/go.mod index e76591004..e7c5283a3 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d + github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index c7d0c32c4..0f7945aac 100644 --- a/go.sum +++ b/go.sum @@ -2137,8 +2137,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d h1:YVTBJpd6vZZzu8X0515bK0D21fgGcrAlYZelgbIdBD4= -github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5 h1:r5kgO4DWxeKyGF+wq5KhayW710XAqX5iWXhS/4ZqVkc= +github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/utils.go new file mode 100644 index 000000000..506d68140 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/utils.go @@ -0,0 +1,5 @@ +package common + +import "time" + +const DefaultStoreQueryTimeout = 30 * time.Second diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index 6de6e7600..bafde783b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -37,7 +37,7 @@ type MissingMessageVerifier struct { messageTracker MessageTracker criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages - criteriaInterestMu sync.Mutex + criteriaInterestMu sync.RWMutex C <-chan *protocol.Envelope @@ -110,8 +110,13 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { select { case <-t.C: m.logger.Debug("checking for missing messages...") - m.criteriaInterestMu.Lock() - for _, interest := range m.criteriaInterest { + m.criteriaInterestMu.RLock() + critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest)) + for _, value := range m.criteriaInterest { + critIntList = append(critIntList, value) + } + m.criteriaInterestMu.RUnlock() + for _, interest := range critIntList { select { case <-ctx.Done(): return @@ -123,7 +128,6 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { }(interest) } } - m.criteriaInterestMu.Unlock() case <-ctx.Done(): return @@ -155,8 +159,8 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter } m.criteriaInterestMu.Lock() - c := m.criteriaInterest[interest.contentFilter.PubsubTopic] - if c.equals(interest) { + c, ok := m.criteriaInterest[interest.contentFilter.PubsubTopic] + if ok && c.equals(interest) { c.lastChecked = now m.criteriaInterest[interest.contentFilter.PubsubTopic] = c } @@ -261,7 +265,9 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, defer wg.Wait() result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { - return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) + queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) + defer cancel() + return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) }, logger, "retrieving missing messages") if err != nil { if !errors.Is(err, context.Canceled) { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/options.go index b16abbc70..ebd53e718 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/options.go @@ -1,11 +1,16 @@ package missing -import "time" +import ( + "time" + + "github.com/waku-org/go-waku/waku/v2/api/common" +) type missingMessageVerifierParams struct { delay time.Duration interval time.Duration maxAttemptsToRetrieveHistory int + storeQueryTimeout time.Duration } // MissingMessageVerifierOption is an option that can be used to customize the MissingMessageVerifier behavior @@ -32,8 +37,16 @@ func WithMaxRetryAttempts(max int) MissingMessageVerifierOption { } } +// WithStoreQueryTimeout sets the timeout for store query +func WithStoreQueryTimeout(timeout time.Duration) MissingMessageVerifierOption { + return func(params *missingMessageVerifierParams) { + params.storeQueryTimeout = timeout + } +} + var defaultMissingMessagesVerifierOptions = []MissingMessageVerifierOption{ WithVerificationInterval(time.Minute), WithDelay(20 * time.Second), WithMaxRetryAttempts(3), + WithStoreQueryTimeout(common.DefaultStoreQueryTimeout), } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go index be22abaac..67a67c913 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p/core/peer" + apicommon "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" @@ -47,6 +48,7 @@ type MessageSentCheck struct { hashQueryInterval time.Duration messageSentPeriod uint32 messageExpiredPerid uint32 + storeQueryTimeout time.Duration } // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters @@ -64,6 +66,7 @@ func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource hashQueryInterval: DefaultHashQueryInterval, messageSentPeriod: DefaultMessageSentPeriod, messageExpiredPerid: DefaultMessageExpiredPerid, + storeQueryTimeout: apicommon.DefaultStoreQueryTimeout, } } @@ -99,6 +102,14 @@ func WithMessageExpiredPerid(period uint32) MessageSentCheckOption { } } +// WithStoreQueryTimeout sets the timeout for store query +func WithStoreQueryTimeout(timeout time.Duration) MessageSentCheckOption { + return func(params *MessageSentCheck) error { + params.storeQueryTimeout = timeout + return nil + } +} + // Add adds a message for message sent check func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32) { m.messageIDsMu.Lock() @@ -218,7 +229,9 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes)) - result, err := m.store.QueryByHash(ctx, messageHashes, opts...) + queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout) + defer cancel() + result, err := m.store.QueryByHash(queryCtx, messageHashes, opts...) if err != nil { m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) return []common.Hash{} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 747109ffb..f9dc443fc 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -416,9 +416,9 @@ func (w *WakuNode) Start(ctx context.Context) error { if err != nil { return err } - w.peermanager.Start(ctx) w.registerAndMonitorReachability(ctx) } + w.peermanager.Start(ctx) w.legacyStore = w.storeFactory(w) w.legacyStore.SetHost(host) @@ -752,7 +752,7 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { - w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info) + w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID) return err } @@ -770,7 +770,7 @@ func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { } } - w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info) + w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info.ID) w.metrics.RecordDial() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go index ca58c4b5c..bd844b20c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go @@ -279,8 +279,9 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { err := c.host.Connect(ctx, pi) if err != nil && !errors.Is(err, context.Canceled) { c.addConnectionBackoff(pi.ID) - c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi) + c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID) c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) } + c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) <-sem } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index 1441f7f48..2ac489a04 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -101,6 +101,8 @@ const ( // some protocol var ErrNoPeersAvailable = errors.New("no suitable peers found") +const maxFailedAttempts = 5 +const prunePeerStoreInterval = 10 * time.Minute const peerConnectivityLoopSecs = 15 const maxConnsToPeerRatio = 5 @@ -123,6 +125,10 @@ func inAndOutRelayPeers(relayPeers int) (int, int) { // checkAndUpdateTopicHealth finds health of specified topic and updates and notifies of the same. // Also returns the healthyPeerCount func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { + if topic == nil { + return 0 + } + healthyPeerCount := 0 for _, p := range pm.relay.PubSub().MeshPeers(topic.topic.String()) { @@ -234,13 +240,115 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { - pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) - pm.ctx = ctx - if pm.sub != nil && pm.RelayEnabled { - go pm.peerEventLoop(ctx) + if pm.RelayEnabled { + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + if pm.sub != nil { + go pm.peerEventLoop(ctx) + } + go pm.connectivityLoop(ctx) } - go pm.connectivityLoop(ctx) + go pm.peerStoreLoop(ctx) +} + +func (pm *PeerManager) peerStoreLoop(ctx context.Context) { + t := time.NewTicker(prunePeerStoreInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + pm.prunePeerStore() + } + } +} + +func (pm *PeerManager) prunePeerStore() { + peers := pm.host.Peerstore().Peers() + numPeers := len(peers) + if numPeers < pm.maxPeers { + pm.logger.Debug("peerstore size within capacity, not pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", numPeers)) + return + } + peerCntBeforePruning := numPeers + pm.logger.Debug("peerstore capacity exceeded, hence pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", peerCntBeforePruning)) + + for _, peerID := range peers { + connFailues := pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) + if connFailues > maxFailedAttempts { + // safety check so that we don't end up disconnecting connected peers. + if pm.host.Network().Connectedness(peerID) == network.Connected { + pm.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(peerID) + continue + } + pm.host.Peerstore().RemovePeer(peerID) + numPeers-- + } + if numPeers < pm.maxPeers { + pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers)) + return + } + } + + notConnectedPeers := pm.getPeersBasedOnconnectionStatus("", network.NotConnected) + peersByTopic := make(map[string]peer.IDSlice) + var prunedPeers peer.IDSlice + + //prune not connected peers without shard + for _, peerID := range notConnectedPeers { + topics, err := pm.host.Peerstore().(wps.WakuPeerstore).PubSubTopics(peerID) + //Prune peers without pubsubtopics. + if err != nil || len(topics) == 0 { + if err != nil { + pm.logger.Error("pruning:failed to fetch pubsub topics", zap.Error(err), zap.Stringer("peer", peerID)) + } + prunedPeers = append(prunedPeers, peerID) + pm.host.Peerstore().RemovePeer(peerID) + numPeers-- + } else { + prunedPeers = append(prunedPeers, peerID) + for topic := range topics { + peersByTopic[topic] = append(peersByTopic[topic], peerID) + } + } + if numPeers < pm.maxPeers { + pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers)) + return + } + } + pm.logger.Debug("pruned notconnected peers", zap.Stringers("prunedPeers", prunedPeers)) + + // calculate the avg peers per shard + total, maxPeerCnt := 0, 0 + for _, peersInTopic := range peersByTopic { + peerLen := len(peersInTopic) + total += peerLen + if peerLen > maxPeerCnt { + maxPeerCnt = peerLen + } + } + avgPerTopic := min(1, total/maxPeerCnt) + // prune peers from shard with higher than avg count + + for topic, peers := range peersByTopic { + count := max(len(peers)-avgPerTopic, 0) + var prunedPeers peer.IDSlice + for i, pID := range peers { + if i > count { + break + } + prunedPeers = append(prunedPeers, pID) + pm.host.Peerstore().RemovePeer(pID) + numPeers-- + if numPeers < pm.maxPeers { + pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers)) + return + } + } + pm.logger.Debug("pruned peers higher than average", zap.Stringers("prunedPeers", prunedPeers), zap.String("topic", topic)) + } + pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers)) } // This is a connectivity loop, which currently checks and prunes inbound connections. @@ -444,11 +552,6 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { // AddDiscoveredPeer to add dynamically discovered peers. // Note that these peers will not be set in service-slots. func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { - //Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes. - if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { - return - } - //Check if the peer is already present, if so skip adding _, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID) if err == nil { @@ -503,10 +606,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { // addPeer adds peer to the peerStore. // It also sets additional metadata such as origin and supported protocols func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { - if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { - pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers)) - return errors.New("peer store capacity reached") - } + pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID)) if origin == wps.Static { pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peerstore/waku_peer_store.go b/vendor/github.com/waku-org/go-waku/waku/v2/peerstore/waku_peer_store.go index 203b23103..af42a28ad 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peerstore/waku_peer_store.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peerstore/waku_peer_store.go @@ -51,9 +51,9 @@ type WakuPeerstore interface { PeersByOrigin(origin Origin) peer.IDSlice SetENR(p peer.ID, enr *enode.Node) error ENR(p peer.ID) (*enode.Node, error) - AddConnFailure(p peer.AddrInfo) - ResetConnFailures(p peer.AddrInfo) - ConnFailures(p peer.AddrInfo) int + AddConnFailure(pID peer.ID) + ResetConnFailures(pID peer.ID) + ConnFailures(pID peer.ID) int SetDirection(p peer.ID, direction network.Direction) error Direction(p peer.ID) (network.Direction, error) @@ -136,24 +136,24 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID) (*enode.Node, error) { } // AddConnFailure increments connectionFailures for a peer -func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) { +func (ps *WakuPeerstoreImpl) AddConnFailure(pID peer.ID) { ps.connFailures.Lock() defer ps.connFailures.Unlock() - ps.connFailures.failures[p.ID]++ + ps.connFailures.failures[pID]++ } // ResetConnFailures resets connectionFailures for a peer to 0 -func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) { +func (ps *WakuPeerstoreImpl) ResetConnFailures(pID peer.ID) { ps.connFailures.Lock() defer ps.connFailures.Unlock() - ps.connFailures.failures[p.ID] = 0 + ps.connFailures.failures[pID] = 0 } // ConnFailures fetches connectionFailures for a peer -func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int { +func (ps *WakuPeerstoreImpl) ConnFailures(pID peer.ID) int { ps.connFailures.RLock() defer ps.connFailures.RUnlock() - return ps.connFailures.failures[p.ID] + return ps.connFailures.failures[pID] } // SetDirection sets connection direction for a specific peer. diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index c52c90981..a9d2b496d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -246,7 +246,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, if err != nil { wf.metrics.RecordError(dialFailure) if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + ps.AddConnFailure(peerID) } return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go index 3d898f89f..9a2e25d6b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go @@ -275,7 +275,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge } else { wf.metrics.RecordError(dialFailure) if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + ps.AddConnFailure(peerID) } } logger.Error("opening peer stream", zap.Error(err)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go index 456dada54..03f7c9b21 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go @@ -208,7 +208,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + ps.AddConnFailure(selectedPeer) } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index c0a72c2ed..8200fddfc 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -198,7 +198,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + ps.AddConnFailure(peerID) } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go index 93a70a2bf..47cf088dc 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go @@ -105,7 +105,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak if err != nil { logger.Error("creating stream to peer", zap.Error(err)) if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + ps.AddConnFailure(peerID) } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go index 6baf30950..8075e2de8 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go @@ -77,7 +77,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer}) + ps.AddConnFailure(params.selectedPeer) } return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go index f2fd823ba..936b37c58 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -459,37 +459,28 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co defer w.topicsMutex.Unlock() for pubSubTopic, cTopics := range pubSubTopicMap { - cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...) pubsubUnsubscribe := false - sub, ok := w.topics[pubSubTopic] + topicData, ok := w.topics[pubSubTopic] if !ok { w.log.Error("not subscribed to topic", zap.String("topic", pubSubTopic)) return errors.New("not subscribed to topic") } - topicData, ok := w.topics[pubSubTopic] - if ok { - //Remove relevant subscription - for subID, sub := range topicData.contentSubs { - if sub.contentFilter.Equals(cfTemp) { - sub.Unsubscribe() - delete(topicData.contentSubs, subID) - } + cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...) + //Remove relevant subscription + for subID, sub := range topicData.contentSubs { + if sub.contentFilter.Equals(cfTemp) { + sub.Unsubscribe() + delete(topicData.contentSubs, subID) } + } - if len(topicData.contentSubs) == 0 { - pubsubUnsubscribe = true - } - } else { - //Should not land here ideally - w.log.Error("pubsub subscriptions exists, but contentSubscription doesn't for contentFilter", - zap.String("pubsubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics)) - - return errors.New("unexpected error in unsubscribe") + if len(topicData.contentSubs) == 0 { + pubsubUnsubscribe = true } if pubsubUnsubscribe { - err = w.unsubscribeFromPubsubTopic(sub) + err = w.unsubscribeFromPubsubTopic(topicData) if err != nil { return err } @@ -502,6 +493,9 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co // unsubscribeFromPubsubTopic unsubscribes subscription from underlying pubsub. // Note: caller has to acquire topicsMutex in order to avoid race conditions func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptionDetails) error { + if topicData.subscription == nil { + return nil + } pubSubTopic := topicData.subscription.Topic() w.log.Info("unsubscribing from pubsubTopic", zap.String("topic", pubSubTopic)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 5cda4eef2..92c47ff4c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -254,7 +254,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe if err != nil { logger.Error("creating stream to peer", zap.Error(err)) if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + ps.AddConnFailure(selectedPeer) } return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 002e7de00..31d2a18cd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1010,11 +1010,12 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d +# github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/waku/persistence +github.com/waku-org/go-waku/waku/v2/api/common github.com/waku-org/go-waku/waku/v2/api/filter github.com/waku-org/go-waku/waku/v2/api/missing github.com/waku-org/go-waku/waku/v2/api/publish diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 6d5e66936..cb82706b6 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1850,7 +1850,7 @@ func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int { p := make(map[string]int) for _, peerID := range wakuNode.Host().Network().Peers() { peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) - connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo) + connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo.ID) if connFailures > 0 { p[peerID.String()] = connFailures }