feat: update store client Query API for autosharding (#885)

This commit is contained in:
Prem Chaitanya Prathi 2023-11-14 04:22:46 +05:30 committed by GitHub
parent 73bcb2e78a
commit a5ce5dfaa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 258 additions and 135 deletions

View File

@ -12,10 +12,8 @@ import (
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
) )
type StoreService struct { type StoreService struct {
@ -57,24 +55,20 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
return s return s
} }
func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) { func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) {
query := &store.Query{} query := &store.Query{}
var options []store.HistoryRequestOption var options []store.HistoryRequestOption
var err error
peerAddrStr := r.URL.Query().Get("peerAddr") peerAddrStr := r.URL.Query().Get("peerAddr")
m, err := multiaddr.NewMultiaddr(peerAddrStr) var m multiaddr.Multiaddr
if peerAddrStr != "" {
m, err = multiaddr.NewMultiaddr(peerAddrStr)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
options = append(options, store.WithPeerAddr(m))
peerID, err := utils.GetPeerID(m)
if err != nil {
return nil, nil, nil, err
} }
query.PubsubTopic = r.URL.Query().Get("pubsubTopic")
options = append(options, store.WithPeer(peerID))
query.Topic = r.URL.Query().Get("pubsubTopic")
contentTopics := r.URL.Query().Get("contentTopics") contentTopics := r.URL.Query().Get("contentTopics")
if contentTopics != "" { if contentTopics != "" {
@ -85,7 +79,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if startTimeStr != "" { if startTimeStr != "" {
startTime, err := strconv.ParseInt(startTimeStr, 10, 64) startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
query.StartTime = &startTime query.StartTime = &startTime
} }
@ -94,7 +88,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if endTimeStr != "" { if endTimeStr != "" {
endTime, err := strconv.ParseInt(endTimeStr, 10, 64) endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
query.EndTime = &endTime query.EndTime = &endTime
} }
@ -111,25 +105,25 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if senderTimeStr != "" { if senderTimeStr != "" {
cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64) cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
} }
if storeTimeStr != "" { if storeTimeStr != "" {
cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64) cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
} }
if digestStr != "" { if digestStr != "" {
cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr) cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
} }
cursor.PubsubTopic = query.Topic cursor.PubsubTopic = query.PubsubTopic
options = append(options, store.WithCursor(cursor)) options = append(options, store.WithCursor(cursor))
} }
@ -142,21 +136,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if ascendingStr != "" { if ascendingStr != "" {
ascending, err = strconv.ParseBool(ascendingStr) ascending, err = strconv.ParseBool(ascendingStr)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
} }
if pageSizeStr != "" { if pageSizeStr != "" {
pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64) pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
} }
options = append(options, store.WithPaging(ascending, pageSize)) options = append(options, store.WithPaging(ascending, pageSize))
} }
return m, query, options, nil return query, options, nil
} }
func writeStoreError(w http.ResponseWriter, code int, err error) { func writeStoreError(w http.ResponseWriter, code int, err error) {
@ -190,7 +184,7 @@ func toStoreResponse(result *store.Result) StoreResponse {
} }
func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
peerAddr, query, options, err := getStoreParams(r) query, options, err := getStoreParams(r)
if err != nil { if err != nil {
writeStoreError(w, http.StatusBadRequest, err) writeStoreError(w, http.StatusBadRequest, err)
return return
@ -199,12 +193,6 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel() defer cancel()
_, err = d.node.AddPeer(peerAddr, peerstore.Static, d.node.Relay().Topics())
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
result, err := d.node.Store().Query(ctx, *query, options...) result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil { if err != nil {
writeStoreError(w, http.StatusInternalServerError, err) writeStoreError(w, http.StatusInternalServerError, err)

View File

@ -48,7 +48,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs,
res, err := s.node.Store().Query( res, err := s.node.Store().Query(
req.Context(), req.Context(),
store.Query{ store.Query{
Topic: args.Topic, PubsubTopic: args.Topic,
ContentTopics: args.ContentFilters, ContentTopics: args.ContentFilters,
StartTime: args.StartTime, StartTime: args.StartTime,
EndTime: args.EndTime, EndTime: args.EndTime,

View File

@ -39,7 +39,7 @@ func queryResponse(ctx context.Context, args storeMessagesArgs, options []store.
res, err := wakuState.node.Store().Query( res, err := wakuState.node.Store().Query(
ctx, ctx,
store.Query{ store.Query{
Topic: args.Topic, PubsubTopic: args.Topic,
ContentTopics: args.ContentTopics, ContentTopics: args.ContentTopics,
StartTime: args.StartTime, StartTime: args.StartTime,
EndTime: args.EndTime, EndTime: args.EndTime,

View File

@ -714,7 +714,11 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro
// AddPeer is used to add a peer and the protocols it support to the node peerstore // AddPeer is used to add a peer and the protocols it support to the node peerstore
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics. // TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
return w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
if err != nil {
return "", err
}
return pData.AddrInfo.ID, nil
} }
// AddDiscoveredPeer to add a discovered peer to the node peerStore // AddDiscoveredPeer to add a discovered peer to the node peerStore
@ -725,7 +729,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp
ID: ID, ID: ID,
Addrs: addrs, Addrs: addrs,
}, },
PubSubTopics: pubsubTopics, PubsubTopics: pubsubTopics,
} }
w.peermanager.AddDiscoveredPeer(p, connectNow) w.peermanager.AddDiscoveredPeer(p, connectNow)
} }

View File

@ -100,18 +100,20 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
return peers, nil return peers, nil
} }
func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID, ctx context.Context, maxCount int) { func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) {
shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic) shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...)
if err != nil { if err != nil {
pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err)) pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err))
return return
} }
if len(shardInfo) > 0 { if len(shardsInfo) > 0 {
err = pm.DiscoverAndConnectToPeers(ctx, shardInfo[0].ClusterID, shardInfo[0].ShardIDs[0], proto, maxCount) for _, shardInfo := range shardsInfo {
err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount)
if err != nil { if err != nil {
pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) pm.logger.Error("failed to discover and conenct to peers", zap.Error(err))
} }
}
} else { } else {
pm.logger.Debug("failed to convert pubsub topic to shard as topic is named pubsubTopic", zap.String("topic", pubsubTopic)) pm.logger.Debug("failed to convert pubsub topics to shards as one of the topics is named pubsubTopic", zap.Strings("topics", pubsubTopics))
} }
} }

View File

@ -217,7 +217,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
//Find not connected peers. //Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr) notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 { if notConnectedPeers.Len() == 0 {
pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200, pm.ctx, 2) pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2)
continue continue
} }
//Connect to eligible peers. //Connect to eligible peers.
@ -321,11 +321,11 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
} else { } else {
if shards != nil { if shards != nil {
p.PubSubTopics = make([]string, 0) p.PubsubTopics = make([]string, 0)
topics := shards.Topics() topics := shards.Topics()
for _, topic := range topics { for _, topic := range topics {
topicStr := topic.String() topicStr := topic.String()
p.PubSubTopics = append(p.PubSubTopics, topicStr) p.PubsubTopics = append(p.PubsubTopics, topicStr)
} }
} else { } else {
pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID))
@ -361,12 +361,12 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
return return
} }
supportedProtos := []protocol.ID{} supportedProtos := []protocol.ID{}
if len(p.PubSubTopics) == 0 && p.ENR != nil { if len(p.PubsubTopics) == 0 && p.ENR != nil {
// Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics. // Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics.
supportedProtos = pm.processPeerENR(&p) supportedProtos = pm.processPeerENR(&p)
} }
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...) _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...)
if p.ENR != nil { if p.ENR != nil {
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
@ -419,12 +419,29 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
return nil return nil
} }
func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsubTopics ...string) *service.PeerData {
addrs := host.Peerstore().Addrs(peerID)
if len(addrs) == 0 {
//Addresses expired, remove peer from peerStore
host.Peerstore().RemovePeer(peerID)
return nil
}
return &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: peerID,
Addrs: addrs,
},
PubsubTopics: pubsubTopics,
}
}
// AddPeer adds peer to the peerStore and also to service slots // AddPeer adds peer to the peerStore and also to service slots
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
//Assuming all addresses have peerId //Assuming all addresses have peerId
info, err := peer.AddrInfoFromP2pAddr(address) info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil { if err != nil {
return "", err return nil, err
} }
//Add Service peers to serviceSlots. //Add Service peers to serviceSlots.
@ -433,12 +450,26 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo
} }
//Add to the peer-store //Add to the peer-store
err = pm.addPeer(info.ID, info.Addrs, origin, pubSubTopics, protocols...) err = pm.addPeer(info.ID, info.Addrs, origin, pubsubTopics, protocols...)
if err != nil { if err != nil {
return "", err return nil, err
} }
return info.ID, nil pData := &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: info.ID,
Addrs: info.Addrs,
},
PubsubTopics: pubsubTopics,
}
return pData, nil
}
// Connect establishes a connection to a peer.
func (pm *PeerManager) Connect(pData *service.PeerData) {
go pm.peerConnector.PushToChan(*pData)
} }
// RemovePeer deletes peer from the peerStore after disconnecting it. // RemovePeer deletes peer from the peerStore after disconnecting it.

View File

@ -116,7 +116,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, peerID, h4.ID()) require.Equal(t, peerID, h4.ID())
_, err = pm.SelectPeerByContentTopic(protocol1, "") _, err = pm.SelectPeerByContentTopics(protocol1, []string{""})
require.Error(t, wakuproto.ErrInvalidFormat, err) require.Error(t, wakuproto.ErrInvalidFormat, err)
} }
@ -143,18 +143,18 @@ func TestPeerSelection(t *testing.T) {
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err) require.NoError(t, err)
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/2"}) peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, h2.ID(), peerID) require.Equal(t, h2.ID(), peerID)
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/3"}) _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, ErrNoPeersAvailable, err) require.Error(t, ErrNoPeersAvailable, err)
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err) require.NoError(t, err)
//Test for selectWithLowestRTT //Test for selectWithLowestRTT
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err) require.NoError(t, err)
} }
@ -287,7 +287,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
//Discovery should fail for non-waku protocol //Discovery should fail for non-waku protocol
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/test"}) _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"})
require.Error(t, err) require.Error(t, err)
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"}) _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"})
@ -299,7 +299,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
var enrField uint8 var enrField uint8
enrField |= (1 << 1) enrField |= (1 << 1)
pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField) pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField)
peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, peerID, host2.ID()) require.Equal(t, peerID, host2.ID())
@ -307,7 +307,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
enrField1 |= (1 << 3) enrField1 |= (1 << 3)
pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1) pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1)
peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, peerID, host1.ID()) require.Equal(t, peerID, host1.ID())

View File

@ -20,12 +20,16 @@ import (
// If a list of specific peers is passed, the peer will be chosen from that list assuming // If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. // it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore // If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopics []string, specificPeers ...peer.ID) (peer.ID, error) {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) pubsubTopics := []string{}
for _, cTopic := range contentTopics {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(cTopic)
if err != nil { if err != nil {
return "", err return "", err
} }
return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) pubsubTopics = append(pubsubTopics, pubsubTopic)
}
return pm.SelectPeer(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers})
} }
// SelectRandomPeer is used to return a random peer that supports a given protocol. // SelectRandomPeer is used to return a random peer that supports a given protocol.
@ -40,12 +44,12 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID
// - which topics they track // - which topics they track
// - latency? // - latency?
peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...) peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopics, criteria.Ctx, criteria.SpecificPeers...)
if err == nil { if err == nil {
return peerID, nil return peerID, nil
} else if !errors.Is(err, ErrNoPeersAvailable) { } else if !errors.Is(err, ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err)) zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
return "", err return "", err
} }
@ -54,34 +58,34 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID
if err != nil { if err != nil {
return "", err return "", err
} }
if criteria.PubsubTopic != "" { if len(criteria.PubsubTopics) > 0 {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
} }
return selectRandomPeer(filteredPeers, pm.logger) return selectRandomPeer(filteredPeers, pm.logger)
} }
func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopics []string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) {
var peerID peer.ID var peerID peer.ID
var err error var err error
for retryCnt := 0; retryCnt < 1; retryCnt++ { for retryCnt := 0; retryCnt < 1; retryCnt++ {
//Try to fetch from serviceSlot //Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil { if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if pubsubTopic == "" { if len(pubsubTopics) == 0 || (len(pubsubTopics) == 1 && pubsubTopics[0] == "") {
return slot.getRandom() return slot.getRandom()
} else { //PubsubTopic based selection } else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m)) keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m { for i := range slot.m {
keys = append(keys, i) keys = append(keys, i)
} }
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubsubTopic, keys...) selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(pubsubTopics, keys...)
peerID, err = selectRandomPeer(selectedPeers, pm.logger) peerID, err = selectRandomPeer(selectedPeers, pm.logger)
if err == nil { if err == nil {
return peerID, nil return peerID, nil
} else { } else {
pm.logger.Debug("Discovering peers by pubsubTopic", zap.String("pubsubTopic", pubsubTopic)) pm.logger.Debug("Discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics))
//Trigger on-demand discovery for this topic and connect to peer immediately. //Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover atleast 1 peer for the criteria //For now discover atleast 1 peer for the criteria
pm.discoverPeersByPubsubTopic(pubsubTopic, proto, ctx, 1) pm.discoverPeersByPubsubTopics(pubsubTopics, proto, ctx, 1)
//Try to fetch peers again. //Try to fetch peers again.
continue continue
} }
@ -98,7 +102,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string,
type PeerSelectionCriteria struct { type PeerSelectionCriteria struct {
SelectionType PeerSelection SelectionType PeerSelection
Proto protocol.ID Proto protocol.ID
PubsubTopic string PubsubTopics []string
SpecificPeers peer.IDSlice SpecificPeers peer.IDSlice
Ctx context.Context Ctx context.Context
} }
@ -135,8 +139,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
criteria.Ctx = context.Background() criteria.Ctx = context.Background()
} }
if criteria.PubsubTopic != "" { if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...)
} }
peers, err = pm.FilterPeersByProto(peers, criteria.Proto) peers, err = pm.FilterPeersByProto(peers, criteria.Proto)

View File

