From 9dfc5ccc4df2d0f7d6a346f3e6eab8954cc198ed Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 26 Jan 2024 14:15:15 +0530 Subject: [PATCH] feat: support multiple peer selection for filter client (#1005) --- cmd/waku/server/rest/filter.go | 4 +- cmd/waku/server/rest/lightpush_rest_test.go | 1 - waku/v2/peermanager/peer_manager_test.go | 73 ++++++--- waku/v2/peermanager/peer_selection.go | 127 ++++++++++----- waku/v2/peermanager/service_slot.go | 8 +- waku/v2/peermanager/service_slot_test.go | 17 +- waku/v2/protocol/filter/client.go | 146 +++++++++++------- .../filter/filter_proto_ident_test.go | 12 +- waku/v2/protocol/filter/options.go | 18 ++- waku/v2/protocol/filter/options_test.go | 6 +- waku/v2/protocol/legacy_filter/waku_filter.go | 5 +- waku/v2/protocol/lightpush/waku_lightpush.go | 9 +- waku/v2/protocol/peer_exchange/client.go | 4 +- waku/v2/protocol/store/waku_store_client.go | 4 +- .../subscription/subscriptions_map.go | 8 +- 15 files changed, 284 insertions(+), 158 deletions(-) diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index 76dd0f53..7973cc02 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -325,7 +325,7 @@ func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request) func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId string, w http.ResponseWriter) peer.ID { // selecting random peer that supports filter protocol - peerId, err := s.node.PeerManager().SelectPeer(peermanager.PeerSelectionCriteria{ + peerIds, err := s.node.PeerManager().SelectPeers(peermanager.PeerSelectionCriteria{ SelectionType: peermanager.Automatic, Proto: filter.FilterSubscribeID_v20beta1, Ctx: ctx, @@ -338,7 +338,7 @@ func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId string }, http.StatusServiceUnavailable) return "" } - return peerId + return peerIds[0] } func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *http.Request) { diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go index 95b89e66..54173909 100644 --- a/cmd/waku/server/rest/lightpush_rest_test.go +++ b/cmd/waku/server/rest/lightpush_rest_test.go @@ -27,7 +27,6 @@ func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNod require.NoError(t, err) err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) require.NoError(t, err) - return node1, node2 } diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 5b289093..e4de0cc5 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" @@ -78,17 +79,19 @@ func TestServiceSlots(t *testing.T) { // getting peer for protocol /////////////// + var peerID peer.ID // select peer from pm, currently only h2 is set in pm - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) + peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - require.Equal(t, peerID, h2.ID()) + require.Equal(t, h2.ID(), peers[0]) // add h3 peer to peer manager _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) // check that returned peer is h2 or h3 peer - peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) + peers, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) + peerID = peers[0] require.NoError(t, err) if peerID == h2.ID() || peerID == h3.ID() { //Test success @@ -104,7 +107,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) defer h4.Close() - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) + _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) require.Error(t, err, ErrNoPeersAvailable) // add h4 peer for protocol1 @@ -112,7 +115,9 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) //Test peer selection for protocol1 - peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) + peers, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) + peerID = peers[0] + require.NoError(t, err) require.Equal(t, peerID, h4.ID()) @@ -140,22 +145,42 @@ func TestPeerSelection(t *testing.T) { _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) + _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}}) + peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}}) require.NoError(t, err) - require.Equal(t, h2.ID(), peerID) + require.Equal(t, h2.ID(), peerIDs[0]) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) + _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) + _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) //Test for selectWithLowestRTT - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) + _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) + + peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2}) + require.Equal(t, 2, peerIDs.Len()) + require.NoError(t, err) + + peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3}) + fmt.Println("peerIDs", peerIDs) + require.Equal(t, 2, peerIDs.Len()) + require.NoError(t, err) + + h4, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h4.Close() + _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) + require.NoError(t, err) + + peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3}) + require.Equal(t, 3, peerIDs.Len()) + require.NoError(t, err) + } func TestDefaultProtocol(t *testing.T) { @@ -165,7 +190,7 @@ func TestDefaultProtocol(t *testing.T) { // check peer for default protocol /////////////// //Test empty peer selection for relay protocol - _, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) + _, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) require.Error(t, err, ErrNoPeersAvailable) /////////////// @@ -180,9 +205,9 @@ func TestDefaultProtocol(t *testing.T) { require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) + peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) require.NoError(t, err) - require.Equal(t, peerID, h5.ID()) + require.Equal(t, h5.ID(), peerIDs[0]) } func TestAdditionAndRemovalOfPeer(t *testing.T) { @@ -200,12 +225,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) + peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) require.NoError(t, err) - require.Equal(t, peerID, h6.ID()) + require.Equal(t, peers[0], h6.ID()) - pm.RemovePeer(peerID) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) + pm.RemovePeer(peers[0]) + _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) require.Error(t, err, ErrNoPeersAvailable) } @@ -287,10 +312,10 @@ func TestOnDemandPeerDiscovery(t *testing.T) { require.NoError(t, err) //Discovery should fail for non-waku protocol - _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"}) + _, err = pm3.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"}) require.Error(t, err) - _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"}) + _, err = pm3.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"}) require.Error(t, err) ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) @@ -299,16 +324,16 @@ func TestOnDemandPeerDiscovery(t *testing.T) { var enrField uint8 enrField |= (1 << 1) pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField) - peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) + peerIDs, err := pm3.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) require.NoError(t, err) - require.Equal(t, peerID, host2.ID()) + require.Equal(t, host2.ID(), peerIDs[0]) var enrField1 uint8 enrField1 |= (1 << 3) pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1) - peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) + peerIDs, err = pm3.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) require.NoError(t, err) - require.Equal(t, peerID, host1.ID()) + require.Equal(t, host1.ID(), peerIDs[0]) } diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index bfaab97a..2b4c3807 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -3,7 +3,6 @@ package peermanager import ( "context" "errors" - "math/rand" "sync" "time" @@ -14,8 +13,11 @@ import ( wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" + "golang.org/x/exp/maps" ) +type peerSet map[peer.ID]struct{} + // SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. @@ -30,7 +32,11 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic } pubsubTopics = append(pubsubTopics, pubsubTopic) } - return pm.SelectPeer(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers}) + peers, err := pm.SelectPeers(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers}) + if err != nil { + return "", err + } + return peers[0], nil } // SelectRandomPeer is used to return a random peer that supports a given protocol. @@ -38,65 +44,107 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic // it supports the chosen protocol, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore // if pubSubTopic is specified, peer is selected from list that support the pubSubTopic -func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) { +func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlice, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: // - which topics they track // - latency? - - peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopics, criteria.Ctx, criteria.SpecificPeers...) - if err == nil { - return peerID, nil + peerIDs, err := pm.selectServicePeer(criteria) + if err == nil && len(peerIDs) == criteria.MaxPeers { + return maps.Keys(peerIDs), nil } else if !errors.Is(err, ErrNoPeersAvailable) { pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) - return "", err + return nil, err + } else if len(peerIDs) == 0 { + peerIDs = make(peerSet) } - // if not found in serviceSlots or proto == WakuRelayIDv200 filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) if err != nil { - return "", err + return nil, err } if len(criteria.PubsubTopics) > 0 { filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...) } - return selectRandomPeer(filteredPeers, pm.logger) + randomPeers, err := selectRandomPeers(filteredPeers, criteria.MaxPeers-len(peerIDs)) + if err != nil && len(peerIDs) == 0 { + return nil, err + } + + for tmpPeer := range randomPeers { + peerIDs[tmpPeer] = struct{}{} + } + return maps.Keys(peerIDs), nil } -func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopics []string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { - var peerID peer.ID +func getRandom(filter peerSet, count int) (peerSet, error) { + i := 0 + selectedPeers := make(peerSet) + for pID := range filter { + //Map's iterator in golang works using randomness and hence not random function is being used. + selectedPeers[pID] = struct{}{} + i++ + if i == count { + break + } + } + if len(selectedPeers) == 0 { + return nil, ErrNoPeersAvailable + } + return selectedPeers, nil +} + +// selects count random peers from list of peers +func selectRandomPeers(peers peer.IDSlice, count int) (peerSet, error) { + filteredPeerMap := peerSliceToMap(peers) + return getRandom(filteredPeerMap, count) +} + +func peerSliceToMap(peers peer.IDSlice) peerSet { + peerSet := make(peerSet, peers.Len()) + for _, peer := range peers { + peerSet[peer] = struct{}{} + } + return peerSet +} + +func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSet, error) { + peers := make(peerSet) var err error for retryCnt := 0; retryCnt < 1; retryCnt++ { //Try to fetch from serviceSlot - if slot := pm.serviceSlots.getPeers(proto); slot != nil { - if len(pubsubTopics) == 0 || (len(pubsubTopics) == 1 && pubsubTopics[0] == "") { - return slot.getRandom() + if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil { + if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { + return slot.getRandom(criteria.MaxPeers) } else { //PubsubTopic based selection keys := make([]peer.ID, 0, len(slot.m)) for i := range slot.m { keys = append(keys, i) } - selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(pubsubTopics, keys...) - peerID, err = selectRandomPeer(selectedPeers, pm.logger) - if err == nil { - return peerID, nil + selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...) + tmpPeers, err := selectRandomPeers(selectedPeers, criteria.MaxPeers) + for tmpPeer := range tmpPeers { + peers[tmpPeer] = struct{}{} + } + if err == nil && len(peers) == criteria.MaxPeers { + return peers, nil } else { - pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics)) + pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics)) //Trigger on-demand discovery for this topic and connect to peer immediately. //For now discover atleast 1 peer for the criteria - pm.discoverPeersByPubsubTopics(pubsubTopics, proto, ctx, 1) + pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1) //Try to fetch peers again. continue } } } } - if peerID == "" { + if len(peers) == 0 { pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err)) } - return "", ErrNoPeersAvailable + return peers, ErrNoPeersAvailable } // PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. @@ -105,20 +153,28 @@ type PeerSelectionCriteria struct { Proto protocol.ID PubsubTopics []string SpecificPeers peer.IDSlice + MaxPeers int Ctx context.Context } -// SelectPeer selects a peer based on selectionType specified. +// SelectPeers selects a peer based on selectionType specified. // Context is required only in case of selectionType set to LowestRTT -func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { - +func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice, error) { + if criteria.MaxPeers == 0 { + criteria.MaxPeers = 1 + } switch criteria.SelectionType { case Automatic: - return pm.SelectRandomPeer(criteria) + return pm.SelectRandom(criteria) case LowestRTT: - return pm.SelectPeerWithLowestRTT(criteria) + peerID, err := pm.SelectPeerWithLowestRTT(criteria) + if err != nil { + return nil, err + } + //TODO: Update this once peer Ping cache PR is merged into this code. + return []peer.ID{peerID}, nil default: - return "", errors.New("unknown peer selection type specified") + return nil, errors.New("unknown peer selection type specified") } } @@ -198,17 +254,6 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( } } -// selectRandomPeer selects randomly a peer from the list of peers passed. -func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { - if len(peers) >= 1 { - peerID := peers[rand.Intn(len(peers))] - // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned - return peerID, nil // nolint: gosec - } - - return "", ErrNoPeersAvailable -} - // FilterPeersByProto filters list of peers that support specified protocols. // If specificPeers is nil, all peers in the host's peerStore are considered for filtering. func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { diff --git a/waku/v2/peermanager/service_slot.go b/waku/v2/peermanager/service_slot.go index df63b7ac..a5673df3 100644 --- a/waku/v2/peermanager/service_slot.go +++ b/waku/v2/peermanager/service_slot.go @@ -19,14 +19,10 @@ func newPeerMap() *peerMap { } } -func (pm *peerMap) getRandom() (peer.ID, error) { +func (pm *peerMap) getRandom(count int) (peerSet, error) { pm.mu.RLock() defer pm.mu.RUnlock() - for pID := range pm.m { - return pID, nil - } - return "", ErrNoPeersAvailable - + return getRandom(pm.m, count) } func (pm *peerMap) remove(pID peer.ID) { diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go index 493685a7..a9d944a7 100644 --- a/waku/v2/peermanager/service_slot_test.go +++ b/waku/v2/peermanager/service_slot_test.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" ) func TestServiceSlot(t *testing.T) { @@ -18,14 +19,14 @@ func TestServiceSlot(t *testing.T) { // slots.getPeers(protocol).add(peerID) // - fetchedPeer, err := slots.getPeers(protocol).getRandom() + fetchedPeers, err := slots.getPeers(protocol).getRandom(1) require.NoError(t, err) - require.Equal(t, peerID, fetchedPeer) - + require.Equal(t, peerID, maps.Keys(fetchedPeers)[0]) + //TODO: Add test to get more than 1 peers // slots.getPeers(protocol).remove(peerID) // - _, err = slots.getPeers(protocol).getRandom() + _, err = slots.getPeers(protocol).getRandom(1) require.Equal(t, err, ErrNoPeersAvailable) } @@ -41,15 +42,15 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) { slots.getPeers(protocol).add(peerID) slots.getPeers(protocol1).add(peerID) // - fetchedPeer, err := slots.getPeers(protocol1).getRandom() + fetchedPeers, err := slots.getPeers(protocol1).getRandom(1) require.NoError(t, err) - require.Equal(t, peerID, fetchedPeer) + require.Equal(t, peerID, maps.Keys(fetchedPeers)[0]) // slots.removePeer(peerID) // - _, err = slots.getPeers(protocol).getRandom() + _, err = slots.getPeers(protocol).getRandom(1) require.Equal(t, err, ErrNoPeersAvailable) - _, err = slots.getPeers(protocol1).getRandom() + _, err = slots.getPeers(protocol1).getRandom(1) require.Equal(t, err, ErrNoPeersAvailable) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index b6fc3092..c03a2682 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -38,6 +38,7 @@ const FilterPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-push/2.0.0-bet var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") ErrSubscriptionNotFound = errors.New("subscription not found") + ErrNoPeersSpecified = errors.New("no peers specified to unsubscribe") ) type WakuFilterLightNode struct { @@ -211,10 +212,10 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, wf.subscriptions.Notify(remotePeerID, envelope) } -func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, - reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error { +func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, + reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peer peer.ID) error { request := &pb.FilterSubscribeRequest{ - RequestId: hex.EncodeToString(params.requestID), + RequestId: hex.EncodeToString(requestID), FilterSubscribeType: reqType, PubsubTopic: &contentFilter.PubsubTopic, ContentTopics: contentFilter.ContentTopicsList(), @@ -225,9 +226,9 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return err } - logger := wf.log.With(logging.HostID("peerID", params.selectedPeer)) + logger := wf.log.With(logging.HostID("peerID", peer)) - stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) + stream, err := wf.h.NewStream(ctx, peer, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) return err @@ -314,15 +315,19 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, return nil, nil, err } wf.pm.Connect(pData) - params.selectedPeer = pData.AddrInfo.ID + params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) } - if params.pm != nil && params.selectedPeer == "" { - params.selectedPeer, err = wf.pm.SelectPeer( + if params.pm != nil { + + peerCount := params.maxPeers - len(params.selectedPeers) + + params.selectedPeers, err = wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, PubsubTopics: maps.Keys(pubSubTopicMap), SpecificPeers: params.preferredPeers, + MaxPeers: peerCount, Ctx: ctx, }, ) @@ -352,21 +357,22 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics := []string{} subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { - var selectedPeer peer.ID - if params.pm != nil && params.selectedPeer == "" { - selectedPeer, err = wf.pm.SelectPeer( + var selectedPeers peer.IDSlice + if params.pm != nil && len(params.selectedPeers) < params.maxPeers { + selectedPeers, err = wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, PubsubTopics: []string{pubSubTopic}, SpecificPeers: params.preferredPeers, + MaxPeers: params.maxPeers - params.selectedPeers.Len(), Ctx: ctx, }, ) } else { - selectedPeer = params.selectedPeer + selectedPeers = params.selectedPeers } - if selectedPeer == "" { + if len(selectedPeers) == 0 { wf.metrics.RecordError(peerNotFoundFailure) wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Error(err)) @@ -379,20 +385,22 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...) paramsCopy := params.Copy() - paramsCopy.selectedPeer = selectedPeer - err := wf.request( - ctx, - paramsCopy, - pb.FilterSubscribeRequest_SUBSCRIBE, - cFilter, - ) - if err != nil { - wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), - zap.Error(err)) - failedContentTopics = append(failedContentTopics, cTopics...) - continue + paramsCopy.selectedPeers = selectedPeers + for _, peer := range selectedPeers { + err := wf.request( + ctx, + params.requestID, + pb.FilterSubscribeRequest_SUBSCRIBE, + cFilter, + peer) + if err != nil { + wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + continue + } + subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter)) } - subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter)) } if len(failedContentTopics) > 0 { @@ -427,7 +435,6 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO return nil, err } } - return params, nil } @@ -448,9 +455,10 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts .. return wf.request( ctx, - &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, + params.requestID, pb.FilterSubscribeRequest_SUBSCRIBER_PING, - protocol.ContentFilter{}) + protocol.ContentFilter{}, + peerID) } func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error { @@ -463,7 +471,7 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip return wf.Ping(ctx, subscription.PeerID) } -// Unsubscribe is used to stop receiving messages from a peer that match a content filter +// Unsubscribe is used to stop receiving messages from specified peers for the content filter func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() @@ -487,7 +495,6 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr if err != nil { return nil, err } - pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) if err != nil { return nil, err @@ -495,16 +502,33 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr result := &WakuFilterPushResult{} for pTopic, cTopics := range pubSubTopicMap { cFilter := protocol.NewContentFilter(pTopic, cTopics...) - - peers := make(map[peer.ID]struct{}) - subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter) + var subs []*subscription.SubscriptionDetails + if params.selectedPeers.Len() == 0 { + subs = wf.subscriptions.GetAllSubscriptions() + if len(subs) == 0 { + result.Add(WakuFilterPushError{ + Err: ErrSubscriptionNotFound, + PeerID: "", + }) + continue + } + } + for _, peer := range params.selectedPeers { + subsForPeer := wf.subscriptions.GetSubscriptionsForPeer(peer, cFilter) + if len(subsForPeer) == 0 { + result.Add(WakuFilterPushError{ + Err: ErrSubscriptionNotFound, + PeerID: peer, + }) + continue + } + subs = append(subs, subsForPeer...) + } if len(subs) == 0 { - result.Add(WakuFilterPushError{ - Err: ErrSubscriptionNotFound, - PeerID: params.selectedPeer, - }) + //No subscriptions found for this filter continue } + peers := make(map[peer.ID]struct{}) for _, sub := range subs { sub.Remove(cTopics...) peers[sub.PeerID] = struct{}{} @@ -520,7 +544,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr params.wg.Done() } }() - err := wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, cFilter) + err := wf.unsubscribeFromServer(ctx, params.requestID, peerID, cFilter) if params.wg != nil { result.Add(WakuFilterPushError{ @@ -539,7 +563,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr } func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails { - subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{}) + subs := wf.subscriptions.GetAllSubscriptions() return subs } @@ -571,8 +595,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) { // Last sub for this [peer, contentFilter] pair - params.selectedPeer = sub.PeerID - err = wf.unsubscribeFromServer(ctx, params, sub.ContentFilter) + err = wf.unsubscribeFromServer(ctx, params.requestID, sub.PeerID, sub.ContentFilter) result.Add(WakuFilterPushError{ Err: err, PeerID: sub.PeerID, @@ -582,14 +605,14 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, } -func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter protocol.ContentFilter) error { - err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter) +func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, requestID []byte, peer peer.ID, cFilter protocol.ContentFilter) error { + err := wf.request(ctx, requestID, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter, peer) if err != nil { ferr, ok := err.(*FilterError) if ok && ferr.Code == http.StatusNotFound { - wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", params.selectedPeer), zap.Error(err)) + wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peer), zap.Error(err)) } else { - wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", params.selectedPeer), zap.Error(err)) + wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peer), zap.Error(err)) } } @@ -606,12 +629,25 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte result := &WakuFilterPushResult{} peers := make(map[peer.ID]struct{}) - subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{}) - if len(subs) == 0 && params.selectedPeer != "" { - result.Add(WakuFilterPushError{ - Err: err, - PeerID: params.selectedPeer, - }) + var subs []*subscription.SubscriptionDetails + if params.selectedPeers.Len() == 0 { + subs = wf.subscriptions.GetAllSubscriptions() + if len(subs) == 0 { + return result, nil + } + } + for _, peer := range params.selectedPeers { + pSubs := wf.subscriptions.GetSubscriptionsForPeer(peer, protocol.ContentFilter{}) + if len(pSubs) == 0 { + result.Add(WakuFilterPushError{ + Err: ErrSubscriptionNotFound, + PeerID: peer, + }) + continue + } + subs = append(subs, pSubs...) + } + if len(subs) == 0 { return result, ErrSubscriptionNotFound } for _, sub := range subs { @@ -630,13 +666,11 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte _ = recover() }() - paramsCopy := params.Copy() - paramsCopy.selectedPeer = peerID err := wf.request( ctx, - params, + params.requestID, pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, - protocol.ContentFilter{}) + protocol.ContentFilter{}, peerID) if err != nil { wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index 233fa98e..a65da8a0 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -83,7 +83,7 @@ func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, pa const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd") - conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_Incorrect1) + conn, err := wf.h.NewStream(ctx, params.selectedPeers[0], FilterSubscribeID_Incorrect1) if err != nil { wf.metrics.RecordError(dialFailure) return err @@ -173,8 +173,9 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi for pubSubTopic, cTopics := range pubSubTopicMap { var selectedPeer peer.ID //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic - if params.pm != nil && params.selectedPeer == "" { - selectedPeer, err = wf.pm.SelectPeer( + if params.pm != nil && params.selectedPeers[0] == "" { + var selectedPeers peer.IDSlice + selectedPeers, err = wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, @@ -183,8 +184,11 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi Ctx: ctx, }, ) + if err != nil { + selectedPeer = selectedPeers[0] + } } else { - selectedPeer = params.selectedPeer + selectedPeer = params.selectedPeers[0] } if selectedPeer == "" { diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 93c5326b..f151eb71 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -15,8 +15,8 @@ import ( func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters { return &FilterSubscribeParameters{ - selectedPeer: old.selectedPeer, - requestID: old.requestID, + selectedPeers: old.selectedPeers, + requestID: old.requestID, } } @@ -35,10 +35,11 @@ func WithPingRequestId(requestId []byte) FilterPingOption { type ( FilterSubscribeParameters struct { - selectedPeer peer.ID + selectedPeers peer.IDSlice peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice + maxPeers int requestID []byte log *zap.Logger @@ -72,7 +73,7 @@ func WithTimeout(timeout time.Duration) Option { // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. func WithPeer(p peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) error { - params.selectedPeer = p + params.selectedPeers = append(params.selectedPeers, p) if params.peerAddr != nil { return errors.New("peerAddr and peerId options are mutually exclusive") } @@ -86,13 +87,20 @@ func WithPeer(p peer.ID) FilterSubscribeOption { func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption { return func(params *FilterSubscribeParameters) error { params.peerAddr = pAddr - if params.selectedPeer != "" { + if len(params.selectedPeers) != 0 { return errors.New("peerAddr and peerId options are mutually exclusive") } return nil } } +func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { + params.maxPeers = numPeers + return nil + } +} + // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. // If a list of specific peers is passed, the peer will be chosen from that list assuming it // supports the chosen protocol, otherwise it will chose a peer from the node peerstore diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index 71d19500..a8a36148 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -35,7 +35,7 @@ func TestFilterOption(t *testing.T) { } require.Equal(t, host, params.host) - require.NotNil(t, params.selectedPeer) + require.NotEqual(t, 0, params.selectedPeers) // Unsubscribe options options2 := []FilterSubscribeOption{ @@ -51,7 +51,7 @@ func TestFilterOption(t *testing.T) { require.NoError(t, err) } - require.NotNil(t, params2.selectedPeer) + require.NotEqual(t, 0, params2.selectedPeers) require.True(t, params2.unsubscribeAll) // Mutually Exclusive options @@ -73,7 +73,7 @@ func TestFilterOption(t *testing.T) { } } - require.NotNil(t, params2.selectedPeer) + require.NotEqual(t, 0, params2.selectedPeers) require.True(t, params2.unsubscribeAll) } diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 0d87f718..116b1be6 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -252,7 +252,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil opt(params) } if wf.pm != nil && params.selectedPeer == "" { - params.selectedPeer, _ = wf.pm.SelectPeer( + selectedPeers, _ := wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterID_v20beta1, @@ -261,6 +261,9 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil Ctx: ctx, }, ) + if err != nil { + params.selectedPeer = selectedPeers[0] + } } if params.selectedPeer == "" { wf.metrics.RecordError(peerNotFoundFailure) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 322b4d9c..aef39113 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" @@ -264,7 +265,9 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe } if params.pm != nil && params.selectedPeer == "" { - params.selectedPeer, err = wakuLP.pm.SelectPeer( + var selectedPeers peer.IDSlice + //TODO: update this to work with multiple peer selection + selectedPeers, err = wakuLP.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: LightPushID_v20beta1, @@ -273,6 +276,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe Ctx: ctx, }, ) + if err == nil { + params.selectedPeer = selectedPeers[0] + } + } if params.selectedPeer == "" { if err != nil { diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 2d8564fb..0f9037b9 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -43,8 +43,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts } if params.pm != nil && params.selectedPeer == "" { - var err error - params.selectedPeer, err = wakuPX.pm.SelectPeer( + selectedPeers, err := wakuPX.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: PeerExchangeID_v20alpha1, @@ -55,6 +54,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts if err != nil { return err } + params.selectedPeer = selectedPeers[0] } if params.selectedPeer == "" { wakuPX.metrics.RecordError(dialFailure) diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 9e568797..d66defbd 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -312,8 +312,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR params.selectedPeer = pData.AddrInfo.ID } if store.pm != nil && params.selectedPeer == "" { - var err error - params.selectedPeer, err = store.pm.SelectPeer( + selectedPeers, err := store.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: StoreID_v20beta4, @@ -325,6 +324,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR if err != nil { return nil, err } + params.selectedPeer = selectedPeers[0] } } diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index 540fe807..a538912e 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -29,7 +29,7 @@ func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap { } func (m *SubscriptionsMap) Count() int { - m.RLock() + m.RLock() defer m.RUnlock() return len(m.items) } @@ -199,7 +199,7 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e } } -func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) []*SubscriptionDetails { +func (m *SubscriptionsMap) GetSubscriptionsForPeer(peerID peer.ID, contentFilter protocol.ContentFilter) []*SubscriptionDetails { m.RLock() defer m.RUnlock() @@ -217,3 +217,7 @@ func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protoco } return output } + +func (m *SubscriptionsMap) GetAllSubscriptions() []*SubscriptionDetails { + return m.GetSubscriptionsForPeer("", protocol.ContentFilter{}) +}