feat: use automatic peer selection for filter. (#4531)
* feat: use automatic peer selection for filter. * fix: remove sucess peers too. * chore: remove filter manager state of peer candidates
This commit is contained in:
parent
250b8eea87
commit
0c474bb42c
|
@ -2,9 +2,6 @@ package wakuv2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
|
||||||
"errors"
|
|
||||||
"math/big"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -19,7 +16,6 @@ import (
|
||||||
node "github.com/waku-org/go-waku/waku/v2/node"
|
node "github.com/waku-org/go-waku/waku/v2/node"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -63,7 +59,6 @@ type FilterManager struct {
|
||||||
isFilterSubAlive func(sub *subscription.SubscriptionDetails) error
|
isFilterSubAlive func(sub *subscription.SubscriptionDetails) error
|
||||||
getFilter func(string) *common.Filter
|
getFilter func(string) *common.Filter
|
||||||
onNewEnvelopes func(env *protocol.Envelope) error
|
onNewEnvelopes func(env *protocol.Envelope) error
|
||||||
peers []peer.ID
|
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
settings settings
|
settings settings
|
||||||
node *node.WakuNode
|
node *node.WakuNode
|
||||||
|
@ -78,7 +73,6 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func(
|
||||||
mgr.onNewEnvelopes = onNewEnvelopes
|
mgr.onNewEnvelopes = onNewEnvelopes
|
||||||
mgr.filterSubs = make(FilterSubs)
|
mgr.filterSubs = make(FilterSubs)
|
||||||
mgr.eventChan = make(chan FilterEvent, 100)
|
mgr.eventChan = make(chan FilterEvent, 100)
|
||||||
mgr.peers = make([]peer.ID, 0)
|
|
||||||
mgr.settings = settings
|
mgr.settings = settings
|
||||||
mgr.node = node
|
mgr.node = node
|
||||||
mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error {
|
mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error {
|
||||||
|
@ -96,15 +90,11 @@ func (mgr *FilterManager) runFilterLoop(wg *sync.WaitGroup) {
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Populate filter peers initially
|
|
||||||
mgr.peers = mgr.findFilterPeers() // ordered list of peers to select from
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-mgr.ctx.Done():
|
case <-mgr.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
mgr.peers = mgr.findFilterPeers()
|
|
||||||
mgr.pingPeers()
|
mgr.pingPeers()
|
||||||
case ev := <-mgr.eventChan:
|
case ev := <-mgr.eventChan:
|
||||||
mgr.processEvents(&ev)
|
mgr.processEvents(&ev)
|
||||||
|
@ -141,13 +131,7 @@ func (mgr *FilterManager) processEvents(ev *FilterEvent) {
|
||||||
mgr.resubscribe(ev.filterID)
|
mgr.resubscribe(ev.filterID)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Remove peer from list
|
|
||||||
for i, p := range mgr.peers {
|
|
||||||
if ev.peerID == p {
|
|
||||||
mgr.peers = append(mgr.peers[:i], mgr.peers[i+1:]...)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Delete subs for removed peer
|
// Delete subs for removed peer
|
||||||
for filterID, subs := range mgr.filterSubs {
|
for filterID, subs := range mgr.filterSubs {
|
||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
|
@ -200,7 +184,7 @@ func (mgr *FilterManager) processEvents(ev *FilterEvent) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *FilterManager) subscribeToFilter(filterID string, peer peer.ID, tempID string) {
|
func (mgr *FilterManager) subscribeToFilter(filterID string, tempID string) {
|
||||||
|
|
||||||
logger := mgr.logger.With(zap.String("filterId", filterID))
|
logger := mgr.logger.With(zap.String("filterId", filterID))
|
||||||
f := mgr.getFilter(filterID)
|
f := mgr.getFilter(filterID)
|
||||||
|
@ -210,17 +194,17 @@ func (mgr *FilterManager) subscribeToFilter(filterID string, peer peer.ID, tempI
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
|
contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
|
||||||
logger.Debug("filter subscribe to filter node", zap.Stringer("peer", peer), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
|
logger.Debug("filter subscribe to filter node", zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
|
||||||
ctx, cancel := context.WithTimeout(mgr.ctx, requestTimeout)
|
ctx, cancel := context.WithTimeout(mgr.ctx, requestTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
subDetails, err := mgr.node.FilterLightnode().Subscribe(ctx, contentFilter, filter.WithPeer(peer))
|
subDetails, err := mgr.node.FilterLightnode().Subscribe(ctx, contentFilter, filter.WithAutomaticPeerSelection())
|
||||||
var sub *subscription.SubscriptionDetails
|
var sub *subscription.SubscriptionDetails
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("filter could not add wakuv2 filter for peer", zap.Stringer("peer", peer), zap.Error(err))
|
logger.Warn("filter could not add wakuv2 filter for peers", zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
logger.Debug("filter subscription success", zap.Stringer("peer", peer), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
|
|
||||||
sub = subDetails[0]
|
sub = subDetails[0]
|
||||||
|
logger.Debug("filter subscription success", zap.Stringer("peer", sub.PeerID), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
|
||||||
}
|
}
|
||||||
|
|
||||||
success := err == nil
|
success := err == nil
|
||||||
|
@ -299,35 +283,6 @@ func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet
|
||||||
return protocol.NewContentFilter(pubsubTopic, contentTopics...)
|
return protocol.NewContentFilter(pubsubTopic, contentTopics...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find suitable peer(s)
|
|
||||||
func (mgr *FilterManager) findFilterPeers() []peer.ID {
|
|
||||||
allPeers := mgr.node.Host().Peerstore().Peers()
|
|
||||||
|
|
||||||
peers := make([]peer.ID, 0)
|
|
||||||
for _, peer := range allPeers {
|
|
||||||
protocols, err := mgr.node.Host().Peerstore().SupportsProtocols(peer, filter.FilterSubscribeID_v20beta1, relay.WakuRelayID_v200)
|
|
||||||
if err != nil {
|
|
||||||
mgr.logger.Debug("SupportsProtocols error", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(protocols) == 2 {
|
|
||||||
peers = append(peers, peer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mgr.logger.Debug("Filtered peers", zap.Int("cnt", len(peers)))
|
|
||||||
return peers
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mgr *FilterManager) findPeerCandidate() (peer.ID, error) {
|
|
||||||
if len(mgr.peers) == 0 {
|
|
||||||
return "", errors.New("filter could not select a suitable peer")
|
|
||||||
}
|
|
||||||
n, _ := rand.Int(rand.Reader, big.NewInt(int64(len(mgr.peers))))
|
|
||||||
return mgr.peers[n.Int64()], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mgr *FilterManager) resubscribe(filterID string) {
|
func (mgr *FilterManager) resubscribe(filterID string) {
|
||||||
subs, found := mgr.filterSubs[filterID]
|
subs, found := mgr.filterSubs[filterID]
|
||||||
if !found {
|
if !found {
|
||||||
|
@ -344,16 +299,11 @@ func (mgr *FilterManager) resubscribe(filterID string) {
|
||||||
mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs)))
|
mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs)))
|
||||||
for i := len(subs); i < mgr.settings.MinPeersForFilter; i++ {
|
for i := len(subs); i < mgr.settings.MinPeersForFilter; i++ {
|
||||||
mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID))
|
mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID))
|
||||||
peer, err := mgr.findPeerCandidate()
|
|
||||||
|
|
||||||
if err == nil {
|
// Create sub placeholder in order to avoid potentially too many subs
|
||||||
// Create sub placeholder in order to avoid potentially too many subs
|
tempID := uuid.NewString()
|
||||||
tempID := uuid.NewString()
|
subs[tempID] = nil
|
||||||
subs[tempID] = nil
|
go mgr.subscribeToFilter(filterID, tempID)
|
||||||
go mgr.subscribeToFilter(filterID, peer, tempID)
|
|
||||||
} else {
|
|
||||||
mgr.logger.Error("filter resubscribe findPeer error", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue