Add reload/leave http endpoints (#2516)

This commit is contained in:
Kyle Havlovitz 2016-11-30 13:29:42 -05:00 committed by GitHub
parent cbc180d3a4
commit bd69c6d871
11 changed files with 348 additions and 36 deletions

View File

@ -117,6 +117,17 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) {
return out, nil
}
// Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// NodeName is used to get the node name of the agent
func (a *Agent) NodeName() (string, error) {
if a.nodeName != "" {
@ -348,6 +359,17 @@ func (a *Agent) Join(addr string, wan bool) error {
return nil
}
// Leave is used to have the agent gracefully leave the cluster and shutdown
func (a *Agent) Leave() error {
r := a.c.newRequest("PUT", "/v1/agent/leave")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// ForceLeave is used to have the agent eject a failed node
func (a *Agent) ForceLeave(node string) error {
r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node)

View File

@ -1,9 +1,14 @@
package api
import (
"io/ioutil"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
)
func TestAgent_Self(t *testing.T) {
@ -24,6 +29,51 @@ func TestAgent_Self(t *testing.T) {
}
}
func TestAgent_Reload(t *testing.T) {
t.Parallel()
// Create our initial empty config file, to be overwritten later
configFile, err := ioutil.TempFile("", "reload")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, err := configFile.Write([]byte("{}")); err != nil {
t.Fatalf("err: %s", err)
}
configFile.Close()
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.Args = []string{"-config-file", configFile.Name()}
})
defer s.Stop()
agent := c.Agent()
// Update the config file with a service definition
config := `{"service":{"name":"redis", "port":1234}}`
err = ioutil.WriteFile(configFile.Name(), []byte(config), 0644)
if err != nil {
t.Fatalf("err: %v", err)
}
if err = agent.Reload(); err != nil {
t.Fatalf("err: %v", err)
}
services, err := agent.Services()
if err != nil {
t.Fatalf("err: %v", err)
}
service, ok := services["redis"]
if !ok {
t.Fatalf("bad: %v", ok)
}
if service.Port != 1234 {
t.Fatalf("bad: %v", service.Port)
}
}
func TestAgent_Members(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -545,6 +595,39 @@ func TestAgent_Join(t *testing.T) {
}
}
func TestAgent_Leave(t *testing.T) {
t.Parallel()
c1, s1 := makeClient(t)
defer s1.Stop()
c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.Server = false
conf.Bootstrap = false
})
defer s2.Stop()
if err := c2.Agent().Join(s1.LANAddr, false); err != nil {
t.Fatalf("err: %v", err)
}
if err := c2.Agent().Leave(); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the second agent's status is 'Left'
members, err := c1.Agent().Members(false)
if err != nil {
t.Fatalf("err: %v", err)
}
member := members[0]
if member.Name == s1.Config.NodeName {
member = members[1]
}
if member.Status != int(serf.StatusLeft) {
t.Fatalf("bad: %v", *member)
}
}
func TestAgent_ForceLeave(t *testing.T) {
t.Parallel()
c, s := makeClient(t)

View File

@ -109,6 +109,8 @@ type Agent struct {
eventLock sync.RWMutex
eventNotify state.NotifyGroup
reloadCh chan chan error
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
@ -121,7 +123,8 @@ type Agent struct {
// Create is used to create a new Agent. Returns
// the agent or potentially an error.
func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*Agent, error) {
func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter,
reloadCh chan chan error) (*Agent, error) {
// Ensure we have a log sink
if logOutput == nil {
logOutput = os.Stderr
@ -184,6 +187,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
reloadCh: reloadCh,
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
}

View File

@ -39,6 +39,30 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
}, nil
}
func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
errCh := make(chan error, 0)
// Trigger the reload
select {
case <-s.agent.ShutdownCh():
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
case s.agent.reloadCh <- errCh:
}
// Wait for the result of the reload, or for the agent to shutdown
select {
case <-s.agent.ShutdownCh():
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
case err := <-errCh:
return nil, err
}
}
func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
services := s.agent.state.Services()
return services, nil
@ -80,6 +104,18 @@ func (s *HTTPServer) AgentJoin(resp http.ResponseWriter, req *http.Request) (int
}
}
func (s *HTTPServer) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
if err := s.agent.Leave(); err != nil {
return nil, err
}
return nil, s.agent.Shutdown()
}
func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
return nil, s.agent.ForceLeave(addr)

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/cli"
)
func TestHTTPAgentServices(t *testing.T) {
@ -119,6 +120,81 @@ func TestHTTPAgentSelf(t *testing.T) {
}
}
func TestHTTPAgentReload(t *testing.T) {
conf := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
// Write initial config, to be reloaded later
tmpFile, err := ioutil.TempFile(tmpDir, "config")
if err != nil {
t.Fatalf("err: %s", err)
}
_, err = tmpFile.WriteString(`{"service":{"name":"redis"}}`)
if err != nil {
t.Fatalf("err: %s", err)
}
tmpFile.Close()
doneCh := make(chan struct{})
shutdownCh := make(chan struct{})
defer func() {
close(shutdownCh)
<-doneCh
}()
cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
args := []string{
"-server",
"-data-dir", tmpDir,
"-http-port", fmt.Sprintf("%d", conf.Ports.HTTP),
"-config-file", tmpFile.Name(),
}
go func() {
cmd.Run(args)
close(doneCh)
}()
testutil.WaitForResult(func() (bool, error) {
return len(cmd.httpServers) == 1, nil
}, func(err error) {
t.Fatalf("should have an http server")
})
if _, ok := cmd.agent.state.services["redis"]; !ok {
t.Fatalf("missing redis service")
}
err = ioutil.WriteFile(tmpFile.Name(), []byte(`{"service":{"name":"redis-reloaded"}}`), 0644)
if err != nil {
t.Fatalf("err: %v", err)
}
srv := cmd.httpServers[0]
req, err := http.NewRequest("PUT", "/v1/agent/reload", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = srv.AgentReload(nil, req)
if err != nil {
t.Fatalf("Err: %v", err)
}
if _, ok := cmd.agent.state.services["redis-reloaded"]; !ok {
t.Fatalf("missing redis-reloaded service")
}
}
func TestHTTPAgentMembers(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
@ -239,6 +315,49 @@ func TestHTTPAgentJoin_WAN(t *testing.T) {
})
}
func TestHTTPAgentLeave(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) {
c.Server = false
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer srv2.Shutdown()
// Join first
addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan)
_, err := srv.agent.JoinLAN([]string{addr})
if err != nil {
t.Fatalf("err: %v", err)
}
// Graceful leave now
req, err := http.NewRequest("PUT", "/v1/agent/leave", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
obj, err := srv2.AgentLeave(nil, req)
if err != nil {
t.Fatalf("Err: %v", err)
}
if obj != nil {
t.Fatalf("Err: %v", obj)
}
testutil.WaitForResult(func() (bool, error) {
m := srv.agent.LANMembers()
success := m[1].Status == serf.StatusLeft
return success, errors.New(m[1].Status.String())
}, func(err error) {
t.Fatalf("member status is %v, should be left", err)
})
}
func TestHTTPAgentForceLeave(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
@ -1030,7 +1149,6 @@ func TestHTTPAgent_Monitor(t *testing.T) {
logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter)
dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter)
srv.agent.logWriter = logWriter
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()