@ -59,6 +59,7 @@ type WakuPeerstore interface {
RemovePubSubTopic(p peer.ID, topic string) error RemovePubSubTopic(p peer.ID, topic string) error
PubSubTopics(p peer.ID) ([]string, error) PubSubTopics(p peer.ID) ([]string, error)
SetPubSubTopics(p peer.ID, topics []string) error SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice
PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice
} }
@ -207,7 +208,38 @@ func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) {
return result.([]string), nil return result.([]string), nil
} }
// PeersByPubSubTopic Returns list of peers by pubSubTopic // PeersByPubSubTopic Returns list of peers that support list of pubSubTopics
// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore
func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice {
if specificPeers == nil {
specificPeers = ps.Peers()
}
var result peer.IDSlice
for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p)
if err == nil {
//Convoluted and crazy logic to find subset of topics
// Could not find a better way to do it?
peerTopicMap := make(map[string]struct{})
match := true
for _, topic := range topics {
peerTopicMap[topic] = struct{}{}
}
for _, topic := range pubSubTopics {
if _, ok := peerTopicMap[topic]; !ok {
match = false
break
}
}
if match {
result = append(result, p)
}
} //Note: skipping a peer in case of an error as there would be others available.
}
return result
}
// PeersByPubSubTopic Returns list of peers that support a single pubSubTopic
// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore // If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore
func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice { func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice {
if specificPeers == nil { if specificPeers == nil {

View File

@ -52,23 +52,5 @@ func (cf ContentFilter) Equals(cf1 ContentFilter) bool {
// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) { func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) {
pubSubTopicMap := make(map[string][]string) return GeneratePubsubToContentTopicMap(contentFilter.PubsubTopic, contentFilter.ContentTopicsList())
if contentFilter.PubsubTopic != "" {
pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList()
} else {
//Parse the content-Topics to figure out shards.
for _, cTopicString := range contentFilter.ContentTopicsList() {
pTopicStr, err := GetPubSubTopicFromContentTopic(cTopicString)
if err != nil {
return nil, err
}
_, ok := pubSubTopicMap[pTopicStr]
if !ok {
pubSubTopicMap[pTopicStr] = []string{}
}
pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString)
}
}
return pubSubTopicMap, nil
} }

View File

@ -312,7 +312,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1, Proto: FilterSubscribeID_v20beta1,
PubsubTopic: pubSubTopic, PubsubTopics: []string{pubSubTopic},
SpecificPeers: params.preferredPeers, SpecificPeers: params.preferredPeers,
Ctx: ctx, Ctx: ctx,
}, },

View File

@ -178,7 +178,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1, Proto: FilterSubscribeID_v20beta1,
PubsubTopic: pubSubTopic, PubsubTopics: []string{pubSubTopic},
SpecificPeers: params.preferredPeers, SpecificPeers: params.preferredPeers,
Ctx: ctx, Ctx: ctx,
}, },

View File

@ -256,7 +256,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
Proto: FilterID_v20beta1, Proto: FilterID_v20beta1,
PubsubTopic: filter.Topic, PubsubTopics: []string{filter.Topic},
SpecificPeers: params.preferredPeers, SpecificPeers: params.preferredPeers,
Ctx: ctx, Ctx: ctx,
}, },

View File

@ -254,7 +254,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
Proto: LightPushID_v20beta1, Proto: LightPushID_v20beta1,
PubsubTopic: params.pubsubTopic, PubsubTopics: []string{params.pubsubTopic},
SpecificPeers: params.preferredPeers, SpecificPeers: params.preferredPeers,
Ctx: ctx, Ctx: ctx,
}, },

View File

@ -245,3 +245,26 @@ func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) {
return pTopic.String(), nil return pTopic.String(), nil
} }
func GeneratePubsubToContentTopicMap(pubsubTopic string, contentTopics []string) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string, 0)
if pubsubTopic == "" {
//Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly?
for _, cTopic := range contentTopics {
pTopic, err := GetPubSubTopicFromContentTopic(cTopic)
if err != nil {
return nil, err
}
_, ok := pubSubTopicMap[pTopic]
if !ok {
pubSubTopicMap[pTopic] = []string{}
}
pubSubTopicMap[pTopic] = append(pubSubTopicMap[pTopic], cTopic)
}
} else {
pubSubTopicMap[pubsubTopic] = append(pubSubTopicMap[pubsubTopic], contentTopics...)
}
return pubSubTopicMap, nil
}

View File

