feat: support multiple peer selection for filter client (#1005)

This commit is contained in:
Prem Chaitanya Prathi 2024-01-26 14:15:15 +05:30 committed by kaichaosun
parent dd81e1d469
commit 9dfc5ccc4d
15 changed files with 284 additions and 158 deletions

View File

@ -325,7 +325,7 @@ func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request)
func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId string, w http.ResponseWriter) peer.ID {
// selecting random peer that supports filter protocol
peerId, err := s.node.PeerManager().SelectPeer(peermanager.PeerSelectionCriteria{
peerIds, err := s.node.PeerManager().SelectPeers(peermanager.PeerSelectionCriteria{
SelectionType: peermanager.Automatic,
Proto: filter.FilterSubscribeID_v20beta1,
Ctx: ctx,
@ -338,7 +338,7 @@ func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId string
}, http.StatusServiceUnavailable)
return ""
}
return peerId
return peerIds[0]
}
func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *http.Request) {

View File

@ -27,7 +27,6 @@ func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNod
require.NoError(t, err)
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic})
require.NoError(t, err)
return node1, node2
}

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
@ -78,17 +79,19 @@ func TestServiceSlots(t *testing.T) {
// getting peer for protocol
///////////////
var peerID peer.ID
// select peer from pm, currently only h2 is set in pm
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
require.Equal(t, peerID, h2.ID())
require.Equal(t, h2.ID(), peers[0])
// add h3 peer to peer manager
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
// check that returned peer is h2 or h3 peer
peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
peers, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
peerID = peers[0]
require.NoError(t, err)
if peerID == h2.ID() || peerID == h3.ID() {
//Test success
@ -104,7 +107,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)
defer h4.Close()
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
require.Error(t, err, ErrNoPeersAvailable)
// add h4 peer for protocol1
@ -112,7 +115,9 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)
//Test peer selection for protocol1
peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
peers, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
peerID = peers[0]
require.NoError(t, err)
require.Equal(t, peerID, h4.ID())
@ -140,22 +145,42 @@ func TestPeerSelection(t *testing.T) {
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
require.NoError(t, err)
require.Equal(t, h2.ID(), peerID)
require.Equal(t, h2.ID(), peerIDs[0])
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, ErrNoPeersAvailable, err)
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
//Test for selectWithLowestRTT
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
require.Equal(t, 2, peerIDs.Len())
require.NoError(t, err)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
fmt.Println("peerIDs", peerIDs)
require.Equal(t, 2, peerIDs.Len())
require.NoError(t, err)
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h4.Close()
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
require.Equal(t, 3, peerIDs.Len())
require.NoError(t, err)
}
func TestDefaultProtocol(t *testing.T) {
@ -165,7 +190,7 @@ func TestDefaultProtocol(t *testing.T) {
// check peer for default protocol
///////////////
//Test empty peer selection for relay protocol
_, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
_, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
require.Error(t, err, ErrNoPeersAvailable)
///////////////
@ -180,9 +205,9 @@ func TestDefaultProtocol(t *testing.T) {
require.NoError(t, err)
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
require.NoError(t, err)
require.Equal(t, peerID, h5.ID())
require.Equal(t, h5.ID(), peerIDs[0])
}
func TestAdditionAndRemovalOfPeer(t *testing.T) {
@ -200,12 +225,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
require.NoError(t, err)
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
require.NoError(t, err)
require.Equal(t, peerID, h6.ID())
require.Equal(t, peers[0], h6.ID())
pm.RemovePeer(peerID)
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
pm.RemovePeer(peers[0])
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
require.Error(t, err, ErrNoPeersAvailable)
}
@ -287,10 +312,10 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
require.NoError(t, err)
//Discovery should fail for non-waku protocol
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"})
_, err = pm3.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"})
require.Error(t, err)
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"})
_, err = pm3.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"})
require.Error(t, err)
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
@ -299,16 +324,16 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
var enrField uint8
enrField |= (1 << 1)
pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField)
peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx})
peerIDs, err := pm3.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx})
require.NoError(t, err)
require.Equal(t, peerID, host2.ID())
require.Equal(t, host2.ID(), peerIDs[0])
var enrField1 uint8
enrField1 |= (1 << 3)
pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1)
peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx})
peerIDs, err = pm3.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx})
require.NoError(t, err)
require.Equal(t, peerID, host1.ID())
require.Equal(t, host1.ID(), peerIDs[0])
}

