feat_: bad peer removal for lp and filter and fixes for filter

This commit is contained in:
Prem Chaitanya Prathi 2024-11-05 23:21:14 +05:30
parent b329b158c8
commit 801db12706
No known key found for this signature in database
9 changed files with 74 additions and 10 deletions

2
go.mod
View File

@ -95,7 +95,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb github.com/waku-org/go-waku v0.8.1-0.20241105174537-213d38225bd9
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2140,8 +2140,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb h1:E3J49PH9iXpjaOOI/VrEX/VhSk3obKjxVehGEDzZgXI= github.com/waku-org/go-waku v0.8.1-0.20241105174537-213d38225bd9 h1:c/h9xHPKPUz8tHj8BUuy261NbdS0IA8nDnhwz+qR5EI=
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= github.com/waku-org/go-waku v0.8.1-0.20241105174537-213d38225bd9/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -52,6 +52,7 @@ type Sub struct {
type subscribeParameters struct { type subscribeParameters struct {
batchInterval time.Duration batchInterval time.Duration
multiplexChannelBuffer int multiplexChannelBuffer int
preferredPeers peer.IDSlice
} }
type SubscribeOptions func(*subscribeParameters) type SubscribeOptions func(*subscribeParameters)
@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions {
} }
} }
func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions {
return func(params *subscribeParameters) {
params.preferredPeers = peers
}
}
// Subscribe // Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
sub := new(Sub) sub := new(Sub)
@ -197,7 +204,16 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
options := make([]filter.FilterSubscribeOption, 0) options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers { for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p)) isExcludedPeer := false
for _, px := range peersToExclude { // configured peer can be excluded if sub fails with it.
if p == px {
isExcludedPeer = true
break
}
}
if !isExcludedPeer {
options = append(options, filter.WithPeer(p))
}
} }
if len(peersToExclude) > 0 { if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude)) apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude))

View File

@ -2,10 +2,12 @@ package filter
import ( import (
"context" "context"
"math/rand"
"sync" "sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
@ -61,7 +63,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error OnNewEnvelope(env *protocol.Envelope) error
} }
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test // This fn is being mocked in test
mgr := new(FilterManager) mgr := new(FilterManager)
mgr.ctx = ctx mgr.ctx = ctx
@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic() defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx) ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
if len(mgr.params.preferredPeers) > 0 {
//use one peer which is from preferred peers.
randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1)
randomPreferredPeer := mgr.params.preferredPeers[randomIndex]
config.Peers = []peer.ID{randomPreferredPeer}
}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock() mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
@ -188,6 +197,7 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange() mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online") mgr.logger.Debug("switching from offline to online")
mgr.Lock() mgr.Lock()
@ -230,6 +240,7 @@ func (mgr *FilterManager) UnsubscribeFilter(filterID string) {
} }
if len(af.sub.ContentFilter.ContentTopics) == 0 { if len(af.sub.ContentFilter.ContentTopics) == 0 {
af.cancel() af.cancel()
delete(mgr.filterSubscriptions, filterConfig.ID)
} else { } else {
go af.sub.Unsubscribe(filterConfig.contentFilter) go af.sub.Unsubscribe(filterConfig.contentFilter)
} }

View File

@ -101,7 +101,8 @@ const (
const maxFailedAttempts = 5 const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15 const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 5 const maxConnsToPeerRatio = 3
const maxDialFailures = 2
// 80% relay peers 20% service peers // 80% relay peers 20% service peers
func relayAndServicePeers(maxConnections int) (int, int) { func relayAndServicePeers(maxConnections int) (int, int) {
@ -256,6 +257,17 @@ func (pm *PeerManager) Start(ctx context.Context) {
} }
} }
func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures &&
pm.peerConnector.onlineChecker.IsOnline() {
if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically.
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
}
func (pm *PeerManager) peerStoreLoop(ctx context.Context) { func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
defer utils.LogOnPanic() defer utils.LogOnPanic()
t := time.NewTicker(prunePeerStoreInterval) t := time.NewTicker(prunePeerStoreInterval)
@ -731,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
if err == nil || errors.Is(err, context.Canceled) { if err == nil || errors.Is(err, context.Canceled) {
return return
} }
if pm.peerConnector != nil { if pm.peerConnector != nil {
pm.peerConnector.addConnectionBackoff(peerID) pm.peerConnector.addConnectionBackoff(peerID)
} }

View File

@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio" "github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
@ -249,6 +250,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
wf.metrics.RecordError(dialFailure) wf.metrics.RecordError(dialFailure)
if wf.pm != nil { if wf.pm != nil {
wf.pm.HandleDialError(err, peerID) wf.pm.HandleDialError(err, peerID)
if errors.Is(err, swarm.ErrAllDialsFailed) ||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
wf.pm.CheckAndRemoveBadPeer(peerID)
}
} }
return err return err
} }
@ -333,11 +338,20 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
} }
reqPeerCount := params.maxPeers - len(params.selectedPeers) reqPeerCount := params.maxPeers - len(params.selectedPeers)
for _, p := range params.selectedPeers {
if params.peersToExclude == nil {
params.peersToExclude = make(peermanager.PeerSet)
}
//exclude peers that are preferredpeers so that they don't get selected again.
if _, ok := params.peersToExclude[p]; !ok {
params.peersToExclude[p] = struct{}{}
}
}
if params.pm != nil && reqPeerCount > 0 { if params.pm != nil && reqPeerCount > 0 {
wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude))) wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers( selectedPeers, err := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1, Proto: FilterSubscribeID_v20beta1,
@ -350,7 +364,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
) )
if err != nil { if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err)) wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err if len(params.selectedPeers) == 0 {
return nil, nil, err
}
}
if len(selectedPeers) > 0 {
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
} }
} }
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers))) wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))

View File

@ -24,7 +24,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
defer cancel() defer cancel()
err := wf.Ping(ctxWithTimeout, peer) err := wf.Ping(ctxWithTimeout, peer)
if err != nil { if err != nil && wf.onlineChecker.IsOnline() {
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
//quickly retry ping again before marking subscription as failure //quickly retry ping again before marking subscription as failure
//Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent. //Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent.

View File

@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio" "github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
wakuLP.metrics.RecordError(dialFailure) wakuLP.metrics.RecordError(dialFailure)
if wakuLP.pm != nil { if wakuLP.pm != nil {
wakuLP.pm.HandleDialError(err, peerID) wakuLP.pm.HandleDialError(err, peerID)
if errors.Is(err, swarm.ErrAllDialsFailed) ||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
wakuLP.pm.CheckAndRemoveBadPeer(peerID)
}
} }
return nil, err return nil, err
} }

2
vendor/modules.txt vendored
View File

@ -1031,7 +1031,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb # github.com/waku-org/go-waku v0.8.1-0.20241105174537-213d38225bd9
## explicit; go 1.21 ## explicit; go 1.21
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/tests