feat: changes for optimizing filter ping and improve filter resubscription (#1102)

This commit is contained in:
Prem Chaitanya Prathi 2024-05-22 11:45:53 +05:30 committed by GitHub
parent 5ceb61766c
commit 8115ec7013
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 300 additions and 236 deletions

View File

@ -9,7 +9,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/waku-org/go-waku/waku/v2/protocol"
"io"
"math"
"math/big"
@ -22,6 +21,8 @@ import (
"time"
"unicode/utf8"
"github.com/waku-org/go-waku/waku/v2/protocol"
gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
@ -48,6 +49,21 @@ func GetHostAddress(ha host.Host) multiaddr.Multiaddr {
return ha.Addrs()[0]
}
// Returns a full multiaddr of host appended by peerID
func GetAddr(h host.Host) multiaddr.Multiaddr {
id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().String()))
var selectedAddr multiaddr.Multiaddr
//For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses.
for _, addr := range h.Network().ListenAddresses() {
if strings.Contains(addr.String(), "p2p-circuit") {
continue
}
selectedAddr = addr
break
}
return selectedAddr.Encapsulate(id)
}
// FindFreePort returns an available port number
func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) {
t.Helper()

View File

@ -3,7 +3,6 @@ package api
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
@ -11,7 +10,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
const FilterPingTimeout = 5 * time.Second
@ -39,6 +37,7 @@ type Sub struct {
ctx context.Context
cancel context.CancelFunc
log *zap.Logger
closing chan string
}
// Subscribe
@ -53,37 +52,40 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
sub.log = log.Named("filter-api")
sub.log.Debug("filter subscribe params", zap.Int("maxPeers", config.MaxPeers), zap.Stringer("contentFilter", contentFilter))
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
sub.closing = make(chan string, config.MaxPeers)
if err != nil {
return nil, err
}
sub.multiplex(subs)
go sub.healthCheckLoop()
go sub.waitOnSubClose()
return sub, nil
}
func (apiSub *Sub) Unsubscribe() {
apiSub.cancel()
}
func (apiSub *Sub) healthCheckLoop() {
// Health checks
ticker := time.NewTicker(FilterPingTimeout)
defer ticker.Stop()
func (apiSub *Sub) waitOnSubClose() {
for {
select {
case <-apiSub.ctx.Done():
apiSub.log.Debug("healthCheckLoop: Done()")
apiSub.log.Debug("apiSub context: Done()")
apiSub.cleanup()
return
case <-ticker.C:
apiSub.log.Debug("healthCheckLoop: checkAliveness()")
topicCounts := apiSub.getTopicCounts()
apiSub.resubscribe(topicCounts)
case subId := <-apiSub.closing:
//trigger closing and resubscribe flow for subscription.
apiSub.closeAndResubscribe(subId)
}
}
}
func (apiSub *Sub) closeAndResubscribe(subId string) {
apiSub.log.Debug("sub closeAndResubscribe", zap.String("subID", subId))
apiSub.subs[subId].Close()
failedPeer := apiSub.subs[subId].PeerID
delete(apiSub.subs, subId)
apiSub.resubscribe(failedPeer)
}
func (apiSub *Sub) cleanup() {
@ -93,6 +95,7 @@ func (apiSub *Sub) cleanup() {
}()
for _, s := range apiSub.subs {
close(s.Closing)
_, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)
if err != nil {
//Logging with info as this is part of cleanup
@ -103,102 +106,37 @@ func (apiSub *Sub) cleanup() {
}
// Returns active sub counts for each pubsub topic
func (apiSub *Sub) getTopicCounts() map[string]int {
// Buffered chan for sub aliveness results
type CheckResult struct {
sub *subscription.SubscriptionDetails
alive bool
}
checkResults := make(chan CheckResult, len(apiSub.subs))
var wg sync.WaitGroup
// Run pings asynchronously
for _, s := range apiSub.subs {
wg.Add(1)
go func(sub *subscription.SubscriptionDetails) {
defer wg.Done()
ctx, cancelFunc := context.WithTimeout(apiSub.ctx, FilterPingTimeout)
defer cancelFunc()
err := apiSub.wf.IsSubscriptionAlive(ctx, sub)
apiSub.log.Debug("Check result:", zap.Any("subID", sub.ID), zap.Bool("result", err == nil))
checkResults <- CheckResult{sub, err == nil}
}(s)
}
// Collect healthy topic counts
topicCounts := make(map[string]int)
topicMap, _ := protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter)
for _, t := range maps.Keys(topicMap) {
topicCounts[t] = 0
}
wg.Wait()
close(checkResults)
for s := range checkResults {
if !s.alive {
// Close inactive subs
s.sub.Close()
delete(apiSub.subs, s.sub.ID)
} else {
topicCounts[s.sub.ContentFilter.PubsubTopic]++
}
}
return topicCounts
}
// Attempts to resubscribe on topics that lack subscriptions
func (apiSub *Sub) resubscribe(topicCounts map[string]int) {
// Delete healthy topics
for t, cnt := range topicCounts {
if cnt == apiSub.Config.MaxPeers {
delete(topicCounts, t)
}
}
if len(topicCounts) == 0 {
// All topics healthy, return
return
}
var wg sync.WaitGroup
func (apiSub *Sub) resubscribe(failedPeer peer.ID) {
// Re-subscribe asynchronously
newSubs := make(chan []*subscription.SubscriptionDetails)
existingSubCount := len(apiSub.subs)
apiSub.log.Debug("subscribing again", zap.Stringer("contentFilter", apiSub.ContentFilter), zap.Int("numPeers", apiSub.Config.MaxPeers-existingSubCount))
var peersToExclude peer.IDSlice
peersToExclude = append(peersToExclude, failedPeer)
for _, sub := range apiSub.subs {
peersToExclude = append(peersToExclude, sub.PeerID)
}
subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-existingSubCount, peersToExclude...)
if err != nil {
return
} //Not handling scenario where all requested subs are not received as that will get handled in next cycle.
for t, cnt := range topicCounts {
cFilter := protocol.ContentFilter{PubsubTopic: t, ContentTopics: apiSub.ContentFilter.ContentTopics}
wg.Add(1)
go func(count int) {
defer wg.Done()
subs, err := apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-count)
if err != nil {
return
} //Not handling scenario where all requested subs are not received as that will get handled in next cycle.
newSubs <- subs
}(cnt)
}
wg.Wait()
close(newSubs)
apiSub.log.Debug("resubscribe(): before range newSubs")
for subs := range newSubs {
if subs != nil {
apiSub.multiplex(subs)
}
}
apiSub.log.Debug("checkAliveness(): close(newSubs)")
//close(newSubs)
apiSub.multiplex(subs)
}
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int) ([]*subscription.SubscriptionDetails, error) {
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
}
if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peersToExclude", zap.Stringer("peersToExclude", peersToExclude[0]))
options = append(options, filter.WithPeersToExclude(peersToExclude...))
}
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)
if err != nil {
@ -206,7 +144,7 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
// Partial Failure, for now proceed as we don't expect this to happen wrt specific topics.
// Rather it can happen in case subscription with one of the peer fails.
// This can further get automatically handled at resubscribe,
apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err))
apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err), zap.Int("successCount", len(subs)))
return subs, nil
}
// In case of complete subscription failure, application or user needs to handle and probably retry based on error
@ -229,5 +167,11 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
apiSub.DataCh <- env
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
<-subDetails.Closing
apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID))
apiSub.closing <- subDetails.ID
}(subDetails)
}
}

