diff --git a/agent/agent.go b/agent/agent.go index b2a51dceb2..58e374e72a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -178,8 +178,8 @@ type Agent struct { // checkAliases maps the check ID to an associated Alias checks checkAliases map[types.CheckID]*checks.CheckAlias - // checkLock protects updates to the check* maps - checkLock sync.Mutex + // stateLock protects the agent state + stateLock sync.Mutex // dockerClient is the client for performing docker health checks. dockerClient *checks.DockerClient @@ -236,10 +236,6 @@ type Agent struct { // proxyManager is the proxy process manager for managed Connect proxies. proxyManager *proxyprocess.Manager - // proxyLock protects _managed_ proxy information in the local state from - // concurrent modification. It is not needed to work with proxyConfig state. - proxyLock sync.Mutex - // proxyConfig is the manager for proxy service (Kind = connect-proxy) // configuration state. This ensures all state needed by a proxy registration // is maintained in cache and handles pushing updates to that state into XDS @@ -342,6 +338,9 @@ func (a *Agent) setupProxyManager() error { } func (a *Agent) Start() error { + a.stateLock.Lock() + defer a.stateLock.Unlock() + c := a.config logOutput := a.LogOutput @@ -1439,8 +1438,8 @@ func (a *Agent) ShutdownAgent() error { a.logger.Println("[INFO] agent: Requesting shutdown") // Stop all the checks - a.checkLock.Lock() - defer a.checkLock.Unlock() + a.stateLock.Lock() + defer a.stateLock.Unlock() for _, chk := range a.checkMonitors { chk.Stop() } @@ -1754,17 +1753,21 @@ func (a *Agent) reapServicesInternal() { // See if there's a timeout. // todo(fs): this looks fishy... why is there another data structure in the agent with its own lock? - a.checkLock.Lock() + a.stateLock.Lock() timeout := a.checkReapAfter[checkID] - a.checkLock.Unlock() + a.stateLock.Unlock() // Reap, if necessary. We keep track of which service // this is so that we won't try to remove it again. if timeout > 0 && cs.CriticalFor() > timeout { reaped[serviceID] = true - a.RemoveService(serviceID, true) - a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service", - checkID, serviceID) + if err := a.RemoveService(serviceID, true); err != nil { + a.logger.Printf("[ERR] agent: unable to deregister service %q after check %q has been critical for too long: %s", + serviceID, checkID, err) + } else { + a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service", + checkID, serviceID) + } } } } @@ -1886,6 +1889,12 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error { // This entry is persistent and the agent will make a best effort to // ensure it is registered func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { + a.stateLock.Lock() + defer a.stateLock.Unlock() + return a.addServiceLocked(service, chkTypes, persist, token, source) +} + +func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { if service.Service == "" { return fmt.Errorf("Service name missing") } @@ -1932,15 +1941,12 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che a.PauseSync() defer a.ResumeSync() - // Add the service - a.State.AddService(service, token) + // Take a snapshot of the current state of checks (if any), and + // restore them before resuming anti-entropy. + snap := a.snapshotCheckState() + defer a.restoreCheckState(snap) - // Persist the service to a file - if persist && a.config.DataDir != "" { - if err := a.persistService(service); err != nil { - return err - } - } + var checks []*structs.HealthCheck // Create an associated health check for i, chkType := range chkTypes { @@ -1968,7 +1974,51 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che if chkType.Status != "" { check.Status = chkType.Status } - if err := a.AddCheck(check, chkType, persist, token, source); err != nil { + + checks = append(checks, check) + } + + // cleanup, store the ids of services and checks that weren't previously + // registered so we clean them up if somthing fails halfway through the + // process. + var cleanupServices []string + var cleanupChecks []types.CheckID + + if s := a.State.Service(service.ID); s == nil { + cleanupServices = append(cleanupServices, service.ID) + } + + for _, check := range checks { + if c := a.State.Check(check.CheckID); c == nil { + cleanupChecks = append(cleanupChecks, check.CheckID) + } + } + + err := a.State.AddServiceWithChecks(service, checks, token) + if err != nil { + a.cleanupRegistration(cleanupServices, cleanupChecks) + return err + } + + for i := range checks { + if err := a.addCheck(checks[i], chkTypes[i], service, persist, token, source); err != nil { + a.cleanupRegistration(cleanupServices, cleanupChecks) + return err + } + + if persist && a.config.DataDir != "" { + if err := a.persistCheck(checks[i], chkTypes[i]); err != nil { + a.cleanupRegistration(cleanupServices, cleanupChecks) + return err + + } + } + } + + // Persist the service to a file + if persist && a.config.DataDir != "" { + if err := a.persistService(service); err != nil { + a.cleanupRegistration(cleanupServices, cleanupChecks) return err } } @@ -1976,16 +2026,53 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che return nil } +// cleanupRegistration is called on registration error to ensure no there are no +// leftovers after a partial failure +func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) { + for _, s := range serviceIDs { + if err := a.State.RemoveService(s); err != nil { + a.logger.Printf("[ERR] consul: service registration: cleanup: failed to remove service %s: %s", s, err) + } + if err := a.purgeService(s); err != nil { + a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge service %s file: %s", s, err) + } + } + + for _, c := range checksIDs { + a.cancelCheckMonitors(c) + if err := a.State.RemoveCheck(c); err != nil { + a.logger.Printf("[ERR] consul: service registration: cleanup: failed to remove check %s: %s", c, err) + } + if err := a.purgeCheck(c); err != nil { + a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge check %s file: %s", c, err) + } + } +} + // RemoveService is used to remove a service entry. // The agent will make a best effort to ensure it is deregistered func (a *Agent) RemoveService(serviceID string, persist bool) error { + a.stateLock.Lock() + defer a.stateLock.Unlock() + return a.removeServiceLocked(serviceID, persist) +} + +// removeServiceLocked is used to remove a service entry. +// The agent will make a best effort to ensure it is deregistered +func (a *Agent) removeServiceLocked(serviceID string, persist bool) error { // Validate ServiceID if serviceID == "" { return fmt.Errorf("ServiceID missing") } + checks := a.State.Checks() + var checkIDs []types.CheckID + for id := range checks { + checkIDs = append(checkIDs, id) + } + // Remove service immediately - if err := a.State.RemoveService(serviceID); err != nil { + if err := a.State.RemoveServiceWithChecks(serviceID, checkIDs); err != nil { a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err) return nil } @@ -1998,11 +2085,11 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { } // Deregister any associated health checks - for checkID, check := range a.State.Checks() { + for checkID, check := range checks { if check.ServiceID != serviceID { continue } - if err := a.RemoveCheck(checkID, persist); err != nil { + if err := a.removeCheckLocked(checkID, persist); err != nil { return err } } @@ -2010,7 +2097,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { // Remove the associated managed proxy if it exists for proxyID, p := range a.State.Proxies() { if p.Proxy.TargetServiceID == serviceID { - if err := a.RemoveProxy(proxyID, true); err != nil { + if err := a.removeProxyLocked(proxyID, true); err != nil { return err } } @@ -2024,7 +2111,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { // this from a sidecar. if sidecar.LocallyRegisteredAsSidecar { // Remove it! - err := a.RemoveService(a.sidecarServiceID(serviceID), persist) + err := a.removeServiceLocked(a.sidecarServiceID(serviceID), persist) if err != nil { return err } @@ -2039,6 +2126,50 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { // ensure it is registered. The Check may include a CheckType which // is used to automatically update the check status func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, persist bool, token string, source configSource) error { + a.stateLock.Lock() + defer a.stateLock.Unlock() + return a.addCheckLocked(check, chkType, persist, token, source) +} + +func (a *Agent) addCheckLocked(check *structs.HealthCheck, chkType *structs.CheckType, persist bool, token string, source configSource) error { + var service *structs.NodeService + + if check.ServiceID != "" { + service = a.State.Service(check.ServiceID) + if service == nil { + return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) + } + } + + // snapshot the current state of the health check to avoid potential flapping + existing := a.State.Check(check.CheckID) + defer func() { + if existing != nil { + a.State.UpdateCheck(check.CheckID, existing.Status, existing.Output) + } + }() + + err := a.addCheck(check, chkType, service, persist, token, source) + if err != nil { + a.State.RemoveCheck(check.CheckID) + return err + } + + // Add to the local state for anti-entropy + err = a.State.AddCheck(check, token) + if err != nil { + return err + } + + // Persist the check + if persist && a.config.DataDir != "" { + return a.persistCheck(check, chkType) + } + + return nil +} + +func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType, service *structs.NodeService, persist bool, token string, source configSource) error { if check.CheckID == "" { return fmt.Errorf("CheckID missing") } @@ -2060,25 +2191,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } if check.ServiceID != "" { - s := a.State.Service(check.ServiceID) - if s == nil { - return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) - } - check.ServiceName = s.Service - check.ServiceTags = s.Tags + check.ServiceName = service.Service + check.ServiceTags = service.Tags } - a.checkLock.Lock() - defer a.checkLock.Unlock() - - // snapshot the current state of the health check to avoid potential flapping - existing := a.State.Check(check.CheckID) - defer func() { - if existing != nil { - a.State.UpdateCheck(check.CheckID, existing.Status, existing.Output) - } - }() - // Check if already registered if chkType != nil { switch { @@ -2296,37 +2412,28 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } } - // Add to the local state for anti-entropy - err := a.State.AddCheck(check, token) - if err != nil { - a.cancelCheckMonitors(check.CheckID) - return err - } - - // Persist the check - if persist && a.config.DataDir != "" { - return a.persistCheck(check, chkType) - } - return nil } // RemoveCheck is used to remove a health check. // The agent will make a best effort to ensure it is deregistered func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { + a.stateLock.Lock() + defer a.stateLock.Unlock() + return a.removeCheckLocked(checkID, persist) +} + +// removeCheckLocked is used to remove a health check. +// The agent will make a best effort to ensure it is deregistered +func (a *Agent) removeCheckLocked(checkID types.CheckID, persist bool) error { // Validate CheckID if checkID == "" { return fmt.Errorf("CheckID missing") } - // Add to the local state for anti-entropy - a.State.RemoveCheck(checkID) a.tlsConfigurator.RemoveCheck(string(checkID)) - - a.checkLock.Lock() - defer a.checkLock.Unlock() - a.cancelCheckMonitors(checkID) + a.State.RemoveCheck(checkID) if persist { if err := a.purgeCheck(checkID); err != nil { @@ -2400,7 +2507,7 @@ func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, From } } - err = a.AddService(proxyService, chkTypes, persist, token, source) + err = a.addServiceLocked(proxyService, chkTypes, persist, token, source) if err != nil { // Remove the state too a.State.RemoveProxy(proxyService.ID) @@ -2431,8 +2538,8 @@ func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, From // running proxies that already had that credential injected. func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist, FromFile bool, restoredProxyToken string, source configSource) error { - a.proxyLock.Lock() - defer a.proxyLock.Unlock() + a.stateLock.Lock() + defer a.stateLock.Unlock() return a.addProxyLocked(proxy, persist, FromFile, restoredProxyToken, source) } @@ -2595,7 +2702,7 @@ func (a *Agent) removeProxyLocked(proxyID string, persist bool) error { // Remove the proxy service as well. The proxy ID is also the ID // of the servie, but we might as well use the service pointer. - if err := a.RemoveService(p.Proxy.ProxyService.ID, persist); err != nil { + if err := a.removeServiceLocked(p.Proxy.ProxyService.ID, persist); err != nil { return err } @@ -2608,8 +2715,8 @@ func (a *Agent) removeProxyLocked(proxyID string, persist bool) error { // RemoveProxy stops and removes a local proxy instance. func (a *Agent) RemoveProxy(proxyID string, persist bool) error { - a.proxyLock.Lock() - defer a.proxyLock.Unlock() + a.stateLock.Lock() + defer a.stateLock.Unlock() return a.removeProxyLocked(proxyID, persist) } @@ -2717,8 +2824,8 @@ func (a *Agent) cancelCheckMonitors(checkID types.CheckID) { // updateTTLCheck is used to update the status of a TTL check via the Agent API. func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error { - a.checkLock.Lock() - defer a.checkLock.Unlock() + a.stateLock.Lock() + defer a.stateLock.Unlock() // Grab the TTL check. check, ok := a.checkTTLs[checkID] @@ -2921,13 +3028,13 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { // syntax sugar and shouldn't be persisted in local or server state. ns.Connect.SidecarService = nil - if err := a.AddService(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil { + if err := a.addServiceLocked(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil { return fmt.Errorf("Failed to register service %q: %v", service.Name, err) } // If there is a sidecar service, register that too. if sidecar != nil { - if err := a.AddService(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil { + if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil { return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err) } } @@ -2990,7 +3097,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", serviceID, file) - if err := a.AddService(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil { + if err := a.addServiceLocked(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil { return fmt.Errorf("failed adding service %q: %s", serviceID, err) } } @@ -3002,7 +3109,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { // unloadServices will deregister all services. func (a *Agent) unloadServices() error { for id := range a.State.Services() { - if err := a.RemoveService(id, false); err != nil { + if err := a.removeServiceLocked(id, false); err != nil { return fmt.Errorf("Failed deregistering service '%s': %v", id, err) } } @@ -3016,7 +3123,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { for _, check := range conf.Checks { health := check.HealthCheck(conf.NodeName) chkType := check.CheckType() - if err := a.AddCheck(health, chkType, false, check.Token, ConfigSourceLocal); err != nil { + if err := a.addCheckLocked(health, chkType, false, check.Token, ConfigSourceLocal); err != nil { return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) } } @@ -3071,7 +3178,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { // services into the active pool p.Check.Status = api.HealthCritical - if err := a.AddCheck(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil { + if err := a.addCheckLocked(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil { // Purge the check if it is unable to be restored. a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", checkID, err) @@ -3090,7 +3197,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { // unloadChecks will deregister all checks known to the local agent. func (a *Agent) unloadChecks() error { for id := range a.State.Checks() { - if err := a.RemoveCheck(id, false); err != nil { + if err := a.removeCheckLocked(id, false); err != nil { return fmt.Errorf("Failed deregistering check '%s': %s", id, err) } } @@ -3153,9 +3260,6 @@ func (a *Agent) loadPersistedProxies() (map[string]persistedProxy, error) { // loadProxies will load connect proxy definitions from configuration and // persisted definitions on disk, and load them into the local agent. func (a *Agent) loadProxies(conf *config.RuntimeConfig) error { - a.proxyLock.Lock() - defer a.proxyLock.Unlock() - persistedProxies, persistenceErr := a.loadPersistedProxies() for _, svc := range conf.Services { @@ -3287,8 +3391,6 @@ func (a *Agent) loadTokens(conf *config.RuntimeConfig) error { // unloadProxies will deregister all proxies known to the local agent. func (a *Agent) unloadProxies() error { - a.proxyLock.Lock() - defer a.proxyLock.Unlock() for id := range a.State.Proxies() { if err := a.removeProxyLocked(id, false); err != nil { return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err) @@ -3432,6 +3534,9 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { a.PauseSync() defer a.ResumeSync() + a.stateLock.Lock() + defer a.stateLock.Unlock() + // Snapshot the current state, and restore it afterwards snap := a.snapshotCheckState() defer a.restoreCheckState(snap) diff --git a/agent/agent_test.go b/agent/agent_test.go index 67542ccf04..1046dc10a0 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1859,7 +1859,7 @@ func TestAgent_PersistCheck(t *testing.T) { t.Fatalf("err: %s", err) } if !bytes.Equal(expected, content) { - t.Fatalf("bad: %s", string(content)) + t.Fatalf("bad: %s != %s", string(content), expected) } // Updates the check definition on disk diff --git a/agent/local/state.go b/agent/local/state.go index 4b679820fc..0eb089f412 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -11,7 +11,7 @@ import ( "sync/atomic" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -19,7 +19,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-uuid" + uuid "github.com/hashicorp/go-uuid" ) // Config is the configuration for the State. @@ -265,6 +265,12 @@ func (l *State) serviceToken(id string) string { // This entry is persistent and the agent will make a best effort to // ensure it is registered func (l *State) AddService(service *structs.NodeService, token string) error { + l.Lock() + defer l.Unlock() + return l.addServiceLocked(service, token) +} + +func (l *State) addServiceLocked(service *structs.NodeService, token string) error { if service == nil { return fmt.Errorf("no service") } @@ -274,18 +280,58 @@ func (l *State) AddService(service *structs.NodeService, token string) error { service.ID = service.Service } - l.SetServiceState(&ServiceState{ + l.setServiceStateLocked(&ServiceState{ Service: service, Token: token, }) return nil } +// AddServiceWithChecks adds a service and its check tp the local state atomically +func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*structs.HealthCheck, token string) error { + l.Lock() + defer l.Unlock() + + if err := l.addServiceLocked(service, token); err != nil { + return err + } + + for _, check := range checks { + if err := l.addCheckLocked(check, token); err != nil { + return err + } + } + + return nil +} + // RemoveService is used to remove a service entry from the local state. // The agent will make a best effort to ensure it is deregistered. func (l *State) RemoveService(id string) error { l.Lock() defer l.Unlock() + return l.removeServiceLocked(id) +} + +// RemoveServiceWithChecks removes a service and its check from the local state atomically +func (l *State) RemoveServiceWithChecks(serviceID string, checkIDs []types.CheckID) error { + l.Lock() + defer l.Unlock() + + if err := l.removeServiceLocked(serviceID); err != nil { + return err + } + + for _, id := range checkIDs { + if err := l.removeCheckLocked(id); err != nil { + return err + } + } + + return nil +} + +func (l *State) removeServiceLocked(id string) error { s := l.services[id] if s == nil || s.Deleted { @@ -358,6 +404,10 @@ func (l *State) SetServiceState(s *ServiceState) { l.Lock() defer l.Unlock() + l.setServiceStateLocked(s) +} + +func (l *State) setServiceStateLocked(s *ServiceState) { s.WatchCh = make(chan struct{}) old, hasOld := l.services[s.Service.ID] @@ -414,6 +464,13 @@ func (l *State) checkToken(id types.CheckID) string { // This entry is persistent and the agent will make a best effort to // ensure it is registered func (l *State) AddCheck(check *structs.HealthCheck, token string) error { + l.Lock() + defer l.Unlock() + + return l.addCheckLocked(check, token) +} + +func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error { if check == nil { return fmt.Errorf("no check") } @@ -427,14 +484,14 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { // if there is a serviceID associated with the check, make sure it exists before adding it // NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor - if check.ServiceID != "" && l.Service(check.ServiceID) == nil { + if _, ok := l.services[check.ServiceID]; check.ServiceID != "" && !ok { return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID) } // hard-set the node name check.Node = l.config.NodeName - l.SetCheckState(&CheckState{ + l.setCheckStateLocked(&CheckState{ Check: check, Token: token, }) @@ -482,7 +539,10 @@ func (l *State) RemoveAliasCheck(checkID types.CheckID, srcServiceID string) { func (l *State) RemoveCheck(id types.CheckID) error { l.Lock() defer l.Unlock() + return l.removeCheckLocked(id) +} +func (l *State) removeCheckLocked(id types.CheckID) error { c := l.checks[id] if c == nil || c.Deleted { return fmt.Errorf("Check %q does not exist", id) @@ -620,6 +680,10 @@ func (l *State) SetCheckState(c *CheckState) { l.Lock() defer l.Unlock() + l.setCheckStateLocked(c) +} + +func (l *State) setCheckStateLocked(c *CheckState) { l.checks[c.Check.CheckID] = c l.TriggerSyncChanges() }