Feat : New Relay rest API for autosharding (#822)

* fix: REST API endpoint for version

* feat: add new autosharding relay REST API support
This commit is contained in:
Prem Chaitanya Prathi 2023-10-24 04:29:02 +05:30 committed by GitHub
parent 9161c4f7fe
commit ab65c4869c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 219 additions and 7 deletions

View File

@ -21,7 +21,7 @@ type InfoReply struct {
} }
const routeDebugInfoV1 = "/debug/v1/info" const routeDebugInfoV1 = "/debug/v1/info"
const routeDebugVersionV1 = "/debug/v1/info" const routeDebugVersionV1 = "/debug/v1/version"
func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService { func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService {
d := &DebugService{ d := &DebugService{

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"net/http" "net/http"
"net/url"
"strings" "strings"
"sync" "sync"
@ -20,6 +21,9 @@ import (
const routeRelayV1Subscriptions = "/relay/v1/subscriptions" const routeRelayV1Subscriptions = "/relay/v1/subscriptions"
const routeRelayV1Messages = "/relay/v1/messages/{topic}" const routeRelayV1Messages = "/relay/v1/messages/{topic}"
const routeRelayV1AutoSubscriptions = "/relay/v1/auto/subscriptions"
const routeRelayV1AutoMessages = "/relay/v1/auto/messages"
// RelayService represents the REST service for WakuRelay // RelayService represents the REST service for WakuRelay
type RelayService struct { type RelayService struct {
node *node.WakuNode node *node.WakuNode
@ -50,6 +54,14 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
m.Get(routeRelayV1Messages, s.getV1Messages) m.Get(routeRelayV1Messages, s.getV1Messages)
m.Post(routeRelayV1Messages, s.postV1Message) m.Post(routeRelayV1Messages, s.postV1Message)
m.Post(routeRelayV1AutoSubscriptions, s.postV1AutoSubscriptions)
m.Delete(routeRelayV1AutoSubscriptions, s.deleteV1AutoSubscriptions)
m.Route(routeRelayV1AutoMessages, func(r chi.Router) {
r.Get("/{contentTopic}", s.getV1AutoMessages)
r.Post("/", s.postV1AutoMessage)
})
return s return s
} }
@ -215,3 +227,110 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
writeErrOrResponse(w, err, true) writeErrOrResponse(w, err, true)
} }
func (r *RelayService) deleteV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) {
var cTopics []string
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&cTopics); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
err := r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter("", cTopics...))
if err != nil {
r.log.Error("unsubscribing from topics", zap.Strings("contentTopics", cTopics), zap.Error(err))
}
writeErrOrResponse(w, err, true)
}
func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) {
var cTopics []string
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&cTopics); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
var err error
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err))
}
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(err.Error()))
r.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
}
}
func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) {
cTopic := chi.URLParam(req, "contentTopic")
if cTopic == "" {
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte("contentTopic is required"))
r.log.Error("writing response", zap.Error(err))
return
}
cTopic, err := url.QueryUnescape(cTopic)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte("invalid contentTopic format"))
r.log.Error("writing response", zap.Error(err))
return
}
sub, err := r.node.Relay().GetSubscription(cTopic)
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte("not subscribed to topic"))
r.log.Error("writing response", zap.Error(err))
return
}
var response []*pb.WakuMessage
select {
case msg := <-sub.Ch:
response = append(response, msg.Message())
default:
break
}
writeErrOrResponse(w, nil, response)
}
func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) {
var message *pb.WakuMessage
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&message); err != nil {
r.log.Error("decoding message failure", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
var err error
if err = server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
}
_, err = r.node.Relay().Publish(req.Context(), message)
if err != nil {
r.log.Error("publishing message", zap.Error(err))
}
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(err.Error()))
r.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
}
}

View File

@ -106,6 +106,103 @@ paths:
'5XX': '5XX':
description: Unexpected error. description: Unexpected error.
/relay/v1/auto/messages/{contentTopic}: # Note the plural in messages
get: # get_waku_v2_relay_v1_auto_messages
summary: Get the latest messages on the polled topic
description: Get a list of messages that were received on a subscribed Content topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- relay
parameters:
- in: path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: The user ID
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/RelayGetMessagesResponse'
'4XX':
description: Bad request.
'5XX':
description: Unexpected error.
/relay/v1/auto/messages: # Note the plural in messages
post: # post_waku_v2_relay_v1_auto_message
summary: Publish a message to be relayed
description: Publishes a message to be relayed on a Content topic.
operationId: postMessagesToTopic
tags:
- relay
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RelayPostMessagesRequest'
responses:
'200':
description: OK
'4XX':
description: Bad request.
'5XX':
description: Unexpected error.
/relay/v1/auto/subscriptions:
post: # post_waku_v2_relay_v1_auto_subscriptions
summary: Subscribe a node to an array of topics
description: Subscribe a node to an array of Content topics.
operationId: postSubscriptions
tags:
- relay
requestBody:
content:
application/json:
schema:
type array:
items:
$ref: '#/components/schemas/ContentTopic'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
'4XX':
description: Bad request.
'5XX':
description: Unexpected error.
delete: # delete_waku_v2_relay_v1_auto_subscriptions
summary: Unsubscribe a node from an array of topics
description: Unsubscribe a node from an array of Content topics.
operationId: deleteSubscriptions
tags:
- relay
requestBody:
content:
application/json:
schema:
type array:
items:
$ref: '#/components/schemas/ContentTopic'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
'4XX':
description: Bad request.
'5XX':
description: Unexpected error.
components: components:
schemas: schemas:
PubSubTopic: PubSubTopic:
@ -145,4 +242,4 @@ components:
type: array type: array
items: items:
$ref: '#/components/schemas/PubSubTopic' $ref: '#/components/schemas/PubSubTopic'

View File

@ -164,11 +164,7 @@ func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessa
r.log.Error("publishing message", zap.Error(err)) r.log.Error("publishing message", zap.Error(err))
return err return err
} }
if msg.ContentTopic == "" {
err := fmt.Errorf("content-topic cannot be empty")
r.log.Error("publishing message", zap.Error(err))
return err
}
if err = server.AppendRLNProof(r.node, msg); err != nil { if err = server.AppendRLNProof(r.node, msg); err != nil {
return err return err
} }