feat: add option to specify preferred peers for filter

This commit is contained in:
Prem Chaitanya Prathi 2024-10-24 15:34:07 +05:30
parent c78b09d4ca
commit d81465eb1d
4 changed files with 44 additions and 5 deletions

View File

@ -52,6 +52,7 @@ type Sub struct {
type subscribeParameters struct { type subscribeParameters struct {
batchInterval time.Duration batchInterval time.Duration
multiplexChannelBuffer int multiplexChannelBuffer int
preferredPeers peer.IDSlice
} }
type SubscribeOptions func(*subscribeParameters) type SubscribeOptions func(*subscribeParameters)
@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions {
} }
} }
func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions {
return func(params *subscribeParameters) {
params.preferredPeers = peers
}
}
// Subscribe // Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
sub := new(Sub) sub := new(Sub)
@ -197,7 +204,16 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
options := make([]filter.FilterSubscribeOption, 0) options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers { for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p)) isExcludedPeer := false
for _, px := range peersToExclude { // configured peer can be excluded if sub fails with it.
if p == px {
isExcludedPeer = true
break
}
}
if !isExcludedPeer {
options = append(options, filter.WithPeer(p))
}
} }
if len(peersToExclude) > 0 { if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude)) apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude))

View File

@ -2,10 +2,12 @@ package filter
import ( import (
"context" "context"
"math/rand"
"sync" "sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
@ -61,7 +63,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error OnNewEnvelope(env *protocol.Envelope) error
} }
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test // This fn is being mocked in test
mgr := new(FilterManager) mgr := new(FilterManager)
mgr.ctx = ctx mgr.ctx = ctx
@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic() defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx) ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
if len(mgr.params.preferredPeers) > 0 {
//use one peer which is from preferred peers.
randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1)
randomPreferredPeer := mgr.params.preferredPeers[randomIndex]
config.Peers = []peer.ID{randomPreferredPeer}
}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock() mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}

View File

@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()") s.Log.Info("About to perform API Subscribe()")
params := subscribeParameters{300 * time.Second, 1024} params := subscribeParameters{300 * time.Second, 1024, nil}
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params) apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Require().Equal(apiSub.ContentFilter, contentFilter)

View File

@ -333,11 +333,20 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
} }
reqPeerCount := params.maxPeers - len(params.selectedPeers) reqPeerCount := params.maxPeers - len(params.selectedPeers)
for _, p := range params.selectedPeers {
if params.peersToExclude == nil {
params.peersToExclude = make(peermanager.PeerSet)
}
//exclude peers that are preferredpeers so that they don't get selected again.
if _, ok := params.peersToExclude[p]; !ok {
params.peersToExclude[p] = struct{}{}
}
}
if params.pm != nil && reqPeerCount > 0 { if params.pm != nil && reqPeerCount > 0 {
wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude))) wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers( selectedPeers, err := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1, Proto: FilterSubscribeID_v20beta1,
@ -350,7 +359,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
) )
if err != nil { if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err)) wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err if len(params.selectedPeers) == 0 {
return nil, nil, err
}
}
if len(selectedPeers) > 0 {
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
} }
} }
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers))) wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))