Fixes watch tracking during reloads and fixes address issue. (#3189)

This patch fixes watch registration through the config file and a broken log line when the watch registration fails. It also plumbs all the watch loading through a common function and tweaks the
unit test to create the watch before the reload.
This commit is contained in:
James Phillips 2017-06-24 12:52:41 -07:00 committed by GitHub
parent 75cf566311
commit 6977e40077
5 changed files with 63 additions and 52 deletions

View File

@ -29,7 +29,6 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/watch"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-sockaddr/template"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
@ -174,6 +173,10 @@ type Agent struct {
// wgServers is the wait group for all HTTP and DNS servers
wgServers sync.WaitGroup
// watchPlans tracks all the currently-running watch plans for the
// agent.
watchPlans []*watch.Plan
}
func New(c *Config) (*Agent, error) {
@ -317,7 +320,7 @@ func (a *Agent) Start() error {
}
// register watches
if err := a.registerWatches(); err != nil {
if err := a.reloadWatches(a.config); err != nil {
return err
}
@ -496,11 +499,11 @@ func (a *Agent) serveHTTP(l net.Listener, srv *HTTPServer) error {
}
}
func (a *Agent) registerWatches() error {
if len(a.config.WatchPlans) == 0 {
return nil
}
addrs, err := a.config.HTTPAddrs()
// reloadWatches stops any existing watch plans and attempts to load the given
// set of watches.
func (a *Agent) reloadWatches(cfg *Config) error {
// Watches use the API to talk to this agent, so that must be enabled.
addrs, err := cfg.HTTPAddrs()
if err != nil {
return err
}
@ -508,7 +511,15 @@ func (a *Agent) registerWatches() error {
return fmt.Errorf("watch plans require an HTTP or HTTPS endpoint")
}
for _, wp := range a.config.WatchPlans {
// Stop the current watches.
for _, wp := range a.watchPlans {
wp.Stop()
}
a.watchPlans = nil
// Fire off a goroutine for each new watch plan.
for _, wp := range cfg.WatchPlans {
a.watchPlans = append(a.watchPlans, wp)
go func(wp *watch.Plan) {
wp.Handler = makeWatchHandler(a.LogOutput, wp.Exempt["handler"])
wp.LogOutput = a.LogOutput
@ -517,7 +528,7 @@ func (a *Agent) registerWatches() error {
addr = "unix://" + addr
}
if err := wp.Run(addr); err != nil {
a.logger.Println("[ERR] Failed to run watch: %v", err)
a.logger.Printf("[ERR] Failed to run watch: %v", err)
}
}(wp)
}
@ -2302,9 +2313,7 @@ func (a *Agent) DisableNodeMaintenance() {
a.logger.Printf("[INFO] agent: Node left maintenance mode")
}
func (a *Agent) ReloadConfig(newCfg *Config) (bool, error) {
var errs error
func (a *Agent) ReloadConfig(newCfg *Config) error {
// Bulk update the services and checks
a.PauseSync()
defer a.ResumeSync()
@ -2316,50 +2325,28 @@ func (a *Agent) ReloadConfig(newCfg *Config) (bool, error) {
// First unload all checks, services, and metadata. This lets us begin the reload
// with a clean slate.
if err := a.unloadServices(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err))
return false, errs
return fmt.Errorf("Failed unloading services: %s", err)
}
if err := a.unloadChecks(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err))
return false, errs
return fmt.Errorf("Failed unloading checks: %s", err)
}
a.unloadMetadata()
// Reload service/check definitions and metadata.
if err := a.loadServices(newCfg); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err))
return false, errs
return fmt.Errorf("Failed reloading services: %s", err)
}
if err := a.loadChecks(newCfg); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err))
return false, errs
return fmt.Errorf("Failed reloading checks: %s", err)
}
if err := a.loadMetadata(newCfg); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err))
return false, errs
return fmt.Errorf("Failed reloading metadata: %s", err)
}
// Get the new client listener addr
httpAddr, err := newCfg.ClientListener(a.config.Addresses.HTTP, a.config.Ports.HTTP)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to determine HTTP address: %v", err))
// Reload the watches.
if err := a.reloadWatches(newCfg); err != nil {
return fmt.Errorf("Failed reloading watches: %v", err)
}
// Deregister the old watches
for _, wp := range a.config.WatchPlans {
wp.Stop()
}
// Register the new watches
for _, wp := range newCfg.WatchPlans {
go func(wp *watch.Plan) {
wp.Handler = makeWatchHandler(a.LogOutput, wp.Exempt["handler"])
wp.LogOutput = a.LogOutput
if err := wp.Run(httpAddr.String()); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Error running watch: %v", err))
}
}(wp)
}
return true, errs
return nil
}

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/serf/serf"
)
@ -237,6 +238,19 @@ func TestAgent_Reload(t *testing.T) {
cfg.Services = []*structs.ServiceDefinition{
&structs.ServiceDefinition{Name: "redis"},
}
params := map[string]interface{}{
"datacenter": "dc1",
"type": "key",
"key": "test",
"handler": "true",
}
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
t.Fatalf("Expected watch.Parse to succeed %v", err)
}
cfg.WatchPlans = append(cfg.WatchPlans, wp)
a := NewTestAgent(t.Name(), cfg)
defer a.Shutdown()
@ -250,16 +264,19 @@ func TestAgent_Reload(t *testing.T) {
&structs.ServiceDefinition{Name: "redis-reloaded"},
}
ok, err := a.ReloadConfig(cfg2)
if err != nil {
if err := a.ReloadConfig(cfg2); err != nil {
t.Fatalf("got error %v want nil", err)
}
if !ok {
t.Fatalf("got ok %v want true")
}
if _, ok := a.state.services["redis-reloaded"]; !ok {
t.Fatalf("missing redis-reloaded service")
}
// Verify that previous config's watch plans were stopped.
for _, wp := range cfg.WatchPlans {
if !wp.IsStopped() {
t.Fatalf("Reloading configs should stop watch plans of the previous configuration")
}
}
}
func TestAgent_Reload_ACLDeny(t *testing.T) {

View File

@ -803,7 +803,7 @@ type ProtoAddr struct {
}
func (p ProtoAddr) String() string {
return p.Proto + "+" + p.Net + "://" + p.Addr
return p.Proto + "://" + p.Addr
}
func (c *Config) DNSAddrs() ([]ProtoAddr, error) {

View File

@ -845,10 +845,11 @@ func (cmd *AgentCommand) handleReload(agent *agent.Agent, cfg *agent.Config) (*a
newCfg.LogLevel = cfg.LogLevel
}
ok, errs := agent.ReloadConfig(newCfg)
if ok {
return newCfg, errs
if err := agent.ReloadConfig(newCfg); err != nil {
errs = multierror.Append(fmt.Errorf(
"Failed to reload configs: %v", err))
}
return cfg, errs
}

View File

@ -118,3 +118,9 @@ func (p *Plan) shouldStop() bool {
return false
}
}
func (p *Plan) IsStopped() bool {
p.stopLock.Lock()
defer p.stopLock.Unlock()
return p.stop
}