From 9b9fc634cb2e5f18258f5786d53799ad971ef210 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Tue, 24 Oct 2023 06:23:33 +0700 Subject: [PATCH] Rest light push (#818) * feat: add lightPush Rest endpoints * test: lightPush Rest Service --- cmd/waku/server/rest/lightpush_api.yaml | 84 +++++++++++++++++++++ cmd/waku/server/rest/lightpush_rest.go | 76 +++++++++++++++++++ cmd/waku/server/rest/lightpush_rest_test.go | 75 ++++++++++++++++++ cmd/waku/server/rest/waku_rest.go | 1 + 4 files changed, 236 insertions(+) create mode 100644 cmd/waku/server/rest/lightpush_api.yaml create mode 100644 cmd/waku/server/rest/lightpush_rest.go create mode 100644 cmd/waku/server/rest/lightpush_rest_test.go diff --git a/cmd/waku/server/rest/lightpush_api.yaml b/cmd/waku/server/rest/lightpush_api.yaml new file mode 100644 index 00000000..b2c342f6 --- /dev/null +++ b/cmd/waku/server/rest/lightpush_api.yaml @@ -0,0 +1,84 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: lightpush + description: Lightpush REST API for WakuV2 node + +paths: + /lightpush/v1/message: + post: + summary: Request a message relay from a LightPush service provider + description: Push a message to be relayed on a PubSub topic. + operationId: postMessagesToPubsubTopic + tags: + - lightpush + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/PushRequest' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + '400': + description: Bad request. + content: + text/plain: + schema: + type: string + '500': + description: Internal server error + content: + text/plain: + schema: + type: string + '503': + description: Service not available + content: + text/plain: + schema: + type: string + +components: + schemas: + PubsubTopic: + type: string + + ContentTopic: + type: string + + WakuMessage: + type: object + properties: + payload: + type: string + format: byte + contentTopic: + $ref: '#/components/schemas/ContentTopic' + version: + type: number + timestamp: + type: number + required: + - payload + - contentTopic + + PushRequest: + type: object + properties: + pusbsubTopic: + $ref: '#/components/schemas/PubsubTopic' + message: + $ref: '#/components/schemas/WakuMessage' + required: + - message \ No newline at end of file diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go new file mode 100644 index 00000000..8500df4a --- /dev/null +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -0,0 +1,76 @@ +package rest + +import ( + "encoding/json" + "errors" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" +) + +const routeLightPushV1Messages = "/lightpush/v1/message" + +type LightpushService struct { + node *node.WakuNode + log *zap.Logger +} + +func NewLightpushService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *LightpushService { + serv := &LightpushService{ + node: node, + log: log.Named("lightpush"), + } + + m.Post(routeLightPushV1Messages, serv.postMessagev1) + + return serv +} + +func (msg lightpushRequest) Check() error { + if msg.Message == nil { + return errors.New("waku message is required") + } + return nil +} + +type lightpushRequest struct { + PubSubTopic string `json:"pubsubTopic"` + Message *pb.WakuMessage `json:"message"` +} + +// handled error codes are 200, 400, 500, 503 +func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Request) { + msg := &lightpushRequest{} + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(msg); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + defer req.Body.Close() + + if err := msg.Check(); err != nil { + w.WriteHeader(http.StatusBadRequest) + _, err = w.Write([]byte(err.Error())) + serv.log.Error("writing response", zap.Error(err)) + return + } + // + + if serv.node.Lightpush() == nil { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + _, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic)) + if err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + _, err = w.Write([]byte(err.Error())) + serv.log.Error("writing response", zap.Error(err)) + } else { + w.WriteHeader(http.StatusOK) + } +} diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go new file mode 100644 index 00000000..d4b8a760 --- /dev/null +++ b/cmd/waku/server/rest/lightpush_rest_test.go @@ -0,0 +1,75 @@ +package rest + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/go-chi/chi/v5" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/node" + wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func createLightPushNode(t *testing.T) *node.WakuNode { + node, err := node.New(node.WithLightPush(), node.WithWakuRelay()) + require.NoError(t, err) + + err = node.Start(context.Background()) + require.NoError(t, err) + + return node +} + +// node2 connects to node1 +func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) { + node1 := createLightPushNode(t) + node2 := createLightPushNode(t) + + node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) + err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1) + require.NoError(t, err) + err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) + require.NoError(t, err) + + return node1, node2 +} + +func TestLightpushMessagev1(t *testing.T) { + pubSubTopic := "/waku/2/default-waku/proto" + node1, node2 := twoLightPushConnectedNodes(t, pubSubTopic) + defer func() { + node1.Stop() + node2.Stop() + }() + + router := chi.NewRouter() + serv := NewLightpushService(node2, router, utils.Logger()) + _ = serv + + msg := lightpushRequest{ + PubSubTopic: pubSubTopic, + Message: &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "abc", + Version: 0, + Timestamp: utils.GetUnixEpoch(), + }, + } + msgJSONBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeLightPushV1Messages, bytes.NewReader(msgJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) +} diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index ec032590..0041ee08 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -36,6 +36,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool _ = NewDebugService(node, mux) _ = NewHealthService(node, mux) _ = NewStoreService(node, mux) + _ = NewLightpushService(node, mux, log) listenAddr := fmt.Sprintf("%s:%d", address, port)