mirror of https://github.com/status-im/go-waku.git
fix: use bad peer removal logic only for lightpush and filter
This commit is contained in:
parent
d81465eb1d
commit
b646eff372
|
@ -197,6 +197,7 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus
|
|||
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
|
||||
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
|
||||
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
|
||||
mgr.onlineChecker.SetOnline(newStatus)
|
||||
mgr.NetworkChange()
|
||||
mgr.logger.Debug("switching from offline to online")
|
||||
mgr.Lock()
|
||||
|
|
|
@ -102,7 +102,6 @@ const maxFailedAttempts = 5
|
|||
const prunePeerStoreInterval = 10 * time.Minute
|
||||
const peerConnectivityLoopSecs = 15
|
||||
const maxConnsToPeerRatio = 3
|
||||
const badPeersCleanupInterval = 1 * time.Minute
|
||||
const maxDialFailures = 2
|
||||
|
||||
// 80% relay peers 20% service peers
|
||||
|
@ -258,14 +257,13 @@ func (pm *PeerManager) Start(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (pm *PeerManager) removeBadPeers() {
|
||||
if !pm.RelayEnabled {
|
||||
for _, peerID := range pm.host.Peerstore().Peers() {
|
||||
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
|
||||
//delete peer from peerStore
|
||||
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
|
||||
pm.RemovePeer(peerID)
|
||||
}
|
||||
func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) {
|
||||
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures &&
|
||||
pm.peerConnector.onlineChecker.IsOnline() {
|
||||
if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically.
|
||||
//delete peer from peerStore
|
||||
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
|
||||
pm.RemovePeer(peerID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -273,17 +271,13 @@ func (pm *PeerManager) removeBadPeers() {
|
|||
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
|
||||
defer utils.LogOnPanic()
|
||||
t := time.NewTicker(prunePeerStoreInterval)
|
||||
t1 := time.NewTicker(badPeersCleanupInterval)
|
||||
defer t.Stop()
|
||||
defer t1.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
pm.prunePeerStore()
|
||||
case <-t1.C:
|
||||
pm.removeBadPeers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -749,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
|
|||
if err == nil || errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
if pm.peerConnector != nil {
|
||||
pm.peerConnector.addConnectionBackoff(peerID)
|
||||
}
|
||||
|
@ -762,9 +757,4 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
|
|||
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
|
||||
}
|
||||
}
|
||||
if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
|
||||
//delete peer from peerStore
|
||||
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
|
||||
pm.RemovePeer(peerID)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
|
@ -249,6 +250,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
|||
wf.metrics.RecordError(dialFailure)
|
||||
if wf.pm != nil {
|
||||
wf.pm.HandleDialError(err, peerID)
|
||||
if errors.Is(err, swarm.ErrAllDialsFailed) ||
|
||||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
|
||||
wf.pm.CheckAndRemoveBadPeer(peerID)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
|
|||
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
|
||||
defer cancel()
|
||||
err := wf.Ping(ctxWithTimeout, peer)
|
||||
if err != nil {
|
||||
if err != nil && wf.onlineChecker.IsOnline() {
|
||||
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
|
||||
//quickly retry ping again before marking subscription as failure
|
||||
//Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent.
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
|
@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
|
|||
wakuLP.metrics.RecordError(dialFailure)
|
||||
if wakuLP.pm != nil {
|
||||
wakuLP.pm.HandleDialError(err, peerID)
|
||||
if errors.Is(err, swarm.ErrAllDialsFailed) ||
|
||||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
|
||||
wakuLP.pm.CheckAndRemoveBadPeer(peerID)
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue