chore_: upgrade go-waku (#5763)

* chore_: bump go-waku

* chore_: fix go-waku compatibility
This commit is contained in:
Igor Sirotin 2024-08-23 16:44:53 +01:00 committed by GitHub
parent 8cf4feec1e
commit edead41fa6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 201 additions and 68 deletions

2
go.mod
View File

@ -95,7 +95,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2137,8 +2137,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d h1:YVTBJpd6vZZzu8X0515bK0D21fgGcrAlYZelgbIdBD4= github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5 h1:r5kgO4DWxeKyGF+wq5KhayW710XAqX5iWXhS/4ZqVkc=
github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -0,0 +1,5 @@
package common
import "time"
const DefaultStoreQueryTimeout = 30 * time.Second

View File

@ -37,7 +37,7 @@ type MissingMessageVerifier struct {
messageTracker MessageTracker messageTracker MessageTracker
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterestMu sync.Mutex criteriaInterestMu sync.RWMutex
C <-chan *protocol.Envelope C <-chan *protocol.Envelope
@ -110,8 +110,13 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
select { select {
case <-t.C: case <-t.C:
m.logger.Debug("checking for missing messages...") m.logger.Debug("checking for missing messages...")
m.criteriaInterestMu.Lock() m.criteriaInterestMu.RLock()
for _, interest := range m.criteriaInterest { critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest))
for _, value := range m.criteriaInterest {
critIntList = append(critIntList, value)
}
m.criteriaInterestMu.RUnlock()
for _, interest := range critIntList {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -123,7 +128,6 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
}(interest) }(interest)
} }
} }
m.criteriaInterestMu.Unlock()
case <-ctx.Done(): case <-ctx.Done():
return return
@ -155,8 +159,8 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
} }
m.criteriaInterestMu.Lock() m.criteriaInterestMu.Lock()
c := m.criteriaInterest[interest.contentFilter.PubsubTopic] c, ok := m.criteriaInterest[interest.contentFilter.PubsubTopic]
if c.equals(interest) { if ok && c.equals(interest) {
c.lastChecked = now c.lastChecked = now
m.criteriaInterest[interest.contentFilter.PubsubTopic] = c m.criteriaInterest[interest.contentFilter.PubsubTopic] = c
} }
@ -261,7 +265,9 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
defer wg.Wait() defer wg.Wait()
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
defer cancel()
return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest))
}, logger, "retrieving missing messages") }, logger, "retrieving missing messages")
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) { if !errors.Is(err, context.Canceled) {

View File

@ -1,11 +1,16 @@
package missing package missing
import "time" import (
"time"
"github.com/waku-org/go-waku/waku/v2/api/common"
)
type missingMessageVerifierParams struct { type missingMessageVerifierParams struct {
delay time.Duration delay time.Duration
interval time.Duration interval time.Duration
maxAttemptsToRetrieveHistory int maxAttemptsToRetrieveHistory int
storeQueryTimeout time.Duration
} }
// MissingMessageVerifierOption is an option that can be used to customize the MissingMessageVerifier behavior // MissingMessageVerifierOption is an option that can be used to customize the MissingMessageVerifier behavior
@ -32,8 +37,16 @@ func WithMaxRetryAttempts(max int) MissingMessageVerifierOption {
} }
} }
// WithStoreQueryTimeout sets the timeout for store query
func WithStoreQueryTimeout(timeout time.Duration) MissingMessageVerifierOption {
return func(params *missingMessageVerifierParams) {
params.storeQueryTimeout = timeout
}
}
var defaultMissingMessagesVerifierOptions = []MissingMessageVerifierOption{ var defaultMissingMessagesVerifierOptions = []MissingMessageVerifierOption{
WithVerificationInterval(time.Minute), WithVerificationInterval(time.Minute),
WithDelay(20 * time.Second), WithDelay(20 * time.Second),
WithMaxRetryAttempts(3), WithMaxRetryAttempts(3),
WithStoreQueryTimeout(common.DefaultStoreQueryTimeout),
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
apicommon "github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
@ -47,6 +48,7 @@ type MessageSentCheck struct {
hashQueryInterval time.Duration hashQueryInterval time.Duration
messageSentPeriod uint32 messageSentPeriod uint32
messageExpiredPerid uint32 messageExpiredPerid uint32
storeQueryTimeout time.Duration
} }
// NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters
@ -64,6 +66,7 @@ func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource
hashQueryInterval: DefaultHashQueryInterval, hashQueryInterval: DefaultHashQueryInterval,
messageSentPeriod: DefaultMessageSentPeriod, messageSentPeriod: DefaultMessageSentPeriod,
messageExpiredPerid: DefaultMessageExpiredPerid, messageExpiredPerid: DefaultMessageExpiredPerid,
storeQueryTimeout: apicommon.DefaultStoreQueryTimeout,
} }
} }
@ -99,6 +102,14 @@ func WithMessageExpiredPerid(period uint32) MessageSentCheckOption {
} }
} }
// WithStoreQueryTimeout sets the timeout for store query
func WithStoreQueryTimeout(timeout time.Duration) MessageSentCheckOption {
return func(params *MessageSentCheck) error {
params.storeQueryTimeout = timeout
return nil
}
}
// Add adds a message for message sent check // Add adds a message for message sent check
func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32) { func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32) {
m.messageIDsMu.Lock() m.messageIDsMu.Lock()
@ -218,7 +229,9 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes)) m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
result, err := m.store.QueryByHash(ctx, messageHashes, opts...) queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
defer cancel()
result, err := m.store.QueryByHash(queryCtx, messageHashes, opts...)
if err != nil { if err != nil {
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
return []common.Hash{} return []common.Hash{}

View File

@ -416,9 +416,9 @@ func (w *WakuNode) Start(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
w.peermanager.Start(ctx)
w.registerAndMonitorReachability(ctx) w.registerAndMonitorReachability(ctx)
} }
w.peermanager.Start(ctx)
w.legacyStore = w.storeFactory(w) w.legacyStore = w.storeFactory(w)
w.legacyStore.SetHost(host) w.legacyStore.SetHost(host)
@ -752,7 +752,7 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
err := w.host.Connect(ctx, info) err := w.host.Connect(ctx, info)
if err != nil { if err != nil {
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info) w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID)
return err return err
} }
@ -770,7 +770,7 @@ func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
} }
} }
w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info) w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info.ID)
w.metrics.RecordDial() w.metrics.RecordDial()