View File

@ -3,7 +3,6 @@ package peermanager
import (
"context"
"errors"
"math/rand"
"sync"
"time"
@ -14,8 +13,11 @@ import (
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
type peerSet map[peer.ID]struct{}
// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot.
@ -30,7 +32,11 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic
}
pubsubTopics = append(pubsubTopics, pubsubTopic)
}
return pm.SelectPeer(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers})
peers, err := pm.SelectPeers(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers})
if err != nil {
return "", err
}
return peers[0], nil
}
// SelectRandomPeer is used to return a random peer that supports a given protocol.
@ -38,65 +44,107 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic
func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) {
func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlice, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
// - which topics they track
// - latency?
peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopics, criteria.Ctx, criteria.SpecificPeers...)
if err == nil {
return peerID, nil
peerIDs, err := pm.selectServicePeer(criteria)
if err == nil && len(peerIDs) == criteria.MaxPeers {
return maps.Keys(peerIDs), nil
} else if !errors.Is(err, ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
return "", err
return nil, err
} else if len(peerIDs) == 0 {
peerIDs = make(peerSet)
}
// if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto)
if err != nil {
return "", err
return nil, err
}
if len(criteria.PubsubTopics) > 0 {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
}
return selectRandomPeer(filteredPeers, pm.logger)
randomPeers, err := selectRandomPeers(filteredPeers, criteria.MaxPeers-len(peerIDs))
if err != nil && len(peerIDs) == 0 {
return nil, err
}
for tmpPeer := range randomPeers {
peerIDs[tmpPeer] = struct{}{}
}
return maps.Keys(peerIDs), nil
}
func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopics []string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) {
var peerID peer.ID
func getRandom(filter peerSet, count int) (peerSet, error) {
i := 0
selectedPeers := make(peerSet)
for pID := range filter {
//Map's iterator in golang works using randomness and hence not random function is being used.
selectedPeers[pID] = struct{}{}
i++
if i == count {
break
}
}
if len(selectedPeers) == 0 {
return nil, ErrNoPeersAvailable
}
return selectedPeers, nil
}
// selects count random peers from list of peers
func selectRandomPeers(peers peer.IDSlice, count int) (peerSet, error) {
filteredPeerMap := peerSliceToMap(peers)
return getRandom(filteredPeerMap, count)
}
func peerSliceToMap(peers peer.IDSlice) peerSet {
peerSet := make(peerSet, peers.Len())
for _, peer := range peers {
peerSet[peer] = struct{}{}
}
return peerSet
}
func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSet, error) {
peers := make(peerSet)
var err error
for retryCnt := 0; retryCnt < 1; retryCnt++ {
//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if len(pubsubTopics) == 0 || (len(pubsubTopics) == 1 && pubsubTopics[0] == "") {
return slot.getRandom()
if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil {
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers)
} else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
keys = append(keys, i)
}
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(pubsubTopics, keys...)
peerID, err = selectRandomPeer(selectedPeers, pm.logger)
if err == nil {
return peerID, nil
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{}
}
if err == nil && len(peers) == criteria.MaxPeers {
return peers, nil
} else {
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics))
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover atleast 1 peer for the criteria
pm.discoverPeersByPubsubTopics(pubsubTopics, proto, ctx, 1)
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
//Try to fetch peers again.
continue
}
}
}
}
if peerID == "" {
if len(peers) == 0 {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
}
return "", ErrNoPeersAvailable
return peers, ErrNoPeersAvailable
}
// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers.
@ -105,20 +153,28 @@ type PeerSelectionCriteria struct {
Proto protocol.ID
PubsubTopics []string
SpecificPeers peer.IDSlice
MaxPeers int
Ctx context.Context
}
// SelectPeer selects a peer based on selectionType specified.
// SelectPeers selects a peer based on selectionType specified.
// Context is required only in case of selectionType set to LowestRTT
func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) {
func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice, error) {
if criteria.MaxPeers == 0 {
criteria.MaxPeers = 1
}
switch criteria.SelectionType {
case Automatic:
return pm.SelectRandomPeer(criteria)
return pm.SelectRandom(criteria)
case LowestRTT:
return pm.SelectPeerWithLowestRTT(criteria)
peerID, err := pm.SelectPeerWithLowestRTT(criteria)
if err != nil {
return nil, err
}
//TODO: Update this once peer Ping cache PR is merged into this code.
return []peer.ID{peerID}, nil
default:
return "", errors.New("unknown peer selection type specified")
return nil, errors.New("unknown peer selection type specified")
}
}
@ -198,17 +254,6 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
}
}
// selectRandomPeer selects randomly a peer from the list of peers passed.
func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) {
if len(peers) >= 1 {
peerID := peers[rand.Intn(len(peers))]
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned
return peerID, nil // nolint: gosec
}
return "", ErrNoPeersAvailable
}
// FilterPeersByProto filters list of peers that support specified protocols.
// If specificPeers is nil, all peers in the host's peerStore are considered for filtering.
func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) {

View File

@ -19,14 +19,10 @@ func newPeerMap() *peerMap {
}
}
func (pm *peerMap) getRandom() (peer.ID, error) {
func (pm *peerMap) getRandom(count int) (peerSet, error) {
pm.mu.RLock()
defer pm.mu.RUnlock()
for pID := range pm.m {
return pID, nil
}
return "", ErrNoPeersAvailable
return getRandom(pm.m, count)
}
func (pm *peerMap) remove(pID peer.ID) {

View File

@ -6,6 +6,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)
func TestServiceSlot(t *testing.T) {
@ -18,14 +19,14 @@ func TestServiceSlot(t *testing.T) {
//
slots.getPeers(protocol).add(peerID)
//
fetchedPeer, err := slots.getPeers(protocol).getRandom()
fetchedPeers, err := slots.getPeers(protocol).getRandom(1)
require.NoError(t, err)
require.Equal(t, peerID, fetchedPeer)
require.Equal(t, peerID, maps.Keys(fetchedPeers)[0])
//TODO: Add test to get more than 1 peers
//
slots.getPeers(protocol).remove(peerID)
//
_, err = slots.getPeers(protocol).getRandom()
_, err = slots.getPeers(protocol).getRandom(1)
require.Equal(t, err, ErrNoPeersAvailable)
}
@ -41,15 +42,15 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) {
slots.getPeers(protocol).add(peerID)
slots.getPeers(protocol1).add(peerID)
//
fetchedPeer, err := slots.getPeers(protocol1).getRandom()
fetchedPeers, err := slots.getPeers(protocol1).getRandom(1)
require.NoError(t, err)
require.Equal(t, peerID, fetchedPeer)
require.Equal(t, peerID, maps.Keys(fetchedPeers)[0])
//
slots.removePeer(peerID)
//
_, err = slots.getPeers(protocol).getRandom()
_, err = slots.getPeers(protocol).getRandom(1)
require.Equal(t, err, ErrNoPeersAvailable)
_, err = slots.getPeers(protocol1).getRandom()
_, err = slots.getPeers(protocol1).getRandom(1)
require.Equal(t, err, ErrNoPeersAvailable)
}

View File

@ -38,6 +38,7 @@ const FilterPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-push/2.0.0-bet
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrSubscriptionNotFound = errors.New("subscription not found")
ErrNoPeersSpecified = errors.New("no peers specified to unsubscribe")
)
type WakuFilterLightNode struct {
@ -211,10 +212,10 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string,
wf.subscriptions.Notify(remotePeerID, envelope)
}
func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peer peer.ID) error {
request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
RequestId: hex.EncodeToString(requestID),
FilterSubscribeType: reqType,
PubsubTopic: &contentFilter.PubsubTopic,
ContentTopics: contentFilter.ContentTopicsList(),
@ -225,9 +226,9 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
return err
}
logger := wf.log.With(logging.HostID("peerID", params.selectedPeer))
logger := wf.log.With(logging.HostID("peerID", peer))
stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
stream, err := wf.h.NewStream(ctx, peer, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
return err
@ -314,15 +315,19 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
return nil, nil, err
}
wf.pm.Connect(pData)
params.selectedPeer = pData.AddrInfo.ID
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
}
if params.pm != nil && params.selectedPeer == "" {
params.selectedPeer, err = wf.pm.SelectPeer(
if params.pm != nil {
peerCount := params.maxPeers - len(params.selectedPeers)
params.selectedPeers, err = wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
PubsubTopics: maps.Keys(pubSubTopicMap),
SpecificPeers: params.preferredPeers,
MaxPeers: peerCount,
Ctx: ctx,
},
)
@ -352,21 +357,22 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
failedContentTopics := []string{}
subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var selectedPeer peer.ID
if params.pm != nil && params.selectedPeer == "" {
selectedPeer, err = wf.pm.SelectPeer(
var selectedPeers peer.IDSlice
if params.pm != nil && len(params.selectedPeers) < params.maxPeers {
selectedPeers, err = wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
PubsubTopics: []string{pubSubTopic},
SpecificPeers: params.preferredPeers,
MaxPeers: params.maxPeers - params.selectedPeers.Len(),
Ctx: ctx,
},
)
} else {
selectedPeer = params.selectedPeer
selectedPeers = params.selectedPeers
}
if selectedPeer == "" {
if len(selectedPeers) == 0 {
wf.metrics.RecordError(peerNotFoundFailure)
wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
@ -379,20 +385,22 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)
paramsCopy := params.Copy()
paramsCopy.selectedPeer = selectedPeer
err := wf.request(
ctx,
paramsCopy,
pb.FilterSubscribeRequest_SUBSCRIBE,
cFilter,
)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
continue
paramsCopy.selectedPeers = selectedPeers
for _, peer := range selectedPeers {
err := wf.request(
ctx,
params.requestID,
pb.FilterSubscribeRequest_SUBSCRIBE,
cFilter,
peer)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
continue
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter))
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter))
}
if len(failedContentTopics) > 0 {
@ -427,7 +435,6 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO
return nil, err
}
}
return params, nil
}
@ -448,9 +455,10 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ..
return wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
params.requestID,
pb.FilterSubscribeRequest_SUBSCRIBER_PING,
protocol.ContentFilter{})
protocol.ContentFilter{},
peerID)
}
func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error {
@ -463,7 +471,7 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip
return wf.Ping(ctx, subscription.PeerID)
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
// Unsubscribe is used to stop receiving messages from specified peers for the content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
@ -487,7 +495,6 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
if err != nil {
return nil, err
}
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
return nil, err
@ -495,16 +502,33 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
result := &WakuFilterPushResult{}
for pTopic, cTopics := range pubSubTopicMap {
cFilter := protocol.NewContentFilter(pTopic, cTopics...)
peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter)
var subs []*subscription.SubscriptionDetails
if params.selectedPeers.Len() == 0 {
subs = wf.subscriptions.GetAllSubscriptions()
if len(subs) == 0 {
result.Add(WakuFilterPushError{
Err: ErrSubscriptionNotFound,
PeerID: "",
})
continue
}
}
for _, peer := range params.selectedPeers {
subsForPeer := wf.subscriptions.GetSubscriptionsForPeer(peer, cFilter)
if len(subsForPeer) == 0 {
result.Add(WakuFilterPushError{
Err: ErrSubscriptionNotFound,
PeerID: peer,
})
continue
}
subs = append(subs, subsForPeer...)
}
if len(subs) == 0 {
result.Add(WakuFilterPushError{
Err: ErrSubscriptionNotFound,
PeerID: params.selectedPeer,
})
//No subscriptions found for this filter
continue
}
peers := make(map[peer.ID]struct{})
for _, sub := range subs {
sub.Remove(cTopics...)
peers[sub.PeerID] = struct{}{}
@ -520,7 +544,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
params.wg.Done()
}
}()
err := wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, cFilter)
err := wf.unsubscribeFromServer(ctx, params.requestID, peerID, cFilter)
if params.wg != nil {
result.Add(WakuFilterPushError{
@ -539,7 +563,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
}
func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails {
subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{})
subs := wf.subscriptions.GetAllSubscriptions()
return subs
}
@ -571,8 +595,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) {
// Last sub for this [peer, contentFilter] pair
params.selectedPeer = sub.PeerID
err = wf.unsubscribeFromServer(ctx, params, sub.ContentFilter)
err = wf.unsubscribeFromServer(ctx, params.requestID, sub.PeerID, sub.ContentFilter)
result.Add(WakuFilterPushError{
Err: err,
PeerID: sub.PeerID,
@ -582,14 +605,14 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
}
func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter protocol.ContentFilter) error {
err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter)
func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, requestID []byte, peer peer.ID, cFilter protocol.ContentFilter) error {
err := wf.request(ctx, requestID, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter, peer)
if err != nil {
ferr, ok := err.(*FilterError)
if ok && ferr.Code == http.StatusNotFound {
wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", params.selectedPeer), zap.Error(err))
wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peer), zap.Error(err))
} else {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", params.selectedPeer), zap.Error(err))
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peer), zap.Error(err))
}
}
@ -606,12 +629,25 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
result := &WakuFilterPushResult{}
peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{})
if len(subs) == 0 && params.selectedPeer != "" {
result.Add(WakuFilterPushError{
Err: err,
PeerID: params.selectedPeer,
})
var subs []*subscription.SubscriptionDetails
if params.selectedPeers.Len() == 0 {
subs = wf.subscriptions.GetAllSubscriptions()
if len(subs) == 0 {
return result, nil
}
}
for _, peer := range params.selectedPeers {
pSubs := wf.subscriptions.GetSubscriptionsForPeer(peer, protocol.ContentFilter{})
if len(pSubs) == 0 {
result.Add(WakuFilterPushError{
Err: ErrSubscriptionNotFound,
PeerID: peer,
})
continue
}
subs = append(subs, pSubs...)
}
if len(subs) == 0 {
return result, ErrSubscriptionNotFound
}
for _, sub := range subs {
@ -630,13 +666,11 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
_ = recover()
}()
paramsCopy := params.Copy()
paramsCopy.selectedPeer = peerID
err := wf.request(
ctx,
params,
params.requestID,
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
protocol.ContentFilter{})
protocol.ContentFilter{}, peerID)
if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}

