diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 8189d4ea..8e4c7932 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -229,9 +229,38 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, return subscription, nil } +func (w *WakuRelay) Unsubscribe(ctx context.Context, topic Topic) error { + if _, ok := w.topics[topic]; !ok { + return fmt.Errorf("topics %s is not subscribed", (string)(topic)) + } + log.Info("Unsubscribing from topic ", topic) + delete(w.topics, topic) + + for _, sub := range w.subscriptions[topic] { + sub.Unsubscribe() + } + + w.relaySubs[topic].Cancel() + delete(w.relaySubs, topic) + + err := w.wakuRelayTopics[topic].Close() + if err != nil { + return err + } + delete(w.wakuRelayTopics, topic) + + return nil +} + func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message { msgChannel := make(chan *pubsub.Message, 1024) go func(msgChannel chan *pubsub.Message) { + defer func() { + if r := recover(); r != nil { + log.Debug("recovered msgChannel") + } + }() + for { msg, err := sub.Next(ctx) if err != nil { @@ -266,6 +295,9 @@ func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *p } // TODO: if there are no more relay subscriptions, close the pubsub subscription case msg := <-subChannel: + if msg == nil { + return + } stats.Record(ctx, metrics.Messages.M(1)) wakuMessage := &pb.WakuMessage{} if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go new file mode 100644 index 00000000..145f8f83 --- /dev/null +++ b/waku/v2/rpc/relay.go @@ -0,0 +1,69 @@ +package rpc + +import ( + "net/http" + + "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/relay" +) + +type RelayService struct { + node *node.WakuNode +} + +type RelayMessageArgs struct { + Topic string `json:"topic,omitempty"` + Message pb.WakuMessage `json:"message,omitempty"` +} + +type TopicsArgs struct { + Topics []string `json:"topics,omitempty"` +} + +type SuccessReply struct { + Success bool `json:"success,omitempty"` + Error string `json:"error,omitempty"` +} + +func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { + _, err := r.node.Relay().Publish(req.Context(), &args.Message, (*relay.Topic)(&args.Topic)) + if err != nil { + log.Error("Error publishing message:", err) + reply.Success = false + reply.Error = err.Error() + } else { + reply.Success = true + } + return nil +} + +func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { + ctx := req.Context() + for _, topic := range args.Topics { + _, err := r.node.Relay().Subscribe(ctx, (*relay.Topic)(&topic)) + if err != nil { + log.Error("Error subscribing to topic:", topic, "err:", err) + reply.Success = false + reply.Error = err.Error() + return nil + } + } + reply.Success = true + return nil +} + +func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { + ctx := req.Context() + for _, topic := range args.Topics { + err := r.node.Relay().Unsubscribe(ctx, (relay.Topic)(topic)) + if err != nil { + log.Error("Error unsubscribing from topic:", topic, "err:", err) + reply.Success = false + reply.Error = err.Error() + return nil + } + } + reply.Success = true + return nil +} diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index d018a6e3..21f9f2f2 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "time" "github.com/gorilla/rpc/v2" logging "github.com/ipfs/go-log" @@ -27,8 +28,17 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { log.Error(err) } + err = s.RegisterService(&RelayService{node}, "Relay") + if err != nil { + log.Error(err) + } + mux := http.NewServeMux() - mux.HandleFunc("/jsonrpc", s.ServeHTTP) + mux.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) { + t := time.Now() + s.ServeHTTP(w, r) + log.Infof("RPC request at %s took %s", r.URL.Path, time.Since(t)) + }) listenAddr := fmt.Sprintf("%s:%d", address, port)