View File

@ -279,8 +279,9 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
err := c.host.Connect(ctx, pi) err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, context.Canceled) {
c.addConnectionBackoff(pi.ID) c.addConnectionBackoff(pi.ID)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi) c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID)
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
} }
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
<-sem <-sem
} }

View File

@ -101,6 +101,8 @@ const (
// some protocol // some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found") var ErrNoPeersAvailable = errors.New("no suitable peers found")
const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15 const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 5 const maxConnsToPeerRatio = 5
@ -123,6 +125,10 @@ func inAndOutRelayPeers(relayPeers int) (int, int) {
// checkAndUpdateTopicHealth finds health of specified topic and updates and notifies of the same. // checkAndUpdateTopicHealth finds health of specified topic and updates and notifies of the same.
// Also returns the healthyPeerCount // Also returns the healthyPeerCount
func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
if topic == nil {
return 0
}
healthyPeerCount := 0 healthyPeerCount := 0
for _, p := range pm.relay.PubSub().MeshPeers(topic.topic.String()) { for _, p := range pm.relay.PubSub().MeshPeers(topic.topic.String()) {
@ -234,13 +240,115 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {
// Start starts the processing to be done by peer manager. // Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) { func (pm *PeerManager) Start(ctx context.Context) {
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
pm.ctx = ctx pm.ctx = ctx
if pm.sub != nil && pm.RelayEnabled { if pm.RelayEnabled {
go pm.peerEventLoop(ctx) pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
if pm.sub != nil {
go pm.peerEventLoop(ctx)
}
go pm.connectivityLoop(ctx)
} }
go pm.connectivityLoop(ctx) go pm.peerStoreLoop(ctx)
}
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
t := time.NewTicker(prunePeerStoreInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
}
}
}
func (pm *PeerManager) prunePeerStore() {
peers := pm.host.Peerstore().Peers()
numPeers := len(peers)
if numPeers < pm.maxPeers {
pm.logger.Debug("peerstore size within capacity, not pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", numPeers))
return
}
peerCntBeforePruning := numPeers
pm.logger.Debug("peerstore capacity exceeded, hence pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", peerCntBeforePruning))
for _, peerID := range peers {
connFailues := pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID)
if connFailues > maxFailedAttempts {
// safety check so that we don't end up disconnecting connected peers.
if pm.host.Network().Connectedness(peerID) == network.Connected {
pm.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(peerID)
continue
}
pm.host.Peerstore().RemovePeer(peerID)
numPeers--
}
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
return
}
}
notConnectedPeers := pm.getPeersBasedOnconnectionStatus("", network.NotConnected)
peersByTopic := make(map[string]peer.IDSlice)
var prunedPeers peer.IDSlice
//prune not connected peers without shard
for _, peerID := range notConnectedPeers {
topics, err := pm.host.Peerstore().(wps.WakuPeerstore).PubSubTopics(peerID)
//Prune peers without pubsubtopics.
if err != nil || len(topics) == 0 {
if err != nil {
pm.logger.Error("pruning:failed to fetch pubsub topics", zap.Error(err), zap.Stringer("peer", peerID))
}
prunedPeers = append(prunedPeers, peerID)
pm.host.Peerstore().RemovePeer(peerID)
numPeers--
} else {
prunedPeers = append(prunedPeers, peerID)
for topic := range topics {
peersByTopic[topic] = append(peersByTopic[topic], peerID)
}
}
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers))
return
}
}
pm.logger.Debug("pruned notconnected peers", zap.Stringers("prunedPeers", prunedPeers))
// calculate the avg peers per shard
total, maxPeerCnt := 0, 0
for _, peersInTopic := range peersByTopic {
peerLen := len(peersInTopic)
total += peerLen
if peerLen > maxPeerCnt {
maxPeerCnt = peerLen
}
}
avgPerTopic := min(1, total/maxPeerCnt)
// prune peers from shard with higher than avg count
for topic, peers := range peersByTopic {
count := max(len(peers)-avgPerTopic, 0)
var prunedPeers peer.IDSlice
for i, pID := range peers {
if i > count {
break
}
prunedPeers = append(prunedPeers, pID)
pm.host.Peerstore().RemovePeer(pID)
numPeers--
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers))
return
}
}
pm.logger.Debug("pruned peers higher than average", zap.Stringers("prunedPeers", prunedPeers), zap.String("topic", topic))
}
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
} }
// This is a connectivity loop, which currently checks and prunes inbound connections. // This is a connectivity loop, which currently checks and prunes inbound connections.
@ -444,11 +552,6 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
// AddDiscoveredPeer to add dynamically discovered peers. // AddDiscoveredPeer to add dynamically discovered peers.
// Note that these peers will not be set in service-slots. // Note that these peers will not be set in service-slots.
func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
//Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes.
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
return
}
//Check if the peer is already present, if so skip adding //Check if the peer is already present, if so skip adding
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID) _, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
if err == nil { if err == nil {
@ -503,10 +606,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
// addPeer adds peer to the peerStore. // addPeer adds peer to the peerStore.
// It also sets additional metadata such as origin and supported protocols // It also sets additional metadata such as origin and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers))
return errors.New("peer store capacity reached")
}
pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID)) pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID))
if origin == wps.Static { if origin == wps.Static {
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL) pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)