View File

@ -36,12 +36,13 @@ func (s *FilterApiTestSuite) TestSubscribe() {
// We have one full node already created in SetupTest(),
// create another one
fullNodeData2 := s.GetWakuFilterFullNode(s.TestTopic, true)
s.ConnectHosts(s.LightNodeHost, fullNodeData2.FullNodeHost)
s.ConnectToFullNode(s.LightNode, fullNodeData2.FullNode)
//s.ConnectHosts(s.FullNodeHost, fullNodeData2.FullNodeHost)
peers := []peer.ID{s.FullNodeHost.ID(), fullNodeData2.FullNodeHost.ID()}
s.Log.Info("FullNodeHost IDs:", zap.Any("peers", peers))
// Make sure IDs are different
s.Require().True(peers[0] != peers[1])
apiConfig := FilterConfig{MaxPeers: 2, Peers: peers}
//s.Require().True(peers[0] != peers[1])
apiConfig := FilterConfig{MaxPeers: 2}
s.Require().Equal(apiConfig.MaxPeers, 2)
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
@ -68,7 +69,26 @@ func (s *FilterApiTestSuite) TestSubscribe() {
}
s.Require().Equal(cnt, 1)
//Verify HealthCheck
subs := s.LightNode.Subscriptions()
s.Require().Equal(2, len(subs))
s.Log.Info("stopping full node", zap.Stringer("id", fullNodeData2.FullNodeHost.ID()))
fullNodeData3 := s.GetWakuFilterFullNode(s.TestTopic, true)
s.ConnectToFullNode(s.LightNode, fullNodeData3.FullNode)
fullNodeData2.FullNode.Stop()
fullNodeData2.FullNodeHost.Close()
time.Sleep(2 * time.Second)
subs = s.LightNode.Subscriptions()
s.Require().Equal(2, len(subs))
for _, sub := range subs {
s.Require().NotEqual(fullNodeData2.FullNodeHost.ID(), sub.PeerID)
}
apiSub.Unsubscribe()
for range apiSub.DataCh {
}

View File

@ -277,10 +277,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee
//Need to filter peers to check if they support relay
if inPeers.Len() != 0 {
inRelayPeers, _ = pm.FilterPeersByProto(inPeers, relay.WakuRelayID_v200)
inRelayPeers, _ = pm.FilterPeersByProto(inPeers, nil, relay.WakuRelayID_v200)
}
if outPeers.Len() != 0 {
outRelayPeers, _ = pm.FilterPeersByProto(outPeers, relay.WakuRelayID_v200)
outRelayPeers, _ = pm.FilterPeersByProto(outPeers, nil, relay.WakuRelayID_v200)
}
return
}

