fix(rest-filter): requestID is a `string` and refactor routes (#932)

This commit is contained in:
richΛrd 2023-11-29 09:31:23 -04:00 committed by GitHub
parent 28107bd307
commit 251188d217
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 106 deletions

View File

@ -2,11 +2,9 @@ package rest
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peer"
@ -18,30 +16,8 @@ import (
"go.uber.org/zap"
)
type filterRequestId []byte
func (r *filterRequestId) UnmarshalJSON(bodyBytes []byte) error {
body := strings.Trim(string(bodyBytes), `"`)
reqId, err := hex.DecodeString(body)
if err != nil {
return err
}
*r = reqId
return nil
}
func (r filterRequestId) String() string {
return hex.EncodeToString(r)
}
func (r filterRequestId) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, r.String())), nil
}
const filterv2Ping = "/filter/v2/subscriptions/{requestId}"
const filterv2Subscribe = "/filter/v2/subscriptions"
const filterv2SubscribeAll = "/filter/v2/subscriptions/all"
const filterv2MessagesByContentTopic = "/filter/v2/messages/{contentTopic}"
const filterv2MessagesByPubsubTopic = "/filter/v2/messages/{pubsubTopic}/{contentTopic}"
const filterV2Subscriptions = "/filter/v2/subscriptions"
const filterv2Messages = "/filter/v2/messages"
// FilterService represents the REST service for Filter client
type FilterService struct {
@ -84,12 +60,18 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z
cache: newFilterCache(cacheCapacity, logger),
}
m.Get(filterv2Ping, s.ping)
m.Post(filterv2Subscribe, s.subscribe)
m.Delete(filterv2Subscribe, s.unsubscribe)
m.Delete(filterv2SubscribeAll, s.unsubscribeAll)
m.Get(filterv2MessagesByContentTopic, s.getMessagesByContentTopic)
m.Get(filterv2MessagesByPubsubTopic, s.getMessagesByPubsubTopic)
m.Route(filterV2Subscriptions, func(r chi.Router) {
r.Get("/", s.ping)
r.Get("/{requestId}", s.ping)
r.Post("/", s.subscribe)
r.Delete("/", s.unsubscribe)
r.Delete("/all", s.unsubscribeAll)
})
m.Route(filterv2Messages, func(r chi.Router) {
r.Get("/{contentTopic}", s.getMessagesByContentTopic)
r.Get("/{pubsubTopic}/{contentTopic}", s.getMessagesByPubsubTopic)
})
s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage)
@ -100,26 +82,25 @@ func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *z
// 404 when request failed or no suitable peers
// 200 when ping successful
func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) {
var requestId filterRequestId
if err := requestId.UnmarshalJSON([]byte(chi.URLParam(req, "requestId"))); err != nil {
s.log.Error("bad request id", zap.Error(err))
requestID := chi.URLParam(req, "requestId")
if requestID == "" {
writeResponse(w, &filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "bad request id",
}, http.StatusBadRequest)
return
}
// selecting random peer that supports filter protocol
peerId := s.getRandomFilterPeer(req.Context(), requestId, w)
peerId := s.getRandomFilterPeer(req.Context(), requestID, w)
if peerId == "" {
return
}
if err := s.node.FilterLightnode().Ping(req.Context(), peerId, filter.WithPingRequestId(requestId)); err != nil {
if err := s.node.FilterLightnode().Ping(req.Context(), peerId, filter.WithPingRequestId([]byte(requestID))); err != nil {
s.log.Error("ping request failed", zap.Error(err))
writeResponse(w, &filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "ping request failed",
}, http.StatusServiceUnavailable)
return
@ -127,21 +108,21 @@ func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) {
// success
writeResponse(w, &filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: http.StatusText(http.StatusOK),
}, http.StatusOK)
}
// same for FilterUnsubscribeRequest
type filterSubscriptionRequest struct {
RequestId filterRequestId `json:"requestId"`
ContentFilters []string `json:"contentFilters"`
PubsubTopic string `json:"pubsubTopic"`
RequestID string `json:"requestId"`
ContentFilters []string `json:"contentFilters"`
PubsubTopic string `json:"pubsubTopic"`
}
type filterSubscriptionResponse struct {
RequestId filterRequestId `json:"requestId"`
StatusDesc string `json:"statusDesc"`
RequestID string `json:"requestId"`
StatusDesc string `json:"statusDesc"`
}
// 400 on invalid request
@ -158,14 +139,14 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
//
subscriptions, err := s.node.FilterLightnode().Subscribe(req.Context(),
contentFilter,
filter.WithRequestID(message.RequestId))
filter.WithRequestID([]byte(message.RequestID)))
// on partial subscribe failure
if len(subscriptions) > 0 && err != nil {
s.log.Error("partial subscribe failed", zap.Error(err))
// on partial failure
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
RequestID: message.RequestID,
StatusDesc: err.Error(),
}, http.StatusOK)
}
@ -173,7 +154,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
if err != nil {
s.log.Error("subscription failed", zap.Error(err))
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
RequestID: message.RequestID,
StatusDesc: "subscription failed",
}, http.StatusServiceUnavailable)
return
@ -182,7 +163,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
// on success
s.cache.subscribe(contentFilter)
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
RequestID: message.RequestID,
StatusDesc: http.StatusText(http.StatusOK),
}, http.StatusOK)
}
@ -198,7 +179,7 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
return
}
peerId := s.getRandomFilterPeer(req.Context(), message.RequestId, w)
peerId := s.getRandomFilterPeer(req.Context(), message.RequestID, w)
if peerId == "" {
return
}
@ -208,14 +189,14 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
result, err := s.node.FilterLightnode().Unsubscribe(
req.Context(),
contentFilter,
filter.WithRequestID(message.RequestId),
filter.WithRequestID([]byte(message.RequestID)),
filter.WithPeer(peerId),
)
if err != nil {
s.log.Error("unsubscribe failed", zap.Error(err))
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
RequestID: message.RequestID,
StatusDesc: err.Error(),
}, http.StatusServiceUnavailable)
return
@ -228,7 +209,7 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
}
}
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
RequestID: message.RequestID,
StatusDesc: s.unsubscribeGetMessage(result),
}, http.StatusOK)
}
@ -255,10 +236,8 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul
return http.StatusText(http.StatusOK)
}
// ///////////////////////
// ///////////////////////
type filterUnsubscribeAllRequest struct {
RequestId filterRequestId `json:"requestId"`
RequestID string `json:"requestId"`
}
func (s *FilterService) readBody(w http.ResponseWriter, req *http.Request, message interface{}) bool {
@ -282,7 +261,7 @@ func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request)
return
}
peerId := s.getRandomFilterPeer(req.Context(), message.RequestId, w)
peerId := s.getRandomFilterPeer(req.Context(), message.RequestID, w)
if peerId == "" {
return
}
@ -290,13 +269,13 @@ func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request)
// unsubscribe all subscriptions for a given peer
errCh, err := s.node.FilterLightnode().UnsubscribeAll(
req.Context(),
filter.WithRequestID(message.RequestId),
filter.WithRequestID([]byte(message.RequestID)),
filter.WithPeer(peerId),
)
if err != nil {
s.log.Error("unsubscribeAll failed", zap.Error(err))
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
RequestID: message.RequestID,
StatusDesc: err.Error(),
}, http.StatusServiceUnavailable)
return
@ -304,12 +283,12 @@ func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request)
// on success
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
RequestID: message.RequestID,
StatusDesc: s.unsubscribeGetMessage(errCh),
}, http.StatusOK)
}
func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId []byte, w http.ResponseWriter) peer.ID {
func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId string, w http.ResponseWriter) peer.ID {
// selecting random peer that supports filter protocol
peerId, err := s.node.PeerManager().SelectPeer(peermanager.PeerSelectionCriteria{
SelectionType: peermanager.Automatic,
@ -319,7 +298,7 @@ func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId []byte
if err != nil {
s.log.Error("selecting peer", zap.Error(err))
writeResponse(w, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestId,
StatusDesc: "No suitable peers",
}, http.StatusServiceUnavailable)
return ""

View File

@ -3,6 +3,7 @@ package rest
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
@ -61,23 +62,23 @@ func TestFilterPingFailure(t *testing.T) {
router := chi.NewRouter()
_ = NewFilterService(node2, router, 0, utils.Logger())
// with malformed requestId
// with empty requestID
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", "invalid_request_id"), nil)
req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", ""), nil)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: []byte{},
RequestID: "",
StatusDesc: "bad request id",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusBadRequest, rr.Code)
// no subscription with peer
var requestId filterRequestId = protocol.GenerateRequestID()
requestID := hex.EncodeToString(protocol.GenerateRequestID())
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil)
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestID), nil)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "ping request failed",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusServiceUnavailable, rr.Code)
@ -88,9 +89,8 @@ func TestFilterPingFailure(t *testing.T) {
func TestFilterSubscribeAndPing(t *testing.T) {
pubsubTopic := "/waku/2/test/proto"
contentTopics := []string{"test"}
var requestId filterRequestId = protocol.GenerateRequestID()
requestID := hex.EncodeToString(protocol.GenerateRequestID())
//
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
defer func() {
node1.Stop()
@ -103,24 +103,24 @@ func TestFilterSubscribeAndPing(t *testing.T) {
// create subscription to peer
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestId: requestId,
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: contentTopics,
}))
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
// trying pinging the peer once there is subscription to it
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil)
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", filterV2Subscriptions, requestID), nil)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
@ -131,9 +131,8 @@ func TestFilterSubscribeAndPing(t *testing.T) {
func TestFilterSubscribeAndUnsubscribe(t *testing.T) {
pubsubTopic := "/waku/2/test/proto"
contentTopics := []string{"test"}
var requestId filterRequestId = protocol.GenerateRequestID()
requestID := hex.EncodeToString(protocol.GenerateRequestID())
//
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
defer func() {
node1.Stop()
@ -146,30 +145,30 @@ func TestFilterSubscribeAndUnsubscribe(t *testing.T) {
// create subscription to peer
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestId: requestId,
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: contentTopics,
}))
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
// delete the subscription to the peer with matching pubSub and contentTopic
requestId = protocol.GenerateRequestID()
requestID = hex.EncodeToString(protocol.GenerateRequestID())
rr = httptest.NewRecorder()
reqReader = strings.NewReader(toString(t, filterSubscriptionRequest{
RequestId: requestId,
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: contentTopics,
}))
req, _ = http.NewRequest(http.MethodDelete, filterv2Subscribe, reqReader)
req, _ = http.NewRequest(http.MethodDelete, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
@ -182,9 +181,7 @@ func TestFilterAllUnsubscribe(t *testing.T) {
pubsubTopic := "/waku/2/test/proto"
contentTopics1 := "ct_1"
contentTopics2 := "ct_2"
var requestId filterRequestId
//
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
defer func() {
node1.Stop()
@ -196,43 +193,43 @@ func TestFilterAllUnsubscribe(t *testing.T) {
// create 2 different subscription to peer
for _, ct := range []string{contentTopics1, contentTopics2} {
requestId = protocol.GenerateRequestID()
requestID := hex.EncodeToString(protocol.GenerateRequestID())
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestId: requestId,
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: []string{ct},
}))
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
}
// delete all subscription to the peer
requestId = protocol.GenerateRequestID()
requestID := hex.EncodeToString(protocol.GenerateRequestID())
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterUnsubscribeAllRequest{
RequestId: requestId,
RequestID: requestID,
}))
req, _ := http.NewRequest(http.MethodDelete, filterv2SubscribeAll, reqReader)
req, _ := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/all", filterV2Subscriptions), reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
// check if all subscriptions are deleted to the peer are deleted
requestId = protocol.GenerateRequestID()
requestID = hex.EncodeToString(protocol.GenerateRequestID())
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil)
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", filterV2Subscriptions, requestID), nil)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "ping request failed",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusServiceUnavailable, rr.Code)
@ -280,17 +277,17 @@ func TestFilterGetMessages(t *testing.T) {
{ // create subscription so that messages are cached
for _, pubsubTopic := range []string{"", pubsubTopic} {
requestId := protocol.GenerateRequestID()
requestID := hex.EncodeToString(protocol.GenerateRequestID())
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestId: requestId,
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: []string{contentTopic},
}))
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
@ -314,7 +311,7 @@ func TestFilterGetMessages(t *testing.T) {
{ // with malformed contentTopic
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape("/waku/2/wrongtopic")),
fmt.Sprintf("%s/%s", filterv2Messages, url.QueryEscape("/waku/2/wrongtopic")),
nil,
)
router.ServeHTTP(rr, req)
@ -325,7 +322,7 @@ func TestFilterGetMessages(t *testing.T) {
{ // with check if the cache is working properly
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape(contentTopic)),
fmt.Sprintf("%s/%s", filterv2Messages, url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
@ -336,7 +333,7 @@ func TestFilterGetMessages(t *testing.T) {
{ // check if pubsubTopic is present in the url
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages//%s", url.QueryEscape(contentTopic)),
fmt.Sprintf("%s//%s", filterv2Messages, url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
@ -347,7 +344,7 @@ func TestFilterGetMessages(t *testing.T) {
{ // check messages by pubsub/contentTopic pair
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)),
fmt.Sprintf("%s/%s/%s", filterv2Messages, url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
@ -359,7 +356,7 @@ func TestFilterGetMessages(t *testing.T) {
rr := httptest.NewRecorder()
notSubscibredPubsubTopic := "/waku/2/test2/proto"
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)),
fmt.Sprintf("%s/%s/%s", filterv2Messages, url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)