fix(rest/store): allow local queries (#1088)

This commit is contained in:
richΛrd 2024-04-17 08:54:17 -04:00 committed by GitHub
parent 67bbbaf60d
commit ea3f9d8d9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 21 additions and 10 deletions

View File

@ -67,7 +67,13 @@ func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption
return nil, nil, err return nil, nil, err
} }
options = append(options, store.WithPeerAddr(m)) options = append(options, store.WithPeerAddr(m))
} else {
// The user didn't specify a peer address and self-node is configured as a store node.
// In this case we assume that the user is willing to retrieve the messages stored by
// the local/self store node.
options = append(options, store.WithLocalQuery())
} }
query.PubsubTopic = r.URL.Query().Get("pubsubTopic") query.PubsubTopic = r.URL.Query().Get("pubsubTopic")
contentTopics := r.URL.Query().Get("contentTopics") contentTopics := r.URL.Query().Get("contentTopics")

View File

@ -63,10 +63,6 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
wakuLP.limiter = params.limiter wakuLP.limiter = params.limiter
if pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
}
return wakuLP return wakuLP
} }
@ -81,6 +77,10 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
return errors.New("relay is required, without it, it is only a client and cannot be started") return errors.New("relay is required, without it, it is only a client and cannot be started")
} }
if wakuLP.pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
wakuLP.cancel = cancel wakuLP.cancel = cancel

View File

@ -275,6 +275,10 @@ func (store *WakuStore) localQuery(historyQuery *pb.HistoryRPC) (*pb.HistoryResp
return historyResponseRPC.Response, nil return historyResponseRPC.Response, nil
} }
func (store *WakuStore) isLocalQuery(p *HistoryRequestParameters) bool {
return p.localQuery && store.started
}
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) { func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
params := new(HistoryRequestParameters) params := new(HistoryRequestParameters)
params.s = store params.s = store
@ -288,7 +292,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
} }
} }
if !params.localQuery { if !store.isLocalQuery(params) {
pubsubTopics := []string{} pubsubTopics := []string{}
if query.PubsubTopic == "" { if query.PubsubTopic == "" {
for _, cTopic := range query.ContentTopics { for _, cTopic := range query.ContentTopics {
@ -343,7 +347,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
historyRequest.Query.ContentFilters = append(historyRequest.Query.ContentFilters, &pb.ContentFilter{ContentTopic: cf}) historyRequest.Query.ContentFilters = append(historyRequest.Query.ContentFilters, &pb.ContentFilter{ContentTopic: cf})
} }
if !params.localQuery && params.selectedPeer == "" { if !store.isLocalQuery(params) && params.selectedPeer == "" {
store.metrics.RecordError(peerNotFoundFailure) store.metrics.RecordError(peerNotFoundFailure)
return nil, ErrNoPeersAvailable return nil, ErrNoPeersAvailable
} }
@ -373,7 +377,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
var response *pb.HistoryResponse var response *pb.HistoryResponse
if params.localQuery { if store.isLocalQuery(params) {
response, err = store.localQuery(historyRequest) response, err = store.localQuery(historyRequest)
} else { } else {
response, err = store.queryFrom(ctx, historyRequest, params.selectedPeer) response, err = store.queryFrom(ctx, historyRequest, params.selectedPeer)

View File

@ -67,8 +67,5 @@ func NewWakuStore(p MessageProvider, pm *peermanager.PeerManager, timesource tim
wakuStore.pm = pm wakuStore.pm = pm
wakuStore.metrics = newMetrics(reg) wakuStore.metrics = newMetrics(reg)
if pm != nil {
pm.RegisterWakuProtocol(StoreID_v20beta4, StoreENRField)
}
return wakuStore return wakuStore
} }

View File

@ -119,6 +119,10 @@ func (store *WakuStore) Start(ctx context.Context, sub *relay.Subscription) erro
return err return err
} }
if store.pm != nil {
store.pm.RegisterWakuProtocol(StoreID_v20beta4, StoreENRField)
}
store.started = true store.started = true
store.ctx, store.cancel = context.WithCancel(ctx) store.ctx, store.cancel = context.WithCancel(ctx)
store.MsgC = sub store.MsgC = sub