Fill out the service manager functionality and fix tests

This commit is contained in:
Kyle Havlovitz 2019-04-22 23:39:02 -07:00
parent 7c25869e67
commit 88e1d8ce03
5 changed files with 267 additions and 101 deletions

View File

@ -289,6 +289,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) {
endpoints: make(map[string]string), endpoints: make(map[string]string),
tokens: new(token.Store), tokens: new(token.Store),
} }
a.serviceManager = NewServiceManager(a)
if err := a.initializeACLs(); err != nil { if err := a.initializeACLs(); err != nil {
return nil, err return nil, err
@ -475,9 +476,6 @@ func (a *Agent) Start() error {
} }
}() }()
// Start the service registration manager.
a.serviceManager = NewServiceManager(a)
// Start watching for critical services to deregister, based on their // Start watching for critical services to deregister, based on their
// checks. // checks.
go a.reapServices() go a.reapServices()
@ -1897,53 +1895,22 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock() a.stateLock.Lock()
defer a.stateLock.Unlock() defer a.stateLock.Unlock()
a.serviceManager.AddService(service, chkTypes, persist, token, source)
return a.addServiceLocked(service, chkTypes, persist, token, source) return a.addServiceLocked(service, chkTypes, persist, token, source)
} }
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
if service.Service == "" { if err := a.validateService(service, chkTypes); err != nil {
return fmt.Errorf("Service name missing") return err
}
if service.ID == "" && service.Service != "" {
service.ID = service.Service
}
for _, check := range chkTypes {
if err := check.Validate(); err != nil {
return fmt.Errorf("Check is not valid: %v", err)
}
} }
// Set default weights if not specified. This is important as it ensures AE if err := a.serviceManager.AddService(service, chkTypes, persist, token, source); err != nil {
// doesn't consider the service different since it has nil weights. return err
if service.Weights == nil {
service.Weights = &structs.Weights{Passing: 1, Warning: 1}
} }
// Warn if the service name is incompatible with DNS return nil
if InvalidDnsRe.MatchString(service.Service) { }
a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+
"via DNS due to invalid characters. Valid characters include "+
"all alpha-numerics and dashes.", service.Service)
} else if len(service.Service) > MaxDNSLabelLength {
a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+
"via DNS due to it being too long. Valid lengths are between "+
"1 and 63 bytes.", service.Service)
}
// Warn if any tags are incompatible with DNS
for _, tag := range service.Tags {
if InvalidDnsRe.MatchString(tag) {
a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+
"via DNS due to invalid characters. Valid characters include "+
"all alpha-numerics and dashes.", tag)
} else if len(tag) > MaxDNSLabelLength {
a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+
"via DNS due to it being too long. Valid lengths are between "+
"1 and 63 bytes.", tag)
}
}
func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
// Pause the service syncs during modification // Pause the service syncs during modification
a.PauseSync() a.PauseSync()
defer a.ResumeSync() defer a.ResumeSync()
@ -2033,6 +2000,54 @@ func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*struc
return nil return nil
} }
// validateService validates an service and its checks, either returning an error or emitting a
// warning based on the nature of the error.
func (a *Agent) validateService(service *structs.NodeService, chkTypes []*structs.CheckType) error {
if service.Service == "" {
return fmt.Errorf("Service name missing")
}
if service.ID == "" && service.Service != "" {
service.ID = service.Service
}
for _, check := range chkTypes {
if err := check.Validate(); err != nil {
return fmt.Errorf("Check is not valid: %v", err)
}
}
// Set default weights if not specified. This is important as it ensures AE
// doesn't consider the service different since it has nil weights.
if service.Weights == nil {
service.Weights = &structs.Weights{Passing: 1, Warning: 1}
}
// Warn if the service name is incompatible with DNS
if InvalidDnsRe.MatchString(service.Service) {
a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+
"via DNS due to invalid characters. Valid characters include "+
"all alpha-numerics and dashes.", service.Service)
} else if len(service.Service) > MaxDNSLabelLength {
a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+
"via DNS due to it being too long. Valid lengths are between "+
"1 and 63 bytes.", service.Service)
}
// Warn if any tags are incompatible with DNS
for _, tag := range service.Tags {
if InvalidDnsRe.MatchString(tag) {
a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+
"via DNS due to invalid characters. Valid characters include "+
"all alpha-numerics and dashes.", tag)
} else if len(tag) > MaxDNSLabelLength {
a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+
"via DNS due to it being too long. Valid lengths are between "+
"1 and 63 bytes.", tag)
}
}
return nil
}
// cleanupRegistration is called on registration error to ensure no there are no // cleanupRegistration is called on registration error to ensure no there are no
// leftovers after a partial failure // leftovers after a partial failure
func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) { func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) {
@ -2061,7 +2076,6 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check
func (a *Agent) RemoveService(serviceID string, persist bool) error { func (a *Agent) RemoveService(serviceID string, persist bool) error {
a.stateLock.Lock() a.stateLock.Lock()
defer a.stateLock.Unlock() defer a.stateLock.Unlock()
a.serviceManager.RemoveService(serviceID)
return a.removeServiceLocked(serviceID, persist) return a.removeServiceLocked(serviceID, persist)
} }
@ -2073,6 +2087,9 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error {
return fmt.Errorf("ServiceID missing") return fmt.Errorf("ServiceID missing")
} }
// Shut down the config watch in the service manager.
a.serviceManager.RemoveService(serviceID)
checks := a.State.Checks() checks := a.State.Checks()
var checkIDs []types.CheckID var checkIDs []types.CheckID
for id, check := range checks { for id, check := range checks {
@ -3677,6 +3694,15 @@ func (a *Agent) registerCache() {
RefreshTimer: 0 * time.Second, RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute, RefreshTimeout: 10 * time.Minute,
}) })
a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
} }
// defaultProxyCommand returns the default Connect managed proxy command. // defaultProxyCommand returns the default Connect managed proxy command.

