From 06a86f45b0c898915edc4073959b9c502dcb84b3 Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Tue, 2 Nov 2021 10:54:34 +0100 Subject: [PATCH] feat: Add first endpoint for rpc server --- go.mod | 1 + go.sum | 2 + waku/node.go | 12 ++++++ waku/options.go | 8 ++++ waku/v2/rpc/codec.go | 82 +++++++++++++++++++++++++++++++++++++++++ waku/v2/rpc/debug.go | 23 ++++++++++++ waku/v2/rpc/waku_rpc.go | 50 +++++++++++++++++++++++++ 7 files changed, 178 insertions(+) create mode 100644 waku/v2/rpc/codec.go create mode 100644 waku/v2/rpc/debug.go create mode 100644 waku/v2/rpc/waku_rpc.go diff --git a/go.mod b/go.mod index e110b3a5..3cdef61d 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/ethereum/go-ethereum v1.10.4 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 + github.com/gorilla/rpc v1.2.0 github.com/ipfs/go-ds-sql v0.2.0 github.com/ipfs/go-log v1.0.5 github.com/jessevdk/go-flags v1.4.0 diff --git a/go.sum b/go.sum index f9757092..1f9bfa06 100644 --- a/go.sum +++ b/go.sum @@ -351,6 +351,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk= +github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= diff --git a/waku/node.go b/waku/node.go index d73f4212..03e7ae41 100644 --- a/waku/node.go +++ b/waku/node.go @@ -39,6 +39,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/lightpush" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" + "github.com/status-im/go-waku/waku/v2/rpc" ) var log = logging.Logger("wakunode") @@ -225,6 +226,12 @@ func Execute(options Options) { } } + var rpcServer *rpc.WakuRpc + if options.RPCServer.Enable { + rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port) + go rpcServer.Start() + } + // Wait for a SIGINT or SIGTERM signal ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) @@ -234,6 +241,11 @@ func Execute(options Options) { // shut the node down wakuNode.Stop() + if options.RPCServer.Enable { + err := rpcServer.Stop(ctx) + failOnErr(err, "RPCClose") + } + if options.Metrics.Enable { err = metricsServer.Stop(ctx) failOnErr(err, "MetricsClose") diff --git a/waku/options.go b/waku/options.go index 04f92a49..8abb1609 100644 --- a/waku/options.go +++ b/waku/options.go @@ -58,6 +58,13 @@ type MetricsOptions struct { Port int `long:"metrics-port" description:"Listening HTTP port of the metrics server" default:"8008"` } +// RPCServerOptions are settings used to start a json rpc server +type RPCServerOptions struct { + Enable bool `long:"rpc" description:"Enable the rpc server"` + Port int `long:"rpc-port" description:"Listening port of the rpc server" default:"8009"` + Address string `long:"rpc-address" description:"Listening address of the rpc server" default:"127.0.0.1"` +} + // Options contains all the available features and settings that can be // configured via flags when executing go-waku as a service. type Options struct { @@ -86,4 +93,5 @@ type Options struct { RendezvousServer RendezvousServerOptions `group:"Rendezvous Server Options"` DNSDiscovery DNSDiscoveryOptions `group:"DNS Discovery Options"` Metrics MetricsOptions `group:"Metrics Options"` + RPCServer RPCServerOptions `group:"RPC Server Options"` } diff --git a/waku/v2/rpc/codec.go b/waku/v2/rpc/codec.go new file mode 100644 index 00000000..87d9d3a3 --- /dev/null +++ b/waku/v2/rpc/codec.go @@ -0,0 +1,82 @@ +package rpc + +import ( + "net/http" + "strings" + + "github.com/gorilla/rpc/v2" + "github.com/gorilla/rpc/v2/json" +) + +// SnakeCaseCodec creates a CodecRequest to process each request. +type SnakeCaseCodec struct { +} + +// NewSnakeCaseCodec returns a new SnakeCaseCodec. +func NewSnakeCaseCodec() *SnakeCaseCodec { + return &SnakeCaseCodec{} +} + +// NewRequest returns a new CodecRequest of type SnakeCaseCodecRequest. +func (c *SnakeCaseCodec) NewRequest(r *http.Request) rpc.CodecRequest { + outerCR := &SnakeCaseCodecRequest{} // Our custom CR + jsonC := json.NewCodec() // json Codec to create json CR + innerCR := jsonC.NewRequest(r) // create the json CR, sort of. + + // NOTE - innerCR is of the interface type rpc.CodecRequest. + // Because innerCR is of the rpc.CR interface type, we need a + // type assertion in order to assign it to our struct field's type. + // We defined the source of the interface implementation here, so + // we can be confident that innerCR will be of the correct underlying type + outerCR.CodecRequest = innerCR.(*json.CodecRequest) + return outerCR +} + +// SnakeCaseCodecRequest decodes and encodes a single request. SnakeCaseCodecRequest +// implements gorilla/rpc.CodecRequest interface primarily by embedding +// the CodecRequest from gorilla/rpc/json. By selectively adding +// CodecRequest methods to SnakeCaseCodecRequest, we can modify that behaviour +// while maintaining all the other remaining CodecRequest methods from +// gorilla's rpc/json implementation +type SnakeCaseCodecRequest struct { + *json.CodecRequest +} + +// Method returns the decoded method as a string of the form "Service.Method" +// after checking for, and correcting a lowercase method name +// By being of lower depth in the struct , Method will replace the implementation +// of Method() on the embedded CodecRequest. Because the request data is part +// of the embedded json.CodecRequest, and unexported, we have to get the +// requested method name via the embedded CR's own method Method(). +// Essentially, this just intercepts the return value from the embedded +// gorilla/rpc/json.CodecRequest.Method(), checks/modifies it, and passes it +// on to the calling rpc server. +func (c *SnakeCaseCodecRequest) Method() (string, error) { + m, err := c.CodecRequest.Method() + return snakeCaseToCamelCase(m), err +} + +func snakeCaseToCamelCase(inputUnderScoreStr string) (camelCase string) { + isToUpper := false + for k, v := range inputUnderScoreStr { + if k == 0 { + camelCase = strings.ToUpper(string(inputUnderScoreStr[0])) + } else { + if isToUpper { + camelCase += strings.ToUpper(string(v)) + isToUpper = false + } else { + if v == '_' { + isToUpper = true + } else if v == '.' { + isToUpper = true + camelCase += string(v) + } else { + camelCase += string(v) + } + } + } + } + return + +} diff --git a/waku/v2/rpc/debug.go b/waku/v2/rpc/debug.go new file mode 100644 index 00000000..4b9beca5 --- /dev/null +++ b/waku/v2/rpc/debug.go @@ -0,0 +1,23 @@ +package rpc + +import ( + "net/http" + + "github.com/status-im/go-waku/waku/v2/node" +) + +type DebugService struct { + node *node.WakuNode +} + +type InfoArgs struct { +} + +type InfoReply struct { + Version string `json:"version,omitempty"` +} + +func (d *DebugService) GetV1Info(r *http.Request, args *InfoArgs, reply *InfoReply) error { + reply.Version = "2.0" + return nil +} diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go new file mode 100644 index 00000000..318c677b --- /dev/null +++ b/waku/v2/rpc/waku_rpc.go @@ -0,0 +1,50 @@ +package rpc + +import ( + "context" + "fmt" + "net/http" + + "github.com/gorilla/rpc/v2" + logging "github.com/ipfs/go-log" + "github.com/status-im/go-waku/waku/v2/node" +) + +var log = logging.Logger("wakurpc") + +type WakuRpc struct { + node *node.WakuNode + server *http.Server +} + +func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { + s := rpc.NewServer() + s.RegisterCodec(NewSnakeCaseCodec(), "application/json") + s.RegisterCodec(NewSnakeCaseCodec(), "application/json;charset=UTF-8") + + err := s.RegisterService(&DebugService{node}, "WakuV2Debug") + if err != nil { + log.Error(err) + } + + mux := http.NewServeMux() + mux.HandleFunc("/jsonrpc", s.ServeHTTP) + + listenAddr := fmt.Sprintf("%s:%d", address, port) + + server := &http.Server{ + Addr: listenAddr, + Handler: mux, + } + + return &WakuRpc{node: node, server: server} +} + +func (r *WakuRpc) Start() { + log.Info("Rpc server started at ", r.server.Addr) + log.Info("server stopped ", r.server.ListenAndServe()) +} + +func (r *WakuRpc) Stop(ctx context.Context) error { + return r.server.Shutdown(ctx) +}