View File

@ -3,8 +3,6 @@ package peermanager
import (
"context"
"crypto/rand"
"fmt"
"strings"
"testing"
"time"
@ -14,7 +12,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
@ -26,19 +23,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
)
func getAddr(h host.Host) multiaddr.Multiaddr {
id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().String()))
var selectedAddr multiaddr.Multiaddr
//For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses.
for _, addr := range h.Network().ListenAddresses() {
if strings.Contains(addr.String(), "p2p-circuit") {
continue
}
selectedAddr = addr
}
return selectedAddr.Encapsulate(id)
}
func initTest(t *testing.T) (context.Context, *PeerManager, func()) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// hosts
@ -72,7 +56,7 @@ func TestServiceSlots(t *testing.T) {
// add h2 peer to peer manager
t.Log(h2.ID())
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(tests.GetAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
///////////////
@ -86,7 +70,7 @@ func TestServiceSlots(t *testing.T) {
require.Equal(t, h2.ID(), peers[0])
// add h3 peer to peer manager
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(tests.GetAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
// check that returned peer is h2 or h3 peer
@ -111,7 +95,7 @@ func TestServiceSlots(t *testing.T) {
require.Error(t, err, ErrNoPeersAvailable)
// add h4 peer for protocol1
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
_, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
require.NoError(t, err)
//Test peer selection for protocol1
@ -139,10 +123,10 @@ func TestPeerSelection(t *testing.T) {
defer h3.Close()
protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(tests.GetAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(tests.GetAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
@ -173,7 +157,7 @@ func TestPeerSelection(t *testing.T) {
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h4.Close()
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
@ -200,7 +184,7 @@ func TestDefaultProtocol(t *testing.T) {
defer h5.Close()
//Test peer selection for relay protocol from peer store
_, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200)
_, err = pm.AddPeer(tests.GetAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200)
require.NoError(t, err)
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
@ -221,7 +205,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
require.NoError(t, err)
defer h6.Close()
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
_, err = pm.AddPeer(tests.GetAddr(h6), wps.Static, []string{""}, protocol2)
require.NoError(t, err)
peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})

View File

@ -2,6 +2,7 @@ package peermanager
import (
"context"
"encoding/json"
"errors"
"github.com/libp2p/go-libp2p/core/peer"
@ -12,7 +13,16 @@ import (
"golang.org/x/exp/maps"
)
type peerSet map[peer.ID]struct{}
type PeerSet map[peer.ID]struct{}
func PeerInSet(peers PeerSet, peer peer.ID) bool {
if len(peers) > 0 {
if _, ok := peers[peer]; ok {
return true
}
}
return false
}
// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
@ -54,17 +64,19 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
return nil, err
} else if len(peerIDs) == 0 {
peerIDs = make(peerSet)
peerIDs = make(PeerSet)
}
// if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto)
pm.logger.Debug("looking for peers in peerStore", zap.String("proto", string(criteria.Proto)))
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto)
if err != nil {
return nil, err
}
if len(criteria.PubsubTopics) > 0 {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
}
randomPeers, err := selectRandomPeers(filteredPeers, criteria.MaxPeers-len(peerIDs))
//Not passing excludePeers as filterPeers are already considering excluded ones.
randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs))
if err != nil && len(peerIDs) == 0 {
return nil, err
}
@ -75,10 +87,13 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
return maps.Keys(peerIDs), nil
}
func getRandom(filter peerSet, count int) (peerSet, error) {
func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) {
i := 0
selectedPeers := make(peerSet)
selectedPeers := make(PeerSet)
for pID := range filter {
if PeerInSet(excludePeers, pID) {
continue
}
//Map's iterator in golang works using randomness and hence not random function is being used.
selectedPeers[pID] = struct{}{}
i++
@ -93,34 +108,37 @@ func getRandom(filter peerSet, count int) (peerSet, error) {
}
// selects count random peers from list of peers
func selectRandomPeers(peers peer.IDSlice, count int) (peerSet, error) {
filteredPeerMap := peerSliceToMap(peers)
return getRandom(filteredPeerMap, count)
func selectRandomPeers(peers peer.IDSlice, excludePeers PeerSet, count int) (PeerSet, error) {
filteredPeerMap := PeerSliceToMap(peers)
return getRandom(filteredPeerMap, count, excludePeers)
}
func peerSliceToMap(peers peer.IDSlice) peerSet {
peerSet := make(peerSet, peers.Len())
func PeerSliceToMap(peers peer.IDSlice) PeerSet {
peerSet := make(PeerSet, peers.Len())
for _, peer := range peers {
peerSet[peer] = struct{}{}
}
return peerSet
}
func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSet, error) {
peers := make(peerSet)
func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) {
peers := make(PeerSet)
var err error
for retryCnt := 0; retryCnt < 1; retryCnt++ {
//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil {
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers)
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
} else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
if PeerInSet(criteria.ExcludePeers, i) {
continue
}
keys = append(keys, i)
}
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.MaxPeers)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{}
}
@ -145,12 +163,21 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSe
// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers.
type PeerSelectionCriteria struct {
SelectionType PeerSelection
Proto protocol.ID
PubsubTopics []string
SpecificPeers peer.IDSlice
MaxPeers int
Ctx context.Context
SelectionType PeerSelection `json:"selectionType"`
Proto protocol.ID `json:"protocolId"`
PubsubTopics []string `json:"pubsubTopics"`
SpecificPeers peer.IDSlice `json:"specificPeers"`
MaxPeers int `json:"maxPeerCount"`
Ctx context.Context `json:"-"`
ExcludePeers PeerSet `json:"excludePeers"`
}
func (psc PeerSelectionCriteria) String() string {
pscJson, err := json.Marshal(psc)
if err != nil {
return ""
}
return string(pscJson)
}
// SelectPeers selects a peer based on selectionType specified.
@ -159,6 +186,12 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice
if criteria.MaxPeers == 0 {
criteria.MaxPeers = 1
}
excPeers := maps.Keys(criteria.ExcludePeers)
var excPeer peer.ID
if len(excPeers) > 0 {
excPeer = excPeers[0]
}
pm.logger.Debug("Select Peers", zap.Stringer("selectionCriteria", criteria), zap.Stringer("excludedPeers", excPeer))
switch criteria.SelectionType {
case Automatic:
return pm.SelectRandom(criteria)
@ -191,7 +224,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...)
}
peers, err = pm.FilterPeersByProto(peers, criteria.Proto)
peers, err = pm.FilterPeersByProto(peers, criteria.ExcludePeers, criteria.Proto)
if err != nil {
return "", err
}
@ -201,22 +234,25 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
// FilterPeersByProto filters list of peers that support specified protocols.
// If specificPeers is nil, all peers in the host's peerStore are considered for filtering.
func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) {
func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, excludePeers PeerSet, proto ...protocol.ID) (peer.IDSlice, error) {
peerSet := specificPeers
if len(peerSet) == 0 {
peerSet = pm.host.Peerstore().Peers()
}
var peers peer.IDSlice
for _, peer := range peerSet {
protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...)
if err != nil {
return nil, err
}
if len(protocols) > 0 {
//Maybe we can optimize below set of statements a better way??
if PeerInSet(excludePeers, peer) {
continue
}
peers = append(peers, peer)
}
}
pm.logger.Debug("peers selected", zap.Int("peerCnt", len(peers)))
return peers, nil
}

