From 251188d217b2409af1fafa59857a892574ddc123 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Wed, 29 Nov 2023 09:31:23 -0400 Subject: [PATCH] fix(rest-filter): requestID is a `string` and refactor routes (#932) --- cmd/waku/server/rest/filter.go | 103 +++++++++++----------------- cmd/waku/server/rest/filter_test.go | 85 +++++++++++------------ 2 files changed, 82 insertions(+), 106 deletions(-) diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index 7718171c..d497ce1b 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -2,11 +2,9 @@ package rest import ( "context" - "encoding/hex" "encoding/json" "fmt" "net/http" - "strings" "github.com/go-chi/chi/v5" "github.com/libp2p/go-libp2p/core/peer" @@ -18,30 +16,8 @@ import ( "go.uber.org/zap" ) -type filterRequestId []byte - -func (r *filterRequestId) UnmarshalJSON(bodyBytes []byte) error { - body := strings.Trim(string(bodyBytes), `"`) - reqId, err := hex.DecodeString(body) - if err != nil { - return err - } - *r = reqId - return nil -} - -func (r filterRequestId) String() string { - return hex.EncodeToString(r) -} -func (r filterRequestId) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf(`"%s"`, r.String())), nil -} - -const filterv2Ping = "/filter/v2/subscriptions/{requestId}" -const filterv2Subscribe = "/filter/v2/subscriptions" -const filterv2SubscribeAll = "/filter/v2/subscriptions/all" -const filterv2MessagesByContentTopic = "/filter/v2/messages/{contentTopic}" -const filterv2MessagesByPubsubTopic = "/filter/v2/messages/{pubsubTopic}/{contentTopic}" +const filterV2Subscriptions = "/filter/v2/subscriptions" +const filterv2Messages = "/filter/v2/messages" // FilterService represents the REST service for Filter client type FilterService struct { @@ -84,12 +60,18 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z cache: newFilterCache(cacheCapacity, logger), } - m.Get(filterv2Ping, s.ping) - m.Post(filterv2Subscribe, s.subscribe) - m.Delete(filterv2Subscribe, s.unsubscribe) - m.Delete(filterv2SubscribeAll, s.unsubscribeAll) - m.Get(filterv2MessagesByContentTopic, s.getMessagesByContentTopic) - m.Get(filterv2MessagesByPubsubTopic, s.getMessagesByPubsubTopic) + m.Route(filterV2Subscriptions, func(r chi.Router) { + r.Get("/", s.ping) + r.Get("/{requestId}", s.ping) + r.Post("/", s.subscribe) + r.Delete("/", s.unsubscribe) + r.Delete("/all", s.unsubscribeAll) + }) + + m.Route(filterv2Messages, func(r chi.Router) { + r.Get("/{contentTopic}", s.getMessagesByContentTopic) + r.Get("/{pubsubTopic}/{contentTopic}", s.getMessagesByPubsubTopic) + }) s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage) @@ -100,26 +82,25 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z // 404 when request failed or no suitable peers // 200 when ping successful func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) { - var requestId filterRequestId - if err := requestId.UnmarshalJSON([]byte(chi.URLParam(req, "requestId"))); err != nil { - s.log.Error("bad request id", zap.Error(err)) + requestID := chi.URLParam(req, "requestId") + if requestID == "" { writeResponse(w, &filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "bad request id", }, http.StatusBadRequest) return } // selecting random peer that supports filter protocol - peerId := s.getRandomFilterPeer(req.Context(), requestId, w) + peerId := s.getRandomFilterPeer(req.Context(), requestID, w) if peerId == "" { return } - if err := s.node.FilterLightnode().Ping(req.Context(), peerId, filter.WithPingRequestId(requestId)); err != nil { + if err := s.node.FilterLightnode().Ping(req.Context(), peerId, filter.WithPingRequestId([]byte(requestID))); err != nil { s.log.Error("ping request failed", zap.Error(err)) writeResponse(w, &filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "ping request failed", }, http.StatusServiceUnavailable) return @@ -127,21 +108,21 @@ func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) { // success writeResponse(w, &filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: http.StatusText(http.StatusOK), }, http.StatusOK) } // same for FilterUnsubscribeRequest type filterSubscriptionRequest struct { - RequestId filterRequestId `json:"requestId"` - ContentFilters []string `json:"contentFilters"` - PubsubTopic string `json:"pubsubTopic"` + RequestID string `json:"requestId"` + ContentFilters []string `json:"contentFilters"` + PubsubTopic string `json:"pubsubTopic"` } type filterSubscriptionResponse struct { - RequestId filterRequestId `json:"requestId"` - StatusDesc string `json:"statusDesc"` + RequestID string `json:"requestId"` + StatusDesc string `json:"statusDesc"` } // 400 on invalid request @@ -158,14 +139,14 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { // subscriptions, err := s.node.FilterLightnode().Subscribe(req.Context(), contentFilter, - filter.WithRequestID(message.RequestId)) + filter.WithRequestID([]byte(message.RequestID))) // on partial subscribe failure if len(subscriptions) > 0 && err != nil { s.log.Error("partial subscribe failed", zap.Error(err)) // on partial failure writeResponse(w, filterSubscriptionResponse{ - RequestId: message.RequestId, + RequestID: message.RequestID, StatusDesc: err.Error(), }, http.StatusOK) } @@ -173,7 +154,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { if err != nil { s.log.Error("subscription failed", zap.Error(err)) writeResponse(w, filterSubscriptionResponse{ - RequestId: message.RequestId, + RequestID: message.RequestID, StatusDesc: "subscription failed", }, http.StatusServiceUnavailable) return @@ -182,7 +163,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { // on success s.cache.subscribe(contentFilter) writeResponse(w, filterSubscriptionResponse{ - RequestId: message.RequestId, + RequestID: message.RequestID, StatusDesc: http.StatusText(http.StatusOK), }, http.StatusOK) } @@ -198,7 +179,7 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { return } - peerId := s.getRandomFilterPeer(req.Context(), message.RequestId, w) + peerId := s.getRandomFilterPeer(req.Context(), message.RequestID, w) if peerId == "" { return } @@ -208,14 +189,14 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { result, err := s.node.FilterLightnode().Unsubscribe( req.Context(), contentFilter, - filter.WithRequestID(message.RequestId), + filter.WithRequestID([]byte(message.RequestID)), filter.WithPeer(peerId), ) if err != nil { s.log.Error("unsubscribe failed", zap.Error(err)) writeResponse(w, filterSubscriptionResponse{ - RequestId: message.RequestId, + RequestID: message.RequestID, StatusDesc: err.Error(), }, http.StatusServiceUnavailable) return @@ -228,7 +209,7 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { } } writeResponse(w, filterSubscriptionResponse{ - RequestId: message.RequestId, + RequestID: message.RequestID, StatusDesc: s.unsubscribeGetMessage(result), }, http.StatusOK) } @@ -255,10 +236,8 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul return http.StatusText(http.StatusOK) } -// /////////////////////// -// /////////////////////// type filterUnsubscribeAllRequest struct { - RequestId filterRequestId `json:"requestId"` + RequestID string `json:"requestId"` } func (s *FilterService) readBody(w http.ResponseWriter, req *http.Request, message interface{}) bool { @@ -282,7 +261,7 @@ func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request) return } - peerId := s.getRandomFilterPeer(req.Context(), message.RequestId, w) + peerId := s.getRandomFilterPeer(req.Context(), message.RequestID, w) if peerId == "" { return } @@ -290,13 +269,13 @@ func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request) // unsubscribe all subscriptions for a given peer errCh, err := s.node.FilterLightnode().UnsubscribeAll( req.Context(), - filter.WithRequestID(message.RequestId), + filter.WithRequestID([]byte(message.RequestID)), filter.WithPeer(peerId), ) if err != nil { s.log.Error("unsubscribeAll failed", zap.Error(err)) writeResponse(w, filterSubscriptionResponse{ - RequestId: message.RequestId, + RequestID: message.RequestID, StatusDesc: err.Error(), }, http.StatusServiceUnavailable) return @@ -304,12 +283,12 @@ func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request) // on success writeResponse(w, filterSubscriptionResponse{ - RequestId: message.RequestId, + RequestID: message.RequestID, StatusDesc: s.unsubscribeGetMessage(errCh), }, http.StatusOK) } -func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId []byte, w http.ResponseWriter) peer.ID { +func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId string, w http.ResponseWriter) peer.ID { // selecting random peer that supports filter protocol peerId, err := s.node.PeerManager().SelectPeer(peermanager.PeerSelectionCriteria{ SelectionType: peermanager.Automatic, @@ -319,7 +298,7 @@ func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId []byte if err != nil { s.log.Error("selecting peer", zap.Error(err)) writeResponse(w, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestId, StatusDesc: "No suitable peers", }, http.StatusServiceUnavailable) return "" diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go index 7a8382bd..d8e62821 100644 --- a/cmd/waku/server/rest/filter_test.go +++ b/cmd/waku/server/rest/filter_test.go @@ -3,6 +3,7 @@ package rest import ( "bytes" "context" + "encoding/hex" "encoding/json" "fmt" "net/http" @@ -61,23 +62,23 @@ func TestFilterPingFailure(t *testing.T) { router := chi.NewRouter() _ = NewFilterService(node2, router, 0, utils.Logger()) - // with malformed requestId + // with empty requestID rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", "invalid_request_id"), nil) + req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", ""), nil) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: []byte{}, + RequestID: "", StatusDesc: "bad request id", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusBadRequest, rr.Code) // no subscription with peer - var requestId filterRequestId = protocol.GenerateRequestID() + requestID := hex.EncodeToString(protocol.GenerateRequestID()) rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil) + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestID), nil) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "ping request failed", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusServiceUnavailable, rr.Code) @@ -88,9 +89,8 @@ func TestFilterPingFailure(t *testing.T) { func TestFilterSubscribeAndPing(t *testing.T) { pubsubTopic := "/waku/2/test/proto" contentTopics := []string{"test"} - var requestId filterRequestId = protocol.GenerateRequestID() + requestID := hex.EncodeToString(protocol.GenerateRequestID()) - // node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) defer func() { node1.Stop() @@ -103,24 +103,24 @@ func TestFilterSubscribeAndPing(t *testing.T) { // create subscription to peer rr := httptest.NewRecorder() reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestId: requestId, + RequestID: requestID, PubsubTopic: pubsubTopic, ContentFilters: contentTopics, })) - req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader) + req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "OK", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusOK, rr.Code) // trying pinging the peer once there is subscription to it rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil) + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", filterV2Subscriptions, requestID), nil) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "OK", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusOK, rr.Code) @@ -131,9 +131,8 @@ func TestFilterSubscribeAndPing(t *testing.T) { func TestFilterSubscribeAndUnsubscribe(t *testing.T) { pubsubTopic := "/waku/2/test/proto" contentTopics := []string{"test"} - var requestId filterRequestId = protocol.GenerateRequestID() + requestID := hex.EncodeToString(protocol.GenerateRequestID()) - // node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) defer func() { node1.Stop() @@ -146,30 +145,30 @@ func TestFilterSubscribeAndUnsubscribe(t *testing.T) { // create subscription to peer rr := httptest.NewRecorder() reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestId: requestId, + RequestID: requestID, PubsubTopic: pubsubTopic, ContentFilters: contentTopics, })) - req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader) + req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "OK", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusOK, rr.Code) // delete the subscription to the peer with matching pubSub and contentTopic - requestId = protocol.GenerateRequestID() + requestID = hex.EncodeToString(protocol.GenerateRequestID()) rr = httptest.NewRecorder() reqReader = strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestId: requestId, + RequestID: requestID, PubsubTopic: pubsubTopic, ContentFilters: contentTopics, })) - req, _ = http.NewRequest(http.MethodDelete, filterv2Subscribe, reqReader) + req, _ = http.NewRequest(http.MethodDelete, filterV2Subscriptions, reqReader) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "OK", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusOK, rr.Code) @@ -182,9 +181,7 @@ func TestFilterAllUnsubscribe(t *testing.T) { pubsubTopic := "/waku/2/test/proto" contentTopics1 := "ct_1" contentTopics2 := "ct_2" - var requestId filterRequestId - // node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) defer func() { node1.Stop() @@ -196,43 +193,43 @@ func TestFilterAllUnsubscribe(t *testing.T) { // create 2 different subscription to peer for _, ct := range []string{contentTopics1, contentTopics2} { - requestId = protocol.GenerateRequestID() + requestID := hex.EncodeToString(protocol.GenerateRequestID()) rr := httptest.NewRecorder() reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestId: requestId, + RequestID: requestID, PubsubTopic: pubsubTopic, ContentFilters: []string{ct}, })) - req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader) + req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "OK", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusOK, rr.Code) } // delete all subscription to the peer - requestId = protocol.GenerateRequestID() + requestID := hex.EncodeToString(protocol.GenerateRequestID()) rr := httptest.NewRecorder() reqReader := strings.NewReader(toString(t, filterUnsubscribeAllRequest{ - RequestId: requestId, + RequestID: requestID, })) - req, _ := http.NewRequest(http.MethodDelete, filterv2SubscribeAll, reqReader) + req, _ := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/all", filterV2Subscriptions), reqReader) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "OK", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusOK, rr.Code) // check if all subscriptions are deleted to the peer are deleted - requestId = protocol.GenerateRequestID() + requestID = hex.EncodeToString(protocol.GenerateRequestID()) rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil) + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", filterV2Subscriptions, requestID), nil) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "ping request failed", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusServiceUnavailable, rr.Code) @@ -280,17 +277,17 @@ func TestFilterGetMessages(t *testing.T) { { // create subscription so that messages are cached for _, pubsubTopic := range []string{"", pubsubTopic} { - requestId := protocol.GenerateRequestID() + requestID := hex.EncodeToString(protocol.GenerateRequestID()) rr := httptest.NewRecorder() reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestId: requestId, + RequestID: requestID, PubsubTopic: pubsubTopic, ContentFilters: []string{contentTopic}, })) - req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader) + req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader) router.ServeHTTP(rr, req) checkJSON(t, filterSubscriptionResponse{ - RequestId: requestId, + RequestID: requestID, StatusDesc: "OK", }, getFilterResponse(t, rr.Body)) require.Equal(t, http.StatusOK, rr.Code) @@ -314,7 +311,7 @@ func TestFilterGetMessages(t *testing.T) { { // with malformed contentTopic rr := httptest.NewRecorder() req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape("/waku/2/wrongtopic")), + fmt.Sprintf("%s/%s", filterv2Messages, url.QueryEscape("/waku/2/wrongtopic")), nil, ) router.ServeHTTP(rr, req) @@ -325,7 +322,7 @@ func TestFilterGetMessages(t *testing.T) { { // with check if the cache is working properly rr := httptest.NewRecorder() req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape(contentTopic)), + fmt.Sprintf("%s/%s", filterv2Messages, url.QueryEscape(contentTopic)), nil, ) router.ServeHTTP(rr, req) @@ -336,7 +333,7 @@ func TestFilterGetMessages(t *testing.T) { { // check if pubsubTopic is present in the url rr := httptest.NewRecorder() req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("/filter/v2/messages//%s", url.QueryEscape(contentTopic)), + fmt.Sprintf("%s//%s", filterv2Messages, url.QueryEscape(contentTopic)), nil, ) router.ServeHTTP(rr, req) @@ -347,7 +344,7 @@ func TestFilterGetMessages(t *testing.T) { { // check messages by pubsub/contentTopic pair rr := httptest.NewRecorder() req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)), + fmt.Sprintf("%s/%s/%s", filterv2Messages, url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)), nil, ) router.ServeHTTP(rr, req) @@ -359,7 +356,7 @@ func TestFilterGetMessages(t *testing.T) { rr := httptest.NewRecorder() notSubscibredPubsubTopic := "/waku/2/test2/proto" req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)), + fmt.Sprintf("%s/%s/%s", filterv2Messages, url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)), nil, ) router.ServeHTTP(rr, req)