From 469245a06da1ff16a78d7780f2221d62f33477de Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 23 Dec 2013 11:38:51 -0800 Subject: [PATCH] Working on Agent HTTP interface --- command/agent/command.go | 47 +++++++++++++----- command/agent/config.go | 7 +++ command/agent/http.go | 83 ++++++++++++++++++++++++++++++++ command/agent/http_api.md | 38 +++++++++++++++ command/agent/status_endpoint.go | 21 ++++++++ 5 files changed, 185 insertions(+), 11 deletions(-) create mode 100644 command/agent/http.go create mode 100644 command/agent/http_api.md create mode 100644 command/agent/status_endpoint.go diff --git a/command/agent/command.go b/command/agent/command.go index c7ae3688f8..46ef8bd6a6 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -25,6 +25,8 @@ type Command struct { ShutdownCh <-chan struct{} args []string logFilter *logutils.LevelFilter + agent *Agent + httpServer *HTTPServer } // readConfig is responsible for setup of our configuration using @@ -109,6 +111,28 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri return logGate, logWriter, logOutput } +// setupAgent is used to start the agent and various interfaces +func (c *Command) setupAgent(config *Config, logOutput io.Writer) error { + c.Ui.Output("Starting Consul agent...") + agent, err := Create(config, logOutput) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) + return err + } + c.agent = agent + + if config.HTTPAddr != "" { + server, err := NewServer(agent, logOutput, config.HTTPAddr) + if err != nil { + agent.Shutdown() + c.Ui.Error(fmt.Sprintf("Error starting http server: %s", err)) + return err + } + c.httpServer = server + } + return nil +} + func (c *Command) Run(args []string) int { c.Ui = &cli.PrefixedUi{ OutputPrefix: "==> ", @@ -132,17 +156,18 @@ func (c *Command) Run(args []string) int { } // Create the agent - c.Ui.Output("Starting Consul agent...") - agent, err := Create(config, logOutput) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) + if err := c.setupAgent(config, logOutput); err != nil { return 1 } - defer agent.Shutdown() + defer c.agent.Shutdown() + if c.httpServer != nil { + defer c.httpServer.Shutdown() + } c.Ui.Output("Consul agent running!") c.Ui.Info(fmt.Sprintf("Node name: '%s'", config.NodeName)) c.Ui.Info(fmt.Sprintf(" RPC addr: '%s'", config.RPCAddr)) + c.Ui.Info(fmt.Sprintf("HTTP addr: '%s'", config.HTTPAddr)) c.Ui.Info(fmt.Sprintf("Encrypted: %#v", config.EncryptKey != "")) c.Ui.Info(fmt.Sprintf(" Server: %v", config.Server)) @@ -152,11 +177,11 @@ func (c *Command) Run(args []string) int { logGate.Flush() // Wait for exit - return c.handleSignals(config, agent) + return c.handleSignals(config) } // handleSignals blocks until we get an exit-causing signal -func (c *Command) handleSignals(config *Config, agent *Agent) int { +func (c *Command) handleSignals(config *Config) int { signalCh := make(chan os.Signal, 4) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) @@ -168,7 +193,7 @@ WAIT: sig = s case <-c.ShutdownCh: sig = os.Interrupt - case <-agent.ShutdownCh(): + case <-c.agent.ShutdownCh(): // Agent is already shutdown! return 0 } @@ -176,7 +201,7 @@ WAIT: // Check if this is a SIGHUP if sig == syscall.SIGHUP { - config = c.handleReload(config, agent) + config = c.handleReload(config) goto WAIT } @@ -197,7 +222,7 @@ WAIT: gracefulCh := make(chan struct{}) c.Ui.Output("Gracefully shutting down agent...") go func() { - if err := agent.Leave(); err != nil { + if err := c.agent.Leave(); err != nil { c.Ui.Error(fmt.Sprintf("Error: %s", err)) return } @@ -216,7 +241,7 @@ WAIT: } // handleReload is invoked when we should reload our configs, e.g. SIGHUP -func (c *Command) handleReload(config *Config, agent *Agent) *Config { +func (c *Command) handleReload(config *Config) *Config { c.Ui.Output("Reloading configuration...") // TODO return config diff --git a/command/agent/config.go b/command/agent/config.go index 4e044501f3..b41ea2507e 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -28,6 +28,9 @@ type Config struct { // Encryption key to use for the Serf communication EncryptKey string + // HTTP interface address + HTTPAddr string + // LogLevel is the level of the logs to putout LogLevel string @@ -74,6 +77,7 @@ type Config struct { // DefaultConfig is used to return a sane default configuration func DefaultConfig() *Config { return &Config{ + HTTPAddr: "127.0.0.1:8500", LogLevel: "INFO", RPCAddr: "127.0.0.1:8400", Server: false, @@ -127,6 +131,9 @@ func MergeConfig(a, b *Config) *Config { if b.EncryptKey != "" { result.EncryptKey = b.EncryptKey } + if b.HTTPAddr != "" { + result.HTTPAddr = b.HTTPAddr + } if b.LogLevel != "" { result.LogLevel = b.LogLevel } diff --git a/command/agent/http.go b/command/agent/http.go new file mode 100644 index 0000000000..aba0892fab --- /dev/null +++ b/command/agent/http.go @@ -0,0 +1,83 @@ +package agent + +import ( + "bytes" + "encoding/json" + "io" + "log" + "net" + "net/http" +) + +// HTTPServer is used to wrap an Agent and expose various API's +// in a RESTful manner +type HTTPServer struct { + agent *Agent + mux *http.ServeMux + listener net.Listener + logger *log.Logger +} + +// NewServer starts a new HTTP server to provide an interface to +// the agent. +func NewServer(agent *Agent, logOutput io.Writer, bind string) (*HTTPServer, error) { + // Create the mux + mux := http.NewServeMux() + + // Create listener + list, err := net.Listen("tcp", bind) + if err != nil { + return nil, err + } + + // Create the server + srv := &HTTPServer{ + agent: agent, + mux: mux, + listener: list, + logger: log.New(logOutput, "", log.LstdFlags), + } + srv.registerHandlers() + + // Start the server + go http.Serve(list, mux) + return srv, nil +} + +// Shutdown is used to shutdown the HTTP server +func (s *HTTPServer) Shutdown() { + s.listener.Close() +} + +// registerHandlers is used to attach our handlers to the mux +func (s *HTTPServer) registerHandlers() { + s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeader)) + s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeers)) +} + +// wrap is used to wrap functions to make them more convenient +func (s *HTTPServer) wrap(handler func(req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { + f := func(resp http.ResponseWriter, req *http.Request) { + // Invoke the handler + s.logger.Printf("[DEBUG] Request %v", req) + obj, err := handler(req) + + // Check for an error + HAS_ERR: + if err != nil { + s.logger.Printf("[ERR] Request %v, error: %v", req, err) + resp.WriteHeader(500) + resp.Write([]byte(err.Error())) + return + } + + // Write out the JSON object + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + if err = enc.Encode(obj); err != nil { + goto HAS_ERR + } + resp.Write(buf.Bytes()) + } + return f +} diff --git a/command/agent/http_api.md b/command/agent/http_api.md new file mode 100644 index 0000000000..cdb4809e31 --- /dev/null +++ b/command/agent/http_api.md @@ -0,0 +1,38 @@ +# Agent HTTP API + +The Consul agent is capable of running an HTTP server that +exposes various API's in a RESTful manner. These API's can +be used to both query the service catalog, as well as to +register new services. + +The URLs are also versioned to allow for changes in the API. +The current URLs supported are: + +* /v1/catalog/register : Registers a new service +* /v1/catalog/deregister : Deregisters a service or node +* /v1/catalog/datacenters : Lists known datacenters +* /v1/catalog/nodes : Lists nodes in a given DC +* /v1/catalog/services : Lists services in a given DC +* /v1/catalog/service// : Lists the nodes in a given service +* /v1/catalog/node// : Lists the services provided by a node + +* Health system (future): +* /v1/health/register : Registers a new health check +* /v1/health/deregister : Deregisters a health check +* /v1/health/pass: Pass a health check +* /v1/health/warn: Warn on a health check +* /v1/health/fail: Fail a health check +* /v1/health/node/: Returns the health info of a node +* /v1/health/service/: Returns the health info of a service +* /v1/health/query/: Returns the checks in a given state + +* /v1/status/leader : Returns the current Raft leader +* /v1/status/peers : Returns the current Raft peer set + +* /v1/agent/services : Returns the services local agent is attempting to register +* /v1/agent/health: Returns the health info from the local agent (future) +* /v1/agent/members : Returns the members as seen by the local serf agent +* /v1/agent/join : Instructs the local agent to join a node +* /v1/agent/members-wan: Returns the consul servers as seen by the wan serf (must be a server) +* /v1/agent/join-wan : Instructs the local agent to join a remote consul server (must be a server) + diff --git a/command/agent/status_endpoint.go b/command/agent/status_endpoint.go new file mode 100644 index 0000000000..6a98eeb869 --- /dev/null +++ b/command/agent/status_endpoint.go @@ -0,0 +1,21 @@ +package agent + +import ( + "net/http" +) + +func (s *HTTPServer) StatusLeader(req *http.Request) (interface{}, error) { + var out string + if err := s.agent.RPC("Status.Leader", struct{}{}, &out); err != nil { + return nil, err + } + return out, nil +} + +func (s *HTTPServer) StatusPeers(req *http.Request) (interface{}, error) { + var out []string + if err := s.agent.RPC("Status.Peers", struct{}{}, &out); err != nil { + return nil, err + } + return out, nil +}