View File

@ -19,10 +19,10 @@ func newPeerMap() *peerMap {
}
}
func (pm *peerMap) getRandom(count int) (peerSet, error) {
func (pm *peerMap) getRandom(count int, excludePeers PeerSet) (PeerSet, error) {
pm.mu.RLock()
defer pm.mu.RUnlock()
return getRandom(pm.m, count)
return getRandom(pm.m, count, excludePeers)
}
func (pm *peerMap) remove(pID peer.ID) {

View File

@ -1,11 +1,12 @@
package peermanager
import (
"testing"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
"testing"
)
func TestServiceSlot(t *testing.T) {
@ -18,13 +19,13 @@ func TestServiceSlot(t *testing.T) {
//
slots.getPeers(protocol).add(peerID)
//
fetchedPeers, err := slots.getPeers(protocol).getRandom(1)
fetchedPeers, err := slots.getPeers(protocol).getRandom(1, nil)
require.NoError(t, err)
require.Equal(t, peerID, maps.Keys(fetchedPeers)[0])
//
slots.getPeers(protocol).remove(peerID)
//
_, err = slots.getPeers(protocol).getRandom(1)
_, err = slots.getPeers(protocol).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable)
// Test with more peers
@ -36,7 +37,7 @@ func TestServiceSlot(t *testing.T) {
slots.getPeers(protocol).add(peerID3)
//
fetchedPeers, err = slots.getPeers(protocol).getRandom(2)
fetchedPeers, err = slots.getPeers(protocol).getRandom(2, nil)
require.NoError(t, err)
require.Equal(t, 2, len(maps.Keys(fetchedPeers)))
@ -47,7 +48,7 @@ func TestServiceSlot(t *testing.T) {
slots.getPeers(protocol).remove(peerID2)
fetchedPeers, err = slots.getPeers(protocol).getRandom(10)
fetchedPeers, err = slots.getPeers(protocol).getRandom(10, nil)
require.NoError(t, err)
require.Equal(t, peerID3, maps.Keys(fetchedPeers)[0])
@ -65,15 +66,15 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) {
slots.getPeers(protocol).add(peerID)
slots.getPeers(protocol1).add(peerID)
//
fetchedPeers, err := slots.getPeers(protocol1).getRandom(1)
fetchedPeers, err := slots.getPeers(protocol1).getRandom(1, nil)
require.NoError(t, err)
require.Equal(t, peerID, maps.Keys(fetchedPeers)[0])
//
slots.removePeer(peerID)
//
_, err = slots.getPeers(protocol).getRandom(1)
_, err = slots.getPeers(protocol).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable)
_, err = slots.getPeers(protocol1).getRandom(1)
_, err = slots.getPeers(protocol1).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable)
}

View File

@ -9,6 +9,7 @@ import (
"net/http"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
@ -43,13 +44,14 @@ var (
type WakuFilterLightNode struct {
*service.CommonService
h host.Host
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
timesource timesource.Timesource
metrics Metrics
log *zap.Logger
subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager
h host.Host
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
timesource timesource.Timesource
metrics Metrics
log *zap.Logger
subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager
peerPingInterval time.Duration
}
type WakuFilterPushError struct {
@ -86,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
wf.pm = pm
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.peerPingInterval = 5 * time.Second
return wf
}
@ -97,13 +99,15 @@ func (wf *WakuFilterLightNode) SetHost(h host.Host) {
func (wf *WakuFilterLightNode) Start(ctx context.Context) error {
return wf.CommonService.Start(ctx, wf.start)
}
func (wf *WakuFilterLightNode) start() error {
wf.subscriptions = subscription.NewSubscriptionMap(wf.log)
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context()))
//Start Filter liveness check
wf.CommonService.WaitGroup().Add(1)
go wf.FilterHealthCheckLoop()
wf.log.Info("filter-push protocol started")
return nil
}
@ -313,24 +317,29 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
wf.pm.Connect(pData)
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
}
if params.pm != nil {
reqPeerCount := params.maxPeers - len(params.selectedPeers)
peerCount := params.maxPeers - len(params.selectedPeers)
if params.pm != nil && reqPeerCount > 0 {
wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
PubsubTopics: maps.Keys(pubSubTopicMap),
SpecificPeers: params.preferredPeers,
MaxPeers: peerCount,
MaxPeers: reqPeerCount,
Ctx: ctx,
ExcludePeers: params.peersToExclude,
},
)
if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err
}
}
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))
return params, pubSubTopicMap, nil
}
@ -354,7 +363,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var selectedPeers peer.IDSlice
wf.log.Debug("peer selection", zap.Int("params.maxPeers", params.maxPeers))
if params.pm != nil && len(params.selectedPeers) < params.maxPeers {
wf.log.Debug("selected peers less than maxPeers", zap.Int("maxpPeers", params.maxPeers))
selectedPeers, err = wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
@ -363,6 +375,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
SpecificPeers: params.preferredPeers,
MaxPeers: params.maxPeers - params.selectedPeers.Len(),
Ctx: ctx,
ExcludePeers: params.peersToExclude,
},
)
} else {
@ -375,7 +388,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
failedContentTopics = append(failedContentTopics, cTopics...)
continue
}
var cFilter protocol.ContentFilter
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)
@ -395,6 +407,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
failedContentTopics = append(failedContentTopics, cTopics...)
continue
}
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer))
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter))
}
}
@ -457,16 +470,6 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ..
peerID)
}
func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return err
}
return wf.Ping(ctx, subscription.PeerID)
}
// Unsubscribe is used to stop receiving messages from specified peers for the content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()

