draft store api changes for autosharding

This commit is contained in:
Prem Chaitanya Prathi 2023-11-09 15:14:45 +05:30
parent 1dab4ff76e
commit cf6519518a
No known key found for this signature in database
5 changed files with 110 additions and 47 deletions

View File

@ -12,7 +12,6 @@ import (
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -60,20 +59,22 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) {
query := &store.Query{}
var options []store.HistoryRequestOption
var err error
peerAddrStr := r.URL.Query().Get("peerAddr")
m, err := multiaddr.NewMultiaddr(peerAddrStr)
if err != nil {
return nil, nil, nil, err
var m multiaddr.Multiaddr
if peerAddrStr != "" {
m, err = multiaddr.NewMultiaddr(peerAddrStr)
if err != nil {
return nil, nil, nil, err
}
peerID, err := utils.GetPeerID(m)
if err != nil {
return nil, nil, nil, err
}
options = append(options, store.WithPeer(peerID))
}
peerID, err := utils.GetPeerID(m)
if err != nil {
return nil, nil, nil, err
}
options = append(options, store.WithPeer(peerID))
query.PubsubTopic = r.URL.Query().Get("pubsubTopic")
contentTopics := r.URL.Query().Get("contentTopics")
@ -198,18 +199,25 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
_, err = d.node.AddPeer(peerAddr, peerstore.Static, d.node.Relay().Topics())
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
if peerAddr != nil {
options = append(options, store.WithPeerAddr(peerAddr))
}
result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
var results []*store.Result
var result *store.Result
if query.PubsubTopic == "" {
results, err = d.node.Store().QueryAutoSharding(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
} else {
result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
results = append(results, result)
}
//TODO: How to respond with multiple query results??
writeErrOrResponse(w, nil, toStoreResponse(result))
}

View File

@ -52,23 +52,5 @@ func (cf ContentFilter) Equals(cf1 ContentFilter) bool {
// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) {
pubSubTopicMap := make(map[string][]string)
if contentFilter.PubsubTopic != "" {
pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList()
} else {
//Parse the content-Topics to figure out shards.
for _, cTopicString := range contentFilter.ContentTopicsList() {
pTopicStr, err := GetPubSubTopicFromContentTopic(cTopicString)
if err != nil {
return nil, err
}
_, ok := pubSubTopicMap[pTopicStr]
if !ok {
pubSubTopicMap[pTopicStr] = []string{}
}
pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString)
}
}
return pubSubTopicMap, nil
return GeneratePubsubToContentTopicMap(contentFilter.PubsubTopic, contentFilter.ContentTopicsList())
}

View File

@ -245,3 +245,26 @@ func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) {
return pTopic.String(), nil
}
func GeneratePubsubToContentTopicMap(pubsubTopic string, contentTopics []string) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string, 0)
if pubsubTopic == "" {
//Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly?
for _, cTopic := range contentTopics {
pTopic, err := GetPubSubTopicFromContentTopic(cTopic)
if err != nil {
return nil, err
}
_, ok := pubSubTopicMap[pTopic]
if !ok {
pubSubTopicMap[pTopic] = []string{}
}
pubSubTopicMap[pTopic] = append(pubSubTopicMap[pTopic], cTopic)
}
} else {
pubSubTopicMap[pubsubTopic] = append(pubSubTopicMap[pubsubTopic], contentTopics...)
}
return pubSubTopicMap, nil
}

View File

@ -4,14 +4,19 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"strings"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/pbio"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
@ -82,6 +87,7 @@ type criteriaFN = func(msg *wpb.WakuMessage) (bool, error)
type HistoryRequestParameters struct {
selectedPeer peer.ID
peerAddr multiaddr.Multiaddr
peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice
localQuery bool
@ -102,6 +108,12 @@ func WithPeer(p peer.ID) HistoryRequestOption {
}
}
func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.peerAddr = pAddr
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to request the message history. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
@ -246,6 +258,47 @@ func (store *WakuStore) localQuery(historyQuery *pb.HistoryRPC) (*pb.HistoryResp
return historyResponseRPC.Response, nil
}
func (store *WakuStore) QueryAutoSharding(ctx context.Context, query Query, opts ...HistoryRequestOption) ([]*Result, error) {
var results []*Result
var failedContentTopics []string
pubSubTopicMap, err := protocol.GeneratePubsubToContentTopicMap(query.PubsubTopic, query.ContentTopics)
if err != nil {
return nil, err
}
//Duplicate processing of opts, which is done again in Query function below
// Not sure how to handle this, hence leaving it as of now
params := new(HistoryRequestParameters)
var optList []HistoryRequestOption
for _, opt := range optList {
opt(params)
}
//Add Peer to peerstore.
if store.pm != nil && params.peerAddr != nil {
_, err = store.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), StoreID_v20beta4)
if err != nil {
return nil, err
}
}
for pubSubTopic, cTopics := range pubSubTopicMap {
query.ContentTopics = cTopics
query.PubsubTopic = pubSubTopic
//Invoke Query separately
result, err := store.Query(ctx, query, opts...)
if err != nil {
failedContentTopics = append(failedContentTopics, cTopics...)
}
results = append(results, result)
}
if len(failedContentTopics) > 0 {
return results, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
} else {
return results, nil
}
}
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
params := new(HistoryRequestParameters)
params.s = store
@ -256,10 +309,6 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
opt(params)
}
//if query.PubsubTopic == "" {
//Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly?
//}
if store.pm != nil && params.selectedPeer == "" {
var err error
params.selectedPeer, err = store.pm.SelectPeer(

View File

@ -84,6 +84,7 @@ type Store interface {
SetHost(h host.Host)
Start(context.Context, *relay.Subscription) error
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
QueryAutoSharding(ctx context.Context, query Query, opts ...HistoryRequestOption) ([]*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error)
Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)