@ -8,17 +8,19 @@ import (
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/pbio" "github.com/libp2p/go-msgio/pbio"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
) )
type Query struct { type Query struct {
Topic string PubsubTopic string
ContentTopics []string ContentTopics []string
StartTime *int64 StartTime *int64
EndTime *int64 EndTime *int64
@ -82,6 +84,7 @@ type criteriaFN = func(msg *wpb.WakuMessage) (bool, error)
type HistoryRequestParameters struct { type HistoryRequestParameters struct {
selectedPeer peer.ID selectedPeer peer.ID
peerAddr multiaddr.Multiaddr
peerSelectionType peermanager.PeerSelection peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice preferredPeers peer.IDSlice
localQuery bool localQuery bool
@ -93,12 +96,31 @@ type HistoryRequestParameters struct {
s *WakuStore s *WakuStore
} }
type HistoryRequestOption func(*HistoryRequestParameters) type HistoryRequestOption func(*HistoryRequestParameters) error
// WithPeer is an option used to specify the peerID to request the message history // WithPeer is an option used to specify the peerID to request the message history.
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeer(p peer.ID) HistoryRequestOption { func WithPeer(p peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) error {
params.selectedPeer = p params.selectedPeer = p
if params.peerAddr != nil {
return errors.New("peerId and peerAddr options are mutually exclusive")
}
return nil
}
}
//WithPeerAddr is an option used to specify a peerAddress to request the message history.
// This new peer will be added to peerStore.
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption {
return func(params *HistoryRequestParameters) error {
params.peerAddr = pAddr
if params.selectedPeer != "" {
return errors.New("peerAddr and peerId options are mutually exclusive")
}
return nil
} }
} }
@ -108,9 +130,10 @@ func WithPeer(p peer.ID) HistoryRequestOption {
// from the node peerstore // from the node peerstore
// Note: This option is avaiable only with peerManager // Note: This option is avaiable only with peerManager
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) error {
params.peerSelectionType = peermanager.Automatic params.peerSelectionType = peermanager.Automatic
params.preferredPeers = fromThesePeers params.preferredPeers = fromThesePeers
return nil
} }
} }
@ -120,44 +143,50 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption
// from the node peerstore // from the node peerstore
// Note: This option is avaiable only with peerManager // Note: This option is avaiable only with peerManager
func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) error {
params.peerSelectionType = peermanager.LowestRTT params.peerSelectionType = peermanager.LowestRTT
return nil
} }
} }
// WithRequestID is an option to set a specific request ID to be used when // WithRequestID is an option to set a specific request ID to be used when
// creating a store request // creating a store request
func WithRequestID(requestID []byte) HistoryRequestOption { func WithRequestID(requestID []byte) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) error {
params.requestID = requestID params.requestID = requestID
return nil
} }
} }
// WithAutomaticRequestID is an option to automatically generate a request ID // WithAutomaticRequestID is an option to automatically generate a request ID
// when creating a store request // when creating a store request
func WithAutomaticRequestID() HistoryRequestOption { func WithAutomaticRequestID() HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) error {
params.requestID = protocol.GenerateRequestID() params.requestID = protocol.GenerateRequestID()
return nil
} }
} }
func WithCursor(c *pb.Index) HistoryRequestOption { func WithCursor(c *pb.Index) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) error {
params.cursor = c params.cursor = c
return nil
} }
} }
// WithPaging is an option used to specify the order and maximum number of records to return // WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) error {
params.asc = asc params.asc = asc
params.pageSize = pageSize params.pageSize = pageSize
return nil
} }
} }
func WithLocalQuery() HistoryRequestOption { func WithLocalQuery() HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) error {
params.localQuery = true params.localQuery = true
return nil
} }
} }
@ -253,7 +282,34 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
optList := DefaultOptions() optList := DefaultOptions()
optList = append(optList, opts...) optList = append(optList, opts...)
for _, opt := range optList { for _, opt := range optList {
opt(params) err := opt(params)
if err != nil {
return nil, err
}
}
if !params.localQuery {
pubsubTopics := []string{}
if query.PubsubTopic == "" {
for _, cTopic := range query.ContentTopics {
pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(cTopic)
if err != nil {
return nil, err
}
pubsubTopics = append(pubsubTopics, pubsubTopic)
}
} else {
pubsubTopics = append(pubsubTopics, query.PubsubTopic)
}
//Add Peer to peerstore.
if store.pm != nil && params.peerAddr != nil {
pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4)
if err != nil {
return nil, err
}
store.pm.Connect(pData)
params.selectedPeer = pData.AddrInfo.ID
} }
if store.pm != nil && params.selectedPeer == "" { if store.pm != nil && params.selectedPeer == "" {
var err error var err error
@ -261,7 +317,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
Proto: StoreID_v20beta4, Proto: StoreID_v20beta4,
PubsubTopic: query.Topic, PubsubTopics: pubsubTopics,
SpecificPeers: params.preferredPeers, SpecificPeers: params.preferredPeers,
Ctx: ctx, Ctx: ctx,
}, },
@ -270,11 +326,12 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
return nil, err return nil, err
} }
} }
}
historyRequest := &pb.HistoryRPC{ historyRequest := &pb.HistoryRPC{
RequestId: hex.EncodeToString(params.requestID), RequestId: hex.EncodeToString(params.requestID),
Query: &pb.HistoryQuery{ Query: &pb.HistoryQuery{
PubsubTopic: query.Topic, PubsubTopic: query.PubsubTopic,
ContentFilters: []*pb.ContentFilter{}, ContentFilters: []*pb.ContentFilter{},
StartTime: query.StartTime, StartTime: query.StartTime,
EndTime: query.EndTime, EndTime: query.EndTime,

View File

@ -71,7 +71,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
q := Query{ q := Query{
Topic: pubsubTopic1, PubsubTopic: pubsubTopic1,
ContentTopics: []string{topic1}, ContentTopics: []string{topic1},
} }
@ -110,7 +110,7 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
q := Query{ q := Query{
Topic: pubsubTopic1, PubsubTopic: pubsubTopic1,
ContentTopics: []string{topic1}, ContentTopics: []string{topic1},
} }
response, err := s1.Query(ctx, q, WithLocalQuery()) response, err := s1.Query(ctx, q, WithLocalQuery())
@ -167,7 +167,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
defer s2.Stop() defer s2.Stop()
q := Query{ q := Query{
Topic: pubsubTopic1, PubsubTopic: pubsubTopic1,
ContentTopics: []string{topic1}, ContentTopics: []string{topic1},
} }
@ -244,7 +244,7 @@ func TestWakuStoreResult(t *testing.T) {
defer s2.Stop() defer s2.Stop()
q := Query{ q := Query{
Topic: pubsubTopic1, PubsubTopic: pubsubTopic1,
ContentTopics: []string{topic1}, ContentTopics: []string{topic1},
} }
@ -346,7 +346,7 @@ func TestWakuStoreProtocolFind(t *testing.T) {
defer s2.Stop() defer s2.Stop()
q := Query{ q := Query{
Topic: pubsubTopic1, PubsubTopic: pubsubTopic1,
} }
fn := func(msg *pb.WakuMessage) (bool, error) { fn := func(msg *pb.WakuMessage) (bool, error) {

View File

@ -107,7 +107,7 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string
peer := service.PeerData{ peer := service.PeerData{
Origin: peerstore.Rendezvous, Origin: peerstore.Rendezvous,
AddrInfo: p, AddrInfo: p,
PubSubTopics: []string{namespace}, PubsubTopics: []string{namespace},
} }
if !r.PushToChan(peer) { if !r.PushToChan(peer) {
r.log.Error("could push to closed channel/context completed") r.log.Error("could push to closed channel/context completed")

View File

@ -14,7 +14,7 @@ type PeerData struct {
Origin wps.Origin Origin wps.Origin
AddrInfo peer.AddrInfo AddrInfo peer.AddrInfo
ENR *enode.Node ENR *enode.Node
PubSubTopics []string PubsubTopics []string
} }
type CommonDiscoveryService struct { type CommonDiscoveryService struct {