View File

@ -83,7 +83,7 @@ func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, pa
const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd")
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_Incorrect1)
conn, err := wf.h.NewStream(ctx, params.selectedPeers[0], FilterSubscribeID_Incorrect1)
if err != nil {
wf.metrics.RecordError(dialFailure)
return err
@ -173,8 +173,9 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi
for pubSubTopic, cTopics := range pubSubTopicMap {
var selectedPeer peer.ID
//TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic
if params.pm != nil && params.selectedPeer == "" {
selectedPeer, err = wf.pm.SelectPeer(
if params.pm != nil && params.selectedPeers[0] == "" {
var selectedPeers peer.IDSlice
selectedPeers, err = wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
@ -183,8 +184,11 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi
Ctx: ctx,
},
)
if err != nil {
selectedPeer = selectedPeers[0]
}
} else {
selectedPeer = params.selectedPeer
selectedPeer = params.selectedPeers[0]
}
if selectedPeer == "" {

View File

@ -15,8 +15,8 @@ import (
func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
return &FilterSubscribeParameters{
selectedPeer: old.selectedPeer,
requestID: old.requestID,
selectedPeers: old.selectedPeers,
requestID: old.requestID,
}
}
@ -35,10 +35,11 @@ func WithPingRequestId(requestId []byte) FilterPingOption {
type (
FilterSubscribeParameters struct {
selectedPeer peer.ID
selectedPeers peer.IDSlice
peerAddr multiaddr.Multiaddr
peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice
maxPeers int
requestID []byte
log *zap.Logger
@ -72,7 +73,7 @@ func WithTimeout(timeout time.Duration) Option {
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
params.selectedPeer = p
params.selectedPeers = append(params.selectedPeers, p)
if params.peerAddr != nil {
return errors.New("peerAddr and peerId options are mutually exclusive")
}
@ -86,13 +87,20 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
params.peerAddr = pAddr
if params.selectedPeer != "" {
if len(params.selectedPeers) != 0 {
return errors.New("peerAddr and peerId options are mutually exclusive")
}
return nil
}
}
func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
params.maxPeers = numPeers
return nil
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store.
// If a list of specific peers is passed, the peer will be chosen from that list assuming it
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore

View File

@ -35,7 +35,7 @@ func TestFilterOption(t *testing.T) {
}
require.Equal(t, host, params.host)
require.NotNil(t, params.selectedPeer)
require.NotEqual(t, 0, params.selectedPeers)
// Unsubscribe options
options2 := []FilterSubscribeOption{
@ -51,7 +51,7 @@ func TestFilterOption(t *testing.T) {
require.NoError(t, err)
}
require.NotNil(t, params2.selectedPeer)
require.NotEqual(t, 0, params2.selectedPeers)
require.True(t, params2.unsubscribeAll)
// Mutually Exclusive options
@ -73,7 +73,7 @@ func TestFilterOption(t *testing.T) {
}
}
require.NotNil(t, params2.selectedPeer)
require.NotEqual(t, 0, params2.selectedPeers)
require.True(t, params2.unsubscribeAll)
}

View File

@ -252,7 +252,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
opt(params)
}
if wf.pm != nil && params.selectedPeer == "" {
params.selectedPeer, _ = wf.pm.SelectPeer(
selectedPeers, _ := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterID_v20beta1,
@ -261,6 +261,9 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
Ctx: ctx,
},
)
if err != nil {
params.selectedPeer = selectedPeers[0]
}
}
if params.selectedPeer == "" {
wf.metrics.RecordError(peerNotFoundFailure)

View File

@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
@ -264,7 +265,9 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
}
if params.pm != nil && params.selectedPeer == "" {
params.selectedPeer, err = wakuLP.pm.SelectPeer(
var selectedPeers peer.IDSlice
//TODO: update this to work with multiple peer selection
selectedPeers, err = wakuLP.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: LightPushID_v20beta1,
@ -273,6 +276,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
Ctx: ctx,
},
)
if err == nil {
params.selectedPeer = selectedPeers[0]
}
}
if params.selectedPeer == "" {
if err != nil {

View File

@ -43,8 +43,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
}
if params.pm != nil && params.selectedPeer == "" {
var err error
params.selectedPeer, err = wakuPX.pm.SelectPeer(
selectedPeers, err := wakuPX.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: PeerExchangeID_v20alpha1,
@ -55,6 +54,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
if err != nil {
return err
}
params.selectedPeer = selectedPeers[0]
}
if params.selectedPeer == "" {
wakuPX.metrics.RecordError(dialFailure)

View File

@ -312,8 +312,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
params.selectedPeer = pData.AddrInfo.ID
}
if store.pm != nil && params.selectedPeer == "" {
var err error
params.selectedPeer, err = store.pm.SelectPeer(
selectedPeers, err := store.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: StoreID_v20beta4,
@ -325,6 +324,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
if err != nil {
return nil, err
}
params.selectedPeer = selectedPeers[0]
}
}

View File

@ -29,7 +29,7 @@ func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap {
}
func (m *SubscriptionsMap) Count() int {
m.RLock()
m.RLock()
defer m.RUnlock()
return len(m.items)
}
@ -199,7 +199,7 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e
}
}
func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) []*SubscriptionDetails {
func (m *SubscriptionsMap) GetSubscriptionsForPeer(peerID peer.ID, contentFilter protocol.ContentFilter) []*SubscriptionDetails {
m.RLock()
defer m.RUnlock()
@ -217,3 +217,7 @@ func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protoco
}
return output
}
func (m *SubscriptionsMap) GetAllSubscriptions() []*SubscriptionDetails {
return m.GetSubscriptionsForPeer("", protocol.ContentFilter{})
}