View File

@ -0,0 +1,48 @@
package filter
import (
"context"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
)
func (wf *WakuFilterLightNode) PingPeers() {
//Send a ping to all the peers and report their status to corresponding subscriptions
// Alive or not or set state of subcription??
for _, peer := range wf.subscriptions.GetSubscribedPeers() {
go wf.PingPeer(peer)
}
}
func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), wf.peerPingInterval)
defer cancel()
err := wf.Ping(ctxWithTimeout, peer)
if err != nil {
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer)
for _, subscription := range subscriptions {
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID))
//Indicating that subscription is closing,
close(subscription.Closing)
}
}
}
func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
defer wf.WaitGroup().Done()
ticker := time.NewTicker(wf.peerPingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wf.PingPeers()
case <-wf.CommonService.Context().Done():
return
}
}
}

View File

@ -324,7 +324,7 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() {
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second)
nodeData := s.GetWakuFilterFullNode(testTopic, false)
fullNode2 := nodeData.fullNode
fullNode2 := nodeData.FullNode
// Connect nodes
fullNode2.h.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
@ -357,29 +357,6 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() {
}
func (s *FilterTestSuite) TestIsSubscriptionAlive() {
messages := s.prepareData(2, false, true, false, nil)
// Subscribe with the first message only
s.subscribe(messages[0].PubSubTopic, messages[0].ContentTopic, s.FullNodeHost.ID())
// IsSubscriptionAlive returns no error for the first message
err := s.LightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0])
s.Require().NoError(err)
// Create new host/peer - not related to any node
host, err := tests.MakeHost(context.Background(), 54321, rand.Reader)
s.Require().NoError(err)
// Alter the existing peer ID in sub details
s.subDetails[0].PeerID = host.ID()
// IsSubscriptionAlive returns error for the second message, peer ID doesn't match
err = s.LightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0])
s.Require().Error(err)
}
func (s *FilterTestSuite) TestFilterSubscription() {
contentFilter := protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}