View File

@ -191,18 +191,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
if err != nil { if err != nil {
return err return err
} }
serviceConf, ok := serviceEntry.(*structs.ServiceConfigEntry) var serviceConf *structs.ServiceConfigEntry
var ok bool
if serviceEntry != nil {
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok { if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry) return fmt.Errorf("invalid service config type %T", serviceEntry)
} }
}
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal) _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal)
if err != nil { if err != nil {
return err return err
} }
proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry) var proxyConf *structs.ProxyConfigEntry
if proxyEntry != nil {
proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry)
if !ok { if !ok {
return fmt.Errorf("invalid proxy config type %T", serviceEntry) return fmt.Errorf("invalid proxy config type %T", proxyEntry)
}
} }
// Resolve the service definition by overlaying the service config onto the global // Resolve the service definition by overlaying the service config onto the global

View File

@ -24,7 +24,7 @@ func NewServiceManager(agent *Agent) *ServiceManager {
} }
} }
func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) { func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -40,17 +40,45 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st
// start a new config watcher. // start a new config watcher.
watch, ok := s.services[service.ID] watch, ok := s.services[service.ID]
if ok { if ok {
watch.updateRegistration(&reg) s.agent.logger.Printf("[DEBUG] agent: updating local registration for service %q", service.ID)
if err := watch.updateRegistration(&reg); err != nil {
return err
}
} else { } else {
// This is a new entry, so get the existing global config and do the initial
// registration with the merged config.
args := structs.ServiceConfigRequest{
Name: service.Service,
Datacenter: s.agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken},
}
if token != "" {
args.QueryOptions.Token = token
}
var resp structs.ServiceConfigResponse
if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &args, &resp); err != nil {
s.agent.logger.Printf("[WARN] agent: could not retrieve central configuration for service %q: %v",
service.Service, err)
}
watch := &serviceConfigWatch{ watch := &serviceConfigWatch{
registration: &reg,
updateCh: make(chan cache.UpdateEvent, 1), updateCh: make(chan cache.UpdateEvent, 1),
agent: s.agent, agent: s.agent,
config: &resp.Definition,
}
// Force an update/register immediately.
if err := watch.updateRegistration(&reg); err != nil {
return err
} }
s.services[service.ID] = watch s.services[service.ID] = watch
watch.Start() if err := watch.Start(); err != nil {
return err
} }
}
return nil
} }
func (s *ServiceManager) RemoveService(serviceID string) { func (s *ServiceManager) RemoveService(serviceID string) {
@ -84,7 +112,7 @@ type serviceConfigWatch struct {
ctx context.Context ctx context.Context
cancelFunc func() cancelFunc func()
sync.RWMutex sync.Mutex
} }
func (s *serviceConfigWatch) Start() error { func (s *serviceConfigWatch) Start() error {
@ -103,81 +131,80 @@ func (s *serviceConfigWatch) runWatch() {
case <-s.ctx.Done(): case <-s.ctx.Done():
return return
case event := <-s.updateCh: case event := <-s.updateCh:
s.handleUpdate(event) if err := s.handleUpdate(event, false); err != nil {
s.agent.logger.Printf("[ERR] agent: error handling service update: %v", err)
continue
}
} }
} }
} }
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) { func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error {
s.Lock()
defer s.Unlock()
if event.Err != nil {
return fmt.Errorf("error watching service config: %v", event.Err)
}
switch event.Result.(type) { switch event.Result.(type) {
case serviceRegistration: case *serviceRegistration:
s.Lock()
s.registration = event.Result.(*serviceRegistration) s.registration = event.Result.(*serviceRegistration)
s.Unlock() case *structs.ServiceConfigResponse:
case structs.ServiceConfigResponse: resp := event.Result.(*structs.ServiceConfigResponse)
s.Lock() s.config = &resp.Definition
s.config = &event.Result.(*structs.ServiceConfigResponse).Definition
s.Unlock()
default: default:
s.agent.logger.Printf("[ERR] unknown update event type: %T", event) return fmt.Errorf("unknown update event type: %T", event)
} }
service := s.mergeServiceConfig() service := s.mergeServiceConfig()
s.agent.logger.Printf("[INFO] updating service registration: %v, %v", service.ID, service.Meta)
/*err := s.agent.AddService(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) if !locked {
s.agent.stateLock.Lock()
defer s.agent.stateLock.Unlock()
}
err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source)
if err != nil { if err != nil {
s.agent.logger.Printf("[ERR] error updating service registration: %v", err) return fmt.Errorf("error updating service registration: %v", err)
}*/ }
return nil
} }
func (s *serviceConfigWatch) startConfigWatch() error { func (s *serviceConfigWatch) startConfigWatch() error {
s.RLock()
name := s.registration.service.Service name := s.registration.service.Service
s.RUnlock()
req := &structs.ServiceConfigRequest{ req := &structs.ServiceConfigRequest{
Name: name, Name: name,
Datacenter: s.agent.config.Datacenter, Datacenter: s.agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken},
}
if s.registration.token != "" {
req.QueryOptions.Token = s.registration.token
} }
err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh) err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh)
return err return err
} }
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) { func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error {
s.updateCh <- cache.UpdateEvent{ return s.handleUpdate(cache.UpdateEvent{
Result: registration, Result: registration,
} }, true)
} }
func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService {
return nil if s.config == nil {
return s.registration.service
}
svc := s.config.NodeService()
svc.Merge(s.registration.service)
return svc
} }
func (s *serviceConfigWatch) Stop() { func (s *serviceConfigWatch) Stop() {
s.cancelFunc() s.cancelFunc()
} }
/*
// Construct the service config request. This will be re-used with an updated
// index to watch for changes in the effective service config.
req := structs.ServiceConfigRequest{
Name: s.registration.service.Service,
Datacenter: s.agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.agent.tokens.AgentToken()},
}
consul.RetryLoopBackoff(s.shutdownCh, func() error {
var reply structs.ServiceConfigResponse
if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &req, &reply); err != nil {
return err
}
s.updateConfig(&reply.Definition)
req.QueryOptions.MinQueryIndex = reply.QueryMeta.Index
return nil
}, func(err error) {
s.agent.logger.Printf("[ERR] Error getting service config: %v", err)
})
*/

