From 57cf95cd5cb1c8dac75e4795203caac15174b37d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Thu, 15 Feb 2024 22:09:56 -0400 Subject: [PATCH] chore: remove RPC server (#1008) Co-authored-by: Prem Chaitanya Prathi --- cmd/waku/flags.go | 34 ---- cmd/waku/main.go | 5 - cmd/waku/node.go | 13 -- cmd/waku/options.go | 10 -- cmd/waku/server/rpc/admin.go | 78 --------- cmd/waku/server/rpc/admin_test.go | 77 --------- cmd/waku/server/rpc/codec.go | 195 ----------------------- cmd/waku/server/rpc/coded_test.go | 18 --- cmd/waku/server/rpc/debug.go | 40 ----- cmd/waku/server/rpc/debug_test.go | 31 ---- cmd/waku/server/rpc/filter.go | 139 ---------------- cmd/waku/server/rpc/filter_test.go | 177 --------------------- cmd/waku/server/rpc/relay.go | 229 --------------------------- cmd/waku/server/rpc/relay_test.go | 159 ------------------- cmd/waku/server/rpc/rpc_type.go | 41 ----- cmd/waku/server/rpc/runner.go | 32 ---- cmd/waku/server/rpc/store.go | 79 --------- cmd/waku/server/rpc/store_test.go | 33 ---- cmd/waku/server/rpc/util_test.go | 41 ----- cmd/waku/server/rpc/utils.go | 103 ------------ cmd/waku/server/rpc/waku_rpc.go | 121 -------------- cmd/waku/server/rpc/waku_rpc_test.go | 19 --- flake.nix | 2 +- go.mod | 4 +- go.sum | 3 - 25 files changed, 2 insertions(+), 1681 deletions(-) delete mode 100644 cmd/waku/server/rpc/admin.go delete mode 100644 cmd/waku/server/rpc/admin_test.go delete mode 100644 cmd/waku/server/rpc/codec.go delete mode 100644 cmd/waku/server/rpc/coded_test.go delete mode 100644 cmd/waku/server/rpc/debug.go delete mode 100644 cmd/waku/server/rpc/debug_test.go delete mode 100644 cmd/waku/server/rpc/filter.go delete mode 100644 cmd/waku/server/rpc/filter_test.go delete mode 100644 cmd/waku/server/rpc/relay.go delete mode 100644 cmd/waku/server/rpc/relay_test.go delete mode 100644 cmd/waku/server/rpc/rpc_type.go delete mode 100644 cmd/waku/server/rpc/runner.go delete mode 100644 cmd/waku/server/rpc/store.go delete mode 100644 cmd/waku/server/rpc/store_test.go delete mode 100644 cmd/waku/server/rpc/util_test.go delete mode 100644 cmd/waku/server/rpc/utils.go delete mode 100644 cmd/waku/server/rpc/waku_rpc.go delete mode 100644 cmd/waku/server/rpc/waku_rpc_test.go diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index e5fd8588..2a2dfe00 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -499,40 +499,6 @@ var ( Destination: &options.Metrics.Port, EnvVars: []string{"WAKUNODE2_METRICS_SERVER_PORT"}, }) - RPCFlag = altsrc.NewBoolFlag(&cli.BoolFlag{ - Name: "rpc", - Usage: "Enable the rpc server", - Destination: &options.RPCServer.Enable, - EnvVars: []string{"WAKUNODE2_RPC"}, - }) - RPCPort = altsrc.NewIntFlag(&cli.IntFlag{ - Name: "rpc-port", - Value: 8545, - Usage: "Listening port of the rpc server", - Destination: &options.RPCServer.Port, - EnvVars: []string{"WAKUNODE2_RPC_PORT"}, - }) - RPCAddress = altsrc.NewStringFlag(&cli.StringFlag{ - Name: "rpc-address", - Value: "127.0.0.1", - Usage: "Listening address of the rpc server", - Destination: &options.RPCServer.Address, - EnvVars: []string{"WAKUNODE2_RPC_ADDRESS"}, - }) - RPCRelayCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{ - Name: "rpc-relay-cache-capacity", - Value: 30, - Usage: "Capacity of the Relay REST API message cache", - Destination: &options.RPCServer.RelayCacheCapacity, - EnvVars: []string{"WAKUNODE2_RPC_RELAY_CACHE_CAPACITY"}, - }) - RPCAdmin = altsrc.NewBoolFlag(&cli.BoolFlag{ - Name: "rpc-admin", - Value: false, - Usage: "Enable access to JSON-RPC Admin API", - Destination: &options.RPCServer.Admin, - EnvVars: []string{"WAKUNODE2_RPC_ADMIN"}, - }) RESTFlag = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "rest", Usage: "Enable Waku REST HTTP server", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 7f4bbd23..4316aae5 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -90,11 +90,6 @@ func main() { MetricsServer, MetricsServerAddress, MetricsServerPort, - RPCFlag, - RPCPort, - RPCAddress, - RPCRelayCacheCapacity, - RPCAdmin, RESTFlag, RESTAddress, RESTPort, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index f171c676..f59bf571 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -40,7 +40,6 @@ import ( ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/cmd/waku/server/rest" - "github.com/waku-org/go-waku/cmd/waku/server/rpc" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/metrics" "github.com/waku-org/go-waku/waku/persistence" @@ -400,12 +399,6 @@ func Execute(options NodeOptions) error { } } - var rpcServer *rpc.WakuRPC - if options.RPCServer.Enable { - rpcServer = rpc.NewWakuRPC(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger) - rpcServer.Start() - } - var restServer *rest.WakuRest if options.RESTServer.Enable { wg.Add(1) @@ -432,12 +425,6 @@ func Execute(options NodeOptions) error { // shut the node down wakuNode.Stop() - if options.RPCServer.Enable { - if err := rpcServer.Stop(ctx); err != nil { - return err - } - } - if options.RESTServer.Enable { if err := restServer.Stop(ctx); err != nil { return err diff --git a/cmd/waku/options.go b/cmd/waku/options.go index bda8dc3a..dc824e5b 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -101,15 +101,6 @@ type MetricsOptions struct { Port int } -// RPCServerOptions are settings used to start a json rpc server -type RPCServerOptions struct { - Enable bool - Port int - Address string - Admin bool - RelayCacheCapacity int -} - // RESTServerOptions are settings used to start a rest http server type RESTServerOptions struct { Enable bool @@ -185,6 +176,5 @@ type NodeOptions struct { DNSDiscovery DNSDiscoveryOptions Rendezvous RendezvousOptions Metrics MetricsOptions - RPCServer RPCServerOptions RESTServer RESTServerOptions } diff --git a/cmd/waku/server/rpc/admin.go b/cmd/waku/server/rpc/admin.go deleted file mode 100644 index be1b6a79..00000000 --- a/cmd/waku/server/rpc/admin.go +++ /dev/null @@ -1,78 +0,0 @@ -package rpc - -import ( - "net/http" - - "github.com/libp2p/go-libp2p/core/protocol" - ma "github.com/multiformats/go-multiaddr" - "go.uber.org/zap" - - "github.com/waku-org/go-waku/cmd/waku/server" - "github.com/waku-org/go-waku/waku/v2/node" -) - -type AdminService struct { - node *node.WakuNode - log *zap.Logger -} - -type GetPeersArgs struct { -} - -type PeersArgs struct { - Peers []string `json:"peers,omitempty"` -} - -type PeerReply struct { - Multiaddr string `json:"multiaddr,omitempty"` - Protocol protocol.ID `json:"protocol,omitempty"` - Connected bool `json:"connected,omitempty"` -} - -type PeersReply []PeerReply - -func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *SuccessReply) error { - for _, peer := range args.Peers { - addr, err := ma.NewMultiaddr(peer) - if err != nil { - a.log.Error("building multiaddr", zap.Error(err)) - return err - } - - err = a.node.DialPeerWithMultiAddress(req.Context(), addr) - if err != nil { - a.log.Error("dialing peers", zap.Error(err)) - return err - } - } - - *reply = true - return nil -} - -func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error { - peers, err := a.node.Peers() - if err != nil { - a.log.Error("getting peers", zap.Error(err)) - return nil - } - for _, peer := range peers { - if peer.ID.String() == a.node.Host().ID().String() { - //Skip own node id - continue - } - for _, addr := range peer.Addrs { - for _, proto := range peer.Protocols { - if !server.IsWakuProtocol(proto) { - continue - } - *reply = append(*reply, PeerReply{ - Multiaddr: addr.String(), - Protocol: proto, - Connected: peer.Connected, - }) - } - } - } - return nil -} diff --git a/cmd/waku/server/rpc/admin_test.go b/cmd/waku/server/rpc/admin_test.go deleted file mode 100644 index c0716703..00000000 --- a/cmd/waku/server/rpc/admin_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "crypto/rand" - "fmt" - "net/http" - "testing" - "time" - - "github.com/multiformats/go-multiaddr" - "github.com/prometheus/client_golang/prometheus" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/timesource" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func makeAdminService(t *testing.T) *AdminService { - options := node.WithWakuRelay() - n, err := node.New(options) - require.NoError(t, err) - err = n.Start(context.Background()) - require.NoError(t, err) - return &AdminService{n, utils.Logger()} -} - -func TestV1Peers(t *testing.T) { - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) - - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - - host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) - relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - relay.SetHost(host) - err = relay.Start(context.Background()) - require.NoError(t, err) - defer relay.Stop() - - var reply PeersReply - - request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte(""))) - require.NoError(t, err) - - a := makeAdminService(t) - - err = a.GetV1Peers(request, &GetPeersArgs{}, &reply) - require.NoError(t, err) - require.Len(t, reply, 0) - - var reply2 SuccessReply - - hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().String())) - require.NoError(t, err) - - var addr multiaddr.Multiaddr - for _, a := range host.Addrs() { - addr = a.Encapsulate(hostInfo) - break - } - err = a.PostV1Peers(request, &PeersArgs{Peers: []string{addr.String()}}, &reply2) - require.NoError(t, err) - require.True(t, reply2) - - time.Sleep(2 * time.Second) - - err = a.GetV1Peers(request, &GetPeersArgs{}, &reply) - require.NoError(t, err) - require.Len(t, reply, 1) -} diff --git a/cmd/waku/server/rpc/codec.go b/cmd/waku/server/rpc/codec.go deleted file mode 100644 index 8a340aff..00000000 --- a/cmd/waku/server/rpc/codec.go +++ /dev/null @@ -1,195 +0,0 @@ -package rpc - -import ( - "encoding/json" - "errors" - "fmt" - "net/http" - "reflect" - "strings" - - "github.com/gorilla/rpc/v2" - "golang.org/x/text/cases" - "golang.org/x/text/language" -) - -// Based on github.com/gorilla/rpc/v2/json which is governed by a BSD-style license - -var null = json.RawMessage([]byte("null")) - -// An Error is a wrapper for a JSON interface value. It can be used by either -// a service's handler func to write more complex JSON data to an error field -// of a server's response, or by a client to read it. -type Error struct { - Data interface{} -} - -func (e *Error) Error() string { - return fmt.Sprintf("%v", e.Data) -} - -// ---------------------------------------------------------------------------- -// Request and Response -// ---------------------------------------------------------------------------- - -// serverRequest represents a JSON-RPC request received by the server. -type serverRequest struct { - // A String containing the name of the method to be invoked. - Method string `json:"method"` - // An Array of objects to pass as arguments to the method. - Params *json.RawMessage `json:"params"` - // The request id. This can be of any type. It is used to match the - // response with the request that it is replying to. - ID *json.RawMessage `json:"id"` -} - -// serverResponse represents a JSON-RPC response returned by the server. -type serverResponse struct { - // The Object that was returned by the invoked method. This must be null - // in case there was an error invoking the method. - Result interface{} `json:"result"` - // An Error object if there was an error invoking the method. It must be - // null if there was no error. - Error interface{} `json:"error"` - // This must be the same id as the request it is responding to. - ID *json.RawMessage `json:"id"` -} - -// ---------------------------------------------------------------------------- -// Codec -// ---------------------------------------------------------------------------- - -// NewCodec returns a new SnakeCaseCodec Codec. -func NewSnakeCaseCodec() *SnakeCaseCodec { - return &SnakeCaseCodec{} -} - -// SnakeCaseCodec creates a CodecRequest to process each request. -type SnakeCaseCodec struct { -} - -// NewRequest returns a CodecRequest. -func (c *SnakeCaseCodec) NewRequest(r *http.Request) rpc.CodecRequest { - return newCodecRequest(r) -} - -// ---------------------------------------------------------------------------- -// CodecRequest -// ---------------------------------------------------------------------------- - -// newCodecRequest returns a new CodecRequest. -func newCodecRequest(r *http.Request) rpc.CodecRequest { - // Decode the request body and check if RPC method is valid. - req := new(serverRequest) - err := json.NewDecoder(r.Body).Decode(req) - r.Body.Close() - return &CodecRequest{request: req, err: err} -} - -// CodecRequest decodes and encodes a single request. -type CodecRequest struct { - request *serverRequest - err error -} - -// Method returns the RPC method for the current request. -// -// The method uses a dotted notation as in "Service.Method". -func (c *CodecRequest) Method() (string, error) { - if c.err == nil { - return toWakuMethod(c.request.Method), nil - } - return "", c.err -} - -// toWakuMethod transform get_waku_v2_debug_v1_info to Debug.GetV1Info -func toWakuMethod(input string) string { - typ := "get" - if strings.HasPrefix(input, "post") { - typ = "post" - } else if strings.HasPrefix(input, "delete") { - typ = "delete" - } - - base := typ + "_waku_v2_" - cleanedInput := strings.Replace(input, base, "", 1) - splited := strings.Split(cleanedInput, "_") - - c := cases.Title(language.AmericanEnglish) - - method := c.String(typ) - for _, val := range splited[1:] { - method = method + c.String(val) - } - - return c.String(splited[0]) + "." + method -} - -// ReadRequest fills the request object for the RPC method. -func (c *CodecRequest) ReadRequest(args interface{}) error { - if c.err == nil { - if c.request.Params != nil { - // JSON params is array value. RPC params is struct. - // Attempt to unmarshal into array containing the request struct. - params := [1]interface{}{args} - err := json.Unmarshal(*c.request.Params, ¶ms) - if err != nil { - // This failed so we might have received an array of parameters - // instead of a object - argsValueOf := reflect.Indirect(reflect.ValueOf(args)) - if argsValueOf.Kind() == reflect.Struct { - var params []interface{} - for i := 0; i < argsValueOf.NumField(); i++ { - params = append(params, argsValueOf.Field(i).Addr().Interface()) - } - c.err = json.Unmarshal(*c.request.Params, ¶ms) - } else { - // Unknown field type... - c.err = err - } - } - - } else { - c.err = errors.New("rpc: method request ill-formed: missing params field") - } - } - return c.err -} - -// WriteResponse encodes the response and writes it to the ResponseWriter. -func (c *CodecRequest) WriteResponse(w http.ResponseWriter, reply interface{}) { - if c.request.ID != nil { - // Id is null for notifications and they don't have a response. - res := &serverResponse{ - Result: reply, - Error: &null, - ID: c.request.ID, - } - c.writeServerResponse(w, 200, res) - } -} - -func (c *CodecRequest) WriteError(w http.ResponseWriter, _ int, err error) { - res := &serverResponse{ - Result: &null, - ID: c.request.ID, - } - if jsonErr, ok := err.(*Error); ok { - res.Error = jsonErr.Data - } else { - res.Error = err.Error() - } - c.writeServerResponse(w, 400, res) -} - -func (c *CodecRequest) writeServerResponse(w http.ResponseWriter, status int, res *serverResponse) { - b, err := json.Marshal(res) - if err == nil { - w.Header().Set("Content-Type", "application/json; charset=utf-8") - w.WriteHeader(status) - _, _ = w.Write(b) - } else { - // Not sure in which case will this happen. But seems harmless. - rpc.WriteError(w, status, err.Error()) - } -} diff --git a/cmd/waku/server/rpc/coded_test.go b/cmd/waku/server/rpc/coded_test.go deleted file mode 100644 index 7f593c32..00000000 --- a/cmd/waku/server/rpc/coded_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package rpc - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestConvertWakuMethod(t *testing.T) { - res := toWakuMethod("get_waku_v2_debug_v1_info") - require.Equal(t, "Debug.GetV1Info", res) - - res = toWakuMethod("post_waku_v2_relay_v1_message") - require.Equal(t, "Relay.PostV1Message", res) - - res = toWakuMethod("delete_waku_v2_relay_v1_subscriptions") - require.Equal(t, "Relay.DeleteV1Subscriptions", res) -} diff --git a/cmd/waku/server/rpc/debug.go b/cmd/waku/server/rpc/debug.go deleted file mode 100644 index 63ba8efc..00000000 --- a/cmd/waku/server/rpc/debug.go +++ /dev/null @@ -1,40 +0,0 @@ -package rpc - -import ( - "net/http" - - "github.com/waku-org/go-waku/waku/v2/node" -) - -type DebugService struct { - node *node.WakuNode -} - -type InfoArgs struct { -} - -type InfoReply struct { - ENRUri string `json:"enrUri,omitempty"` - ListenAddresses []string `json:"listenAddresses,omitempty"` -} - -func NewDebugService(node *node.WakuNode) *DebugService { - return &DebugService{ - node: node, - } -} - -func (d *DebugService) GetV1Info(r *http.Request, args *InfoArgs, reply *InfoReply) error { - reply.ENRUri = d.node.ENR().String() - for _, addr := range d.node.ListenAddresses() { - reply.ListenAddresses = append(reply.ListenAddresses, addr.String()) - } - return nil -} - -type VersionResponse string - -func (d *DebugService) GetV1Version(r *http.Request, args *InfoArgs, reply *VersionResponse) error { - *reply = VersionResponse(node.GetVersionInfo().String()) - return nil -} diff --git a/cmd/waku/server/rpc/debug_test.go b/cmd/waku/server/rpc/debug_test.go deleted file mode 100644 index c7a00f05..00000000 --- a/cmd/waku/server/rpc/debug_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "net/http" - "testing" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/node" -) - -func TestGetV1Info(t *testing.T) { - var reply InfoReply - - request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte(""))) - require.NoError(t, err) - - wakuNode1, err := node.New() - require.NoError(t, err) - defer wakuNode1.Stop() - err = wakuNode1.Start(context.Background()) - require.NoError(t, err) - - d := &DebugService{ - node: wakuNode1, - } - - err = d.GetV1Info(request, &InfoArgs{}, &reply) - require.NoError(t, err) -} diff --git a/cmd/waku/server/rpc/filter.go b/cmd/waku/server/rpc/filter.go deleted file mode 100644 index 2d1002d9..00000000 --- a/cmd/waku/server/rpc/filter.go +++ /dev/null @@ -1,139 +0,0 @@ -package rpc - -import ( - "fmt" - "net/http" - "sync" - - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" - wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "go.uber.org/zap" -) - -type FilterService struct { - node *node.WakuNode - log *zap.Logger - - messages map[string][]*wpb.WakuMessage - cacheCapacity int - messagesMutex sync.RWMutex - - runner *runnerService -} - -type FilterContentArgs struct { - Topic string `json:"topic,omitempty"` - ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"` -} - -type ContentTopicArgs struct { - ContentTopic string `json:"contentTopic,omitempty"` -} - -func NewFilterService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *FilterService { - s := &FilterService{ - node: node, - log: log.Named("filter"), - cacheCapacity: cacheCapacity, - messages: make(map[string][]*wpb.WakuMessage), - } - s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - return s -} - -func makeContentFilter(args *FilterContentArgs) legacy_filter.ContentFilter { - var contentTopics []string - for _, contentFilter := range args.ContentFilters { - contentTopics = append(contentTopics, contentFilter.ContentTopic) - } - - return legacy_filter.ContentFilter{ - Topic: args.Topic, - ContentTopics: contentTopics, - } -} - -func (f *FilterService) addEnvelope(envelope *protocol.Envelope) { - f.messagesMutex.Lock() - defer f.messagesMutex.Unlock() - - contentTopic := envelope.Message().ContentTopic - if _, ok := f.messages[contentTopic]; !ok { - return - } - - // Keep a specific max number of messages per topic - if len(f.messages[envelope.PubsubTopic()]) >= f.cacheCapacity { - f.messages[envelope.PubsubTopic()] = f.messages[envelope.PubsubTopic()][1:] - } - - f.messages[contentTopic] = append(f.messages[contentTopic], envelope.Message()) -} - -func (f *FilterService) Start() { - f.runner.Start() -} - -func (f *FilterService) Stop() { - f.runner.Stop() -} - -func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error { - _, _, err := f.node.LegacyFilter().Subscribe( - req.Context(), - makeContentFilter(args), - legacy_filter.WithAutomaticPeerSelection(), - ) - if err != nil { - f.log.Error("subscribing to topic", zap.String("topic", args.Topic), zap.Error(err)) - return err - } - for _, contentFilter := range args.ContentFilters { - f.messages[contentFilter.ContentTopic] = make([]*wpb.WakuMessage, 0) - } - - *reply = true - return nil -} - -func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error { - err := f.node.LegacyFilter().UnsubscribeFilter( - req.Context(), - makeContentFilter(args), - ) - if err != nil { - f.log.Error("unsubscribing from topic", zap.String("topic", args.Topic), zap.Error(err)) - return err - } - for _, contentFilter := range args.ContentFilters { - delete(f.messages, contentFilter.ContentTopic) - } - - *reply = true - return nil -} - -func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs, reply *MessagesReply) error { - f.messagesMutex.Lock() - defer f.messagesMutex.Unlock() - - if _, ok := f.messages[args.ContentTopic]; !ok { - return fmt.Errorf("topic %s not subscribed", args.ContentTopic) - } - - for i := range f.messages[args.ContentTopic] { - msg := f.messages[args.ContentTopic][i] - rpcMsg, err := ProtoToRPC(msg) - if err != nil { - f.log.Warn("could not include message in response", zap.Error(err)) - } else { - *reply = append(*reply, rpcMsg) - } - } - - f.messages[args.ContentTopic] = make([]*wpb.WakuMessage, 0) - return nil -} diff --git a/cmd/waku/server/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go deleted file mode 100644 index 97ef47ea..00000000 --- a/cmd/waku/server/rpc/filter_test.go +++ /dev/null @@ -1,177 +0,0 @@ -package rpc - -import ( - "context" - "crypto/rand" - "fmt" - "testing" - "time" - - "github.com/multiformats/go-multiaddr" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/peerstore" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" - wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/timesource" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -var testTopic = "test" - -func makeFilterService(t *testing.T, isFullNode bool) *FilterService { - var nodeOpts []node.WakuNodeOption - - nodeOpts = append(nodeOpts, node.WithLegacyWakuFilter(isFullNode)) - if isFullNode { - nodeOpts = append(nodeOpts, node.WithWakuRelay()) - } - - n, err := node.New(nodeOpts...) - require.NoError(t, err) - err = n.Start(context.Background()) - require.NoError(t, err) - - if isFullNode { - sub, err := n.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic)) - go func() { - for range sub[0].Ch { - } - }() - require.NoError(t, err) - } - - return NewFilterService(n, 30, utils.Logger()) -} - -func TestFilterSubscription(t *testing.T) { - t.Skip("skipping since it is legacy filter") - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) - - host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) - - b := relay.NewBroadcaster(10) - require.NoError(t, b.Start(context.Background())) - node := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - node.SetHost(host) - err = node.Start(context.Background()) - require.NoError(t, err) - - _, err = node.Subscribe(context.Background(), protocol.NewContentFilter(testTopic)) - require.NoError(t, err) - - b2 := relay.NewBroadcaster(10) - require.NoError(t, b2.Start(context.Background())) - f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - f.SetHost(host) - sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) - err = f.Start(context.Background(), sub) - require.NoError(t, err) - - d := makeFilterService(t, true) - defer d.node.Stop() - - hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().String())) - require.NoError(t, err) - - var addr multiaddr.Multiaddr - for _, a := range host.Addrs() { - addr = a.Encapsulate(hostInfo) - break - } - - _, err = d.node.AddPeer(addr, peerstore.Static, []string{testTopic}, legacy_filter.FilterID_v20beta1) - require.NoError(t, err) - - args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}} - - var reply SuccessReply - err = d.PostV1Subscription( - makeRequest(t), - args, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) - - err = d.DeleteV1Subscription( - makeRequest(t), - args, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) -} - -func TestFilterGetV1Messages(t *testing.T) { - t.Skip("skipping since it is legacy filter") - - serviceA := makeFilterService(t, true) - var reply SuccessReply - - serviceB := makeFilterService(t, false) - go serviceB.Start() - defer serviceB.Stop() - - hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().String())) - require.NoError(t, err) - - var addr multiaddr.Multiaddr - for _, a := range serviceB.node.Host().Addrs() { - addr = a.Encapsulate(hostInfo) - break - } - err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) - require.NoError(t, err) - - // Wait for the dial to complete - time.Sleep(1 * time.Second) - - args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}} - err = serviceB.PostV1Subscription( - makeRequest(t), - args, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) - - // Wait for the subscription to be started - time.Sleep(1 * time.Second) - - _, err = serviceA.node.Relay().Publish( - context.Background(), - &wpb.WakuMessage{ContentTopic: "ct"}, - relay.WithPubSubTopic(testTopic), - ) - require.NoError(t, err) - require.True(t, reply) - - // Wait for the message to be received - time.Sleep(1 * time.Second) - - var messagesReply1 MessagesReply - err = serviceB.GetV1Messages( - makeRequest(t), - &ContentTopicArgs{"ct"}, - &messagesReply1, - ) - require.NoError(t, err) - require.Len(t, messagesReply1, 1) - - var messagesReply2 MessagesReply - err = serviceB.GetV1Messages( - makeRequest(t), - &ContentTopicArgs{"ct"}, - &messagesReply2, - ) - require.NoError(t, err) - require.Len(t, messagesReply2, 0) -} diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go deleted file mode 100644 index ddc221bf..00000000 --- a/cmd/waku/server/rpc/relay.go +++ /dev/null @@ -1,229 +0,0 @@ -package rpc - -import ( - "errors" - "fmt" - "net/http" - - "github.com/waku-org/go-waku/cmd/waku/server" - "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "go.uber.org/zap" -) - -var errChannelClosed = errors.New("consume channel is closed for subscription") - -// RelayService represents the JSON RPC service for WakuRelay -type RelayService struct { - node *node.WakuNode - - log *zap.Logger - - cacheCapacity int -} - -// RelayMessageArgs represents the requests used for posting messages -type RelayMessageArgs struct { - Topic string `json:"topic,omitempty"` - Message *RPCWakuMessage `json:"message,omitempty"` -} - -// RelayAutoMessageArgs represents the requests used for posting messages -type RelayAutoMessageArgs struct { - Message *RPCWakuMessage `json:"message,omitempty"` -} - -// TopicsArgs represents the lists of topics to use when subscribing / unsubscribing -type TopicsArgs struct { - Topics []string `json:"topics,omitempty"` -} - -// TopicArgs represents a request that contains a single topic -type TopicArgs struct { - Topic string `json:"topic,omitempty"` -} - -// NewRelayService returns an instance of RelayService -func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *RelayService { - s := &RelayService{ - node: node, - cacheCapacity: cacheCapacity, - log: log.Named("relay"), - } - - return s -} - -// Start starts the RelayService -func (r *RelayService) Start() { -} - -// Stop stops the RelayService -func (r *RelayService) Stop() { -} - -// PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method -func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { - var err error - - topic := relay.DefaultWakuTopic - if args.Topic != "" { - topic = args.Topic - } - - msg, err := args.Message.toProto() - if err != nil { - return err - } - - if err = server.AppendRLNProof(r.node, msg); err != nil { - return err - } - - _, err = r.node.Relay().Publish(req.Context(), msg, relay.WithPubSubTopic(topic)) - if err != nil { - r.log.Error("publishing message", zap.Error(err)) - return err - } - - *reply = true - return nil -} - -// PostV1AutoSubscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_subscription -// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics. -func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - - _, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...), relay.WithCacheSize(uint(r.cacheCapacity))) - if err != nil { - r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err)) - return err - } - //TODO: Handle partial errors. - - *reply = true - return nil -} - -// DeleteV1AutoSubscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_auto_subscription -// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics. -func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - ctx := req.Context() - - err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter("", args.Topics...)) - if err != nil { - r.log.Error("unsubscribing from topics", zap.Strings("topic", args.Topics), zap.Error(err)) - return err - } - //TODO: Handle partial errors. - *reply = true - return nil -} - -// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message -func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessageArgs, reply *SuccessReply) error { - msg, err := args.Message.toProto() - if err != nil { - err = fmt.Errorf("invalid message format received: %w", err) - r.log.Error("publishing message", zap.Error(err)) - return err - } - - if err = server.AppendRLNProof(r.node, msg); err != nil { - return err - } - - _, err = r.node.Relay().Publish(req.Context(), msg) - if err != nil { - r.log.Error("publishing message", zap.Error(err)) - return err - } - - *reply = true - return nil -} - -// GetV1AutoMessages is invoked when the json rpc request uses the get_waku_v2_relay_v1_auto_messages method -// Note that this method takes contentTopic as an argument instead of pubSubtopic and uses autosharding. -func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { - sub, err := r.node.Relay().GetSubscription(args.Topic) - if err != nil { - return err - } - select { - case msg, open := <-sub.Ch: - if !open { - r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic)) - return errChannelClosed - } - rpcMsg, err := ProtoToRPC(msg.Message()) - if err != nil { - r.log.Warn("could not include message in response", logging.HexBytes("hash", msg.Hash()), zap.Error(err)) - } else { - *reply = append(*reply, rpcMsg) - } - default: - break - } - return nil -} - -// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method -func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - - for _, topic := range args.Topics { - var err error - if topic == "" { - topic = relay.DefaultWakuTopic - } - _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topic), relay.WithCacheSize(uint(r.cacheCapacity))) - if err != nil { - r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) - return err - } - - } - - *reply = true - return nil -} - -// DeleteV1Subscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_subscription method -func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - ctx := req.Context() - for _, topic := range args.Topics { - err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter(topic)) - if err != nil { - r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err)) - return err - } - } - - *reply = true - return nil -} - -// GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method -func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { - - sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(args.Topic, "") - if err != nil { - return err - } - select { - case msg, open := <-sub.Ch: - if !open { - r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic)) - return errChannelClosed - } - m, err := ProtoToRPC(msg.Message()) - if err == nil { - *reply = append(*reply, m) - } - default: - break - } - return nil -} diff --git a/cmd/waku/server/rpc/relay_test.go b/cmd/waku/server/rpc/relay_test.go deleted file mode 100644 index e95e6a43..00000000 --- a/cmd/waku/server/rpc/relay_test.go +++ /dev/null @@ -1,159 +0,0 @@ -package rpc - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func makeRelayService(t *testing.T) *RelayService { - options := node.WithWakuRelayAndMinPeers(0) - n, err := node.New(options) - require.NoError(t, err) - err = n.Start(context.Background()) - require.NoError(t, err) - - return NewRelayService(n, 30, utils.Logger()) -} - -func TestPostV1Message(t *testing.T) { - var reply SuccessReply - - d := makeRelayService(t) - - msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: "abc", - Timestamp: utils.GetUnixEpoch(), - } - - rpcWakuMsg, err := ProtoToRPC(msg) - require.NoError(t, err) - - err = d.PostV1Message( - makeRequest(t), - &RelayMessageArgs{ - Message: rpcWakuMsg, - }, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) -} - -func TestRelaySubscription(t *testing.T) { - var reply SuccessReply - - d := makeRelayService(t) - args := &TopicsArgs{Topics: []string{"test"}} - - err := d.PostV1Subscription( - makeRequest(t), - args, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) - - err = d.DeleteV1Subscription( - makeRequest(t), - args, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) -} - -func TestRelayGetV1Messages(t *testing.T) { - serviceA := makeRelayService(t) - go serviceA.Start() - defer serviceA.Stop() - - var reply SuccessReply - - serviceB := makeRelayService(t) - go serviceB.Start() - defer serviceB.Stop() - - hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().String())) - require.NoError(t, err) - - var addr multiaddr.Multiaddr - for _, a := range serviceB.node.Host().Addrs() { - addr = a.Encapsulate(hostInfo) - break - } - err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) - require.NoError(t, err) - - // Wait for the dial to complete - time.Sleep(1 * time.Second) - - args := &TopicsArgs{Topics: []string{"test"}} - - // Subscribe A to topic - err = serviceA.PostV1Subscription( - makeRequest(t), - args, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) - - // Subscribe B to topic - err = serviceB.PostV1Subscription( - makeRequest(t), - args, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) - - // Wait for the subscription to be started - time.Sleep(1 * time.Second) - - rpcWakuMsg, err := ProtoToRPC(&pb.WakuMessage{ - Payload: []byte("test"), - ContentTopic: "test", - }) - require.NoError(t, err) - - err = serviceA.PostV1Message( - makeRequest(t), - &RelayMessageArgs{ - Topic: "test", - Message: rpcWakuMsg, - }, - &reply, - ) - require.NoError(t, err) - require.True(t, reply) - - // Wait for the message to be received - time.Sleep(1 * time.Second) - - var messagesReply1 MessagesReply - err = serviceB.GetV1Messages( - makeRequest(t), - &TopicArgs{"test"}, - &messagesReply1, - ) - require.NoError(t, err) - require.Len(t, messagesReply1, 1) - - var messagesReply2 MessagesReply - err = serviceB.GetV1Messages( - makeRequest(t), - &TopicArgs{"test"}, - &messagesReply2, - ) - require.NoError(t, err) - require.Len(t, messagesReply2, 0) -} diff --git a/cmd/waku/server/rpc/rpc_type.go b/cmd/waku/server/rpc/rpc_type.go deleted file mode 100644 index 3106f931..00000000 --- a/cmd/waku/server/rpc/rpc_type.go +++ /dev/null @@ -1,41 +0,0 @@ -package rpc - -import ( - "encoding/base64" - "strings" -) - -type SuccessReply = bool - -type Empty struct { -} - -type MessagesReply = []*RPCWakuMessage - -type Base64URLByte []byte - -// UnmarshalText is used by json.Unmarshal to decode both url-safe and standard -// base64 encoded strings with and without padding -func (h *Base64URLByte) UnmarshalText(b []byte) error { - inputValue := "" - if b != nil { - inputValue = string(b) - } - - enc := base64.StdEncoding - if strings.ContainsAny(inputValue, "-_") { - enc = base64.URLEncoding - } - if len(inputValue)%4 != 0 { - enc = enc.WithPadding(base64.NoPadding) - } - - decodedBytes, err := enc.DecodeString(inputValue) - if err != nil { - return err - } - - *h = decodedBytes - - return nil -} diff --git a/cmd/waku/server/rpc/runner.go b/cmd/waku/server/rpc/runner.go deleted file mode 100644 index e09e031b..00000000 --- a/cmd/waku/server/rpc/runner.go +++ /dev/null @@ -1,32 +0,0 @@ -package rpc - -import ( - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" -) - -type Adder func(msg *protocol.Envelope) - -type runnerService struct { - broadcaster relay.Broadcaster - sub *relay.Subscription - adder Adder -} - -func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService { - return &runnerService{ - broadcaster: broadcaster, - adder: adder, - } -} - -func (r *runnerService) Start() { - r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize)) - for envelope := range r.sub.Ch { - r.adder(envelope) - } -} - -func (r *runnerService) Stop() { - r.sub.Unsubscribe() -} diff --git a/cmd/waku/server/rpc/store.go b/cmd/waku/server/rpc/store.go deleted file mode 100644 index f39bb645..00000000 --- a/cmd/waku/server/rpc/store.go +++ /dev/null @@ -1,79 +0,0 @@ -package rpc - -import ( - "net/http" - - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/store" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" - "go.uber.org/zap" -) - -type StoreService struct { - node *node.WakuNode - log *zap.Logger -} - -// cursor *pb.Index -// pageSize uint64 -// asc bool - -type StorePagingOptions struct { - PageSize uint64 `json:"pageSize,omitempty"` - Cursor *pb.Index `json:"cursor,omitempty"` - Forward bool `json:"forward,omitempty"` -} - -type StoreMessagesArgs struct { - Topic string `json:"pubsubTopic,omitempty"` - ContentFilters []string `json:"contentFilters,omitempty"` - StartTime *int64 `json:"startTime,omitempty"` - EndTime *int64 `json:"endTime,omitempty"` - PagingOptions StorePagingOptions `json:"pagingOptions,omitempty"` -} - -type StoreMessagesReply struct { - Messages []*RPCWakuMessage `json:"messages,omitempty"` - PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"` - Error string `json:"error,omitempty"` -} - -func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, reply *StoreMessagesReply) error { - options := []store.HistoryRequestOption{ - store.WithAutomaticRequestID(), - store.WithAutomaticPeerSelection(), - store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), - store.WithCursor(args.PagingOptions.Cursor), - } - res, err := s.node.Store().Query( - req.Context(), - store.Query{ - PubsubTopic: args.Topic, - ContentTopics: args.ContentFilters, - StartTime: args.StartTime, - EndTime: args.EndTime, - }, - options..., - ) - if err != nil { - s.log.Error("querying messages", zap.Error(err)) - reply.Error = err.Error() - return nil - } - - reply.Messages = make([]*RPCWakuMessage, len(res.Messages)) - for i := range res.Messages { - msg, err := ProtoToRPC(res.Messages[i]) - if err != nil { - return err - } - reply.Messages[i] = msg - } - - reply.PagingInfo = StorePagingOptions{ - PageSize: args.PagingOptions.PageSize, - Cursor: res.Cursor(), - Forward: args.PagingOptions.Forward, - } - return nil -} diff --git a/cmd/waku/server/rpc/store_test.go b/cmd/waku/server/rpc/store_test.go deleted file mode 100644 index b9358121..00000000 --- a/cmd/waku/server/rpc/store_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package rpc - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func makeStoreService(t *testing.T) *StoreService { - options := node.WithWakuStore() - n, err := node.New(options) - require.NoError(t, err) - err = n.Start(context.Background()) - require.NoError(t, err) - return &StoreService{n, utils.Logger()} -} - -func TestStoreGetV1Messages(t *testing.T) { - var reply StoreMessagesReply - - s := makeStoreService(t) - - err := s.GetV1Messages( - makeRequest(t), - &StoreMessagesArgs{}, - &reply, - ) - require.NoError(t, err) - require.NotEmpty(t, reply.Error) -} diff --git a/cmd/waku/server/rpc/util_test.go b/cmd/waku/server/rpc/util_test.go deleted file mode 100644 index ecb9b103..00000000 --- a/cmd/waku/server/rpc/util_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package rpc - -import ( - "bytes" - "encoding/base64" - "encoding/json" - "net/http" - "testing" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" -) - -func makeRequest(t *testing.T) *http.Request { - request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte(""))) - require.NoError(t, err) - return request -} - -func TestBase64Encoding(t *testing.T) { - input := "Hello World" - - rpcMsg, err := ProtoToRPC(&pb.WakuMessage{ - Payload: []byte(input), - ContentTopic: "test", - }) - require.NoError(t, err) - - jsonBytes, err := json.Marshal(rpcMsg) - require.NoError(t, err) - - m := make(map[string]interface{}) - err = json.Unmarshal(jsonBytes, &m) - require.NoError(t, err) - require.Equal(t, base64.StdEncoding.EncodeToString([]byte(input)), m["payload"]) - - decodedRPCMsg := new(RPCWakuMessage) - err = json.Unmarshal(jsonBytes, decodedRPCMsg) - require.NoError(t, err) - require.Equal(t, input, string(decodedRPCMsg.Payload)) -} diff --git a/cmd/waku/server/rpc/utils.go b/cmd/waku/server/rpc/utils.go deleted file mode 100644 index 91271897..00000000 --- a/cmd/waku/server/rpc/utils.go +++ /dev/null @@ -1,103 +0,0 @@ -package rpc - -import ( - "errors" - - "github.com/waku-org/go-waku/cmd/waku/server" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - rlnpb "github.com/waku-org/go-waku/waku/v2/protocol/rln/pb" - - "google.golang.org/protobuf/proto" -) - -type RateLimitProof struct { - Proof Base64URLByte `json:"proof,omitempty"` - MerkleRoot Base64URLByte `json:"merkle_root,omitempty"` - Epoch Base64URLByte `json:"epoch,omitempty"` - ShareX Base64URLByte `json:"share_x,omitempty"` - ShareY Base64URLByte `json:"share_y,omitempty"` - Nullifier Base64URLByte `json:"nullifier,omitempty"` - RlnIdentifier Base64URLByte `json:"rln_identifier,omitempty"` -} - -type RPCWakuMessage struct { - Payload server.Base64URLByte `json:"payload,omitempty"` - ContentTopic string `json:"contentTopic,omitempty"` - Version uint32 `json:"version"` - Timestamp int64 `json:"timestamp,omitempty"` - RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"` - Ephemeral bool `json:"ephemeral,omitempty"` -} - -func ProtoToRPC(input *pb.WakuMessage) (*RPCWakuMessage, error) { - if input == nil { - return nil, nil - } - - if err := input.Validate(); err != nil { - return nil, err - } - - rpcWakuMsg := &RPCWakuMessage{ - Payload: input.Payload, - ContentTopic: input.ContentTopic, - Version: input.GetVersion(), - Timestamp: input.GetTimestamp(), - Ephemeral: input.GetEphemeral(), - } - - if input.RateLimitProof != nil { - rateLimitProof := &rlnpb.RateLimitProof{} - err := proto.Unmarshal(input.RateLimitProof, rateLimitProof) - if err != nil { - return nil, err - } - - rpcWakuMsg.RateLimitProof = &RateLimitProof{ - Proof: rateLimitProof.Proof, - MerkleRoot: rateLimitProof.MerkleRoot, - Epoch: rateLimitProof.Epoch, - ShareX: rateLimitProof.ShareX, - ShareY: rateLimitProof.ShareY, - Nullifier: rateLimitProof.Nullifier, - RlnIdentifier: rateLimitProof.RlnIdentifier, - } - } - - return rpcWakuMsg, nil -} - -func (r *RPCWakuMessage) toProto() (*pb.WakuMessage, error) { - if r == nil { - return nil, errors.New("wakumessage is missing") - } - - msg := &pb.WakuMessage{ - Payload: r.Payload, - ContentTopic: r.ContentTopic, - Version: proto.Uint32(r.Version), - Timestamp: proto.Int64(r.Timestamp), - Ephemeral: proto.Bool(r.Ephemeral), - } - - if r.RateLimitProof != nil { - rateLimitProof := &rlnpb.RateLimitProof{ - Proof: r.RateLimitProof.Proof, - MerkleRoot: r.RateLimitProof.MerkleRoot, - Epoch: r.RateLimitProof.Epoch, - ShareX: r.RateLimitProof.ShareX, - ShareY: r.RateLimitProof.ShareY, - Nullifier: r.RateLimitProof.Nullifier, - RlnIdentifier: r.RateLimitProof.RlnIdentifier, - } - - b, err := proto.Marshal(rateLimitProof) - if err != nil { - return nil, err - } - - msg.RateLimitProof = b - } - - return msg, nil -} diff --git a/cmd/waku/server/rpc/waku_rpc.go b/cmd/waku/server/rpc/waku_rpc.go deleted file mode 100644 index 920f1aae..00000000 --- a/cmd/waku/server/rpc/waku_rpc.go +++ /dev/null @@ -1,121 +0,0 @@ -package rpc - -import ( - "context" - "fmt" - "net/http" - "net/http/pprof" - "time" - - "github.com/gorilla/mux" - "github.com/gorilla/rpc/v2" - "github.com/waku-org/go-waku/waku/v2/node" - "go.uber.org/zap" -) - -type WakuRPC struct { - node *node.WakuNode - server *http.Server - - log *zap.Logger - - relayService *RelayService - filterService *FilterService - adminService *AdminService -} - -func NewWakuRPC(node *node.WakuNode, address string, port int, enableAdmin bool, enablePProf bool, cacheCapacity int, log *zap.Logger) *WakuRPC { - wrpc := new(WakuRPC) - wrpc.log = log.Named("rpc") - - s := rpc.NewServer() - s.RegisterCodec(NewSnakeCaseCodec(), "application/json") - s.RegisterCodec(NewSnakeCaseCodec(), "application/json;charset=UTF-8") - - mux := mux.NewRouter() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - t := time.Now() - s.ServeHTTP(w, r) - wrpc.log.Info("served request", zap.String("path", r.URL.Path), zap.Duration("duration", time.Since(t))) - }) - - if enablePProf { - mux.PathPrefix("/debug/").Handler(http.DefaultServeMux) - mux.HandleFunc("/debug/pprof/", pprof.Index) - } - - debugService := NewDebugService(node) - err := s.RegisterService(debugService, "Debug") - if err != nil { - wrpc.log.Error("registering debug service", zap.Error(err)) - } - - var relayService *RelayService - if node.Relay() != nil { - relayService = NewRelayService(node, cacheCapacity, log) - err = s.RegisterService(relayService, "Relay") - if err != nil { - wrpc.log.Error("registering relay service", zap.Error(err)) - } - } - - err = s.RegisterService(&StoreService{node, log}, "Store") - if err != nil { - wrpc.log.Error("registering store service", zap.Error(err)) - } - - if enableAdmin { - adminService := &AdminService{node, log.Named("admin")} - err = s.RegisterService(adminService, "Admin") - if err != nil { - wrpc.log.Error("registering admin service", zap.Error(err)) - } - wrpc.adminService = adminService - } - - filterService := NewFilterService(node, cacheCapacity, log) - err = s.RegisterService(filterService, "Filter") - if err != nil { - wrpc.log.Error("registering filter service", zap.Error(err)) - } - - listenAddr := fmt.Sprintf("%s:%d", address, port) - - server := &http.Server{ - Addr: listenAddr, - Handler: mux, - } - - server.RegisterOnShutdown(func() { - filterService.Stop() - - if relayService != nil { - relayService.Stop() - } - }) - - wrpc.node = node - wrpc.server = server - wrpc.relayService = relayService - wrpc.filterService = filterService - - return wrpc -} - -func (r *WakuRPC) Start() { - if r.relayService != nil { - go r.relayService.Start() - } - - go r.filterService.Start() - - go func() { - _ = r.server.ListenAndServe() - }() - r.log.Info("server started", zap.String("addr", r.server.Addr)) -} - -func (r *WakuRPC) Stop(ctx context.Context) error { - r.log.Info("shutting down server") - return r.server.Shutdown(ctx) -} diff --git a/cmd/waku/server/rpc/waku_rpc_test.go b/cmd/waku/server/rpc/waku_rpc_test.go deleted file mode 100644 index db866f49..00000000 --- a/cmd/waku/server/rpc/waku_rpc_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package rpc - -import ( - "testing" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func TestWakuRpc(t *testing.T) { - options := node.WithWakuStore() - n, err := node.New(options) - require.NoError(t, err) - - rpc := NewWakuRPC(n, "127.0.0.1", 8080, true, false, 30, utils.Logger()) - require.NotNil(t, rpc.server) - require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") -} diff --git a/flake.nix b/flake.nix index b944afb6..afc96639 100644 --- a/flake.nix +++ b/flake.nix @@ -29,7 +29,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-PnN+61S9F58A/VeO2M1DW7IJYYUP7xpkZrYYnWoO8lc="; + vendorSha256 = "sha256-D0IwlMmCW32T/bfmJjFu3Mlg7pgW4j8IJGZUQ6fnHJQ="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/go.mod b/go.mod index 699d4571..65a5e820 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/ethereum/go-ethereum v1.10.26 github.com/golang-migrate/migrate/v4 v4.15.2 github.com/golang/protobuf v1.5.3 // indirect - github.com/gorilla/rpc v1.2.0 github.com/ipfs/go-ds-sql v0.3.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/libp2p/go-libp2p v0.32.2 @@ -27,9 +26,8 @@ require ( ) require ( - github.com/gorilla/mux v1.8.0 github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 - golang.org/x/text v0.13.0 + golang.org/x/text v0.13.0 // indirect ) require ( diff --git a/go.sum b/go.sum index 7b802cc2..967d4991 100644 --- a/go.sum +++ b/go.sum @@ -772,10 +772,7 @@ github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/ github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk= -github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=