View File

@ -7,9 +7,7 @@ import (
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -46,6 +44,7 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() {
s.Require().NoError(err)
result, err := s.LightNode.Unsubscribe(s.ctx, contentFilter, DontWait())
s.Require().NoError(err)
s.Require().Equal(0, len(result.Errors()))
@ -92,7 +91,7 @@ func (s *FilterTestSuite) TestAutoShard() {
//Workaround as could not find a way to reuse setup test with params
// Stop what is run in setup
s.fullNode.Stop()
s.FullNode.Stop()
s.LightNode.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
@ -109,10 +108,7 @@ func (s *FilterTestSuite) TestAutoShard() {
s.MakeWakuFilterLightNode()
s.StartLightNode()
s.MakeWakuFilterFullNode(pubSubTopic.String(), false)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
err = s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
s.ConnectToFullNode(s.LightNode, s.FullNode)
s.Log.Info("Testing Autoshard:CreateSubscription")
s.subscribe("", s.TestContentTopic, s.FullNodeHost.ID())
@ -210,7 +206,7 @@ func (s *FilterTestSuite) TestStaticSharding() {
s.MakeWakuFilterFullNode(s.TestTopic, false)
// Connect nodes
s.ConnectHosts(s.LightNodeHost, s.FullNodeHost)
s.ConnectToFullNode(s.LightNode, s.FullNode)
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())

View File

@ -80,7 +80,7 @@ func (s *FilterTestSuite) TestUnsubscribeMultiPubSubMultiContentTopic() {
s.MakeWakuFilterFullNode(s.TestTopic, true)
// Connect nodes
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNode.h), peerstore.PermanentAddrTTL)
err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
@ -211,7 +211,7 @@ func (s *FilterTestSuite) TestUnsubscribeAllDiffPubSubContentTopics() {
s.MakeWakuFilterFullNode(s.TestTopic, true)
// Connect nodes
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNode.h), peerstore.PermanentAddrTTL)
err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)

View File

@ -39,6 +39,7 @@ type (
peerAddr multiaddr.Multiaddr
peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice
peersToExclude peermanager.PeerSet
maxPeers int
requestID []byte
log *zap.Logger
@ -101,6 +102,14 @@ func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption {
}
}
// WithPeersToExclude option excludes the peers that are specified from selection
func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
params.peersToExclude = peermanager.PeerSliceToMap(peers)
return nil
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store.
// If a list of specific peers is passed, the peer will be chosen from that list assuming it
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
@ -145,6 +154,7 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
WithAutomaticPeerSelection(),
WithAutomaticRequestID(),
WithMaxPeersPerContentFilter(1),
}
}

