Working on Agent HTTP interface

This commit is contained in:
Armon Dadgar 2013-12-23 11:38:51 -08:00
parent 8caf0034db
commit 469245a06d
5 changed files with 185 additions and 11 deletions

View File

@ -25,6 +25,8 @@ type Command struct {
ShutdownCh <-chan struct{} ShutdownCh <-chan struct{}
args []string args []string
logFilter *logutils.LevelFilter logFilter *logutils.LevelFilter
agent *Agent
httpServer *HTTPServer
} }
// readConfig is responsible for setup of our configuration using // readConfig is responsible for setup of our configuration using
@ -109,6 +111,28 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri
return logGate, logWriter, logOutput return logGate, logWriter, logOutput
} }
// setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logOutput io.Writer) error {
c.Ui.Output("Starting Consul agent...")
agent, err := Create(config, logOutput)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
return err
}
c.agent = agent
if config.HTTPAddr != "" {
server, err := NewServer(agent, logOutput, config.HTTPAddr)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting http server: %s", err))
return err
}
c.httpServer = server
}
return nil
}
func (c *Command) Run(args []string) int { func (c *Command) Run(args []string) int {
c.Ui = &cli.PrefixedUi{ c.Ui = &cli.PrefixedUi{
OutputPrefix: "==> ", OutputPrefix: "==> ",
@ -132,17 +156,18 @@ func (c *Command) Run(args []string) int {
} }
// Create the agent // Create the agent
c.Ui.Output("Starting Consul agent...") if err := c.setupAgent(config, logOutput); err != nil {
agent, err := Create(config, logOutput)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
return 1 return 1
} }
defer agent.Shutdown() defer c.agent.Shutdown()
if c.httpServer != nil {
defer c.httpServer.Shutdown()
}
c.Ui.Output("Consul agent running!") c.Ui.Output("Consul agent running!")
c.Ui.Info(fmt.Sprintf("Node name: '%s'", config.NodeName)) c.Ui.Info(fmt.Sprintf("Node name: '%s'", config.NodeName))
c.Ui.Info(fmt.Sprintf(" RPC addr: '%s'", config.RPCAddr)) c.Ui.Info(fmt.Sprintf(" RPC addr: '%s'", config.RPCAddr))
c.Ui.Info(fmt.Sprintf("HTTP addr: '%s'", config.HTTPAddr))
c.Ui.Info(fmt.Sprintf("Encrypted: %#v", config.EncryptKey != "")) c.Ui.Info(fmt.Sprintf("Encrypted: %#v", config.EncryptKey != ""))
c.Ui.Info(fmt.Sprintf(" Server: %v", config.Server)) c.Ui.Info(fmt.Sprintf(" Server: %v", config.Server))
@ -152,11 +177,11 @@ func (c *Command) Run(args []string) int {
logGate.Flush() logGate.Flush()
// Wait for exit // Wait for exit
return c.handleSignals(config, agent) return c.handleSignals(config)
} }
// handleSignals blocks until we get an exit-causing signal // handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config, agent *Agent) int { func (c *Command) handleSignals(config *Config) int {
signalCh := make(chan os.Signal, 4) signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
@ -168,7 +193,7 @@ WAIT:
sig = s sig = s
case <-c.ShutdownCh: case <-c.ShutdownCh:
sig = os.Interrupt sig = os.Interrupt
case <-agent.ShutdownCh(): case <-c.agent.ShutdownCh():
// Agent is already shutdown! // Agent is already shutdown!
return 0 return 0
} }
@ -176,7 +201,7 @@ WAIT:
// Check if this is a SIGHUP // Check if this is a SIGHUP
if sig == syscall.SIGHUP { if sig == syscall.SIGHUP {
config = c.handleReload(config, agent) config = c.handleReload(config)
goto WAIT goto WAIT
} }
@ -197,7 +222,7 @@ WAIT:
gracefulCh := make(chan struct{}) gracefulCh := make(chan struct{})
c.Ui.Output("Gracefully shutting down agent...") c.Ui.Output("Gracefully shutting down agent...")
go func() { go func() {
if err := agent.Leave(); err != nil { if err := c.agent.Leave(); err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err)) c.Ui.Error(fmt.Sprintf("Error: %s", err))
return return
} }
@ -216,7 +241,7 @@ WAIT:
} }
// handleReload is invoked when we should reload our configs, e.g. SIGHUP // handleReload is invoked when we should reload our configs, e.g. SIGHUP
func (c *Command) handleReload(config *Config, agent *Agent) *Config { func (c *Command) handleReload(config *Config) *Config {
c.Ui.Output("Reloading configuration...") c.Ui.Output("Reloading configuration...")
// TODO // TODO
return config return config

View File

@ -28,6 +28,9 @@ type Config struct {
// Encryption key to use for the Serf communication // Encryption key to use for the Serf communication
EncryptKey string EncryptKey string
// HTTP interface address
HTTPAddr string
// LogLevel is the level of the logs to putout // LogLevel is the level of the logs to putout
LogLevel string LogLevel string
@ -74,6 +77,7 @@ type Config struct {
// DefaultConfig is used to return a sane default configuration // DefaultConfig is used to return a sane default configuration
func DefaultConfig() *Config { func DefaultConfig() *Config {
return &Config{ return &Config{
HTTPAddr: "127.0.0.1:8500",
LogLevel: "INFO", LogLevel: "INFO",
RPCAddr: "127.0.0.1:8400", RPCAddr: "127.0.0.1:8400",
Server: false, Server: false,
@ -127,6 +131,9 @@ func MergeConfig(a, b *Config) *Config {
if b.EncryptKey != "" { if b.EncryptKey != "" {
result.EncryptKey = b.EncryptKey result.EncryptKey = b.EncryptKey
} }
if b.HTTPAddr != "" {
result.HTTPAddr = b.HTTPAddr
}
if b.LogLevel != "" { if b.LogLevel != "" {
result.LogLevel = b.LogLevel result.LogLevel = b.LogLevel
} }

83
command/agent/http.go Normal file
View File

@ -0,0 +1,83 @@
package agent
import (
"bytes"
"encoding/json"
"io"
"log"
"net"
"net/http"
)
// HTTPServer is used to wrap an Agent and expose various API's
// in a RESTful manner
type HTTPServer struct {
agent *Agent
mux *http.ServeMux
listener net.Listener
logger *log.Logger
}
// NewServer starts a new HTTP server to provide an interface to
// the agent.
func NewServer(agent *Agent, logOutput io.Writer, bind string) (*HTTPServer, error) {
// Create the mux
mux := http.NewServeMux()
// Create listener
list, err := net.Listen("tcp", bind)
if err != nil {
return nil, err
}
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: list,
logger: log.New(logOutput, "", log.LstdFlags),
}
srv.registerHandlers()
// Start the server
go http.Serve(list, mux)
return srv, nil
}
// Shutdown is used to shutdown the HTTP server
func (s *HTTPServer) Shutdown() {
s.listener.Close()
}
// registerHandlers is used to attach our handlers to the mux
func (s *HTTPServer) registerHandlers() {
s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeader))
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeers))
}
// wrap is used to wrap functions to make them more convenient
func (s *HTTPServer) wrap(handler func(req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) {
f := func(resp http.ResponseWriter, req *http.Request) {
// Invoke the handler
s.logger.Printf("[DEBUG] Request %v", req)
obj, err := handler(req)
// Check for an error
HAS_ERR:
if err != nil {
s.logger.Printf("[ERR] Request %v, error: %v", req, err)
resp.WriteHeader(500)
resp.Write([]byte(err.Error()))
return
}
// Write out the JSON object
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
if err = enc.Encode(obj); err != nil {
goto HAS_ERR
}
resp.Write(buf.Bytes())
}
return f
}

38
command/agent/http_api.md Normal file
View File

@ -0,0 +1,38 @@
# Agent HTTP API
The Consul agent is capable of running an HTTP server that
exposes various API's in a RESTful manner. These API's can
be used to both query the service catalog, as well as to
register new services.
The URLs are also versioned to allow for changes in the API.
The current URLs supported are:
* /v1/catalog/register : Registers a new service
* /v1/catalog/deregister : Deregisters a service or node
* /v1/catalog/datacenters : Lists known datacenters
* /v1/catalog/nodes : Lists nodes in a given DC
* /v1/catalog/services : Lists services in a given DC
* /v1/catalog/service/<service>/ : Lists the nodes in a given service
* /v1/catalog/node/<node>/ : Lists the services provided by a node
* Health system (future):
* /v1/health/register : Registers a new health check
* /v1/health/deregister : Deregisters a health check
* /v1/health/pass: Pass a health check
* /v1/health/warn: Warn on a health check
* /v1/health/fail: Fail a health check
* /v1/health/node/<node>: Returns the health info of a node
* /v1/health/service/<service>: Returns the health info of a service
* /v1/health/query/<state>: Returns the checks in a given state
* /v1/status/leader : Returns the current Raft leader
* /v1/status/peers : Returns the current Raft peer set
* /v1/agent/services : Returns the services local agent is attempting to register
* /v1/agent/health: Returns the health info from the local agent (future)
* /v1/agent/members : Returns the members as seen by the local serf agent
* /v1/agent/join : Instructs the local agent to join a node
* /v1/agent/members-wan: Returns the consul servers as seen by the wan serf (must be a server)
* /v1/agent/join-wan : Instructs the local agent to join a remote consul server (must be a server)

View File

@ -0,0 +1,21 @@
package agent
import (
"net/http"
)
func (s *HTTPServer) StatusLeader(req *http.Request) (interface{}, error) {
var out string
if err := s.agent.RPC("Status.Leader", struct{}{}, &out); err != nil {
return nil, err
}
return out, nil
}
func (s *HTTPServer) StatusPeers(req *http.Request) (interface{}, error) {
var out []string
if err := s.agent.RPC("Status.Peers", struct{}{}, &out); err != nil {
return nil, err
}
return out, nil
}