mirror of https://github.com/status-im/go-waku.git
feat: Add relay rpc methods
This commit is contained in:
parent
a6ae4b7140
commit
7ff5fcf838
|
@ -229,9 +229,38 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription,
|
||||||
return subscription, nil
|
return subscription, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic Topic) error {
|
||||||
|
if _, ok := w.topics[topic]; !ok {
|
||||||
|
return fmt.Errorf("topics %s is not subscribed", (string)(topic))
|
||||||
|
}
|
||||||
|
log.Info("Unsubscribing from topic ", topic)
|
||||||
|
delete(w.topics, topic)
|
||||||
|
|
||||||
|
for _, sub := range w.subscriptions[topic] {
|
||||||
|
sub.Unsubscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
w.relaySubs[topic].Cancel()
|
||||||
|
delete(w.relaySubs, topic)
|
||||||
|
|
||||||
|
err := w.wakuRelayTopics[topic].Close()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
delete(w.wakuRelayTopics, topic)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
|
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
|
||||||
msgChannel := make(chan *pubsub.Message, 1024)
|
msgChannel := make(chan *pubsub.Message, 1024)
|
||||||
go func(msgChannel chan *pubsub.Message) {
|
go func(msgChannel chan *pubsub.Message) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
log.Debug("recovered msgChannel")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
msg, err := sub.Next(ctx)
|
msg, err := sub.Next(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -266,6 +295,9 @@ func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *p
|
||||||
}
|
}
|
||||||
// TODO: if there are no more relay subscriptions, close the pubsub subscription
|
// TODO: if there are no more relay subscriptions, close the pubsub subscription
|
||||||
case msg := <-subChannel:
|
case msg := <-subChannel:
|
||||||
|
if msg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
stats.Record(ctx, metrics.Messages.M(1))
|
stats.Record(ctx, metrics.Messages.M(1))
|
||||||
wakuMessage := &pb.WakuMessage{}
|
wakuMessage := &pb.WakuMessage{}
|
||||||
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
|
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/status-im/go-waku/waku/v2/node"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RelayService struct {
|
||||||
|
node *node.WakuNode
|
||||||
|
}
|
||||||
|
|
||||||
|
type RelayMessageArgs struct {
|
||||||
|
Topic string `json:"topic,omitempty"`
|
||||||
|
Message pb.WakuMessage `json:"message,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TopicsArgs struct {
|
||||||
|
Topics []string `json:"topics,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SuccessReply struct {
|
||||||
|
Success bool `json:"success,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
|
||||||
|
_, err := r.node.Relay().Publish(req.Context(), &args.Message, (*relay.Topic)(&args.Topic))
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error publishing message:", err)
|
||||||
|
reply.Success = false
|
||||||
|
reply.Error = err.Error()
|
||||||
|
} else {
|
||||||
|
reply.Success = true
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||||
|
ctx := req.Context()
|
||||||
|
for _, topic := range args.Topics {
|
||||||
|
_, err := r.node.Relay().Subscribe(ctx, (*relay.Topic)(&topic))
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error subscribing to topic:", topic, "err:", err)
|
||||||
|
reply.Success = false
|
||||||
|
reply.Error = err.Error()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reply.Success = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||||
|
ctx := req.Context()
|
||||||
|
for _, topic := range args.Topics {
|
||||||
|
err := r.node.Relay().Unsubscribe(ctx, (relay.Topic)(topic))
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error unsubscribing from topic:", topic, "err:", err)
|
||||||
|
reply.Success = false
|
||||||
|
reply.Error = err.Error()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reply.Success = true
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/rpc/v2"
|
"github.com/gorilla/rpc/v2"
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
|
@ -27,8 +28,17 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = s.RegisterService(&RelayService{node}, "Relay")
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.HandleFunc("/jsonrpc", s.ServeHTTP)
|
mux.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
t := time.Now()
|
||||||
|
s.ServeHTTP(w, r)
|
||||||
|
log.Infof("RPC request at %s took %s", r.URL.Path, time.Since(t))
|
||||||
|
})
|
||||||
|
|
||||||
listenAddr := fmt.Sprintf("%s:%d", address, port)
|
listenAddr := fmt.Sprintf("%s:%d", address, port)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue