From 0868f5d4dd38101419f6bf04a5b78d9194fbb276 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Fri, 27 Oct 2023 06:21:50 +0700 Subject: [PATCH] feat: add filter v2 rpc (#798) * feat: add filter v2 rpc ping, subscribe/unsubscribe and unsubscribeAll. * test(filterRest): pingFailure, subscribe-ping, unsubscribe and unsubscribeAll --- cmd/waku/server/rest/filter.go | 284 ++++++++++++++++++++ cmd/waku/server/rest/filter_api.yaml | 231 ++++++++++++++++ cmd/waku/server/rest/filter_test.go | 253 +++++++++++++++++ cmd/waku/server/rest/lightpush_rest_test.go | 15 +- cmd/waku/server/rest/utils.go | 10 +- cmd/waku/server/rest/waku_rest.go | 4 + waku/v2/node/wakunode2.go | 5 + waku/v2/protocol/filter/client.go | 13 +- waku/v2/protocol/filter/options.go | 13 + 9 files changed, 806 insertions(+), 22 deletions(-) create mode 100644 cmd/waku/server/rest/filter.go create mode 100644 cmd/waku/server/rest/filter_api.yaml create mode 100644 cmd/waku/server/rest/filter_test.go diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go new file mode 100644 index 00000000..61cea4c9 --- /dev/null +++ b/cmd/waku/server/rest/filter.go @@ -0,0 +1,284 @@ +package rest + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/go-chi/chi/v5" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "go.uber.org/zap" +) + +type filterRequestId []byte + +func (r *filterRequestId) UnmarshalJSON(bodyBytes []byte) error { + body := strings.Trim(string(bodyBytes), `"`) + reqId, err := hex.DecodeString(body) + if err != nil { + return err + } + *r = reqId + return nil +} + +func (r filterRequestId) String() string { + return hex.EncodeToString(r) +} +func (r filterRequestId) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, r.String())), nil +} + +const filterv2Ping = "/filter/v2/subscriptions/{requestId}" +const filterv2Subscribe = "/filter/v2/subscriptions" +const filterv2SubscribeAll = "/filter/v2/subscriptions/all" + +// FilterService represents the REST service for Filter client +type FilterService struct { + node *node.WakuNode + + log *zap.Logger +} + +// NewFilterService returns an instance of FilterService +func NewFilterService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *FilterService { + s := &FilterService{ + node: node, + log: log.Named("filter"), + } + + m.Get(filterv2Ping, s.ping) + m.Post(filterv2Subscribe, s.subscribe) + m.Delete(filterv2Subscribe, s.unsubscribe) + m.Delete(filterv2SubscribeAll, s.unsubscribeAll) + + return s +} + +// 400 for bad requestId +// 404 when request failed or no suitable peers +// 200 when ping successful +func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) { + var requestId filterRequestId + if err := requestId.UnmarshalJSON([]byte(chi.URLParam(req, "requestId"))); err != nil { + s.log.Error("bad request id", zap.Error(err)) + writeResponse(w, &filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "bad request id", + }, http.StatusBadRequest) + return + } + + // selecting random peer that supports filter protocol + peerId := s.getRandomFilterPeer(req.Context(), requestId, w) + if peerId == "" { + return + } + + if err := s.node.FilterLightnode().Ping(req.Context(), peerId, filter.WithPingRequestId(requestId)); err != nil { + s.log.Error("ping request failed", zap.Error(err)) + writeResponse(w, &filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "ping request failed", + }, http.StatusServiceUnavailable) + return + } + + // success + writeResponse(w, &filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: http.StatusText(http.StatusOK), + }, http.StatusOK) +} + +/////////////////////// +/////////////////////// + +// same for FilterUnsubscribeRequest +type filterSubscriptionRequest struct { + RequestId filterRequestId `json:"requestId"` + ContentFilters []string `json:"contentFilters"` + PubsubTopic string `json:"pubsubTopic"` +} + +type filterSubscriptionResponse struct { + RequestId filterRequestId `json:"requestId"` + StatusDesc string `json:"statusDesc"` +} + +// 400 on invalid request +// 404 on failed subscription +// 200 on single returned successful subscription +// NOTE: subscribe on filter client randomly selects a peer if missing for given pubSubTopic +func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { + message := filterSubscriptionRequest{} + if !s.readBody(w, req, &message) { + return + } + + // + subscriptions, err := s.node.FilterLightnode().Subscribe(req.Context(), + protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...), + filter.WithRequestID(message.RequestId)) + + // on partial subscribe failure + if len(subscriptions) > 0 && err != nil { + s.log.Error("partial subscribe failed", zap.Error(err)) + // on partial failure + writeResponse(w, filterSubscriptionResponse{ + RequestId: message.RequestId, + StatusDesc: err.Error(), + }, http.StatusOK) + } + + if err != nil { + s.log.Error("subscription failed", zap.Error(err)) + writeResponse(w, filterSubscriptionResponse{ + RequestId: message.RequestId, + StatusDesc: "subscription failed", + }, http.StatusServiceUnavailable) + return + } + + // on success + writeResponse(w, filterSubscriptionResponse{ + RequestId: message.RequestId, + StatusDesc: http.StatusText(http.StatusOK), + }, http.StatusOK) +} + +// 400 on invalid request +// 500 on failed subscription +// 200 on successful unsubscribe +// NOTE: unsubscribe on filter client will remove subscription from all peers with matching pubSubTopic, if peerId is not provided +// to match functionality in nwaku, we will randomly select a peer that supports filter protocol. +func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { + message := filterSubscriptionRequest{} // as pubSubTopics can also be present + if !s.readBody(w, req, &message) { + return + } + + peerId := s.getRandomFilterPeer(req.Context(), message.RequestId, w) + if peerId == "" { + return + } + + // unsubscribe on filter + errCh, err := s.node.FilterLightnode().Unsubscribe( + req.Context(), + protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...), + filter.WithRequestID(message.RequestId), + filter.WithPeer(peerId), + ) + + if err != nil { + s.log.Error("unsubscribe failed", zap.Error(err)) + writeResponse(w, filterSubscriptionResponse{ + RequestId: message.RequestId, + StatusDesc: err.Error(), + }, http.StatusServiceUnavailable) + return + } + + // on success + writeResponse(w, filterSubscriptionResponse{ + RequestId: message.RequestId, + StatusDesc: s.unsubscribeGetMessage(errCh), + }, http.StatusOK) +} + +func (s *FilterService) unsubscribeGetMessage(ch <-chan filter.WakuFilterPushResult) string { + var peerIds string + ind := 0 + for entry := range ch { + s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err)) + if ind != 0 { + peerIds += ", " + } + peerIds += entry.PeerID.String() + ind++ + } + if peerIds != "" { + return "can't unsubscribe from " + peerIds + } + return http.StatusText(http.StatusOK) +} + +// /////////////////////// +// /////////////////////// +type filterUnsubscribeAllRequest struct { + RequestId filterRequestId `json:"requestId"` +} + +func (s *FilterService) readBody(w http.ResponseWriter, req *http.Request, message interface{}) bool { + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(message); err != nil { + s.log.Error("bad request", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return false + } + defer req.Body.Close() + return true +} + +// 400 on invalid request +// 500 on failed subscription +// 200 on all successful unsubscribe +// unsubscribe all subscriptions for a given peer +func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request) { + message := filterUnsubscribeAllRequest{} + if !s.readBody(w, req, &message) { + return + } + + peerId := s.getRandomFilterPeer(req.Context(), message.RequestId, w) + if peerId == "" { + return + } + + // unsubscribe all subscriptions for a given peer + errCh, err := s.node.FilterLightnode().UnsubscribeAll( + req.Context(), + filter.WithRequestID(message.RequestId), + filter.WithPeer(peerId), + ) + if err != nil { + s.log.Error("unsubscribeAll failed", zap.Error(err)) + writeResponse(w, filterSubscriptionResponse{ + RequestId: message.RequestId, + StatusDesc: err.Error(), + }, http.StatusServiceUnavailable) + return + } + + // on success + writeResponse(w, filterSubscriptionResponse{ + RequestId: message.RequestId, + StatusDesc: s.unsubscribeGetMessage(errCh), + }, http.StatusOK) +} + +func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId []byte, w http.ResponseWriter) peer.ID { + // selecting random peer that supports filter protocol + peerId, err := s.node.PeerManager().SelectPeer(peermanager.PeerSelectionCriteria{ + SelectionType: peermanager.Automatic, + Proto: filter.FilterSubscribeID_v20beta1, + Ctx: ctx, + }) + if err != nil { + s.log.Error("selecting peer", zap.Error(err)) + writeResponse(w, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "No suitable peers", + }, http.StatusServiceUnavailable) + return "" + } + return peerId +} diff --git a/cmd/waku/server/rest/filter_api.yaml b/cmd/waku/server/rest/filter_api.yaml new file mode 100644 index 00000000..bdeef767 --- /dev/null +++ b/cmd/waku/server/rest/filter_api.yaml @@ -0,0 +1,231 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ +tags: + - name: filter + description: Filter REST API for WakuV2 node + +paths: + /filter/v2/subscriptions/{requestId}: + get: # get_waku_v2_filter_v2_subscription - ping + summary: Subscriber-ping - a peer can query if there is a registered subscription for it + description: | + Subscriber peer can query its subscription existence on service node. + Returns HTTP200 if exists and HTTP404 if not. + Client must not fill anything but requestId in the request body. + operationId: subscriberPing + tags: + - filter + parameters: + - in: path + name: requestId + required: true + schema: + type: string + description: Id of ping request + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + + /filter/v2/subscriptions: + post: # post_waku_v2_filter_v2_subscription + summary: Subscribe a peer to an array of content topics under a pubsubTopic + description: | + Subscribe a peer to an array of content topics under a pubsubTopic. + + It is allowed to refresh or add new content topic to an existing subscription. + + Fields pubsubTopic and contentFilters must be filled. + operationId: postSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + delete: # delete_waku_v2_filter_v2_subscription + summary: Unsubscribe a peer from content topics + description: | + Unsubscribe a peer from content topics + Only that subscription will be removed which matches existing. + operationId: deleteSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterUnsubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + /filter/v2/subscriptions/all: + delete: # delete_waku_v2_filter_v2_subscription + summary: Unsubscribe a peer from all content topics + description: | + Unsubscribe a peer from all content topics + operationId: deleteAllSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterUnsubscribeAllRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + +components: + PubSubTopic: + type: string + ContentTopic: + type: string + + FilterSubscriptionResponse: + type: object + properties: + requestId: + type: string + statusDesc: + type: string + required: + - requestId + + FilterSubscribeRequest: + type: object + properties: + requestId: + type: string + contentFilters: + type: array + items: + $ref: '#/components/schemas/ContentTopic' + pubsubTopic: + $ref: "#/components/schemas/PubSubTopic" + required: + - requestId + - contentFilters + - pubsubTopic + + FilterUnsubscribeRequest: + type: object + properties: + requestId: + type: string + contentFilters: + type: array + items: + $ref: '#/components/schemas/ContentTopic' + pubsubTopic: + $ref: "#/components/schemas/PubSubTopic" + required: + - requestId + - contentFilters + + FilterUnsubscribeAllRequest: + type: object + properties: + requestId: + type: string + required: + - requestId \ No newline at end of file diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go new file mode 100644 index 00000000..fa42406b --- /dev/null +++ b/cmd/waku/server/rest/filter_test.go @@ -0,0 +1,253 @@ +package rest + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/go-chi/chi/v5" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/node" + wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode { + node, err := node.New(opts...) + require.NoError(t, err) + + err = node.Start(context.Background()) + require.NoError(t, err) + + return node +} + +// node2 connects to node1 +func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) { + node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter + node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter + + node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) + err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1) + require.NoError(t, err) + + if pubSubTopic != "" { + err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) + require.NoError(t, err) + } + + return node1, node2 +} + +// test 400, 404 status code for ping rest endpoint +// both requests are not successful +func TestFilterPingFailure(t *testing.T) { + node1, node2 := twoFilterConnectedNodes(t, "") + defer func() { + node1.Stop() + node2.Stop() + }() + + router := chi.NewRouter() + _ = NewFilterService(node2, router, utils.Logger()) + + // with malformed requestId + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", "invalid_request_id"), nil) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: []byte{}, + StatusDesc: "bad request id", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusBadRequest, rr.Code) + + // no subscription with peer + var requestId filterRequestId = protocol.GenerateRequestID() + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "ping request failed", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusServiceUnavailable, rr.Code) +} + +// create a filter subscription to the peer and try peer that peer +// both steps should be successful +func TestFilterSubscribeAndPing(t *testing.T) { + pubsubTopic := "/waku/2/test/proto" + contentTopics := []string{"test"} + var requestId filterRequestId = protocol.GenerateRequestID() + + // + node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) + defer func() { + node1.Stop() + node2.Stop() + }() + + router := chi.NewRouter() + _ = NewFilterService(node2, router, utils.Logger()) + + // create subscription to peer + rr := httptest.NewRecorder() + reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ + RequestId: requestId, + PubsubTopic: pubsubTopic, + ContentFilters: contentTopics, + })) + req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "OK", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusOK, rr.Code) + + // trying pinging the peer once there is subscription to it + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "OK", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusOK, rr.Code) +} + +// create subscription to peer +// delete the subscription to the peer with matching pubSub and contentTopic +func TestFilterSubscribeAndUnsubscribe(t *testing.T) { + pubsubTopic := "/waku/2/test/proto" + contentTopics := []string{"test"} + var requestId filterRequestId = protocol.GenerateRequestID() + + // + node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) + defer func() { + node1.Stop() + node2.Stop() + }() + + router := chi.NewRouter() + _ = NewFilterService(node2, router, utils.Logger()) + + // create subscription to peer + rr := httptest.NewRecorder() + reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ + RequestId: requestId, + PubsubTopic: pubsubTopic, + ContentFilters: contentTopics, + })) + req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "OK", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusOK, rr.Code) + + // delete the subscription to the peer with matching pubSub and contentTopic + requestId = protocol.GenerateRequestID() + rr = httptest.NewRecorder() + reqReader = strings.NewReader(toString(t, filterSubscriptionRequest{ + RequestId: requestId, + PubsubTopic: pubsubTopic, + ContentFilters: contentTopics, + })) + req, _ = http.NewRequest(http.MethodDelete, filterv2Subscribe, reqReader) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "OK", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusOK, rr.Code) +} + +// create 2 subscription from filter client to server +// make a unsubscribeAll request +// try pinging the peer, if 404 is returned then unsubscribeAll was successful +func TestFilterAllUnsubscribe(t *testing.T) { + pubsubTopic := "/waku/2/test/proto" + contentTopics1 := "ct_1" + contentTopics2 := "ct_2" + var requestId filterRequestId + + // + node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) + defer func() { + node1.Stop() + node2.Stop() + }() + + router := chi.NewRouter() + _ = NewFilterService(node2, router, utils.Logger()) + + // create 2 different subscription to peer + for _, ct := range []string{contentTopics1, contentTopics2} { + requestId = protocol.GenerateRequestID() + rr := httptest.NewRecorder() + reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ + RequestId: requestId, + PubsubTopic: pubsubTopic, + ContentFilters: []string{ct}, + })) + req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "OK", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusOK, rr.Code) + } + + // delete all subscription to the peer + requestId = protocol.GenerateRequestID() + rr := httptest.NewRecorder() + reqReader := strings.NewReader(toString(t, filterUnsubscribeAllRequest{ + RequestId: requestId, + })) + req, _ := http.NewRequest(http.MethodDelete, filterv2SubscribeAll, reqReader) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "OK", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusOK, rr.Code) + + // check if all subscriptions are deleted to the peer are deleted + requestId = protocol.GenerateRequestID() + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "ping request failed", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusServiceUnavailable, rr.Code) +} + +func checkJSON(t *testing.T, expected, actual interface{}) { + require.JSONEq(t, toString(t, expected), toString(t, actual)) +} +func getFilterResponse(t *testing.T, body *bytes.Buffer) filterSubscriptionResponse { + resp := filterSubscriptionResponse{} + err := json.Unmarshal(body.Bytes(), &resp) + require.NoError(t, err) + return resp +} +func toString(t *testing.T, data interface{}) string { + bytes, err := json.Marshal(data) + require.NoError(t, err) + return string(bytes) +} diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go index d4b8a760..a0bb45bb 100644 --- a/cmd/waku/server/rest/lightpush_rest_test.go +++ b/cmd/waku/server/rest/lightpush_rest_test.go @@ -2,7 +2,6 @@ package rest import ( "bytes" - "context" "encoding/json" "net/http" "net/http/httptest" @@ -19,20 +18,10 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) -func createLightPushNode(t *testing.T) *node.WakuNode { - node, err := node.New(node.WithLightPush(), node.WithWakuRelay()) - require.NoError(t, err) - - err = node.Start(context.Background()) - require.NoError(t, err) - - return node -} - // node2 connects to node1 func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) { - node1 := createLightPushNode(t) - node2 := createLightPushNode(t) + node1 := createNode(t, node.WithLightPush(), node.WithWakuRelay()) + node2 := createNode(t, node.WithLightPush(), node.WithWakuRelay()) node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1) diff --git a/cmd/waku/server/rest/utils.go b/cmd/waku/server/rest/utils.go index 7d549d71..0178bc0c 100644 --- a/cmd/waku/server/rest/utils.go +++ b/cmd/waku/server/rest/utils.go @@ -34,11 +34,9 @@ func writeResponse(w http.ResponseWriter, value interface{}, code int) { return } - _, err = w.Write(jsonResponse) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - + // w.Write implicitly writes a 200 status code + // and only once we can write 2xx-5xx status code + // so any statusCode apart from 1xx being written to the header, will be ignored. w.WriteHeader(code) + _, _ = w.Write(jsonResponse) } diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index ab39e7c2..c7e8275b 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -60,6 +60,10 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool _ = NewAdminService(node, mux, wrpc.log) } + if node.FilterLightnode() != nil { + _ = NewFilterService(node, mux, log) + } + return wrpc } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 25228398..8fc659c2 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -636,6 +636,11 @@ func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightNode { return nil } +// PeerManager for getting peer filterv2 protocol +func (w *WakuNode) PeerManager() *peermanager.PeerManager { + return w.peermanager +} + // Lightpush is used to access any operation related to Waku Lightpush protocol func (w *WakuNode) Lightpush() *lightpush.WakuLightPush { if result, ok := w.lightPush.(*lightpush.WakuLightPush); ok { diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 5e49b1a6..ca4fcf51 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -298,7 +298,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot } else { selectedPeer = params.selectedPeer } - if selectedPeer == "" { wf.metrics.RecordError(peerNotFoundFailure) wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), @@ -364,16 +363,24 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO return params, nil } -func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { +func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...FilterPingOption) error { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { return err } + params := &FilterPingParameters{} + for _, opt := range opts { + opt(params) + } + if len(params.requestID) == 0 { + params.requestID = protocol.GenerateRequestID() + } + return wf.request( ctx, - &FilterSubscribeParameters{selectedPeer: peerID, requestID: protocol.GenerateRequestID()}, + &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, pb.FilterSubscribeRequest_SUBSCRIBER_PING, protocol.ContentFilter{}) } diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 038d2b57..f7f46e92 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -18,6 +18,19 @@ func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters { } } +type ( + FilterPingParameters struct { + requestID []byte + } + FilterPingOption func(*FilterPingParameters) +) + +func WithPingRequestId(requestId []byte) FilterPingOption { + return func(params *FilterPingParameters) { + params.requestID = requestId + } +} + type ( FilterSubscribeParameters struct { selectedPeer peer.ID