View File

@ -10,10 +10,11 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/peermanager"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
@ -32,7 +33,7 @@ type FullNodeData struct {
RelaySub *relay.Subscription
FullNodeHost host.Host
Broadcaster relay.Broadcaster
fullNode *WakuFilterFullNode
FullNode *WakuFilterFullNode
}
type FilterTestSuite struct {
@ -78,27 +79,29 @@ func (s *FilterTestSuite) SetupTest() {
s.TestContentTopic = DefaultTestContentTopic
s.MakeWakuFilterLightNode()
s.LightNode.peerPingInterval = 1 * time.Second
s.StartLightNode()
//TODO: Add tests to verify broadcaster.
s.MakeWakuFilterFullNode(s.TestTopic, false)
s.ConnectHosts(s.LightNodeHost, s.FullNodeHost)
s.ConnectToFullNode(s.LightNode, s.FullNode)
}
func (s *FilterTestSuite) TearDownTest() {
s.fullNode.Stop()
s.FullNode.Stop()
s.LightNode.Stop()
s.RelaySub.Unsubscribe()
s.LightNode.Stop()
s.ctxCancel()
}
func (s *FilterTestSuite) ConnectHosts(h1, h2 host.Host) {
h1.Peerstore().AddAddr(h2.ID(), tests.GetHostAddress(h2), peerstore.PermanentAddrTTL)
err := h1.Peerstore().AddProtocols(h2.ID(), FilterSubscribeID_v20beta1)
func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) {
mAddr := tests.GetAddr(h2.h)
_, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1)
s.Log.Info("add peer", zap.Stringer("mAddr", mAddr))
s.Require().NoError(err)
}
@ -142,7 +145,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo
err := node2Filter.Start(s.ctx, sub)
s.Require().NoError(err)
nodeData.fullNode = node2Filter
nodeData.FullNode = node2Filter
return nodeData
}
@ -161,9 +164,10 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
s.Require().NoError(err)
b := relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background()))
filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
pm := peermanager.NewPeerManager(5, 5, nil, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
filterPush.SetHost(host)
pm.SetHost(host)
return LightNodeData{filterPush, host}
}