View File

@ -87,7 +87,7 @@ func makeAgentLog(t *testing.T, conf *Config, l io.Writer, writer *logger.LogWri
}
conf.DataDir = dir
agent, err := Create(conf, l, writer)
agent, err := Create(conf, l, writer, nil)
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))
@ -113,7 +113,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) {
t.Fatalf("err: %s", err)
}
agent, err := Create(conf, nil, nil)
agent, err := Create(conf, nil, nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -846,7 +846,7 @@ func TestAgent_PersistService(t *testing.T) {
agent.Shutdown()
// Should load it back during later start
agent2, err := Create(config, nil, nil)
agent2, err := Create(config, nil, nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -980,7 +980,7 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
}
config.Services = []*ServiceDefinition{svc2}
agent2, err := Create(config, nil, nil)
agent2, err := Create(config, nil, nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1073,7 +1073,7 @@ func TestAgent_PersistCheck(t *testing.T) {
agent.Shutdown()
// Should load it back during later start
agent2, err := Create(config, nil, nil)
agent2, err := Create(config, nil, nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1166,7 +1166,7 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
}
config.Checks = []*CheckDefinition{check2}
agent2, err := Create(config, nil, nil)
agent2, err := Create(config, nil, nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-checkpoint"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/logutils"
scada "github.com/hashicorp/scada-client/scada"
"github.com/mitchellh/cli"
@ -50,6 +51,7 @@ type Command struct {
HumanVersion string
Ui cli.Ui
ShutdownCh <-chan struct{}
configReloadCh chan chan error
args []string
logFilter *logutils.LevelFilter
logOutput io.Writer
@ -466,7 +468,7 @@ func (c *Config) discoverEc2Hosts(logger *log.Logger) ([]string, error) {
// setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error {
c.Ui.Output("Starting Consul agent...")
agent, err := Create(config, logOutput, logWriter)
agent, err := Create(config, logOutput, logWriter, c.configReloadCh)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
return err
@ -747,6 +749,9 @@ func (c *Command) Run(args []string) int {
c.logFilter = logFilter
c.logOutput = logOutput
// Setup the channel for triggering config reloads
c.configReloadCh = make(chan chan error)
/* Setup telemetry
Aggregate on 10 second intervals for 1 minute. Expose the
metrics over stderr when there is a SIGUSR1 received.
@ -947,11 +952,15 @@ func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}, retry
// Wait for a signal
WAIT:
var sig os.Signal
var reloadErrCh chan error
select {
case s := <-signalCh:
sig = s
case <-c.rpcServer.ReloadCh():
sig = syscall.SIGHUP
case ch := <-c.configReloadCh:
sig = syscall.SIGHUP
reloadErrCh = ch
case <-c.ShutdownCh:
sig = os.Interrupt
case <-retryJoin:
@ -966,9 +975,17 @@ WAIT:
// Check if this is a SIGHUP
if sig == syscall.SIGHUP {
if conf := c.handleReload(config); conf != nil {
conf, err := c.handleReload(config)
if conf != nil {
config = conf
}
if err != nil {
c.Ui.Error(err.Error())
}
// Send result back if reload was called via HTTP
if reloadErrCh != nil {
reloadErrCh <- err
}
goto WAIT
}
@ -1008,12 +1025,13 @@ WAIT:
}
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
func (c *Command) handleReload(config *Config) *Config {
func (c *Command) handleReload(config *Config) (*Config, error) {
c.Ui.Output("Reloading configuration...")
var errs error
newConf := c.readConfig()
if newConf == nil {
c.Ui.Error(fmt.Sprintf("Failed to reload configs"))
return config
errs = multierror.Append(errs, fmt.Errorf("Failed to reload configs"))
return config, errs
}
// Change the log level
@ -1021,7 +1039,7 @@ func (c *Command) handleReload(config *Config) *Config {
if logger.ValidateLevelFilter(minLevel, c.logFilter) {
c.logFilter.SetMinLevel(minLevel)
} else {
c.Ui.Error(fmt.Sprintf(
errs = multierror.Append(fmt.Errorf(
"Invalid log level: %s. Valid log levels are: %v",
minLevel, c.logFilter.Levels))
@ -1040,28 +1058,28 @@ func (c *Command) handleReload(config *Config) *Config {
// First unload all checks and services. This lets us begin the reload
// with a clean slate.
if err := c.agent.unloadServices(); err != nil {
c.Ui.Error(fmt.Sprintf("Failed unloading services: %s", err))
return nil
errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err))
return nil, errs
}
if err := c.agent.unloadChecks(); err != nil {
c.Ui.Error(fmt.Sprintf("Failed unloading checks: %s", err))
return nil
errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err))
return nil, errs
}
// Reload services and check definitions.
if err := c.agent.loadServices(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading services: %s", err))
return nil
errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err))
return nil, errs
}
if err := c.agent.loadChecks(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading checks: %s", err))
return nil
errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err))
return nil, errs
}
// Get the new client listener addr
httpAddr, err := newConf.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
errs = multierror.Append(errs, fmt.Errorf("Failed to determine HTTP address: %v", err))
}
// Deregister the old watches
@ -1075,7 +1093,7 @@ func (c *Command) handleReload(config *Config) *Config {
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr.String()); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
errs = multierror.Append(errs, fmt.Errorf("Error running watch: %v", err))
}
}(wp)
}
@ -1085,12 +1103,12 @@ func (c *Command) handleReload(config *Config) *Config {
newConf.AtlasToken != config.AtlasToken ||
newConf.AtlasEndpoint != config.AtlasEndpoint {
if err := c.setupScadaConn(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err))
return nil
errs = multierror.Append(errs, fmt.Errorf("Failed reloading SCADA client: %s", err))
return nil, errs
}
}
return newConf
return newConf, errs
}
// startScadaClient is used to start a new SCADA provider and listener,

View File

@ -251,11 +251,13 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
}
s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf))
s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
s.handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload))
s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor))
s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin))
s.handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave))
s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave))
s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck))
s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck))

View File

@ -74,7 +74,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper {
}
conf.DataDir = dir
a, err := agent.Create(conf, lw, nil)
a, err := agent.Create(conf, lw, nil, nil)
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))

View File

@ -72,6 +72,7 @@ type TestServerConfig struct {
ACLDefaultPolicy string `json:"acl_default_policy,omitempty"`
Encrypt string `json:"encrypt,omitempty"`
Stdout, Stderr io.Writer `json:"-"`
Args []string `json:"-"`
}
// ServerConfigCallback is a function interface which can be
@ -201,7 +202,9 @@ func NewTestServerConfig(t TestingT, cb ServerConfigCallback) *TestServer {
}
// Start the server
cmd := exec.Command("consul", "agent", "-config-file", configFile.Name())
args := []string{"agent", "-config-file", configFile.Name()}
args = append(args, consulConfig.Args...)
cmd := exec.Command("consul", args...)
cmd.Stdout = stdout
cmd.Stderr = stderr
if err := cmd.Start(); err != nil {

View File

@ -20,9 +20,11 @@ The following endpoints are supported:
* [`/v1/agent/services`](#agent_services) : Returns the services the local agent is managing
* [`/v1/agent/members`](#agent_members) : Returns the members as seen by the local serf agent
* [`/v1/agent/self`](#agent_self) : Returns the local node configuration
* [`/v1/agent/reload`](#agent_reload) : Causes the local agent to reload its configuration
* [`/v1/agent/maintenance`](#agent_maintenance) : Manages node maintenance mode
* [`/v1/agent/monitor`](#agent_monitor) : Streams logs from the local agent
* [`/v1/agent/join/<address>`](#agent_join) : Triggers the local agent to join a node
* [`/v1/agent/leave`](#agent_leave): Triggers the local agent to gracefully shutdown and leave the cluster
* [`/v1/agent/force-leave/<node>`](#agent_force_leave): Forces removal of a node
* [`/v1/agent/check/register`](#agent_check_register) : Registers a new local check
* [`/v1/agent/check/deregister/<checkID>`](#agent_check_deregister) : Deregisters a local check
@ -196,6 +198,18 @@ It returns a JSON body like this:
}
```
### <a name="agent_reload"></a> /v1/agent/reload
Added in Consul 0.7.2, this endpoint is hit with a `PUT` and is used to instruct
the agent to reload its configuration. Any errors encountered during this process
will be returned.
Not all configuration options are reloadable. See the
[Reloadable Configuration](/docs/agent/options.html#reloadable-configuration)
section on the agent options page for details on which options are supported.
The return code is 200 on success.
### <a name="agent_maintenance"></a> /v1/agent/maintenance
The node maintenance endpoint can place the agent into "maintenance mode".
@ -231,14 +245,26 @@ query parameter causes the agent to attempt to join using the WAN pool.
The return code is 200 on success.
### <a name="agent_leave"></a> /v1/agent/leave
Added in Consul 0.7.2, this endpoint is hit with a `PUT` and is used to trigger a
graceful leave and shutdown of the agent. It is used to ensure other nodes see the
agent as "left" instead of "failed". Nodes that leave will not attempt to re-join
the cluster on restarting with a snapshot.
For nodes in server mode, the node is removed from the Raft peer set in a graceful
manner. This is critical, as in certain situations a non-graceful leave can affect
cluster availability.
The return code is 200 on success.
### <a name="agent_force_leave"></a> /v1/agent/force-leave/\<node\>
This endpoint is hit with a `GET` and is used to instruct the agent to
force a node into the `left` state. If a node fails unexpectedly, then
it will be in a `failed` state. Once in the `failed` state, Consul will
attempt to reconnect, and the services and checks belonging to that node
will not be cleaned up. Forcing a node into the `left` state allows its
old entries to be removed.
This endpoint is hit with a `PUT` and is used to instruct the agent to force a node
into the `left` state. If a node fails unexpectedly, then it will be in a `failed`
state. Once in the `failed` state, Consul will attempt to reconnect, and the
services and checks belonging to that node will not be cleaned up. Forcing a node
into the `left` state allows its old entries to be removed.
The endpoint always returns 200.