From 88e1d8ce031dee2c1155ec5d84f18b0490accefc Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 22 Apr 2019 23:39:02 -0700 Subject: [PATCH] Fill out the service manager functionality and fix tests --- agent/agent.go | 110 ++++++++++++++++---------- agent/consul/config_endpoint.go | 19 +++-- agent/service_manager.go | 133 +++++++++++++++++++------------- agent/service_manager_test.go | 55 +++++++++++++ agent/structs/structs.go | 51 ++++++++++++ 5 files changed, 267 insertions(+), 101 deletions(-) create mode 100644 agent/service_manager_test.go diff --git a/agent/agent.go b/agent/agent.go index 771f8a4119..69a6ffd82f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -289,6 +289,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) { endpoints: make(map[string]string), tokens: new(token.Store), } + a.serviceManager = NewServiceManager(a) if err := a.initializeACLs(); err != nil { return nil, err @@ -475,9 +476,6 @@ func (a *Agent) Start() error { } }() - // Start the service registration manager. - a.serviceManager = NewServiceManager(a) - // Start watching for critical services to deregister, based on their // checks. go a.reapServices() @@ -1897,53 +1895,22 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error { func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { a.stateLock.Lock() defer a.stateLock.Unlock() - a.serviceManager.AddService(service, chkTypes, persist, token, source) return a.addServiceLocked(service, chkTypes, persist, token, source) } func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { - if service.Service == "" { - return fmt.Errorf("Service name missing") - } - if service.ID == "" && service.Service != "" { - service.ID = service.Service - } - for _, check := range chkTypes { - if err := check.Validate(); err != nil { - return fmt.Errorf("Check is not valid: %v", err) - } + if err := a.validateService(service, chkTypes); err != nil { + return err } - // Set default weights if not specified. This is important as it ensures AE - // doesn't consider the service different since it has nil weights. - if service.Weights == nil { - service.Weights = &structs.Weights{Passing: 1, Warning: 1} + if err := a.serviceManager.AddService(service, chkTypes, persist, token, source); err != nil { + return err } - // Warn if the service name is incompatible with DNS - if InvalidDnsRe.MatchString(service.Service) { - a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+ - "via DNS due to invalid characters. Valid characters include "+ - "all alpha-numerics and dashes.", service.Service) - } else if len(service.Service) > MaxDNSLabelLength { - a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+ - "via DNS due to it being too long. Valid lengths are between "+ - "1 and 63 bytes.", service.Service) - } - - // Warn if any tags are incompatible with DNS - for _, tag := range service.Tags { - if InvalidDnsRe.MatchString(tag) { - a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+ - "via DNS due to invalid characters. Valid characters include "+ - "all alpha-numerics and dashes.", tag) - } else if len(tag) > MaxDNSLabelLength { - a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+ - "via DNS due to it being too long. Valid lengths are between "+ - "1 and 63 bytes.", tag) - } - } + return nil +} +func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { // Pause the service syncs during modification a.PauseSync() defer a.ResumeSync() @@ -2033,6 +2000,54 @@ func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*struc return nil } +// validateService validates an service and its checks, either returning an error or emitting a +// warning based on the nature of the error. +func (a *Agent) validateService(service *structs.NodeService, chkTypes []*structs.CheckType) error { + if service.Service == "" { + return fmt.Errorf("Service name missing") + } + if service.ID == "" && service.Service != "" { + service.ID = service.Service + } + for _, check := range chkTypes { + if err := check.Validate(); err != nil { + return fmt.Errorf("Check is not valid: %v", err) + } + } + + // Set default weights if not specified. This is important as it ensures AE + // doesn't consider the service different since it has nil weights. + if service.Weights == nil { + service.Weights = &structs.Weights{Passing: 1, Warning: 1} + } + + // Warn if the service name is incompatible with DNS + if InvalidDnsRe.MatchString(service.Service) { + a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+ + "via DNS due to invalid characters. Valid characters include "+ + "all alpha-numerics and dashes.", service.Service) + } else if len(service.Service) > MaxDNSLabelLength { + a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+ + "via DNS due to it being too long. Valid lengths are between "+ + "1 and 63 bytes.", service.Service) + } + + // Warn if any tags are incompatible with DNS + for _, tag := range service.Tags { + if InvalidDnsRe.MatchString(tag) { + a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+ + "via DNS due to invalid characters. Valid characters include "+ + "all alpha-numerics and dashes.", tag) + } else if len(tag) > MaxDNSLabelLength { + a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+ + "via DNS due to it being too long. Valid lengths are between "+ + "1 and 63 bytes.", tag) + } + } + + return nil +} + // cleanupRegistration is called on registration error to ensure no there are no // leftovers after a partial failure func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) { @@ -2061,7 +2076,6 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check func (a *Agent) RemoveService(serviceID string, persist bool) error { a.stateLock.Lock() defer a.stateLock.Unlock() - a.serviceManager.RemoveService(serviceID) return a.removeServiceLocked(serviceID, persist) } @@ -2073,6 +2087,9 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error { return fmt.Errorf("ServiceID missing") } + // Shut down the config watch in the service manager. + a.serviceManager.RemoveService(serviceID) + checks := a.State.Checks() var checkIDs []types.CheckID for id, check := range checks { @@ -3677,6 +3694,15 @@ func (a *Agent) registerCache() { RefreshTimer: 0 * time.Second, RefreshTimeout: 10 * time.Minute, }) + + a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{ + RPC: a, + }, &cache.RegisterOptions{ + // Maintain a blocking query, retry dropped connections quickly + Refresh: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + }) } // defaultProxyCommand returns the default Connect managed proxy command. diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 566a932f2d..720e0df034 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -191,18 +191,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r if err != nil { return err } - serviceConf, ok := serviceEntry.(*structs.ServiceConfigEntry) - if !ok { - return fmt.Errorf("invalid service config type %T", serviceEntry) + var serviceConf *structs.ServiceConfigEntry + var ok bool + if serviceEntry != nil { + serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry) + if !ok { + return fmt.Errorf("invalid service config type %T", serviceEntry) + } } _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal) if err != nil { return err } - proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry) - if !ok { - return fmt.Errorf("invalid proxy config type %T", serviceEntry) + var proxyConf *structs.ProxyConfigEntry + if proxyEntry != nil { + proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry) + if !ok { + return fmt.Errorf("invalid proxy config type %T", proxyEntry) + } } // Resolve the service definition by overlaying the service config onto the global diff --git a/agent/service_manager.go b/agent/service_manager.go index ea131dd105..f86adf66c1 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -24,7 +24,7 @@ func NewServiceManager(agent *Agent) *ServiceManager { } } -func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) { +func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { s.Lock() defer s.Unlock() @@ -40,17 +40,45 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // start a new config watcher. watch, ok := s.services[service.ID] if ok { - watch.updateRegistration(®) + s.agent.logger.Printf("[DEBUG] agent: updating local registration for service %q", service.ID) + if err := watch.updateRegistration(®); err != nil { + return err + } } else { + // This is a new entry, so get the existing global config and do the initial + // registration with the merged config. + args := structs.ServiceConfigRequest{ + Name: service.Service, + Datacenter: s.agent.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, + } + if token != "" { + args.QueryOptions.Token = token + } + var resp structs.ServiceConfigResponse + if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &args, &resp); err != nil { + s.agent.logger.Printf("[WARN] agent: could not retrieve central configuration for service %q: %v", + service.Service, err) + } + watch := &serviceConfigWatch{ - registration: ®, - updateCh: make(chan cache.UpdateEvent, 1), - agent: s.agent, + updateCh: make(chan cache.UpdateEvent, 1), + agent: s.agent, + config: &resp.Definition, + } + + // Force an update/register immediately. + if err := watch.updateRegistration(®); err != nil { + return err } s.services[service.ID] = watch - watch.Start() + if err := watch.Start(); err != nil { + return err + } } + + return nil } func (s *ServiceManager) RemoveService(serviceID string) { @@ -84,7 +112,7 @@ type serviceConfigWatch struct { ctx context.Context cancelFunc func() - sync.RWMutex + sync.Mutex } func (s *serviceConfigWatch) Start() error { @@ -103,81 +131,80 @@ func (s *serviceConfigWatch) runWatch() { case <-s.ctx.Done(): return case event := <-s.updateCh: - s.handleUpdate(event) + if err := s.handleUpdate(event, false); err != nil { + s.agent.logger.Printf("[ERR] agent: error handling service update: %v", err) + continue + } } } } -func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) { +func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { + s.Lock() + defer s.Unlock() + + if event.Err != nil { + return fmt.Errorf("error watching service config: %v", event.Err) + } + switch event.Result.(type) { - case serviceRegistration: - s.Lock() + case *serviceRegistration: s.registration = event.Result.(*serviceRegistration) - s.Unlock() - case structs.ServiceConfigResponse: - s.Lock() - s.config = &event.Result.(*structs.ServiceConfigResponse).Definition - s.Unlock() + case *structs.ServiceConfigResponse: + resp := event.Result.(*structs.ServiceConfigResponse) + s.config = &resp.Definition default: - s.agent.logger.Printf("[ERR] unknown update event type: %T", event) + return fmt.Errorf("unknown update event type: %T", event) } service := s.mergeServiceConfig() - s.agent.logger.Printf("[INFO] updating service registration: %v, %v", service.ID, service.Meta) - /*err := s.agent.AddService(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) + + if !locked { + s.agent.stateLock.Lock() + defer s.agent.stateLock.Unlock() + } + + err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) if err != nil { - s.agent.logger.Printf("[ERR] error updating service registration: %v", err) - }*/ + return fmt.Errorf("error updating service registration: %v", err) + } + + return nil } func (s *serviceConfigWatch) startConfigWatch() error { - s.RLock() name := s.registration.service.Service - s.RUnlock() req := &structs.ServiceConfigRequest{ - Name: name, - Datacenter: s.agent.config.Datacenter, + Name: name, + Datacenter: s.agent.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, + } + if s.registration.token != "" { + req.QueryOptions.Token = s.registration.token } err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh) return err } -func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) { - s.updateCh <- cache.UpdateEvent{ +func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { + return s.handleUpdate(cache.UpdateEvent{ Result: registration, - } + }, true) } func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { - return nil + if s.config == nil { + return s.registration.service + } + + svc := s.config.NodeService() + svc.Merge(s.registration.service) + + return svc } func (s *serviceConfigWatch) Stop() { s.cancelFunc() } - -/* -// Construct the service config request. This will be re-used with an updated - // index to watch for changes in the effective service config. - req := structs.ServiceConfigRequest{ - Name: s.registration.service.Service, - Datacenter: s.agent.config.Datacenter, - QueryOptions: structs.QueryOptions{Token: s.agent.tokens.AgentToken()}, - } - - consul.RetryLoopBackoff(s.shutdownCh, func() error { - var reply structs.ServiceConfigResponse - if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &req, &reply); err != nil { - return err - } - - s.updateConfig(&reply.Definition) - - req.QueryOptions.MinQueryIndex = reply.QueryMeta.Index - return nil - }, func(err error) { - s.agent.logger.Printf("[ERR] Error getting service config: %v", err) - }) -*/ diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go new file mode 100644 index 0000000000..d7487defa8 --- /dev/null +++ b/agent/service_manager_test.go @@ -0,0 +1,55 @@ +package agent + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/require" +) + +func TestServiceManager_RegisterService(t *testing.T) { + require := require.New(t) + + a := NewTestAgent(t, t.Name(), "") + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register some global proxy config + args := &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, + }, + }, + } + var out struct{} + require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) + + // Now register a service locally and make sure the resulting State entry + // has the global config in it. + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + } + require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal)) + mergedService := a.State.Service("redis") + require.NotNil(mergedService) + require.Equal(&structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + Proxy: structs.ConnectProxyConfig{ + Config: map[string]interface{}{ + "foo": int64(1), + }, + }, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + }, mergedService) +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 9e7b6acf17..06988679b5 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -765,6 +765,57 @@ type ServiceConnect struct { SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"` } +// Merge overlays the given node's attributes onto the existing node. +func (s *NodeService) Merge(other *NodeService) { + if other.Kind != "" { + s.Kind = other.Kind + } + if other.ID != "" { + s.ID = other.ID + } + if other.Service != "" { + s.Service = other.Service + } + for _, tag := range other.Tags { + s.Tags = append(s.Tags, tag) + } + if other.Address != "" { + s.Address = other.Address + } + if s.Meta == nil { + s.Meta = other.Meta + } else { + for k, v := range other.Meta { + s.Meta[k] = v + } + } + if other.Port != 0 { + s.Port = other.Port + } + if other.Weights != nil { + s.Weights = other.Weights + } + s.EnableTagOverride = other.EnableTagOverride + if other.ProxyDestination != "" { + s.ProxyDestination = other.ProxyDestination + } + + // Take the incoming service's proxy fields and merge the config map. + proxyConf := s.Proxy.Config + s.Proxy = other.Proxy + if proxyConf == nil { + proxyConf = other.Proxy.Config + } else { + for k, v := range other.Proxy.Config { + proxyConf[k] = v + } + } + s.Proxy.Config = proxyConf + + s.Connect = other.Connect + s.LocallyRegisteredAsSidecar = other.LocallyRegisteredAsSidecar +} + // Validate validates the node service configuration. // // NOTE(mitchellh): This currently only validates fields for a ConnectProxy.