From ab65c4869c23ff68478dd444543bd18269fa7482 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 24 Oct 2023 04:29:02 +0530 Subject: [PATCH] Feat : New Relay rest API for autosharding (#822) * fix: REST API endpoint for version * feat: add new autosharding relay REST API support --- cmd/waku/server/rest/debug.go | 2 +- cmd/waku/server/rest/relay.go | 119 ++++++++++++++++++++++++++++ cmd/waku/server/rest/relay_api.yaml | 99 ++++++++++++++++++++++- cmd/waku/server/rpc/relay.go | 6 +- 4 files changed, 219 insertions(+), 7 deletions(-) diff --git a/cmd/waku/server/rest/debug.go b/cmd/waku/server/rest/debug.go index b0e18805..6606f5d0 100644 --- a/cmd/waku/server/rest/debug.go +++ b/cmd/waku/server/rest/debug.go @@ -21,7 +21,7 @@ type InfoReply struct { } const routeDebugInfoV1 = "/debug/v1/info" -const routeDebugVersionV1 = "/debug/v1/info" +const routeDebugVersionV1 = "/debug/v1/version" func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService { d := &DebugService{ diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index af4f5d41..b788ed99 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "net/http" + "net/url" "strings" "sync" @@ -20,6 +21,9 @@ import ( const routeRelayV1Subscriptions = "/relay/v1/subscriptions" const routeRelayV1Messages = "/relay/v1/messages/{topic}" +const routeRelayV1AutoSubscriptions = "/relay/v1/auto/subscriptions" +const routeRelayV1AutoMessages = "/relay/v1/auto/messages" + // RelayService represents the REST service for WakuRelay type RelayService struct { node *node.WakuNode @@ -50,6 +54,14 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za m.Get(routeRelayV1Messages, s.getV1Messages) m.Post(routeRelayV1Messages, s.postV1Message) + m.Post(routeRelayV1AutoSubscriptions, s.postV1AutoSubscriptions) + m.Delete(routeRelayV1AutoSubscriptions, s.deleteV1AutoSubscriptions) + + m.Route(routeRelayV1AutoMessages, func(r chi.Router) { + r.Get("/{contentTopic}", s.getV1AutoMessages) + r.Post("/", s.postV1AutoMessage) + }) + return s } @@ -215,3 +227,110 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { writeErrOrResponse(w, err, true) } + +func (r *RelayService) deleteV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) { + var cTopics []string + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&cTopics); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + defer req.Body.Close() + + err := r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter("", cTopics...)) + if err != nil { + r.log.Error("unsubscribing from topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) + } + + writeErrOrResponse(w, err, true) +} + +func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) { + var cTopics []string + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&cTopics); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + defer req.Body.Close() + + var err error + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...)) + if err != nil { + r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) + } + + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte(err.Error())) + r.log.Error("writing response", zap.Error(err)) + } else { + w.WriteHeader(http.StatusOK) + } + +} + +func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) { + cTopic := chi.URLParam(req, "contentTopic") + if cTopic == "" { + w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte("contentTopic is required")) + r.log.Error("writing response", zap.Error(err)) + return + } + cTopic, err := url.QueryUnescape(cTopic) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _, err = w.Write([]byte("invalid contentTopic format")) + r.log.Error("writing response", zap.Error(err)) + return + } + + sub, err := r.node.Relay().GetSubscription(cTopic) + if err != nil { + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte("not subscribed to topic")) + r.log.Error("writing response", zap.Error(err)) + return + } + var response []*pb.WakuMessage + select { + case msg := <-sub.Ch: + response = append(response, msg.Message()) + default: + break + } + + writeErrOrResponse(w, nil, response) +} + +func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) { + + var message *pb.WakuMessage + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&message); err != nil { + r.log.Error("decoding message failure", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + defer req.Body.Close() + var err error + if err = server.AppendRLNProof(r.node, message); err != nil { + writeErrOrResponse(w, err, nil) + return + } + + _, err = r.node.Relay().Publish(req.Context(), message) + if err != nil { + r.log.Error("publishing message", zap.Error(err)) + } + + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte(err.Error())) + r.log.Error("writing response", zap.Error(err)) + } else { + w.WriteHeader(http.StatusOK) + } + +} diff --git a/cmd/waku/server/rest/relay_api.yaml b/cmd/waku/server/rest/relay_api.yaml index 0cf1edd0..e42432a1 100644 --- a/cmd/waku/server/rest/relay_api.yaml +++ b/cmd/waku/server/rest/relay_api.yaml @@ -106,6 +106,103 @@ paths: '5XX': description: Unexpected error. + /relay/v1/auto/messages/{contentTopic}: # Note the plural in messages + get: # get_waku_v2_relay_v1_auto_messages + summary: Get the latest messages on the polled topic + description: Get a list of messages that were received on a subscribed Content topic after the last time this method was called. + operationId: getMessagesByTopic + tags: + - relay + parameters: + - in: path + name: contentTopic # Note the name is the same as in the path + required: true + schema: + type: string + description: The user ID + responses: + '200': + description: The latest messages on the polled topic. + content: + application/json: + schema: + $ref: '#/components/schemas/RelayGetMessagesResponse' + '4XX': + description: Bad request. + '5XX': + description: Unexpected error. + + /relay/v1/auto/messages: # Note the plural in messages + post: # post_waku_v2_relay_v1_auto_message + summary: Publish a message to be relayed + description: Publishes a message to be relayed on a Content topic. + operationId: postMessagesToTopic + tags: + - relay + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RelayPostMessagesRequest' + responses: + '200': + description: OK + '4XX': + description: Bad request. + '5XX': + description: Unexpected error. + + /relay/v1/auto/subscriptions: + post: # post_waku_v2_relay_v1_auto_subscriptions + summary: Subscribe a node to an array of topics + description: Subscribe a node to an array of Content topics. + operationId: postSubscriptions + tags: + - relay + requestBody: + content: + application/json: + schema: + type array: + items: + $ref: '#/components/schemas/ContentTopic' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + '4XX': + description: Bad request. + '5XX': + description: Unexpected error. + + delete: # delete_waku_v2_relay_v1_auto_subscriptions + summary: Unsubscribe a node from an array of topics + description: Unsubscribe a node from an array of Content topics. + operationId: deleteSubscriptions + tags: + - relay + requestBody: + content: + application/json: + schema: + type array: + items: + $ref: '#/components/schemas/ContentTopic' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + '4XX': + description: Bad request. + '5XX': + description: Unexpected error. + components: schemas: PubSubTopic: @@ -145,4 +242,4 @@ components: type: array items: $ref: '#/components/schemas/PubSubTopic' - \ No newline at end of file + diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index ee0679fd..8c7d10a4 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -164,11 +164,7 @@ func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessa r.log.Error("publishing message", zap.Error(err)) return err } - if msg.ContentTopic == "" { - err := fmt.Errorf("content-topic cannot be empty") - r.log.Error("publishing message", zap.Error(err)) - return err - } + if err = server.AppendRLNProof(r.node, msg); err != nil { return err }