go-waku/waku/v2/rpc/relay.go

229 lines
5.7 KiB
Go
Raw Normal View History

2021-11-06 10:49:47 +00:00
package rpc
import (
2022-07-07 21:58:07 +00:00
"encoding/json"
"fmt"
2021-11-06 10:49:47 +00:00
"net/http"
2021-11-18 14:20:58 +00:00
"sync"
2021-11-06 10:49:47 +00:00
2022-07-07 21:58:07 +00:00
"github.com/gorilla/mux"
2021-11-06 10:49:47 +00:00
"github.com/status-im/go-waku/waku/v2/node"
2021-11-18 14:20:58 +00:00
"github.com/status-im/go-waku/waku/v2/protocol"
2021-11-06 10:49:47 +00:00
"github.com/status-im/go-waku/waku/v2/protocol/pb"
2022-06-13 18:30:35 +00:00
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
2021-11-06 10:49:47 +00:00
)
type RelayService struct {
node *node.WakuNode
2021-11-18 14:20:58 +00:00
log *zap.Logger
2021-11-18 14:20:58 +00:00
messages map[string][]*pb.WakuMessage
messagesMutex sync.RWMutex
2021-11-22 14:48:32 +00:00
runner *runnerService
2021-11-06 10:49:47 +00:00
}
type RelayMessageArgs struct {
2022-06-13 18:30:35 +00:00
Topic string `json:"topic,omitempty"`
Message RPCWakuRelayMessage `json:"message,omitempty"`
2021-11-06 10:49:47 +00:00
}
type TopicsArgs struct {
Topics []string `json:"topics,omitempty"`
}
2021-11-18 14:20:58 +00:00
type TopicArgs struct {
Topic string `json:"topic,omitempty"`
}
2022-07-07 21:58:07 +00:00
func NewRelayService(node *node.WakuNode, m *mux.Router, log *zap.Logger) *RelayService {
2021-11-22 14:48:32 +00:00
s := &RelayService{
2021-11-18 14:20:58 +00:00
node: node,
log: log.Named("relay"),
2021-11-18 14:20:58 +00:00
messages: make(map[string][]*pb.WakuMessage),
}
2021-11-22 14:48:32 +00:00
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
2022-06-13 18:30:35 +00:00
2022-07-07 21:58:07 +00:00
m.HandleFunc("/relay/v1/subscriptions", s.restPostV1Subscriptions).Methods(http.MethodPost)
m.HandleFunc("/relay/v1/subscriptions", s.restDeleteV1Subscriptions).Methods(http.MethodDelete)
m.HandleFunc("/relay/v1/messages/{topic}", s.restGetV1Messages).Methods(http.MethodGet)
m.HandleFunc("/relay/v1/messages/{topic}", s.restPostV1Message).Methods(http.MethodPost)
2021-11-22 14:48:32 +00:00
return s
2021-11-18 14:20:58 +00:00
}
func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
if _, ok := r.messages[envelope.PubsubTopic()]; !ok {
return
}
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}
func (r *RelayService) Start() {
2022-06-13 18:30:35 +00:00
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = make([]*pb.WakuMessage, 0)
}
2021-11-22 14:48:32 +00:00
r.runner.Start()
2021-11-18 14:20:58 +00:00
}
func (r *RelayService) Stop() {
2021-11-22 14:48:32 +00:00
r.runner.Stop()
2021-11-18 14:20:58 +00:00
}
2021-11-06 10:49:47 +00:00
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
var err error
2022-06-13 18:30:35 +00:00
msg := args.Message.toProto()
if args.Topic == "" {
2022-06-13 18:30:35 +00:00
_, err = r.node.Relay().Publish(req.Context(), msg)
} else {
2022-06-13 18:30:35 +00:00
_, err = r.node.Relay().PublishToTopic(req.Context(), msg, args.Topic)
}
2021-11-06 10:49:47 +00:00
if err != nil {
r.log.Error("publishing message", zap.Error(err))
2022-06-14 15:36:34 +00:00
return err
2021-11-06 10:49:47 +00:00
}
2022-06-14 15:36:34 +00:00
*reply = true
2021-11-06 10:49:47 +00:00
return nil
}
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
var err error
if topic == "" {
2022-06-13 18:30:35 +00:00
var sub *relay.Subscription
sub, err = r.node.Relay().Subscribe(ctx)
r.node.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
} else {
2022-06-13 18:30:35 +00:00
var sub *relay.Subscription
sub, err = r.node.Relay().SubscribeToTopic(ctx, topic)
r.node.Broadcaster().Unregister(&topic, sub.C)
}
2021-11-06 10:49:47 +00:00
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
2022-06-14 15:36:34 +00:00
return err
2021-11-06 10:49:47 +00:00
}
2021-11-18 14:20:58 +00:00
r.messages[topic] = make([]*pb.WakuMessage, 0)
2021-11-06 10:49:47 +00:00
}
2022-06-14 15:36:34 +00:00
*reply = true
2021-11-06 10:49:47 +00:00
return nil
}
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context()
for _, topic := range args.Topics {
2021-11-19 16:19:48 +00:00
err := r.node.Relay().Unsubscribe(ctx, topic)
2021-11-06 10:49:47 +00:00
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
2022-06-14 15:36:34 +00:00
return err
2021-11-06 10:49:47 +00:00
}
2021-11-18 14:20:58 +00:00
delete(r.messages, topic)
2021-11-06 10:49:47 +00:00
}
2022-06-14 15:36:34 +00:00
*reply = true
2021-11-06 10:49:47 +00:00
return nil
}
2022-06-13 18:30:35 +00:00
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *RelayMessagesReply) error {
2021-11-18 14:20:58 +00:00
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
if _, ok := r.messages[args.Topic]; !ok {
return fmt.Errorf("topic %s not subscribed", args.Topic)
}
2022-06-13 18:30:35 +00:00
for i := range r.messages[args.Topic] {
*reply = append(*reply, ProtoWakuMessageToRPCWakuRelayMessage(r.messages[args.Topic][i]))
}
2021-11-18 14:20:58 +00:00
r.messages[args.Topic] = make([]*pb.WakuMessage, 0)
2022-06-13 18:30:35 +00:00
2021-11-18 14:20:58 +00:00
return nil
}
2022-07-07 21:58:07 +00:00
func (d *RelayService) restDeleteV1Subscriptions(w http.ResponseWriter, r *http.Request) {
request := new(TopicsArgs)
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&request.Topics); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
response := new(SuccessReply)
err := d.DeleteV1Subscription(r, request, response)
writeErrOrResponse(w, err, response)
}
func (d *RelayService) restPostV1Subscriptions(w http.ResponseWriter, r *http.Request) {
request := new(TopicsArgs)
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&request.Topics); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
response := new(SuccessReply)
err := d.PostV1Subscription(r, request, response)
writeErrOrResponse(w, err, response)
}
func (d *RelayService) restGetV1Messages(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
topic, ok := vars["topic"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
request := new(TopicArgs)
request.Topic = topic
response := new([]*RPCWakuRelayMessage)
err := d.GetV1Messages(r, request, response)
writeErrOrResponse(w, err, response)
}
func (d *RelayService) restPostV1Message(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
topic, ok := vars["topic"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
request := new(RelayMessageArgs)
request.Topic = topic
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&request.Message); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
response := new(bool)
err := d.PostV1Message(r, request, response)
writeErrOrResponse(w, err, response)
}