View File

@ -51,9 +51,9 @@ type WakuPeerstore interface {
PeersByOrigin(origin Origin) peer.IDSlice PeersByOrigin(origin Origin) peer.IDSlice
SetENR(p peer.ID, enr *enode.Node) error SetENR(p peer.ID, enr *enode.Node) error
ENR(p peer.ID) (*enode.Node, error) ENR(p peer.ID) (*enode.Node, error)
AddConnFailure(p peer.AddrInfo) AddConnFailure(pID peer.ID)
ResetConnFailures(p peer.AddrInfo) ResetConnFailures(pID peer.ID)
ConnFailures(p peer.AddrInfo) int ConnFailures(pID peer.ID) int
SetDirection(p peer.ID, direction network.Direction) error SetDirection(p peer.ID, direction network.Direction) error
Direction(p peer.ID) (network.Direction, error) Direction(p peer.ID) (network.Direction, error)
@ -136,24 +136,24 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID) (*enode.Node, error) {
} }
// AddConnFailure increments connectionFailures for a peer // AddConnFailure increments connectionFailures for a peer
func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) { func (ps *WakuPeerstoreImpl) AddConnFailure(pID peer.ID) {
ps.connFailures.Lock() ps.connFailures.Lock()
defer ps.connFailures.Unlock() defer ps.connFailures.Unlock()
ps.connFailures.failures[p.ID]++ ps.connFailures.failures[pID]++
} }
// ResetConnFailures resets connectionFailures for a peer to 0 // ResetConnFailures resets connectionFailures for a peer to 0
func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) { func (ps *WakuPeerstoreImpl) ResetConnFailures(pID peer.ID) {
ps.connFailures.Lock() ps.connFailures.Lock()
defer ps.connFailures.Unlock() defer ps.connFailures.Unlock()
ps.connFailures.failures[p.ID] = 0 ps.connFailures.failures[pID] = 0
} }
// ConnFailures fetches connectionFailures for a peer // ConnFailures fetches connectionFailures for a peer
func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int { func (ps *WakuPeerstoreImpl) ConnFailures(pID peer.ID) int {
ps.connFailures.RLock() ps.connFailures.RLock()
defer ps.connFailures.RUnlock() defer ps.connFailures.RUnlock()
return ps.connFailures.failures[p.ID] return ps.connFailures.failures[pID]
} }
// SetDirection sets connection direction for a specific peer. // SetDirection sets connection direction for a specific peer.

View File

@ -246,7 +246,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
if err != nil { if err != nil {
wf.metrics.RecordError(dialFailure) wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID}) ps.AddConnFailure(peerID)
} }
return err return err
} }

View File