View File

@ -25,10 +25,11 @@ type PeerContentFilter struct {
type SubscriptionDetails struct {
sync.RWMutex
ID string `json:"subscriptionID"`
mapRef *SubscriptionsMap
Closed bool `json:"-"`
once sync.Once
ID string `json:"subscriptionID"`
mapRef *SubscriptionsMap
Closed bool `json:"-"`
once sync.Once
Closing chan struct{}
PeerID peer.ID `json:"peerID"`
ContentFilter protocol.ContentFilter `json:"contentFilters"`
@ -96,7 +97,6 @@ func (s *SubscriptionDetails) CloseC() {
s.once.Do(func() {
s.Lock()
defer s.Unlock()
s.Closed = true
close(s.C)
})

View File

@ -75,6 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content
PeerID: peerID,
C: make(chan *protocol.Envelope, 1024),
ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
Closing: make(chan struct{}),
}
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
@ -218,6 +219,30 @@ func (m *SubscriptionsMap) GetSubscriptionsForPeer(peerID peer.ID, contentFilter
return output
}
func (m *SubscriptionsMap) GetAllSubscriptionsForPeer(peerID peer.ID) []*SubscriptionDetails {
m.RLock()
defer m.RUnlock()
var output []*SubscriptionDetails
for _, peerSubs := range m.items {
if peerSubs.PeerID == peerID {
for _, subs := range peerSubs.SubsPerPubsubTopic {
for _, subscriptionDetail := range subs {
output = append(output, subscriptionDetail)
}
}
break
}
}
return output
}
func (m *SubscriptionsMap) GetSubscribedPeers() peer.IDSlice {
m.RLock()
defer m.RUnlock()
return maps.Keys(m.items)
}
func (m *SubscriptionsMap) GetAllSubscriptions() []*SubscriptionDetails {
return m.GetSubscriptionsForPeer("", protocol.ContentFilter{})
}