diff --git a/tests/utils.go b/tests/utils.go index 8b9dec22..d5a57912 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -9,7 +9,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "github.com/waku-org/go-waku/waku/v2/protocol" "io" "math" "math/big" @@ -22,6 +21,8 @@ import ( "time" "unicode/utf8" + "github.com/waku-org/go-waku/waku/v2/protocol" + gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" @@ -48,6 +49,21 @@ func GetHostAddress(ha host.Host) multiaddr.Multiaddr { return ha.Addrs()[0] } +// Returns a full multiaddr of host appended by peerID +func GetAddr(h host.Host) multiaddr.Multiaddr { + id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().String())) + var selectedAddr multiaddr.Multiaddr + //For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses. + for _, addr := range h.Network().ListenAddresses() { + if strings.Contains(addr.String(), "p2p-circuit") { + continue + } + selectedAddr = addr + break + } + return selectedAddr.Encapsulate(id) +} + // FindFreePort returns an available port number func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) { t.Helper() diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go index f6f83f14..7281b915 100644 --- a/waku/v2/api/filter.go +++ b/waku/v2/api/filter.go @@ -3,7 +3,6 @@ package api import ( "context" "encoding/json" - "sync" "time" "github.com/libp2p/go-libp2p/core/peer" @@ -11,7 +10,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "go.uber.org/zap" - "golang.org/x/exp/maps" ) const FilterPingTimeout = 5 * time.Second @@ -39,6 +37,7 @@ type Sub struct { ctx context.Context cancel context.CancelFunc log *zap.Logger + closing chan string } // Subscribe @@ -53,37 +52,40 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte sub.log = log.Named("filter-api") sub.log.Debug("filter subscribe params", zap.Int("maxPeers", config.MaxPeers), zap.Stringer("contentFilter", contentFilter)) subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) - + sub.closing = make(chan string, config.MaxPeers) if err != nil { return nil, err } sub.multiplex(subs) - go sub.healthCheckLoop() + go sub.waitOnSubClose() return sub, nil } func (apiSub *Sub) Unsubscribe() { apiSub.cancel() - } -func (apiSub *Sub) healthCheckLoop() { - // Health checks - ticker := time.NewTicker(FilterPingTimeout) - defer ticker.Stop() +func (apiSub *Sub) waitOnSubClose() { for { select { case <-apiSub.ctx.Done(): - apiSub.log.Debug("healthCheckLoop: Done()") + apiSub.log.Debug("apiSub context: Done()") apiSub.cleanup() return - case <-ticker.C: - apiSub.log.Debug("healthCheckLoop: checkAliveness()") - topicCounts := apiSub.getTopicCounts() - apiSub.resubscribe(topicCounts) + case subId := <-apiSub.closing: + //trigger closing and resubscribe flow for subscription. + apiSub.closeAndResubscribe(subId) } } +} +func (apiSub *Sub) closeAndResubscribe(subId string) { + apiSub.log.Debug("sub closeAndResubscribe", zap.String("subID", subId)) + + apiSub.subs[subId].Close() + failedPeer := apiSub.subs[subId].PeerID + delete(apiSub.subs, subId) + apiSub.resubscribe(failedPeer) } func (apiSub *Sub) cleanup() { @@ -93,6 +95,7 @@ func (apiSub *Sub) cleanup() { }() for _, s := range apiSub.subs { + close(s.Closing) _, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s) if err != nil { //Logging with info as this is part of cleanup @@ -103,102 +106,37 @@ func (apiSub *Sub) cleanup() { } -// Returns active sub counts for each pubsub topic -func (apiSub *Sub) getTopicCounts() map[string]int { - // Buffered chan for sub aliveness results - type CheckResult struct { - sub *subscription.SubscriptionDetails - alive bool - } - checkResults := make(chan CheckResult, len(apiSub.subs)) - var wg sync.WaitGroup - - // Run pings asynchronously - for _, s := range apiSub.subs { - wg.Add(1) - go func(sub *subscription.SubscriptionDetails) { - defer wg.Done() - ctx, cancelFunc := context.WithTimeout(apiSub.ctx, FilterPingTimeout) - defer cancelFunc() - err := apiSub.wf.IsSubscriptionAlive(ctx, sub) - - apiSub.log.Debug("Check result:", zap.Any("subID", sub.ID), zap.Bool("result", err == nil)) - checkResults <- CheckResult{sub, err == nil} - }(s) - } - - // Collect healthy topic counts - topicCounts := make(map[string]int) - - topicMap, _ := protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter) - for _, t := range maps.Keys(topicMap) { - topicCounts[t] = 0 - } - wg.Wait() - close(checkResults) - for s := range checkResults { - if !s.alive { - // Close inactive subs - s.sub.Close() - delete(apiSub.subs, s.sub.ID) - } else { - topicCounts[s.sub.ContentFilter.PubsubTopic]++ - } - } - - return topicCounts -} - // Attempts to resubscribe on topics that lack subscriptions -func (apiSub *Sub) resubscribe(topicCounts map[string]int) { - - // Delete healthy topics - for t, cnt := range topicCounts { - if cnt == apiSub.Config.MaxPeers { - delete(topicCounts, t) - } - } - - if len(topicCounts) == 0 { - // All topics healthy, return - return - } - var wg sync.WaitGroup - +func (apiSub *Sub) resubscribe(failedPeer peer.ID) { // Re-subscribe asynchronously - newSubs := make(chan []*subscription.SubscriptionDetails) + existingSubCount := len(apiSub.subs) + apiSub.log.Debug("subscribing again", zap.Stringer("contentFilter", apiSub.ContentFilter), zap.Int("numPeers", apiSub.Config.MaxPeers-existingSubCount)) + var peersToExclude peer.IDSlice + peersToExclude = append(peersToExclude, failedPeer) + for _, sub := range apiSub.subs { + peersToExclude = append(peersToExclude, sub.PeerID) + } + subs, err := apiSub.subscribe(apiSub.ContentFilter, apiSub.Config.MaxPeers-existingSubCount, peersToExclude...) + if err != nil { + return + } //Not handling scenario where all requested subs are not received as that will get handled in next cycle. - for t, cnt := range topicCounts { - cFilter := protocol.ContentFilter{PubsubTopic: t, ContentTopics: apiSub.ContentFilter.ContentTopics} - wg.Add(1) - go func(count int) { - defer wg.Done() - subs, err := apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-count) - if err != nil { - return - } //Not handling scenario where all requested subs are not received as that will get handled in next cycle. - newSubs <- subs - }(cnt) - } - wg.Wait() - close(newSubs) apiSub.log.Debug("resubscribe(): before range newSubs") - for subs := range newSubs { - if subs != nil { - apiSub.multiplex(subs) - } - } - apiSub.log.Debug("checkAliveness(): close(newSubs)") - //close(newSubs) + + apiSub.multiplex(subs) } -func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int) ([]*subscription.SubscriptionDetails, error) { +func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) { // Low-level subscribe, returns a set of SubscriptionDetails options := make([]filter.FilterSubscribeOption, 0) options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) for _, p := range apiSub.Config.Peers { options = append(options, filter.WithPeer(p)) } + if len(peersToExclude) > 0 { + apiSub.log.Debug("subscribing with peersToExclude", zap.Stringer("peersToExclude", peersToExclude[0])) + options = append(options, filter.WithPeersToExclude(peersToExclude...)) + } subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...) if err != nil { @@ -206,7 +144,7 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int // Partial Failure, for now proceed as we don't expect this to happen wrt specific topics. // Rather it can happen in case subscription with one of the peer fails. // This can further get automatically handled at resubscribe, - apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err)) + apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err), zap.Int("successCount", len(subs))) return subs, nil } // In case of complete subscription failure, application or user needs to handle and probably retry based on error @@ -229,5 +167,11 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { apiSub.DataCh <- env } }(subDetails) + go func(subDetails *subscription.SubscriptionDetails) { + <-subDetails.Closing + apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID)) + + apiSub.closing <- subDetails.ID + }(subDetails) } } diff --git a/waku/v2/api/filter_test.go b/waku/v2/api/filter_test.go index 22432e7e..83551019 100644 --- a/waku/v2/api/filter_test.go +++ b/waku/v2/api/filter_test.go @@ -36,12 +36,13 @@ func (s *FilterApiTestSuite) TestSubscribe() { // We have one full node already created in SetupTest(), // create another one fullNodeData2 := s.GetWakuFilterFullNode(s.TestTopic, true) - s.ConnectHosts(s.LightNodeHost, fullNodeData2.FullNodeHost) + s.ConnectToFullNode(s.LightNode, fullNodeData2.FullNode) + //s.ConnectHosts(s.FullNodeHost, fullNodeData2.FullNodeHost) peers := []peer.ID{s.FullNodeHost.ID(), fullNodeData2.FullNodeHost.ID()} s.Log.Info("FullNodeHost IDs:", zap.Any("peers", peers)) // Make sure IDs are different - s.Require().True(peers[0] != peers[1]) - apiConfig := FilterConfig{MaxPeers: 2, Peers: peers} + //s.Require().True(peers[0] != peers[1]) + apiConfig := FilterConfig{MaxPeers: 2} s.Require().Equal(apiConfig.MaxPeers, 2) s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) @@ -68,7 +69,26 @@ func (s *FilterApiTestSuite) TestSubscribe() { } s.Require().Equal(cnt, 1) + //Verify HealthCheck + subs := s.LightNode.Subscriptions() + s.Require().Equal(2, len(subs)) + + s.Log.Info("stopping full node", zap.Stringer("id", fullNodeData2.FullNodeHost.ID())) + fullNodeData3 := s.GetWakuFilterFullNode(s.TestTopic, true) + + s.ConnectToFullNode(s.LightNode, fullNodeData3.FullNode) + + fullNodeData2.FullNode.Stop() + fullNodeData2.FullNodeHost.Close() time.Sleep(2 * time.Second) + subs = s.LightNode.Subscriptions() + + s.Require().Equal(2, len(subs)) + + for _, sub := range subs { + s.Require().NotEqual(fullNodeData2.FullNodeHost.ID(), sub.PeerID) + } + apiSub.Unsubscribe() for range apiSub.DataCh { } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index b4ec439d..3e995792 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -277,10 +277,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee //Need to filter peers to check if they support relay if inPeers.Len() != 0 { - inRelayPeers, _ = pm.FilterPeersByProto(inPeers, relay.WakuRelayID_v200) + inRelayPeers, _ = pm.FilterPeersByProto(inPeers, nil, relay.WakuRelayID_v200) } if outPeers.Len() != 0 { - outRelayPeers, _ = pm.FilterPeersByProto(outPeers, relay.WakuRelayID_v200) + outRelayPeers, _ = pm.FilterPeersByProto(outPeers, nil, relay.WakuRelayID_v200) } return } diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index a49a7b9b..60fa2d0f 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -3,8 +3,6 @@ package peermanager import ( "context" "crypto/rand" - "fmt" - "strings" "testing" "time" @@ -14,7 +12,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" - "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" @@ -26,19 +23,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) -func getAddr(h host.Host) multiaddr.Multiaddr { - id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().String())) - var selectedAddr multiaddr.Multiaddr - //For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses. - for _, addr := range h.Network().ListenAddresses() { - if strings.Contains(addr.String(), "p2p-circuit") { - continue - } - selectedAddr = addr - } - return selectedAddr.Encapsulate(id) -} - func initTest(t *testing.T) (context.Context, *PeerManager, func()) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // hosts @@ -72,7 +56,7 @@ func TestServiceSlots(t *testing.T) { // add h2 peer to peer manager t.Log(h2.ID()) - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(tests.GetAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) /////////////// @@ -86,7 +70,7 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, h2.ID(), peers[0]) // add h3 peer to peer manager - _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(tests.GetAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) // check that returned peer is h2 or h3 peer @@ -111,7 +95,7 @@ func TestServiceSlots(t *testing.T) { require.Error(t, err, ErrNoPeersAvailable) // add h4 peer for protocol1 - _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) + _, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) require.NoError(t, err) //Test peer selection for protocol1 @@ -139,10 +123,10 @@ func TestPeerSelection(t *testing.T) { defer h3.Close() protocol := libp2pProtocol.ID("test/protocol") - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(tests.GetAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(tests.GetAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) @@ -173,7 +157,7 @@ func TestPeerSelection(t *testing.T) { h4, err := tests.MakeHost(ctx, 0, rand.Reader) require.NoError(t, err) defer h4.Close() - _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3}) @@ -200,7 +184,7 @@ func TestDefaultProtocol(t *testing.T) { defer h5.Close() //Test peer selection for relay protocol from peer store - _, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200) + _, err = pm.AddPeer(tests.GetAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200) require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. @@ -221,7 +205,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { require.NoError(t, err) defer h6.Close() - _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) + _, err = pm.AddPeer(tests.GetAddr(h6), wps.Static, []string{""}, protocol2) require.NoError(t, err) peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index 4c1268bb..6726ebee 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -2,6 +2,7 @@ package peermanager import ( "context" + "encoding/json" "errors" "github.com/libp2p/go-libp2p/core/peer" @@ -12,7 +13,16 @@ import ( "golang.org/x/exp/maps" ) -type peerSet map[peer.ID]struct{} +type PeerSet map[peer.ID]struct{} + +func PeerInSet(peers PeerSet, peer peer.ID) bool { + if len(peers) > 0 { + if _, ok := peers[peer]; ok { + return true + } + } + return false +} // SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. // If a list of specific peers is passed, the peer will be chosen from that list assuming @@ -54,17 +64,19 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) return nil, err } else if len(peerIDs) == 0 { - peerIDs = make(peerSet) + peerIDs = make(PeerSet) } // if not found in serviceSlots or proto == WakuRelayIDv200 - filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) + pm.logger.Debug("looking for peers in peerStore", zap.String("proto", string(criteria.Proto))) + filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto) if err != nil { return nil, err } if len(criteria.PubsubTopics) > 0 { filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...) } - randomPeers, err := selectRandomPeers(filteredPeers, criteria.MaxPeers-len(peerIDs)) + //Not passing excludePeers as filterPeers are already considering excluded ones. + randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs)) if err != nil && len(peerIDs) == 0 { return nil, err } @@ -75,10 +87,13 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic return maps.Keys(peerIDs), nil } -func getRandom(filter peerSet, count int) (peerSet, error) { +func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) { i := 0 - selectedPeers := make(peerSet) + selectedPeers := make(PeerSet) for pID := range filter { + if PeerInSet(excludePeers, pID) { + continue + } //Map's iterator in golang works using randomness and hence not random function is being used. selectedPeers[pID] = struct{}{} i++ @@ -93,34 +108,37 @@ func getRandom(filter peerSet, count int) (peerSet, error) { } // selects count random peers from list of peers -func selectRandomPeers(peers peer.IDSlice, count int) (peerSet, error) { - filteredPeerMap := peerSliceToMap(peers) - return getRandom(filteredPeerMap, count) +func selectRandomPeers(peers peer.IDSlice, excludePeers PeerSet, count int) (PeerSet, error) { + filteredPeerMap := PeerSliceToMap(peers) + return getRandom(filteredPeerMap, count, excludePeers) } -func peerSliceToMap(peers peer.IDSlice) peerSet { - peerSet := make(peerSet, peers.Len()) +func PeerSliceToMap(peers peer.IDSlice) PeerSet { + peerSet := make(PeerSet, peers.Len()) for _, peer := range peers { peerSet[peer] = struct{}{} } return peerSet } -func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSet, error) { - peers := make(peerSet) +func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) { + peers := make(PeerSet) var err error for retryCnt := 0; retryCnt < 1; retryCnt++ { //Try to fetch from serviceSlot if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil { if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { - return slot.getRandom(criteria.MaxPeers) + return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers) } else { //PubsubTopic based selection keys := make([]peer.ID, 0, len(slot.m)) for i := range slot.m { + if PeerInSet(criteria.ExcludePeers, i) { + continue + } keys = append(keys, i) } selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...) - tmpPeers, err := selectRandomPeers(selectedPeers, criteria.MaxPeers) + tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers) for tmpPeer := range tmpPeers { peers[tmpPeer] = struct{}{} } @@ -145,12 +163,21 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSe // PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. type PeerSelectionCriteria struct { - SelectionType PeerSelection - Proto protocol.ID - PubsubTopics []string - SpecificPeers peer.IDSlice - MaxPeers int - Ctx context.Context + SelectionType PeerSelection `json:"selectionType"` + Proto protocol.ID `json:"protocolId"` + PubsubTopics []string `json:"pubsubTopics"` + SpecificPeers peer.IDSlice `json:"specificPeers"` + MaxPeers int `json:"maxPeerCount"` + Ctx context.Context `json:"-"` + ExcludePeers PeerSet `json:"excludePeers"` +} + +func (psc PeerSelectionCriteria) String() string { + pscJson, err := json.Marshal(psc) + if err != nil { + return "" + } + return string(pscJson) } // SelectPeers selects a peer based on selectionType specified. @@ -159,6 +186,12 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice if criteria.MaxPeers == 0 { criteria.MaxPeers = 1 } + excPeers := maps.Keys(criteria.ExcludePeers) + var excPeer peer.ID + if len(excPeers) > 0 { + excPeer = excPeers[0] + } + pm.logger.Debug("Select Peers", zap.Stringer("selectionCriteria", criteria), zap.Stringer("excludedPeers", excPeer)) switch criteria.SelectionType { case Automatic: return pm.SelectRandom(criteria) @@ -191,7 +224,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...) } - peers, err = pm.FilterPeersByProto(peers, criteria.Proto) + peers, err = pm.FilterPeersByProto(peers, criteria.ExcludePeers, criteria.Proto) if err != nil { return "", err } @@ -201,22 +234,25 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( // FilterPeersByProto filters list of peers that support specified protocols. // If specificPeers is nil, all peers in the host's peerStore are considered for filtering. -func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { +func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, excludePeers PeerSet, proto ...protocol.ID) (peer.IDSlice, error) { peerSet := specificPeers if len(peerSet) == 0 { peerSet = pm.host.Peerstore().Peers() } - var peers peer.IDSlice for _, peer := range peerSet { protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) if err != nil { return nil, err } - if len(protocols) > 0 { + //Maybe we can optimize below set of statements a better way?? + if PeerInSet(excludePeers, peer) { + continue + } peers = append(peers, peer) } } + pm.logger.Debug("peers selected", zap.Int("peerCnt", len(peers))) return peers, nil } diff --git a/waku/v2/peermanager/service_slot.go b/waku/v2/peermanager/service_slot.go index a5673df3..9fadca6d 100644 --- a/waku/v2/peermanager/service_slot.go +++ b/waku/v2/peermanager/service_slot.go @@ -19,10 +19,10 @@ func newPeerMap() *peerMap { } } -func (pm *peerMap) getRandom(count int) (peerSet, error) { +func (pm *peerMap) getRandom(count int, excludePeers PeerSet) (PeerSet, error) { pm.mu.RLock() defer pm.mu.RUnlock() - return getRandom(pm.m, count) + return getRandom(pm.m, count, excludePeers) } func (pm *peerMap) remove(pID peer.ID) { diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go index 5b7ac986..b85a2c6f 100644 --- a/waku/v2/peermanager/service_slot_test.go +++ b/waku/v2/peermanager/service_slot_test.go @@ -1,11 +1,12 @@ package peermanager import ( + "testing" + "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" "golang.org/x/exp/maps" - "testing" ) func TestServiceSlot(t *testing.T) { @@ -18,13 +19,13 @@ func TestServiceSlot(t *testing.T) { // slots.getPeers(protocol).add(peerID) // - fetchedPeers, err := slots.getPeers(protocol).getRandom(1) + fetchedPeers, err := slots.getPeers(protocol).getRandom(1, nil) require.NoError(t, err) require.Equal(t, peerID, maps.Keys(fetchedPeers)[0]) // slots.getPeers(protocol).remove(peerID) // - _, err = slots.getPeers(protocol).getRandom(1) + _, err = slots.getPeers(protocol).getRandom(1, nil) require.Equal(t, err, ErrNoPeersAvailable) // Test with more peers @@ -36,7 +37,7 @@ func TestServiceSlot(t *testing.T) { slots.getPeers(protocol).add(peerID3) // - fetchedPeers, err = slots.getPeers(protocol).getRandom(2) + fetchedPeers, err = slots.getPeers(protocol).getRandom(2, nil) require.NoError(t, err) require.Equal(t, 2, len(maps.Keys(fetchedPeers))) @@ -47,7 +48,7 @@ func TestServiceSlot(t *testing.T) { slots.getPeers(protocol).remove(peerID2) - fetchedPeers, err = slots.getPeers(protocol).getRandom(10) + fetchedPeers, err = slots.getPeers(protocol).getRandom(10, nil) require.NoError(t, err) require.Equal(t, peerID3, maps.Keys(fetchedPeers)[0]) @@ -65,15 +66,15 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) { slots.getPeers(protocol).add(peerID) slots.getPeers(protocol1).add(peerID) // - fetchedPeers, err := slots.getPeers(protocol1).getRandom(1) + fetchedPeers, err := slots.getPeers(protocol1).getRandom(1, nil) require.NoError(t, err) require.Equal(t, peerID, maps.Keys(fetchedPeers)[0]) // slots.removePeer(peerID) // - _, err = slots.getPeers(protocol).getRandom(1) + _, err = slots.getPeers(protocol).getRandom(1, nil) require.Equal(t, err, ErrNoPeersAvailable) - _, err = slots.getPeers(protocol1).getRandom(1) + _, err = slots.getPeers(protocol1).getRandom(1, nil) require.Equal(t, err, ErrNoPeersAvailable) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index cad12dc6..edbba8d3 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -43,13 +44,14 @@ var ( type WakuFilterLightNode struct { *service.CommonService - h host.Host - broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s - timesource timesource.Timesource - metrics Metrics - log *zap.Logger - subscriptions *subscription.SubscriptionsMap - pm *peermanager.PeerManager + h host.Host + broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s + timesource timesource.Timesource + metrics Metrics + log *zap.Logger + subscriptions *subscription.SubscriptionsMap + pm *peermanager.PeerManager + peerPingInterval time.Duration } type WakuFilterPushError struct { @@ -86,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM wf.pm = pm wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) - + wf.peerPingInterval = 5 * time.Second return wf } @@ -97,13 +99,15 @@ func (wf *WakuFilterLightNode) SetHost(h host.Host) { func (wf *WakuFilterLightNode) Start(ctx context.Context) error { return wf.CommonService.Start(ctx, wf.start) - } func (wf *WakuFilterLightNode) start() error { wf.subscriptions = subscription.NewSubscriptionMap(wf.log) wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context())) + //Start Filter liveness check + wf.CommonService.WaitGroup().Add(1) + go wf.FilterHealthCheckLoop() wf.log.Info("filter-push protocol started") return nil } @@ -313,24 +317,29 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, wf.pm.Connect(pData) params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) } - if params.pm != nil { + reqPeerCount := params.maxPeers - len(params.selectedPeers) - peerCount := params.maxPeers - len(params.selectedPeers) + if params.pm != nil && reqPeerCount > 0 { + wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude))) params.selectedPeers, err = wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, PubsubTopics: maps.Keys(pubSubTopicMap), SpecificPeers: params.preferredPeers, - MaxPeers: peerCount, + MaxPeers: reqPeerCount, Ctx: ctx, + ExcludePeers: params.peersToExclude, }, ) if err != nil { + wf.log.Error("peer selection returned err", zap.Error(err)) return nil, nil, err } } + wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers))) + return params, pubSubTopicMap, nil } @@ -354,7 +363,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { var selectedPeers peer.IDSlice + wf.log.Debug("peer selection", zap.Int("params.maxPeers", params.maxPeers)) + if params.pm != nil && len(params.selectedPeers) < params.maxPeers { + wf.log.Debug("selected peers less than maxPeers", zap.Int("maxpPeers", params.maxPeers)) selectedPeers, err = wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, @@ -363,6 +375,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot SpecificPeers: params.preferredPeers, MaxPeers: params.maxPeers - params.selectedPeers.Len(), Ctx: ctx, + ExcludePeers: params.peersToExclude, }, ) } else { @@ -375,7 +388,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics = append(failedContentTopics, cTopics...) continue } - var cFilter protocol.ContentFilter cFilter.PubsubTopic = pubSubTopic cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...) @@ -395,6 +407,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics = append(failedContentTopics, cTopics...) continue } + wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer)) subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter)) } } @@ -457,16 +470,6 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts .. peerID) } -func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error { - wf.RLock() - defer wf.RUnlock() - if err := wf.ErrOnNotRunning(); err != nil { - return err - } - - return wf.Ping(ctx, subscription.PeerID) -} - // Unsubscribe is used to stop receiving messages from specified peers for the content filter func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() diff --git a/waku/v2/protocol/filter/filter_health_check.go b/waku/v2/protocol/filter/filter_health_check.go new file mode 100644 index 00000000..11b9a720 --- /dev/null +++ b/waku/v2/protocol/filter/filter_health_check.go @@ -0,0 +1,48 @@ +package filter + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" +) + +func (wf *WakuFilterLightNode) PingPeers() { + //Send a ping to all the peers and report their status to corresponding subscriptions + // Alive or not or set state of subcription?? + for _, peer := range wf.subscriptions.GetSubscribedPeers() { + go wf.PingPeer(peer) + } +} + +func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { + ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), wf.peerPingInterval) + defer cancel() + err := wf.Ping(ctxWithTimeout, peer) + if err != nil { + wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) + + subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) + for _, subscription := range subscriptions { + wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) + + //Indicating that subscription is closing, + close(subscription.Closing) + } + } +} + +func (wf *WakuFilterLightNode) FilterHealthCheckLoop() { + defer wf.WaitGroup().Done() + ticker := time.NewTicker(wf.peerPingInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + wf.PingPeers() + case <-wf.CommonService.Context().Done(): + return + } + } +} diff --git a/waku/v2/protocol/filter/filter_subscribe_test.go b/waku/v2/protocol/filter/filter_subscribe_test.go index 17e23a27..112718ee 100644 --- a/waku/v2/protocol/filter/filter_subscribe_test.go +++ b/waku/v2/protocol/filter/filter_subscribe_test.go @@ -324,7 +324,7 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() { s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) nodeData := s.GetWakuFilterFullNode(testTopic, false) - fullNode2 := nodeData.fullNode + fullNode2 := nodeData.FullNode // Connect nodes fullNode2.h.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) @@ -357,29 +357,6 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() { } -func (s *FilterTestSuite) TestIsSubscriptionAlive() { - messages := s.prepareData(2, false, true, false, nil) - - // Subscribe with the first message only - s.subscribe(messages[0].PubSubTopic, messages[0].ContentTopic, s.FullNodeHost.ID()) - - // IsSubscriptionAlive returns no error for the first message - err := s.LightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0]) - s.Require().NoError(err) - - // Create new host/peer - not related to any node - host, err := tests.MakeHost(context.Background(), 54321, rand.Reader) - s.Require().NoError(err) - - // Alter the existing peer ID in sub details - s.subDetails[0].PeerID = host.ID() - - // IsSubscriptionAlive returns error for the second message, peer ID doesn't match - err = s.LightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0]) - s.Require().Error(err) - -} - func (s *FilterTestSuite) TestFilterSubscription() { contentFilter := protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index d3117118..5fa0c413 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -7,9 +7,7 @@ import ( "testing" "time" - "github.com/libp2p/go-libp2p/core/peerstore" "github.com/stretchr/testify/suite" - "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" @@ -46,6 +44,7 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { s.Require().NoError(err) result, err := s.LightNode.Unsubscribe(s.ctx, contentFilter, DontWait()) + s.Require().NoError(err) s.Require().Equal(0, len(result.Errors())) @@ -92,7 +91,7 @@ func (s *FilterTestSuite) TestAutoShard() { //Workaround as could not find a way to reuse setup test with params // Stop what is run in setup - s.fullNode.Stop() + s.FullNode.Stop() s.LightNode.Stop() ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds s.ctx = ctx @@ -109,10 +108,7 @@ func (s *FilterTestSuite) TestAutoShard() { s.MakeWakuFilterLightNode() s.StartLightNode() s.MakeWakuFilterFullNode(pubSubTopic.String(), false) - - s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) - err = s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) - s.Require().NoError(err) + s.ConnectToFullNode(s.LightNode, s.FullNode) s.Log.Info("Testing Autoshard:CreateSubscription") s.subscribe("", s.TestContentTopic, s.FullNodeHost.ID()) @@ -210,7 +206,7 @@ func (s *FilterTestSuite) TestStaticSharding() { s.MakeWakuFilterFullNode(s.TestTopic, false) // Connect nodes - s.ConnectHosts(s.LightNodeHost, s.FullNodeHost) + s.ConnectToFullNode(s.LightNode, s.FullNode) s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) diff --git a/waku/v2/protocol/filter/filter_unsubscribe_test.go b/waku/v2/protocol/filter/filter_unsubscribe_test.go index cac86615..45970ca5 100644 --- a/waku/v2/protocol/filter/filter_unsubscribe_test.go +++ b/waku/v2/protocol/filter/filter_unsubscribe_test.go @@ -80,7 +80,7 @@ func (s *FilterTestSuite) TestUnsubscribeMultiPubSubMultiContentTopic() { s.MakeWakuFilterFullNode(s.TestTopic, true) // Connect nodes - s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNode.h), peerstore.PermanentAddrTTL) err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) @@ -211,7 +211,7 @@ func (s *FilterTestSuite) TestUnsubscribeAllDiffPubSubContentTopics() { s.MakeWakuFilterFullNode(s.TestTopic, true) // Connect nodes - s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNode.h), peerstore.PermanentAddrTTL) err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index afb9f2f1..8743218b 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -39,6 +39,7 @@ type ( peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice + peersToExclude peermanager.PeerSet maxPeers int requestID []byte log *zap.Logger @@ -101,6 +102,14 @@ func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption { } } +// WithPeersToExclude option excludes the peers that are specified from selection +func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { + params.peersToExclude = peermanager.PeerSliceToMap(peers) + return nil + } +} + // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. // If a list of specific peers is passed, the peer will be chosen from that list assuming it // supports the chosen protocol, otherwise it will chose a peer from the node peerstore @@ -145,6 +154,7 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption { return []FilterSubscribeOption{ WithAutomaticPeerSelection(), WithAutomaticRequestID(), + WithMaxPeersPerContentFilter(1), } } diff --git a/waku/v2/protocol/filter/test_utils.go b/waku/v2/protocol/filter/test_utils.go index 2784c16b..2beabc2d 100644 --- a/waku/v2/protocol/filter/test_utils.go +++ b/waku/v2/protocol/filter/test_utils.go @@ -10,10 +10,11 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/peermanager" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" @@ -32,7 +33,7 @@ type FullNodeData struct { RelaySub *relay.Subscription FullNodeHost host.Host Broadcaster relay.Broadcaster - fullNode *WakuFilterFullNode + FullNode *WakuFilterFullNode } type FilterTestSuite struct { @@ -78,27 +79,29 @@ func (s *FilterTestSuite) SetupTest() { s.TestContentTopic = DefaultTestContentTopic s.MakeWakuFilterLightNode() + s.LightNode.peerPingInterval = 1 * time.Second s.StartLightNode() //TODO: Add tests to verify broadcaster. s.MakeWakuFilterFullNode(s.TestTopic, false) - s.ConnectHosts(s.LightNodeHost, s.FullNodeHost) + s.ConnectToFullNode(s.LightNode, s.FullNode) } func (s *FilterTestSuite) TearDownTest() { - s.fullNode.Stop() + s.FullNode.Stop() s.LightNode.Stop() s.RelaySub.Unsubscribe() s.LightNode.Stop() s.ctxCancel() } -func (s *FilterTestSuite) ConnectHosts(h1, h2 host.Host) { - h1.Peerstore().AddAddr(h2.ID(), tests.GetHostAddress(h2), peerstore.PermanentAddrTTL) - err := h1.Peerstore().AddProtocols(h2.ID(), FilterSubscribeID_v20beta1) +func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) { + mAddr := tests.GetAddr(h2.h) + _, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1) + s.Log.Info("add peer", zap.Stringer("mAddr", mAddr)) s.Require().NoError(err) } @@ -142,7 +145,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo err := node2Filter.Start(s.ctx, sub) s.Require().NoError(err) - nodeData.fullNode = node2Filter + nodeData.FullNode = node2Filter return nodeData } @@ -161,9 +164,10 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { s.Require().NoError(err) b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) - filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + pm := peermanager.NewPeerManager(5, 5, nil, s.Log) + filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) filterPush.SetHost(host) - + pm.SetHost(host) return LightNodeData{filterPush, host} } diff --git a/waku/v2/protocol/subscription/subscription_details.go b/waku/v2/protocol/subscription/subscription_details.go index 936aeecd..f2ec8870 100644 --- a/waku/v2/protocol/subscription/subscription_details.go +++ b/waku/v2/protocol/subscription/subscription_details.go @@ -25,10 +25,11 @@ type PeerContentFilter struct { type SubscriptionDetails struct { sync.RWMutex - ID string `json:"subscriptionID"` - mapRef *SubscriptionsMap - Closed bool `json:"-"` - once sync.Once + ID string `json:"subscriptionID"` + mapRef *SubscriptionsMap + Closed bool `json:"-"` + once sync.Once + Closing chan struct{} PeerID peer.ID `json:"peerID"` ContentFilter protocol.ContentFilter `json:"contentFilters"` @@ -96,7 +97,6 @@ func (s *SubscriptionDetails) CloseC() { s.once.Do(func() { s.Lock() defer s.Unlock() - s.Closed = true close(s.C) }) diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index a538912e..c308d9bb 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -75,6 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content PeerID: peerID, C: make(chan *protocol.Envelope, 1024), ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, + Closing: make(chan struct{}), } // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair @@ -218,6 +219,30 @@ func (m *SubscriptionsMap) GetSubscriptionsForPeer(peerID peer.ID, contentFilter return output } +func (m *SubscriptionsMap) GetAllSubscriptionsForPeer(peerID peer.ID) []*SubscriptionDetails { + m.RLock() + defer m.RUnlock() + + var output []*SubscriptionDetails + for _, peerSubs := range m.items { + if peerSubs.PeerID == peerID { + for _, subs := range peerSubs.SubsPerPubsubTopic { + for _, subscriptionDetail := range subs { + output = append(output, subscriptionDetail) + } + } + break + } + } + return output +} + +func (m *SubscriptionsMap) GetSubscribedPeers() peer.IDSlice { + m.RLock() + defer m.RUnlock() + return maps.Keys(m.items) +} + func (m *SubscriptionsMap) GetAllSubscriptions() []*SubscriptionDetails { return m.GetSubscriptionsForPeer("", protocol.ContentFilter{}) }