View File

@ -0,0 +1,55 @@
package agent
import (
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)
func TestServiceManager_RegisterService(t *testing.T) {
require := require.New(t)
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register some global proxy config
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ProxyConfigEntry{
Config: map[string]interface{}{
"foo": 1,
},
},
}
var out struct{}
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
// Now register a service locally and make sure the resulting State entry
// has the global config in it.
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Port: 8000,
}
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
mergedService := a.State.Service("redis")
require.NotNil(mergedService)
require.Equal(&structs.NodeService{
ID: "redis",
Service: "redis",
Port: 8000,
Proxy: structs.ConnectProxyConfig{
Config: map[string]interface{}{
"foo": int64(1),
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}, mergedService)
}

View File

@ -765,6 +765,57 @@ type ServiceConnect struct {
SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"` SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"`
} }
// Merge overlays the given node's attributes onto the existing node.
func (s *NodeService) Merge(other *NodeService) {
if other.Kind != "" {
s.Kind = other.Kind
}
if other.ID != "" {
s.ID = other.ID
}
if other.Service != "" {
s.Service = other.Service
}
for _, tag := range other.Tags {
s.Tags = append(s.Tags, tag)
}
if other.Address != "" {
s.Address = other.Address
}
if s.Meta == nil {
s.Meta = other.Meta
} else {
for k, v := range other.Meta {
s.Meta[k] = v
}
}
if other.Port != 0 {
s.Port = other.Port
}
if other.Weights != nil {
s.Weights = other.Weights
}
s.EnableTagOverride = other.EnableTagOverride
if other.ProxyDestination != "" {
s.ProxyDestination = other.ProxyDestination
}
// Take the incoming service's proxy fields and merge the config map.
proxyConf := s.Proxy.Config
s.Proxy = other.Proxy
if proxyConf == nil {
proxyConf = other.Proxy.Config
} else {
for k, v := range other.Proxy.Config {
proxyConf[k] = v
}
}
s.Proxy.Config = proxyConf
s.Connect = other.Connect
s.LocallyRegisteredAsSidecar = other.LocallyRegisteredAsSidecar
}
// Validate validates the node service configuration. // Validate validates the node service configuration.
// //
// NOTE(mitchellh): This currently only validates fields for a ConnectProxy. // NOTE(mitchellh): This currently only validates fields for a ConnectProxy.