diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 3d778682..47bc2640 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -40,8 +40,8 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" "github.com/multiformats/go-multiaddr" - "github.com/waku-org/go-waku/cmd/waku/rest" - "github.com/waku-org/go-waku/cmd/waku/rpc" + "github.com/waku-org/go-waku/cmd/waku/server/rest" + "github.com/waku-org/go-waku/cmd/waku/server/rpc" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/metrics" "github.com/waku-org/go-waku/waku/persistence" diff --git a/cmd/waku/server/no_rln.go b/cmd/waku/server/no_rln.go new file mode 100644 index 00000000..ab094e60 --- /dev/null +++ b/cmd/waku/server/no_rln.go @@ -0,0 +1,13 @@ +//go:build !gowaku_rln +// +build !gowaku_rln + +package server + +import ( + "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +func AppendRLNProof(node *node.WakuNode, msg *pb.WakuMessage) error { + return nil +} diff --git a/cmd/waku/rest/debug.go b/cmd/waku/server/rest/debug.go similarity index 100% rename from cmd/waku/rest/debug.go rename to cmd/waku/server/rest/debug.go diff --git a/cmd/waku/rest/debug_api.yaml b/cmd/waku/server/rest/debug_api.yaml similarity index 100% rename from cmd/waku/rest/debug_api.yaml rename to cmd/waku/server/rest/debug_api.yaml diff --git a/cmd/waku/rest/debug_test.go b/cmd/waku/server/rest/debug_test.go similarity index 100% rename from cmd/waku/rest/debug_test.go rename to cmd/waku/server/rest/debug_test.go diff --git a/cmd/waku/rest/relay.go b/cmd/waku/server/rest/relay.go similarity index 83% rename from cmd/waku/rest/relay.go rename to cmd/waku/server/rest/relay.go index 4cce1daa..9ed162fe 100644 --- a/cmd/waku/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -3,11 +3,13 @@ package rest import ( "context" "encoding/json" + "errors" "net/http" "strings" "sync" "github.com/go-chi/chi/v5" + "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -15,9 +17,10 @@ import ( "go.uber.org/zap" ) -const ROUTE_RELAY_SUBSCRIPTIONSV1 = "/relay/v1/subscriptions" -const ROUTE_RELAY_MESSAGESV1 = "/relay/v1/messages/{topic}" +const routeRelayV1Subscriptions = "/relay/v1/subscriptions" +const routeRelayV1Messages = "/relay/v1/messages/{topic}" +// RelayService represents the REST service for WakuRelay type RelayService struct { node *node.WakuNode cancel context.CancelFunc @@ -31,6 +34,7 @@ type RelayService struct { runner *runnerService } +// NewRelayService returns an instance of RelayService func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService { s := &RelayService{ node: node, @@ -41,10 +45,10 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - m.Post(ROUTE_RELAY_SUBSCRIPTIONSV1, s.postV1Subscriptions) - m.Delete(ROUTE_RELAY_SUBSCRIPTIONSV1, s.deleteV1Subscriptions) - m.Get(ROUTE_RELAY_MESSAGESV1, s.getV1Messages) - m.Post(ROUTE_RELAY_MESSAGESV1, s.postV1Message) + m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions) + m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions) + m.Get(routeRelayV1Messages, s.getV1Messages) + m.Post(routeRelayV1Messages, s.postV1Message) return s } @@ -65,6 +69,7 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) } +// Start starts the RelayService func (r *RelayService) Start(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) r.cancel = cancel @@ -80,6 +85,7 @@ func (r *RelayService) Start(ctx context.Context) { r.runner.Start(ctx) } +// Stop stops the RelayService func (r *RelayService) Stop() { if r.cancel == nil { return @@ -187,11 +193,20 @@ func (d *RelayService) postV1Message(w http.ResponseWriter, r *http.Request) { var err error if topic == "" { - _, err = d.node.Relay().Publish(r.Context(), message) - } else { - _, err = d.node.Relay().PublishToTopic(r.Context(), message, strings.Replace(topic, "\n", "", -1)) + topic = relay.DefaultWakuTopic } + if !d.node.Relay().IsSubscribed(topic) { + writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil) + return + } + + if err = server.AppendRLNProof(d.node, message); err != nil { + writeErrOrResponse(w, err, nil) + return + } + + _, err = d.node.Relay().PublishToTopic(r.Context(), message, strings.Replace(topic, "\n", "", -1)) if err != nil { d.log.Error("publishing message", zap.Error(err)) } diff --git a/cmd/waku/rest/relay_api.yaml b/cmd/waku/server/rest/relay_api.yaml similarity index 100% rename from cmd/waku/rest/relay_api.yaml rename to cmd/waku/server/rest/relay_api.yaml diff --git a/cmd/waku/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go similarity index 100% rename from cmd/waku/rest/relay_test.go rename to cmd/waku/server/rest/relay_test.go diff --git a/cmd/waku/rest/runner.go b/cmd/waku/server/rest/runner.go similarity index 100% rename from cmd/waku/rest/runner.go rename to cmd/waku/server/rest/runner.go diff --git a/cmd/waku/rest/store.go b/cmd/waku/server/rest/store.go similarity index 100% rename from cmd/waku/rest/store.go rename to cmd/waku/server/rest/store.go diff --git a/cmd/waku/rest/store_api.yaml b/cmd/waku/server/rest/store_api.yaml similarity index 100% rename from cmd/waku/rest/store_api.yaml rename to cmd/waku/server/rest/store_api.yaml diff --git a/cmd/waku/rest/store_test.go b/cmd/waku/server/rest/store_test.go similarity index 100% rename from cmd/waku/rest/store_test.go rename to cmd/waku/server/rest/store_test.go diff --git a/cmd/waku/rest/utils.go b/cmd/waku/server/rest/utils.go similarity index 100% rename from cmd/waku/rest/utils.go rename to cmd/waku/server/rest/utils.go diff --git a/cmd/waku/rest/utils_test.go b/cmd/waku/server/rest/utils_test.go similarity index 100% rename from cmd/waku/rest/utils_test.go rename to cmd/waku/server/rest/utils_test.go diff --git a/cmd/waku/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go similarity index 100% rename from cmd/waku/rest/waku_rest.go rename to cmd/waku/server/rest/waku_rest.go diff --git a/cmd/waku/rest/waku_rest_test.go b/cmd/waku/server/rest/waku_rest_test.go similarity index 100% rename from cmd/waku/rest/waku_rest_test.go rename to cmd/waku/server/rest/waku_rest_test.go diff --git a/cmd/waku/server/rln.go b/cmd/waku/server/rln.go new file mode 100644 index 00000000..d0c24770 --- /dev/null +++ b/cmd/waku/server/rln.go @@ -0,0 +1,23 @@ +//go:build gowaku_rln +// +build gowaku_rln + +package server + +import ( + "fmt" + + "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/rln" +) + +func AppendRLNProof(node *node.WakuNode, msg *pb.WakuMessage) error { + _, rlnEnabled := node.RLNRelay().(*rln.WakuRLNRelay) + if rlnEnabled { + err := node.RLNRelay().AppendRLNProof(msg, node.Timesource().Now()) + if err != nil { + return fmt.Errorf("could not append rln proof: %w", err) + } + } + return nil +} diff --git a/cmd/waku/rpc/admin.go b/cmd/waku/server/rpc/admin.go similarity index 100% rename from cmd/waku/rpc/admin.go rename to cmd/waku/server/rpc/admin.go diff --git a/cmd/waku/rpc/admin_test.go b/cmd/waku/server/rpc/admin_test.go similarity index 100% rename from cmd/waku/rpc/admin_test.go rename to cmd/waku/server/rpc/admin_test.go diff --git a/cmd/waku/rpc/codec.go b/cmd/waku/server/rpc/codec.go similarity index 100% rename from cmd/waku/rpc/codec.go rename to cmd/waku/server/rpc/codec.go diff --git a/cmd/waku/rpc/coded_test.go b/cmd/waku/server/rpc/coded_test.go similarity index 100% rename from cmd/waku/rpc/coded_test.go rename to cmd/waku/server/rpc/coded_test.go diff --git a/cmd/waku/rpc/debug.go b/cmd/waku/server/rpc/debug.go similarity index 100% rename from cmd/waku/rpc/debug.go rename to cmd/waku/server/rpc/debug.go diff --git a/cmd/waku/rpc/debug_test.go b/cmd/waku/server/rpc/debug_test.go similarity index 100% rename from cmd/waku/rpc/debug_test.go rename to cmd/waku/server/rpc/debug_test.go diff --git a/cmd/waku/rpc/filter.go b/cmd/waku/server/rpc/filter.go similarity index 100% rename from cmd/waku/rpc/filter.go rename to cmd/waku/server/rpc/filter.go diff --git a/cmd/waku/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go similarity index 100% rename from cmd/waku/rpc/filter_test.go rename to cmd/waku/server/rpc/filter_test.go diff --git a/cmd/waku/rpc/relay.go b/cmd/waku/server/rpc/relay.go similarity index 76% rename from cmd/waku/rpc/relay.go rename to cmd/waku/server/rpc/relay.go index 8fce74ca..a5a6927f 100644 --- a/cmd/waku/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -1,10 +1,12 @@ package rpc import ( + "errors" "fmt" "net/http" "sync" + "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -12,6 +14,7 @@ import ( "go.uber.org/zap" ) +// RelayService represents the JSON RPC service for WakuRelay type RelayService struct { node *node.WakuNode @@ -24,19 +27,23 @@ type RelayService struct { runner *runnerService } +// RelayMessageArgs represents the requests used for posting messages type RelayMessageArgs struct { Topic string `json:"topic,omitempty"` Message *RPCWakuMessage `json:"message,omitempty"` } +// TopicsArgs represents the lists of topics to use when subscribing / unsubscribing type TopicsArgs struct { Topics []string `json:"topics,omitempty"` } +// TopicArgs represents a request that contains a single topic type TopicArgs struct { Topic string `json:"topic,omitempty"` } +// NewRelayService returns an instance of RelayService func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *RelayService { s := &RelayService{ node: node, @@ -66,6 +73,7 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) } +// Start starts the RelayService func (r *RelayService) Start() { r.messagesMutex.Lock() // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these @@ -78,18 +86,31 @@ func (r *RelayService) Start() { r.runner.Start() } +// Stop stops the RelayService func (r *RelayService) Stop() { r.runner.Stop() } +// PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { var err error - if args.Topic == "" { - _, err = r.node.Relay().Publish(req.Context(), args.Message.toProto()) - } else { - _, err = r.node.Relay().PublishToTopic(req.Context(), args.Message.toProto(), args.Topic) + topic := relay.DefaultWakuTopic + if args.Topic != "" { + topic = args.Topic } + + if !r.node.Relay().IsSubscribed(topic) { + return errors.New("not subscribed to pubsubTopic") + } + + msg := args.Message.toProto() + + if err = server.AppendRLNProof(r.node, msg); err != nil { + return err + } + + _, err = r.node.Relay().PublishToTopic(req.Context(), msg, topic) if err != nil { r.log.Error("publishing message", zap.Error(err)) return err @@ -99,6 +120,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, return nil } +// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { ctx := req.Context() for _, topic := range args.Topics { @@ -129,6 +151,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r return nil } +// DeleteV1Subscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_subscription method func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { ctx := req.Context() for _, topic := range args.Topics { @@ -145,6 +168,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, return nil } +// GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { r.messagesMutex.Lock() defer r.messagesMutex.Unlock() diff --git a/cmd/waku/rpc/relay_test.go b/cmd/waku/server/rpc/relay_test.go similarity index 100% rename from cmd/waku/rpc/relay_test.go rename to cmd/waku/server/rpc/relay_test.go diff --git a/cmd/waku/rpc/rpc_type.go b/cmd/waku/server/rpc/rpc_type.go similarity index 100% rename from cmd/waku/rpc/rpc_type.go rename to cmd/waku/server/rpc/rpc_type.go diff --git a/cmd/waku/rpc/runner.go b/cmd/waku/server/rpc/runner.go similarity index 100% rename from cmd/waku/rpc/runner.go rename to cmd/waku/server/rpc/runner.go diff --git a/cmd/waku/rpc/store.go b/cmd/waku/server/rpc/store.go similarity index 100% rename from cmd/waku/rpc/store.go rename to cmd/waku/server/rpc/store.go diff --git a/cmd/waku/rpc/store_test.go b/cmd/waku/server/rpc/store_test.go similarity index 100% rename from cmd/waku/rpc/store_test.go rename to cmd/waku/server/rpc/store_test.go diff --git a/cmd/waku/rpc/util_test.go b/cmd/waku/server/rpc/util_test.go similarity index 100% rename from cmd/waku/rpc/util_test.go rename to cmd/waku/server/rpc/util_test.go diff --git a/cmd/waku/rpc/utils.go b/cmd/waku/server/rpc/utils.go similarity index 100% rename from cmd/waku/rpc/utils.go rename to cmd/waku/server/rpc/utils.go diff --git a/cmd/waku/rpc/waku_rpc.go b/cmd/waku/server/rpc/waku_rpc.go similarity index 100% rename from cmd/waku/rpc/waku_rpc.go rename to cmd/waku/server/rpc/waku_rpc.go diff --git a/cmd/waku/rpc/waku_rpc_test.go b/cmd/waku/server/rpc/waku_rpc_test.go similarity index 100% rename from cmd/waku/rpc/waku_rpc_test.go rename to cmd/waku/server/rpc/waku_rpc_test.go diff --git a/waku/v2/node/wakunode2_no_rln.go b/waku/v2/node/wakunode2_no_rln.go index 53620cda..d6da3e98 100644 --- a/waku/v2/node/wakunode2_no_rln.go +++ b/waku/v2/node/wakunode2_no_rln.go @@ -5,6 +5,7 @@ package node import "context" +// RLNRelay is used to access any operation related to Waku RLN protocol func (w *WakuNode) RLNRelay() RLNRelay { return nil }