feat: update peer selection to be based on multiple pubsubTopics, update store API

This commit is contained in:
Prem Chaitanya Prathi 2023-11-10 14:34:05 +05:30
parent ab8cce642a
commit 5c37e53205
No known key found for this signature in database
12 changed files with 119 additions and 113 deletions

View File

@ -202,22 +202,11 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
if peerAddr != nil {
options = append(options, store.WithPeerAddr(peerAddr))
}
var results []*store.Result
var result *store.Result
if query.PubsubTopic == "" {
results, err = d.node.Store().QueryAutoSharding(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
} else {
result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
results = append(results, result)
result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
//TODO: How to respond with multiple query results??
writeErrOrResponse(w, nil, toStoreResponse(result))
}

View File

@ -100,18 +100,20 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
return peers, nil
}
func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID, ctx context.Context, maxCount int) {
shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic)
func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) {
shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...)
if err != nil {
pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err))
pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err))
return
}
if len(shardInfo) > 0 {
err = pm.DiscoverAndConnectToPeers(ctx, shardInfo[0].ClusterID, shardInfo[0].ShardIDs[0], proto, maxCount)
if err != nil {
pm.logger.Error("failed to discover and conenct to peers", zap.Error(err))
if len(shardsInfo) > 0 {
for _, shardInfo := range shardsInfo {
err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount)
if err != nil {
pm.logger.Error("failed to discover and conenct to peers", zap.Error(err))
}
}
} else {
pm.logger.Debug("failed to convert pubsub topic to shard as topic is named pubsubTopic", zap.String("topic", pubsubTopic))
pm.logger.Debug("failed to convert pubsub topics to shards as one of the topics is named pubsubTopic", zap.Strings("topics", pubsubTopics))
}
}

View File

@ -217,7 +217,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 {
pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200, pm.ctx, 2)
pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2)
continue
}
//Connect to eligible peers.

View File

@ -116,7 +116,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)
require.Equal(t, peerID, h4.ID())
_, err = pm.SelectPeerByContentTopic(protocol1, "")
_, err = pm.SelectPeerByContentTopics(protocol1, []string{""})
require.Error(t, wakuproto.ErrInvalidFormat, err)
}
@ -143,18 +143,18 @@ func TestPeerSelection(t *testing.T) {
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/2"})
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
require.NoError(t, err)
require.Equal(t, h2.ID(), peerID)
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/3"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, ErrNoPeersAvailable, err)
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
//Test for selectWithLowestRTT
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
}
@ -287,7 +287,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
require.NoError(t, err)
//Discovery should fail for non-waku protocol
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/test"})
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"})
require.Error(t, err)
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"})
@ -299,7 +299,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
var enrField uint8
enrField |= (1 << 1)
pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField)
peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx})
peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx})
require.NoError(t, err)
require.Equal(t, peerID, host2.ID())
@ -307,7 +307,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
enrField1 |= (1 << 3)
pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1)
peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx})
peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx})
require.NoError(t, err)
require.Equal(t, peerID, host1.ID())

View File

@ -20,12 +20,16 @@ import (
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return "", err
func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopics []string, specificPeers ...peer.ID) (peer.ID, error) {
pubsubTopics := []string{}
for _, cTopic := range contentTopics {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(cTopic)
if err != nil {
return "", err
}
pubsubTopics = append(pubsubTopics, pubsubTopic)
}
return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers})
return pm.SelectPeer(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers})
}
// SelectRandomPeer is used to return a random peer that supports a given protocol.
@ -40,12 +44,12 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID
// - which topics they track
// - latency?
peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...)
peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopics, criteria.Ctx, criteria.SpecificPeers...)
if err == nil {
return peerID, nil
} else if !errors.Is(err, ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err))
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
return "", err
}
@ -54,34 +58,34 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID
if err != nil {
return "", err
}
if criteria.PubsubTopic != "" {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...)
if len(criteria.PubsubTopics) > 0 {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
}
return selectRandomPeer(filteredPeers, pm.logger)
}
func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) {
func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopics []string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) {
var peerID peer.ID
var err error
for retryCnt := 0; retryCnt < 1; retryCnt++ {
//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if pubsubTopic == "" {
if len(pubsubTopics) == 0 || (len(pubsubTopics) == 1 && pubsubTopics[0] == "") {
return slot.getRandom()
} else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
keys = append(keys, i)
}
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubsubTopic, keys...)
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(pubsubTopics, keys...)
peerID, err = selectRandomPeer(selectedPeers, pm.logger)
if err == nil {
return peerID, nil
} else {
pm.logger.Debug("Discovering peers by pubsubTopic", zap.String("pubsubTopic", pubsubTopic))
pm.logger.Debug("Discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics))
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover atleast 1 peer for the criteria
pm.discoverPeersByPubsubTopic(pubsubTopic, proto, ctx, 1)
pm.discoverPeersByPubsubTopics(pubsubTopics, proto, ctx, 1)
//Try to fetch peers again.
continue
}
@ -98,7 +102,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string,
type PeerSelectionCriteria struct {
SelectionType PeerSelection
Proto protocol.ID
PubsubTopic string
PubsubTopics []string
SpecificPeers peer.IDSlice
Ctx context.Context
}
@ -135,8 +139,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
criteria.Ctx = context.Background()
}
if criteria.PubsubTopic != "" {
peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...)
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...)
}
peers, err = pm.FilterPeersByProto(peers, criteria.Proto)

View File

@ -59,6 +59,7 @@ type WakuPeerstore interface {
RemovePubSubTopic(p peer.ID, topic string) error
PubSubTopics(p peer.ID) ([]string, error)
SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice
PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice
}
@ -207,7 +208,38 @@ func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) {
return result.([]string), nil
}
// PeersByPubSubTopic Returns list of peers by pubSubTopic
// PeersByPubSubTopic Returns list of peers that support list of pubSubTopics
// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore
func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice {
if specificPeers == nil {
specificPeers = ps.Peers()
}
var result peer.IDSlice
for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p)
if err == nil {
//Convoluted and crazy logic to find subset of topics
// Could not find a better way to do it?
peerTopicMap := make(map[string]struct{})
match := true
for _, topic := range topics {
peerTopicMap[topic] = struct{}{}
}
for _, topic := range pubSubTopics {
if _, ok := peerTopicMap[topic]; !ok {
match = false
break
}
}
if match {
result = append(result, p)
}
} //Note: skipping a peer in case of an error as there would be others available.
}
return result
}
// PeersByPubSubTopic Returns list of peers that support a single pubSubTopic
// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore
func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice {
if specificPeers == nil {

View File

@ -312,7 +312,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
PubsubTopic: pubSubTopic,
PubsubTopics: []string{pubSubTopic},
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},

View File

@ -178,7 +178,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
PubsubTopic: pubSubTopic,
PubsubTopics: []string{pubSubTopic},
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},

View File

@ -256,7 +256,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterID_v20beta1,
PubsubTopic: filter.Topic,
PubsubTopics: []string{filter.Topic},
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},

View File

@ -253,7 +253,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: LightPushID_v20beta1,
PubsubTopic: params.pubsubTopic,
PubsubTopics: []string{params.pubsubTopic},
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},

View File

@ -4,15 +4,12 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"strings"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/pbio"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peermanager"
@ -258,47 +255,6 @@ func (store *WakuStore) localQuery(historyQuery *pb.HistoryRPC) (*pb.HistoryResp
return historyResponseRPC.Response, nil
}
func (store *WakuStore) QueryAutoSharding(ctx context.Context, query Query, opts ...HistoryRequestOption) ([]*Result, error) {
var results []*Result
var failedContentTopics []string
pubSubTopicMap, err := protocol.GeneratePubsubToContentTopicMap(query.PubsubTopic, query.ContentTopics)
if err != nil {
return nil, err
}
//Duplicate processing of opts, which is done again in Query function below
// Not sure how to handle this, hence leaving it as of now
params := new(HistoryRequestParameters)
var optList []HistoryRequestOption
for _, opt := range optList {
opt(params)
}
//Add Peer to peerstore.
if store.pm != nil && params.peerAddr != nil {
_, err = store.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), StoreID_v20beta4)
if err != nil {
return nil, err
}
}
for pubSubTopic, cTopics := range pubSubTopicMap {
query.ContentTopics = cTopics
query.PubsubTopic = pubSubTopic
//Invoke Query separately
result, err := store.Query(ctx, query, opts...)
if err != nil {
failedContentTopics = append(failedContentTopics, cTopics...)
}
results = append(results, result)
}
if len(failedContentTopics) > 0 {
return results, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
} else {
return results, nil
}
}
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
params := new(HistoryRequestParameters)
params.s = store
@ -309,19 +265,43 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
opt(params)
}
if store.pm != nil && params.selectedPeer == "" {
var err error
params.selectedPeer, err = store.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: StoreID_v20beta4,
PubsubTopic: query.PubsubTopic,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
if err != nil {
return nil, err
if !params.localQuery {
pubsubTopics := []string{}
if query.PubsubTopic == "" {
for _, cTopic := range query.ContentTopics {
pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(cTopic)
if err != nil {
return nil, err
}
pubsubTopics = append(pubsubTopics, pubsubTopic)
}
} else {
pubsubTopics = append(pubsubTopics, query.PubsubTopic)
}
//Add Peer to peerstore.
if store.pm != nil && params.peerAddr != nil {
peerId, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4)
if err != nil {
return nil, err
}
params.selectedPeer = peerId
}
if store.pm != nil && params.selectedPeer == "" {
var err error
params.selectedPeer, err = store.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: StoreID_v20beta4,
PubsubTopics: pubsubTopics,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
if err != nil {
return nil, err
}
}
}

View File

@ -84,7 +84,6 @@ type Store interface {
SetHost(h host.Host)
Start(context.Context, *relay.Subscription) error
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
QueryAutoSharding(ctx context.Context, query Query, opts ...HistoryRequestOption) ([]*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error)
Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)