diff --git a/agent/agent.go b/agent/agent.go index 600432e8b3..b7f51d8f5c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1386,29 +1386,31 @@ OUTER: // reapServicesInternal does a single pass, looking for services to reap. func (a *Agent) reapServicesInternal() { - reaped := make(map[string]struct{}) - for checkID, check := range a.state.CriticalChecks() { + reaped := make(map[string]bool) + for checkID, cs := range a.state.CriticalCheckStates() { + serviceID := cs.Check.ServiceID + // There's nothing to do if there's no service. - if check.Check.ServiceID == "" { + if serviceID == "" { continue } // There might be multiple checks for one service, so // we don't need to reap multiple times. - serviceID := check.Check.ServiceID - if _, ok := reaped[serviceID]; ok { + if reaped[serviceID] { continue } // See if there's a timeout. + // todo(fs): this looks fishy... why is there anoter data structure in the agent with its own lock? a.checkLock.Lock() - timeout, ok := a.checkReapAfter[checkID] + timeout := a.checkReapAfter[checkID] a.checkLock.Unlock() // Reap, if necessary. We keep track of which service // this is so that we won't try to remove it again. - if ok && check.CriticalFor > timeout { - reaped[serviceID] = struct{}{} + if timeout > 0 && cs.CriticalFor() > timeout { + reaped[serviceID] = true a.RemoveService(serviceID, true) a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service", checkID, serviceID) diff --git a/agent/local/state.go b/agent/local/state.go index 3e8379696b..b2fdb833f4 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -21,12 +21,6 @@ import ( // permissionDenied is returned when an ACL based rejection happens. const permissionDenied = "Permission denied" -// syncStatus is used to represent the difference between -// the local and remote state, and if action needs to be taken -type syncStatus struct { - inSync bool // Is this in sync with the server -} - // Config is the configuration for the State. It is // populated during NewLocalAgent from the agent configuration to avoid // race conditions with the agent configuration. @@ -40,6 +34,62 @@ type Config struct { TaggedAddresses map[string]string } +// ServiceState describes the state of a service record. +type ServiceState struct { + // Service is the local copy of the service record. + Service *structs.NodeService + + // Token is the ACL to update the service record on the server. + Token string + + // InSync contains whether the local state of the service record + // is in sync with the remote state on the server. + InSync bool + + // Deleted is true when the service record has been marked as deleted + // but has not been removed on the server yet. + Deleted bool +} + +// CheckState describes the state of a health check record. +type CheckState struct { + // Check is the local copy of the health check record. + Check *structs.HealthCheck + + // Token is the ACL record to update the health check record + // on the server. + Token string + + // CriticalTime is the last time the health check status went + // from non-critical to critical. When the health check is not + // in critical state the value is the zero value. + CriticalTime time.Time + + // DeferCheck is used to delay the sync of a health check when + // only the status has changed. + // todo(fs): ^^ this needs double checking... + DeferCheck *time.Timer + + // InSync contains whether the local state of the health check + // record is in sync with the remote state on the server. + InSync bool + + // Deleted is true when the health check record has been marked as + // deleted but has not been removed on the server yet. + Deleted bool +} + +// Critical returns true when the health check is in critical state. +func (c *CheckState) Critical() bool { + return !c.CriticalTime.IsZero() +} + +// CriticalFor returns the amount of time the service has been in critical +// state. Its value is undefined when the service is not in critical state. +func (c *CheckState) CriticalFor() time.Duration { + return time.Since(c.CriticalTime) +} + type delegate interface { RPC(method string, args interface{}, reply interface{}) error } @@ -62,18 +112,10 @@ type State struct { nodeInfoInSync bool // Services tracks the local services - services map[string]*structs.NodeService - serviceStatus map[string]syncStatus - serviceTokens map[string]string + services map[string]*ServiceState // Checks tracks the local checks - checks map[types.CheckID]*structs.HealthCheck - checkStatus map[types.CheckID]syncStatus - checkTokens map[types.CheckID]string - checkCriticalTime map[types.CheckID]time.Time - - // Used to track checks that are being deferred - deferCheck map[types.CheckID]*time.Timer + checks map[types.CheckID]*CheckState // metadata tracks the local metadata fields metadata map[string]string @@ -86,25 +128,20 @@ type State struct { // is stored in the raft log. discardCheckOutput atomic.Value // bool + // tokens contains the ACL tokens tokens *token.Store } // NewLocalState creates a is used to initialize the local state func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State { l := &State{ - config: c, - logger: lg, - services: make(map[string]*structs.NodeService), - serviceStatus: make(map[string]syncStatus), - serviceTokens: make(map[string]string), - checks: make(map[types.CheckID]*structs.HealthCheck), - checkStatus: make(map[types.CheckID]syncStatus), - checkTokens: make(map[types.CheckID]string), - checkCriticalTime: make(map[types.CheckID]time.Time), - deferCheck: make(map[types.CheckID]*time.Timer), - metadata: make(map[string]string), - triggerCh: triggerCh, - tokens: tokens, + config: c, + logger: lg, + services: make(map[string]*ServiceState), + checks: make(map[types.CheckID]*CheckState), + metadata: make(map[string]string), + triggerCh: triggerCh, + tokens: tokens, } l.discardCheckOutput.Store(c.DiscardCheckOutput) return l @@ -137,7 +174,10 @@ func (l *State) ServiceToken(id string) string { // serviceToken returns an ACL token associated with a service. func (l *State) serviceToken(id string) string { - token := l.serviceTokens[id] + var token string + if s := l.services[id]; s != nil { + token = s.Token + } if token == "" { token = l.tokens.UserToken() } @@ -147,37 +187,48 @@ func (l *State) serviceToken(id string) string { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *State) AddService(service *structs.NodeService, token string) { - // Assign the ID if none given - if service.ID == "" && service.Service != "" { +// todo(fs): where is the persistence happening? +func (l *State) AddService(service *structs.NodeService, token string) error { + l.Lock() + defer l.Unlock() + + if service == nil { + return fmt.Errorf("no service") + } + + // use the service name as id if the id was omitted + // todo(fs): is this for backwards compatibility? + if service.ID == "" { service.ID = service.Service } - l.Lock() - defer l.Unlock() - - l.services[service.ID] = service - l.serviceStatus[service.ID] = syncStatus{} - l.serviceTokens[service.ID] = token + l.services[service.ID] = &ServiceState{ + Service: service, + Token: token, + } l.changeMade() + + return nil } // RemoveService is used to remove a service entry from the local state. -// The agent will make a best effort to ensure it is deregistered -func (l *State) RemoveService(serviceID string) error { +// The agent will make a best effort to ensure it is deregistered. +func (l *State) RemoveService(id string) error { l.Lock() defer l.Unlock() - if _, ok := l.services[serviceID]; ok { - delete(l.services, serviceID) - // Leave the service token around, if any, until we successfully - // delete the service. - l.serviceStatus[serviceID] = syncStatus{inSync: false} - l.changeMade() - } else { - return fmt.Errorf("Service does not exist") + s := l.services[id] + if s == nil || s.Deleted { + return fmt.Errorf("Service %q does not exist", id) } + // To remove the service on the server we need the token. + // Therefore, we mark the service as deleted and keep the + // entry around until it is actually removed. + s.InSync = false + s.Deleted = true + l.changeMade() + return nil } @@ -186,20 +237,28 @@ func (l *State) RemoveService(serviceID string) error { func (l *State) Service(id string) *structs.NodeService { l.RLock() defer l.RUnlock() - return l.services[id] + + s := l.services[id] + if s == nil || s.Deleted { + return nil + } + return s.Service } // Services returns the locally registered services that the // agent is aware of and are being kept in sync with the server func (l *State) Services() map[string]*structs.NodeService { - services := make(map[string]*structs.NodeService) l.RLock() defer l.RUnlock() - for name, serv := range l.services { - services[name] = serv + m := make(map[string]*structs.NodeService) + for id, s := range l.services { + if s.Deleted { + continue + } + m[id] = s.Service } - return services + return m } // CheckToken is used to return the configured health check token for a @@ -211,8 +270,12 @@ func (l *State) CheckToken(checkID types.CheckID) string { } // checkToken returns an ACL token associated with a check. -func (l *State) checkToken(checkID types.CheckID) string { - token := l.checkTokens[checkID] +func (l *State) checkToken(id types.CheckID) string { + var token string + c := l.checks[id] + if c != nil { + token = c.Token + } if token == "" { token = l.tokens.UserToken() } @@ -226,8 +289,9 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { l.Lock() defer l.Unlock() - // Set the node name - check.Node = l.config.NodeName + if check == nil { + return fmt.Errorf("no check") + } if l.discardCheckOutput.Load().(bool) { check.Output = "" @@ -236,38 +300,51 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { // if there is a serviceID associated with the check, make sure it exists before adding it // NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor if check.ServiceID != "" && l.services[check.ServiceID] == nil { - return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) + return fmt.Errorf("Check %q refers to non-existent service %q does not exist", check.CheckID, check.ServiceID) } - l.checks[check.CheckID] = check - l.checkStatus[check.CheckID] = syncStatus{} - l.checkTokens[check.CheckID] = token - delete(l.checkCriticalTime, check.CheckID) + // hard-set the node name + check.Node = l.config.NodeName + + l.checks[check.CheckID] = &CheckState{ + Check: check, + Token: token, + } l.changeMade() + return nil } // RemoveCheck is used to remove a health check from the local state. // The agent will make a best effort to ensure it is deregistered -func (l *State) RemoveCheck(checkID types.CheckID) { +// todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well. +// todo(fs): Check code that calls this to handle the error. +func (l *State) RemoveCheck(id types.CheckID) error { l.Lock() defer l.Unlock() - delete(l.checks, checkID) - // Leave the check token around, if any, until we successfully delete - // the check. - delete(l.checkCriticalTime, checkID) - l.checkStatus[checkID] = syncStatus{inSync: false} + c := l.checks[id] + if c == nil || c.Deleted { + return fmt.Errorf("Check %q does not exist", id) + } + + // To remove the check on the server we need the token. + // Therefore, we mark the service as deleted and keep the + // entry around until it is actually removed. + c.InSync = false + c.Deleted = true l.changeMade() + + return nil } // UpdateCheck is used to update the status of a check -func (l *State) UpdateCheck(checkID types.CheckID, status, output string) { +func (l *State) UpdateCheck(id types.CheckID, status, output string) { l.Lock() defer l.Unlock() - check, ok := l.checks[checkID] - if !ok { + c := l.checks[id] + if c == nil || c.Deleted { return } @@ -278,16 +355,15 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) { // Update the critical time tracking (this doesn't cause a server updates // so we can always keep this up to date). if status == api.HealthCritical { - _, wasCritical := l.checkCriticalTime[checkID] - if !wasCritical { - l.checkCriticalTime[checkID] = time.Now() + if !c.Critical() { + c.CriticalTime = time.Now() } } else { - delete(l.checkCriticalTime, checkID) + c.CriticalTime = time.Time{} } // Do nothing if update is idempotent - if check.Status == status && check.Output == output { + if c.Check.Status == status && c.Check.Output == output { return } @@ -295,28 +371,34 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) { // frequent updates of output. Instead, we update the output internally, // and periodically do a write-back to the servers. If there is a status // change we do the write immediately. - if l.config.CheckUpdateInterval > 0 && check.Status == status { - check.Output = output - if _, ok := l.deferCheck[checkID]; !ok { - intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval) - deferSync := time.AfterFunc(intv, func() { + if l.config.CheckUpdateInterval > 0 && c.Check.Status == status { + c.Check.Output = output + if c.DeferCheck == nil { + d := l.config.CheckUpdateInterval + intv := time.Duration(uint64(d)/2) + lib.RandomStagger(d) + c.DeferCheck = time.AfterFunc(intv, func() { l.Lock() - if _, ok := l.checkStatus[checkID]; ok { - l.checkStatus[checkID] = syncStatus{inSync: false} - l.changeMade() + defer l.Unlock() + + c := l.checks[id] + if c == nil { + return } - delete(l.deferCheck, checkID) - l.Unlock() + c.DeferCheck = nil + if c.Deleted { + return + } + c.InSync = false + l.changeMade() }) - l.deferCheck[checkID] = deferSync } return } // Update status and mark out of sync - check.Status = status - check.Output = output - l.checkStatus[checkID] = syncStatus{inSync: false} + c.Check.Status = status + c.Check.Output = output + c.InSync = false l.changeMade() } @@ -325,7 +407,12 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) { func (l *State) Check(id types.CheckID) *structs.HealthCheck { l.RLock() defer l.RUnlock() - return l.checks[id] + + c := l.checks[id] + if c == nil || c.Deleted { + return nil + } + return c.Check } // Checks returns the locally registered checks that the @@ -334,78 +421,83 @@ func (l *State) Checks() map[types.CheckID]*structs.HealthCheck { l.RLock() defer l.RUnlock() - checks := make(map[types.CheckID]*structs.HealthCheck) + m := make(map[types.CheckID]*structs.HealthCheck) for id, c := range l.checks { + if c.Deleted { + continue + } c2 := new(structs.HealthCheck) - *c2 = *c - checks[id] = c2 + *c2 = *c.Check + m[id] = c2 } - return checks + return m } -// CriticalCheck is used to return the duration a check has been critical along -// with its associated health check. -type CriticalCheck struct { - CriticalFor time.Duration - Check *structs.HealthCheck -} - -// CriticalChecks returns locally registered health checks that the agent is -// aware of and are being kept in sync with the server, and that are in a -// critical state. This also returns information about how long each check has -// been critical. -func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck { - checks := make(map[types.CheckID]CriticalCheck) - +// CriticalCheckStates returns the locally registered checks that the +// agent is aware of and are being kept in sync with the server +func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { l.RLock() defer l.RUnlock() - now := time.Now() - for checkID, criticalTime := range l.checkCriticalTime { - checks[checkID] = CriticalCheck{ - CriticalFor: now.Sub(criticalTime), - Check: l.checks[checkID], + m := make(map[types.CheckID]*CheckState) + for id, c := range l.checks { + if c.Deleted || !c.Critical() { + continue } + m[id] = c } - - return checks + return m } // Metadata returns the local node metadata fields that the // agent is aware of and are being kept in sync with the server func (l *State) Metadata() map[string]string { - metadata := make(map[string]string) l.RLock() defer l.RUnlock() - - for key, value := range l.metadata { - metadata[key] = value + m := make(map[string]string) + for k, v := range l.metadata { + m[k] = v } - return metadata + return m } // UpdateSyncState does a read of the server state, and updates // the local sync status as appropriate func (l *State) UpdateSyncState() error { + // 1. get all checks and services from the master req := structs.NodeSpecificRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()}, } + var out1 structs.IndexedNodeServices - var out2 structs.IndexedHealthChecks - if e := l.delegate.RPC("Catalog.NodeServices", &req, &out1); e != nil { - return e + if err := l.delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil { + return err } + + var out2 structs.IndexedHealthChecks if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil { return err } - checks := out2.HealthChecks + + // 2. create useful data structures for traversal + remoteServices := make(map[string]*structs.NodeService) + if out1.NodeServices != nil { + remoteServices = out1.NodeServices.Services + } + + remoteChecks := make(map[types.CheckID]*structs.HealthCheck, len(out2.HealthChecks)) + for _, rc := range out2.HealthChecks { + remoteChecks[rc.CheckID] = rc + } + + // 3. perform sync l.Lock() defer l.Unlock() - // Check the node info + // sync node info if out1.NodeServices == nil || out1.NodeServices.Node == nil || out1.NodeServices.Node.ID != l.config.NodeID || !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) || @@ -413,99 +505,103 @@ func (l *State) UpdateSyncState() error { l.nodeInfoInSync = false } - // Check all our services - services := make(map[string]*structs.NodeService) - if out1.NodeServices != nil { - services = out1.NodeServices.Services - } + // sync services - for id := range l.services { - // If the local service doesn't exist remotely, then sync it - if _, ok := services[id]; !ok { - l.serviceStatus[id] = syncStatus{inSync: false} + // sync local services that do not exist remotely + for id, s := range l.services { + if remoteServices[id] == nil { + s.InSync = false } } - for id, service := range services { + for id, rs := range remoteServices { // If we don't have the service locally, deregister it - existing, ok := l.services[id] - if !ok { - // The consul service is created automatically, and does + ls := l.services[id] + if ls == nil { + // The consul service is created automatically and does // not need to be deregistered. if id == structs.ConsulServiceID { continue } - l.serviceStatus[id] = syncStatus{inSync: false} + + l.services[id] = &ServiceState{Deleted: true} + continue + } + + // If the service is scheduled for removal skip it. + // todo(fs): is this correct? + if ls.Deleted { continue } // If our definition is different, we need to update it. Make a // copy so that we don't retain a pointer to any actual state // store info for in-memory RPCs. - if existing.EnableTagOverride { - existing.Tags = make([]string, len(service.Tags)) - copy(existing.Tags, service.Tags) + if ls.Service.EnableTagOverride { + ls.Service.Tags = make([]string, len(rs.Tags)) + copy(ls.Service.Tags, rs.Tags) } - equal := existing.IsSame(service) - l.serviceStatus[id] = syncStatus{inSync: equal} + ls.InSync = ls.Service.IsSame(rs) } - // Index the remote health checks to improve efficiency - checkIndex := make(map[types.CheckID]*structs.HealthCheck, len(checks)) - for _, check := range checks { - checkIndex[check.CheckID] = check - } + // sync checks - // Sync any check which doesn't exist on the remote side - for id := range l.checks { - if _, ok := checkIndex[id]; !ok { - l.checkStatus[id] = syncStatus{inSync: false} + // sync local checks which do not exist remotely + for id, c := range l.checks { + if remoteChecks[id] == nil { + c.InSync = false } } - for _, check := range checks { + for id, rc := range remoteChecks { + + lc := l.checks[id] + // If we don't have the check locally, deregister it - id := check.CheckID - existing, ok := l.checks[id] - if !ok { - // The Serf check is created automatically, and does not + if lc == nil { + // The Serf check is created automatically and does not // need to be deregistered. if id == structs.SerfCheckID { + l.logger.Printf("Skipping remote check %q since it is managed automatically", id) continue } - l.checkStatus[id] = syncStatus{inSync: false} + + l.checks[id] = &CheckState{Deleted: true} + continue + } + + // If the check is scheduled for removal skip it. + // todo(fs): is this correct? + if lc.Deleted { continue } // If our definition is different, we need to update it - var equal bool if l.config.CheckUpdateInterval == 0 { - equal = existing.IsSame(check) - } else { - // Copy the existing check before potentially modifying - // it before the compare operation. - eCopy := existing.Clone() - - // Copy the server's check before modifying, otherwise - // in-memory RPCs will have side effects. - cCopy := check.Clone() - - // If there's a defer timer active then we've got a - // potentially spammy check so we don't sync the output - // during this sweep since the timer will mark the check - // out of sync for us. Otherwise, it is safe to sync the - // output now. This is especially important for checks - // that don't change state after they are created, in - // which case we'd never see their output synced back ever. - if _, ok := l.deferCheck[id]; ok { - eCopy.Output = "" - cCopy.Output = "" - } - equal = eCopy.IsSame(cCopy) + lc.InSync = lc.Check.IsSame(rc) + continue } - // Update the status - l.checkStatus[id] = syncStatus{inSync: equal} + // Copy the existing check before potentially modifying + // it before the compare operation. + lcCopy := lc.Check.Clone() + + // Copy the server's check before modifying, otherwise + // in-memory RPCs will have side effects. + rcCopy := rc.Clone() + + // If there's a defer timer active then we've got a + // potentially spammy check so we don't sync the output + // during this sweep since the timer will mark the check + // out of sync for us. Otherwise, it is safe to sync the + // output now. This is especially important for checks + // that don't change state after they are created, in + // which case we'd never see their output synced back ever. + if lc.DeferCheck != nil { + lcCopy.Output = "" + rcCopy.Output = "" + } + lc.InSync = lcCopy.IsSame(rcCopy) } return nil } @@ -521,39 +617,38 @@ func (l *State) SyncChanges() error { // API works. // Sync the services - for id, status := range l.serviceStatus { - if _, ok := l.services[id]; !ok { - if err := l.deleteService(id); err != nil { - return err - } - } else if !status.inSync { - if err := l.syncService(id); err != nil { - return err - } - } else { + for id, s := range l.services { + var err error + switch { + case s.Deleted: + err = l.deleteService(id) + case !s.InSync: + err = l.syncService(id) + default: l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id) } + if err != nil { + return err + } } - // Sync the checks - for id, status := range l.checkStatus { - if _, ok := l.checks[id]; !ok { - if err := l.deleteCheck(id); err != nil { - return err + for id, c := range l.checks { + var err error + switch { + case c.Deleted: + err = l.deleteCheck(id) + case !c.InSync: + if c.DeferCheck != nil { + c.DeferCheck.Stop() + c.DeferCheck = nil } - } else if !status.inSync { - // Cancel a deferred sync - if timer := l.deferCheck[id]; timer != nil { - timer.Stop() - delete(l.deferCheck, id) - } - - if err := l.syncCheck(id); err != nil { - return err - } - } else { + err = l.syncCheck(id) + default: l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) } + if err != nil { + return err + } } // Now sync the node level info if we need to, and didn't do any of @@ -593,9 +688,26 @@ func (l *State) UnloadMetadata() { func (l *State) Stats() map[string]string { l.RLock() defer l.RUnlock() + + services := 0 + for _, s := range l.services { + if s.Deleted { + continue + } + services++ + } + + checks := 0 + for _, c := range l.checks { + if c.Deleted { + continue + } + checks++ + } + return map[string]string{ - "services": strconv.Itoa(len(l.services)), - "checks": strconv.Itoa(len(l.checks)), + "services": strconv.Itoa(services), + "checks": strconv.Itoa(checks), } } @@ -614,12 +726,13 @@ func (l *State) deleteService(id string) error { var out struct{} err := l.delegate.RPC("Catalog.Deregister", &req, &out) if err == nil || strings.Contains(err.Error(), "Unknown service") { - delete(l.serviceStatus, id) - delete(l.serviceTokens, id) + delete(l.services, id) l.logger.Printf("[INFO] agent: Deregistered service '%s'", id) return nil - } else if acl.IsErrPermissionDenied(err) { - l.serviceStatus[id] = syncStatus{inSync: true} + } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why is the service in sync here? + l.services[id].InSync = true l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id) return nil } @@ -641,12 +754,14 @@ func (l *State) deleteCheck(id types.CheckID) error { var out struct{} err := l.delegate.RPC("Catalog.Deregister", &req, &out) if err == nil || strings.Contains(err.Error(), "Unknown check") { - delete(l.checkStatus, id) - delete(l.checkTokens, id) + // todo(fs): do we need to stop the deferCheck timer here? + delete(l.checks, id) l.logger.Printf("[INFO] agent: Deregistered check '%s'", id) return nil - } else if acl.IsErrPermissionDenied(err) { - l.checkStatus[id] = syncStatus{inSync: true} + } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why is the check in sync here? + l.checks[id].InSync = true l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id) return nil } @@ -655,17 +770,6 @@ func (l *State) deleteCheck(id types.CheckID) error { // syncService is used to sync a service to the server func (l *State) syncService(id string) error { - req := structs.RegisterRequest{ - Datacenter: l.config.Datacenter, - ID: l.config.NodeID, - Node: l.config.NodeName, - Address: l.config.AdvertiseAddr, - TaggedAddresses: l.config.TaggedAddresses, - NodeMeta: l.metadata, - Service: l.services[id], - WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, - } - // If the service has associated checks that are out of sync, // piggyback them on the service sync so they are part of the // same transaction and are registered atomically. We only let @@ -673,12 +777,28 @@ func (l *State) syncService(id string) error { // otherwise we need to register them separately so they don't // pick up privileges from the service token. var checks structs.HealthChecks - for _, check := range l.checks { - if check.ServiceID == id && (l.serviceToken(id) == l.checkToken(check.CheckID)) { - if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync { - checks = append(checks, check) - } + for checkID, c := range l.checks { + if c.Deleted || c.InSync { + continue } + if c.Check.ServiceID != id { + continue + } + if l.serviceToken(id) != l.checkToken(checkID) { + continue + } + checks = append(checks, c.Check) + } + + req := structs.RegisterRequest{ + Datacenter: l.config.Datacenter, + ID: l.config.NodeID, + Node: l.config.NodeName, + Address: l.config.AdvertiseAddr, + TaggedAddresses: l.config.TaggedAddresses, + NodeMeta: l.metadata, + Service: l.services[id].Service, + WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } // Backwards-compatibility for Consul < 0.5 @@ -691,20 +811,24 @@ func (l *State) syncService(id string) error { var out struct{} err := l.delegate.RPC("Catalog.Register", &req, &out) if err == nil { - l.serviceStatus[id] = syncStatus{inSync: true} + l.services[id].InSync = true // Given how the register API works, this info is also updated // every time we sync a service. l.nodeInfoInSync = true + for _, check := range checks { + l.checks[check.CheckID].InSync = true + } l.logger.Printf("[INFO] agent: Synced service '%s'", id) + return nil + } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why are the service and the checks in sync here? + // todo(fs): why is the node info not in sync here? + l.services[id].InSync = true for _, check := range checks { - l.checkStatus[check.CheckID] = syncStatus{inSync: true} + l.checks[check.CheckID].InSync = true } - } else if acl.IsErrPermissionDenied(err) { - l.serviceStatus[id] = syncStatus{inSync: true} l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id) - for _, check := range checks { - l.checkStatus[check.CheckID] = syncStatus{inSync: true} - } return nil } return err @@ -712,14 +836,7 @@ func (l *State) syncService(id string) error { // syncCheck is used to sync a check to the server func (l *State) syncCheck(id types.CheckID) error { - // Pull in the associated service if any - check := l.checks[id] - var service *structs.NodeService - if check.ServiceID != "" { - if serv, ok := l.services[check.ServiceID]; ok { - service = serv - } - } + c := l.checks[id] req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, @@ -728,20 +845,29 @@ func (l *State) syncCheck(id types.CheckID) error { Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, NodeMeta: l.metadata, - Service: service, - Check: l.checks[id], + Check: c.Check, WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, } + + // Pull in the associated service if any + s := l.services[c.Check.ServiceID] + if s != nil && !s.Deleted { + req.Service = s.Service + } + var out struct{} err := l.delegate.RPC("Catalog.Register", &req, &out) if err == nil { - l.checkStatus[id] = syncStatus{inSync: true} + l.checks[id].InSync = true // Given how the register API works, this info is also updated // every time we sync a check. l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced check '%s'", id) - } else if acl.IsErrPermissionDenied(err) { - l.checkStatus[id] = syncStatus{inSync: true} + return nil + } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why is the check in sync here? + l.checks[id].InSync = true l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) return nil } @@ -763,7 +889,10 @@ func (l *State) syncNodeInfo() error { if err == nil { l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced node info") - } else if acl.IsErrPermissionDenied(err) { + return nil + } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why is the node info in sync here? l.nodeInfoInSync = true l.logger.Printf("[WARN] agent: Node info update blocked by ACLs") return nil