@ -275,7 +275,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
} else { } else {
wf.metrics.RecordError(dialFailure) wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID}) ps.AddConnFailure(peerID)
} }
} }
logger.Error("opening peer stream", zap.Error(err)) logger.Error("opening peer stream", zap.Error(err))

View File

@ -208,7 +208,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
store.metrics.RecordError(dialFailure) store.metrics.RecordError(dialFailure)
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) ps.AddConnFailure(selectedPeer)
} }
return nil, err return nil, err
} }

View File

@ -198,7 +198,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure) wakuLP.metrics.RecordError(dialFailure)
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID}) ps.AddConnFailure(peerID)
} }
return nil, err return nil, err
} }

View File

@ -105,7 +105,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok { if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID}) ps.AddConnFailure(peerID)
} }
return nil, err return nil, err
} }

View File

@ -77,7 +77,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
if err != nil { if err != nil {
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer}) ps.AddConnFailure(params.selectedPeer)
} }
return err return err
} }

View File

@ -459,37 +459,28 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
defer w.topicsMutex.Unlock() defer w.topicsMutex.Unlock()
for pubSubTopic, cTopics := range pubSubTopicMap { for pubSubTopic, cTopics := range pubSubTopicMap {
cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...)
pubsubUnsubscribe := false pubsubUnsubscribe := false
sub, ok := w.topics[pubSubTopic] topicData, ok := w.topics[pubSubTopic]
if !ok { if !ok {
w.log.Error("not subscribed to topic", zap.String("topic", pubSubTopic)) w.log.Error("not subscribed to topic", zap.String("topic", pubSubTopic))
return errors.New("not subscribed to topic") return errors.New("not subscribed to topic")
} }
topicData, ok := w.topics[pubSubTopic] cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...)
if ok { //Remove relevant subscription
//Remove relevant subscription for subID, sub := range topicData.contentSubs {
for subID, sub := range topicData.contentSubs { if sub.contentFilter.Equals(cfTemp) {
if sub.contentFilter.Equals(cfTemp) { sub.Unsubscribe()
sub.Unsubscribe() delete(topicData.contentSubs, subID)
delete(topicData.contentSubs, subID)
}
} }
}
if len(topicData.contentSubs) == 0 { if len(topicData.contentSubs) == 0 {
pubsubUnsubscribe = true pubsubUnsubscribe = true
}
} else {
//Should not land here ideally
w.log.Error("pubsub subscriptions exists, but contentSubscription doesn't for contentFilter",
zap.String("pubsubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics))
return errors.New("unexpected error in unsubscribe")
} }
if pubsubUnsubscribe { if pubsubUnsubscribe {
err = w.unsubscribeFromPubsubTopic(sub) err = w.unsubscribeFromPubsubTopic(topicData)
if err != nil { if err != nil {
return err return err
} }
@ -502,6 +493,9 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
// unsubscribeFromPubsubTopic unsubscribes subscription from underlying pubsub. // unsubscribeFromPubsubTopic unsubscribes subscription from underlying pubsub.
// Note: caller has to acquire topicsMutex in order to avoid race conditions // Note: caller has to acquire topicsMutex in order to avoid race conditions
func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptionDetails) error { func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptionDetails) error {
if topicData.subscription == nil {
return nil
}
pubSubTopic := topicData.subscription.Topic() pubSubTopic := topicData.subscription.Topic()
w.log.Info("unsubscribing from pubsubTopic", zap.String("topic", pubSubTopic)) w.log.Info("unsubscribing from pubsubTopic", zap.String("topic", pubSubTopic))

View File

@ -254,7 +254,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) ps.AddConnFailure(selectedPeer)
} }
return nil, err return nil, err
} }

3
vendor/modules.txt vendored
View File

@ -1010,11 +1010,12 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d # github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5
## explicit; go 1.21 ## explicit; go 1.21
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/tests
github.com/waku-org/go-waku/waku/persistence github.com/waku-org/go-waku/waku/persistence
github.com/waku-org/go-waku/waku/v2/api/common
github.com/waku-org/go-waku/waku/v2/api/filter github.com/waku-org/go-waku/waku/v2/api/filter
github.com/waku-org/go-waku/waku/v2/api/missing github.com/waku-org/go-waku/waku/v2/api/missing
github.com/waku-org/go-waku/waku/v2/api/publish github.com/waku-org/go-waku/waku/v2/api/publish

View File

@ -1850,7 +1850,7 @@ func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int {
p := make(map[string]int) p := make(map[string]int)
for _, peerID := range wakuNode.Host().Network().Peers() { for _, peerID := range wakuNode.Host().Network().Peers() {
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo) connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo.ID)
if connFailures > 0 { if connFailures > 0 {
p[peerID.String()] = connFailures p[peerID.String()] = connFailures
} }