agent: tolerate more failure scenarios during service registration with central config enabled (#6472)

Also:

* Finished threading replaceExistingChecks setting (from GH-4905)
  through service manager.

* Respected the original configSource value that was used to register a
  service or a check when restoring persisted data.

* Run several existing tests with and without central config enabled
  (not exhaustive yet).

* Switch to ioutil.ReadFile for all types of agent persistence.
This commit is contained in:
R.B. Boyer 2019-09-24 10:04:48 -05:00 committed by GitHub
parent 100ebd63f9
commit 5882e21b2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1691 additions and 460 deletions

View File

@ -54,6 +54,7 @@ import (
const (
// Path to save agent service definitions
servicesDir = "services"
serviceConfigDir = "services/configs"
// Path to save agent proxy definitions
proxyDir = "proxies"
@ -85,6 +86,28 @@ const (
ConfigSourceRemote
)
var configSourceToName = map[configSource]string{
ConfigSourceLocal: "local",
ConfigSourceRemote: "remote",
}
var configSourceFromName = map[string]configSource{
"local": ConfigSourceLocal,
"remote": ConfigSourceRemote,
// If the value is not found in the persisted config file, then use the
// former default.
"": ConfigSourceLocal,
}
func (s configSource) String() string {
return configSourceToName[s]
}
// ConfigSourceFromName will unmarshal the string form of a configSource.
func ConfigSourceFromName(name string) (configSource, bool) {
s, ok := configSourceFromName[name]
return s, ok
}
// delegate defines the interface shared by both
// consul.Client and consul.Server.
type delegate interface {
@ -419,6 +442,8 @@ func (a *Agent) Start() error {
a.logger.Printf("[INFO] AutoEncrypt: upgraded to TLS")
}
a.serviceManager.Start()
// Load checks/services/metadata.
if err := a.loadServices(c); err != nil {
return err
@ -1582,6 +1607,11 @@ func (a *Agent) ShutdownAgent() error {
}
a.logger.Println("[INFO] agent: Requesting shutdown")
// Stop the service manager (must happen before we take the stateLock to avoid deadlock)
if a.serviceManager != nil {
a.serviceManager.Stop()
}
// Stop all the checks
a.stateLock.Lock()
defer a.stateLock.Unlock()
@ -1898,7 +1928,7 @@ func (a *Agent) reapServicesInternal() {
// this is so that we won't try to remove it again.
if timeout > 0 && cs.CriticalFor() > timeout {
reaped[serviceID] = true
if err := a.RemoveService(serviceID, true); err != nil {
if err := a.RemoveService(serviceID); err != nil {
a.logger.Printf("[ERR] agent: unable to deregister service %q after check %q has been critical for too long: %s",
serviceID, checkID, err)
} else {
@ -1929,15 +1959,17 @@ func (a *Agent) reapServices() {
type persistedService struct {
Token string
Service *structs.NodeService
Source string
}
// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
func (a *Agent) persistService(service *structs.NodeService, source configSource) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
wrapped := persistedService{
Token: a.State.ServiceToken(service.ID),
Service: service,
Source: source.String(),
}
encoded, err := json.Marshal(wrapped)
if err != nil {
@ -1957,7 +1989,7 @@ func (a *Agent) purgeService(serviceID string) error {
}
// persistCheck saves a check definition to the local agent's state directory
func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckType) error {
func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckType, source configSource) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, checkIDHash(check.CheckID))
// Create the persisted check
@ -1965,6 +1997,7 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckT
Check: check,
ChkType: chkType,
Token: a.State.CheckToken(check.CheckID),
Source: source.String(),
}
encoded, err := json.Marshal(wrapped)
@ -1984,13 +2017,105 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
return nil
}
// persistedServiceConfig is used to serialize the resolved service config that
// feeds into the ServiceManager at registration time so that it may be
// restored later on.
type persistedServiceConfig struct {
ServiceID string
Defaults *structs.ServiceConfigResponse
}
func (a *Agent) persistServiceConfig(serviceID string, defaults *structs.ServiceConfigResponse) error {
// Create the persisted config.
wrapped := persistedServiceConfig{
ServiceID: serviceID,
Defaults: defaults,
}
encoded, err := json.Marshal(wrapped)
if err != nil {
return err
}
dir := filepath.Join(a.config.DataDir, serviceConfigDir)
configPath := filepath.Join(dir, stringHash(serviceID))
// Create the config dir if it doesn't exist
if err := os.MkdirAll(dir, 0700); err != nil {
return fmt.Errorf("failed creating service configs dir %q: %s", dir, err)
}
return file.WriteAtomic(configPath, encoded)
}
func (a *Agent) purgeServiceConfig(serviceID string) error {
configPath := filepath.Join(a.config.DataDir, serviceConfigDir, stringHash(serviceID))
if _, err := os.Stat(configPath); err == nil {
return os.Remove(configPath)
}
return nil
}
func (a *Agent) readPersistedServiceConfigs() (map[string]*structs.ServiceConfigResponse, error) {
out := make(map[string]*structs.ServiceConfigResponse)
configDir := filepath.Join(a.config.DataDir, serviceConfigDir)
files, err := ioutil.ReadDir(configDir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("Failed reading service configs dir %q: %s", configDir, err)
}
for _, fi := range files {
// Skip all dirs
if fi.IsDir() {
continue
}
// Skip all partially written temporary files
if strings.HasSuffix(fi.Name(), "tmp") {
a.logger.Printf("[WARN] agent: Ignoring temporary service config file %v", fi.Name())
continue
}
// Read the contents into a buffer
file := filepath.Join(configDir, fi.Name())
buf, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("failed reading service config file %q: %s", file, err)
}
// Try decoding the service config definition
var p persistedServiceConfig
if err := json.Unmarshal(buf, &p); err != nil {
a.logger.Printf("[ERR] agent: Failed decoding service config file %q: %s", file, err)
continue
}
out[p.ServiceID] = p.Defaults
}
return out, nil
}
// AddServiceAndReplaceChecks is used to add a service entry and its check. Any check for this service missing from chkTypes will be deleted.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addServiceLocked(service, chkTypes, persist, token, true, source)
return a.addServiceLocked(&addServiceRequest{
service: service,
chkTypes: chkTypes,
previousDefaults: nil,
waitForCentralConfig: true,
persist: persist,
persistServiceConfig: true,
token: token,
replaceExistingChecks: true,
source: source,
})
}
// AddService is used to add a service entry.
@ -1999,25 +2124,89 @@ func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkType
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addServiceLocked(service, chkTypes, persist, token, false, source)
return a.addServiceLocked(&addServiceRequest{
service: service,
chkTypes: chkTypes,
previousDefaults: nil,
waitForCentralConfig: true,
persist: persist,
persistServiceConfig: true,
token: token,
replaceExistingChecks: false,
source: source,
})
}
// addServiceLocked adds a service entry to the service manager if enabled, or directly
// to the local state if it is not. This function assumes the state lock is already held.
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error {
if err := a.validateService(service, chkTypes); err != nil {
func (a *Agent) addServiceLocked(req *addServiceRequest) error {
req.fixupForAddServiceLocked()
if err := a.validateService(req.service, req.chkTypes); err != nil {
return err
}
if a.config.EnableCentralServiceConfig {
return a.serviceManager.AddService(service, chkTypes, persist, token, source)
return a.serviceManager.AddService(req)
}
return a.addServiceInternal(service, chkTypes, persist, token, replaceExistingChecks, source)
// previousDefaults are ignored here because they are only relevant for central config.
req.persistService = nil
req.persistDefaults = nil
req.persistServiceConfig = false
return a.addServiceInternal(req)
}
// addServiceRequest is the union of arguments for calling both
// addServiceLocked and addServiceInternal. The overlap was significant enough
// to warrant merging them and indicating which fields are meant to be set only
// in one of the two contexts.
//
// Before using the request struct one of the fixupFor*() methods should be
// invoked to clear irrelevant fields.
//
// The ServiceManager.AddService signature is largely just a passthrough for
// addServiceLocked and should be treated as such.
type addServiceRequest struct {
service *structs.NodeService
chkTypes []*structs.CheckType
previousDefaults *structs.ServiceConfigResponse // just for: addServiceLocked
waitForCentralConfig bool // just for: addServiceLocked
persistService *structs.NodeService // just for: addServiceInternal
persistDefaults *structs.ServiceConfigResponse // just for: addServiceInternal
persist bool
persistServiceConfig bool
token string
replaceExistingChecks bool
source configSource
}
func (r *addServiceRequest) fixupForAddServiceLocked() {
r.persistService = nil
r.persistDefaults = nil
}
func (r *addServiceRequest) fixupForAddServiceInternal() {
r.previousDefaults = nil
r.waitForCentralConfig = false
}
// addServiceInternal adds the given service and checks to the local state.
func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error {
func (a *Agent) addServiceInternal(req *addServiceRequest) error {
req.fixupForAddServiceInternal()
var (
service = req.service
chkTypes = req.chkTypes
persistService = req.persistService
persistDefaults = req.persistDefaults
persist = req.persist
persistServiceConfig = req.persistServiceConfig
token = req.token
replaceExistingChecks = req.replaceExistingChecks
source = req.source
)
// Pause the service syncs during modification
a.PauseSync()
defer a.ResumeSync()
@ -2104,7 +2293,7 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str
}
if persist && a.config.DataDir != "" {
if err := a.persistCheck(checks[i], chkTypes[i]); err != nil {
if err := a.persistCheck(checks[i], chkTypes[i], source); err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
@ -2112,9 +2301,27 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str
}
}
if persistServiceConfig && a.config.DataDir != "" {
var err error
if persistDefaults != nil {
err = a.persistServiceConfig(service.ID, persistDefaults)
} else {
err = a.purgeServiceConfig(service.ID)
}
if err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
}
// Persist the service to a file
if persist && a.config.DataDir != "" {
if err := a.persistService(service); err != nil {
if persistService == nil {
persistService = service
}
if err := a.persistService(persistService, source); err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
@ -2189,6 +2396,9 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check
if err := a.purgeService(s); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge service %s file: %s", s, err)
}
if err := a.purgeServiceConfig(s); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge service config %s file: %s", s, err)
}
}
for _, c := range checksIDs {
@ -2204,7 +2414,11 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check
// RemoveService is used to remove a service entry.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveService(serviceID string, persist bool) error {
func (a *Agent) RemoveService(serviceID string) error {
return a.removeService(serviceID, true)
}
func (a *Agent) removeService(serviceID string, persist bool) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.removeServiceLocked(serviceID, persist)
@ -2243,6 +2457,9 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error {
if err := a.purgeService(serviceID); err != nil {
return err
}
if err := a.purgeServiceConfig(serviceID); err != nil {
return err
}
}
// Deregister any associated health checks
@ -2315,7 +2532,7 @@ func (a *Agent) addCheckLocked(check *structs.HealthCheck, chkType *structs.Chec
// Persist the check
if persist && a.config.DataDir != "" {
return a.persistCheck(check, chkType)
return a.persistCheck(check, chkType, source)
}
return nil
@ -2853,6 +3070,13 @@ func (a *Agent) deletePid() error {
// loadServices will load service definitions from configuration and persisted
// definitions on disk, and load them into the local agent.
func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// Load any persisted service configs so we can feed those into the initial
// registrations below.
persistedServiceConfigs, err := a.readPersistedServiceConfigs()
if err != nil {
return err
}
// Register the services from config
for _, service := range conf.Services {
ns := service.NodeService()
@ -2871,13 +3095,37 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// syntax sugar and shouldn't be persisted in local or server state.
ns.Connect.SidecarService = nil
if err := a.addServiceLocked(ns, chkTypes, false, service.Token, false, ConfigSourceLocal); err != nil {
serviceID := defaultIfEmpty(ns.ID, ns.Service)
err = a.addServiceLocked(&addServiceRequest{
service: ns,
chkTypes: chkTypes,
previousDefaults: persistedServiceConfigs[serviceID],
waitForCentralConfig: false, // exclusively use cached values
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: service.Token,
replaceExistingChecks: false, // do default behavior
source: ConfigSourceLocal,
})
if err != nil {
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
}
// If there is a sidecar service, register that too.
if sidecar != nil {
if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, false, ConfigSourceLocal); err != nil {
sidecarServiceID := defaultIfEmpty(sidecar.ID, sidecar.Service)
err = a.addServiceLocked(&addServiceRequest{
service: sidecar,
chkTypes: sidecarChecks,
previousDefaults: persistedServiceConfigs[sidecarServiceID],
waitForCentralConfig: false, // exclusively use cached values
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: sidecarToken,
replaceExistingChecks: false, // do default behavior
source: ConfigSourceLocal,
})
if err != nil {
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
}
}
@ -2904,16 +3152,9 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
continue
}
// Open the file for reading
file := filepath.Join(svcDir, fi.Name())
fh, err := os.Open(file)
if err != nil {
return fmt.Errorf("failed opening service file %q: %s", file, err)
}
// Read the contents into a buffer
buf, err := ioutil.ReadAll(fh)
fh.Close()
file := filepath.Join(svcDir, fi.Name())
buf, err := ioutil.ReadFile(file)
if err != nil {
return fmt.Errorf("failed reading service file %q: %s", file, err)
}
@ -2929,6 +3170,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
}
serviceID := p.Service.ID
source, ok := ConfigSourceFromName(p.Source)
if !ok {
a.logger.Printf("[WARN] agent: service %q exists with invalid source %q, purging", serviceID, p.Source)
if err := a.purgeService(serviceID); err != nil {
return fmt.Errorf("failed purging service %q: %s", serviceID, err)
}
if err := a.purgeServiceConfig(serviceID); err != nil {
return fmt.Errorf("failed purging service config %q: %s", serviceID, err)
}
continue
}
if a.State.Service(serviceID) != nil {
// Purge previously persisted service. This allows config to be
// preferred over services persisted from the API.
@ -2937,15 +3190,39 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
if err := a.purgeService(serviceID); err != nil {
return fmt.Errorf("failed purging service %q: %s", serviceID, err)
}
if err := a.purgeServiceConfig(serviceID); err != nil {
return fmt.Errorf("failed purging service config %q: %s", serviceID, err)
}
} else {
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
serviceID, file)
if err := a.addServiceLocked(p.Service, nil, false, p.Token, false, ConfigSourceLocal); err != nil {
err = a.addServiceLocked(&addServiceRequest{
service: p.Service,
chkTypes: nil,
previousDefaults: persistedServiceConfigs[serviceID],
waitForCentralConfig: false, // exclusively use cached values
persist: false, // don't rewrite the file with the same data we just read
persistServiceConfig: false, // don't rewrite the file with the same data we just read
token: p.Token,
replaceExistingChecks: false, // do default behavior
source: source,
},
)
if err != nil {
return fmt.Errorf("failed adding service %q: %s", serviceID, err)
}
}
}
for serviceID, _ := range persistedServiceConfigs {
if a.State.Service(serviceID) == nil {
// This can be cleaned up now.
if err := a.purgeServiceConfig(serviceID); err != nil {
return fmt.Errorf("failed purging service config %q: %s", serviceID, err)
}
}
}
return nil
}
@ -2993,16 +3270,9 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[types.CheckID]*s
continue
}
// Open the file for reading
file := filepath.Join(checkDir, fi.Name())
fh, err := os.Open(file)
if err != nil {
return fmt.Errorf("Failed opening check file %q: %s", file, err)
}
// Read the contents into a buffer
buf, err := ioutil.ReadAll(fh)
fh.Close()
file := filepath.Join(checkDir, fi.Name())
buf, err := ioutil.ReadFile(file)
if err != nil {
return fmt.Errorf("failed reading check file %q: %s", file, err)
}
@ -3015,6 +3285,15 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[types.CheckID]*s
}
checkID := p.Check.CheckID
source, ok := ConfigSourceFromName(p.Source)
if !ok {
a.logger.Printf("[WARN] agent: check %q exists with invalid source %q, purging", checkID, p.Source)
if err := a.purgeCheck(checkID); err != nil {
return fmt.Errorf("failed purging check %q: %s", checkID, err)
}
continue
}
if a.State.Check(checkID) != nil {
// Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API.
@ -3034,7 +3313,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[types.CheckID]*s
p.Check.Status = prev.Status
}
if err := a.addCheckLocked(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil {
if err := a.addCheckLocked(p.Check, p.ChkType, false, p.Token, source); err != nil {
// Purge the check if it is unable to be restored.
a.logger.Printf("[WARN] agent: Failed to restore check %q: %s",
checkID, err)
@ -3466,3 +3745,11 @@ func (a *Agent) registerCache() {
RefreshTimeout: 10 * time.Minute,
})
}
// defaultIfEmpty returns the value if not empty otherwise the default value.
func defaultIfEmpty(val, defaultVal string) string {
if val != "" {
return val
}
return defaultVal
}

View File

@ -933,10 +933,16 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
}
// Add sidecar.
if sidecar != nil {
if replaceExistingChecks {
if err := s.agent.AddServiceAndReplaceChecks(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil {
return nil, err
}
} else {
if err := s.agent.AddService(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil {
return nil, err
}
}
}
s.syncChanges()
return nil, nil
}
@ -951,7 +957,7 @@ func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http.
return nil, err
}
if err := s.agent.RemoveService(serviceID, true); err != nil {
if err := s.agent.RemoveService(serviceID); err != nil {
return nil, err
}

View File

@ -2349,8 +2349,20 @@ func TestAgent_UpdateCheck_ACLDeny(t *testing.T) {
}
func TestAgent_RegisterService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_RegisterService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -2416,8 +2428,20 @@ func TestAgent_RegisterService(t *testing.T) {
}
func TestAgent_RegisterService_ReRegister(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_RegisterService_ReRegister(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ReRegister(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_ReRegister(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -2481,8 +2505,19 @@ func TestAgent_RegisterService_ReRegister(t *testing.T) {
}
func TestAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_RegisterService_ReRegister_ReplaceExistingChecks(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ReRegister_ReplaceExistingChecks(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -2546,7 +2581,19 @@ func TestAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T) {
}
func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ACLDeny(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ACLDeny(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_TranslateKeys(t *testing.T, extraHCL string) {
t.Helper()
tests := []struct {
ip string
expectedTCPCheckStart string
@ -2558,7 +2605,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
t.Run(tt.ip, func(t *testing.T) {
a := NewTestAgent(t, t.Name(), `
connect {}
`)
`+extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -2750,8 +2797,20 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
}
func TestAgent_RegisterService_ACLDeny(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), TestACLConfig())
testAgent_RegisterService_ACLDeny(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ACLDeny(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_ACLDeny(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), TestACLConfig()+" "+extraHCL)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
@ -2788,8 +2847,20 @@ func TestAgent_RegisterService_ACLDeny(t *testing.T) {
}
func TestAgent_RegisterService_InvalidAddress(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_RegisterService_UnmanagedConnectProxy(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_InvalidAddress(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_InvalidAddress(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -2820,10 +2891,21 @@ func TestAgent_RegisterService_InvalidAddress(t *testing.T) {
// This verifies that it is put in the local state store properly for syncing
// later.
func TestAgent_RegisterService_UnmanagedConnectProxy(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_UnmanagedConnectProxy(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_UnmanagedConnectProxy(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_UnmanagedConnectProxy(t *testing.T, extraHCL string) {
t.Helper()
assert := assert.New(t)
a := NewTestAgent(t, t.Name(), "")
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -2941,7 +3023,18 @@ func testCreatePolicy(t *testing.T, a *TestAgent, name, rules string) string {
// TestAgent_sidecarServiceFromNodeService. Note it also tests Deregister
// explicitly too since setup is identical.
func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RegisterServiceDeregisterService_Sidecar(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterServiceDeregisterService_Sidecar(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T, extraHCL string) {
t.Helper()
tests := []struct {
name string
@ -3324,7 +3417,7 @@ func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) {
hcl = hcl + TestACLConfig()
}
a := NewTestAgent(t, t.Name(), hcl)
a := NewTestAgent(t, t.Name(), hcl+" "+extraHCL)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
@ -3422,10 +3515,21 @@ func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) {
// registration. This doesn't need to test validation exhaustively since
// that is done via a table test in the structs package.
func TestAgent_RegisterService_UnmanagedConnectProxyInvalid(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_UnmanagedConnectProxyInvalid(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_UnmanagedConnectProxyInvalid(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_UnmanagedConnectProxyInvalid(t *testing.T, extraHCL string) {
t.Helper()
assert := assert.New(t)
a := NewTestAgent(t, t.Name(), "")
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -3455,10 +3559,21 @@ func TestAgent_RegisterService_UnmanagedConnectProxyInvalid(t *testing.T) {
// Tests agent registration of a service that is connect native.
func TestAgent_RegisterService_ConnectNative(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ConnectNative(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ConnectNative(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_ConnectNative(t *testing.T, extraHCL string) {
t.Helper()
assert := assert.New(t)
a := NewTestAgent(t, t.Name(), "")
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -3489,8 +3604,20 @@ func TestAgent_RegisterService_ConnectNative(t *testing.T) {
}
func TestAgent_RegisterService_ScriptCheck_ExecDisable(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_RegisterService_ScriptCheck_ExecDisable(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ScriptCheck_ExecDisable(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_ScriptCheck_ExecDisable(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -3525,10 +3652,22 @@ func TestAgent_RegisterService_ScriptCheck_ExecDisable(t *testing.T) {
}
func TestAgent_RegisterService_ScriptCheck_ExecRemoteDisable(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ScriptCheck_ExecRemoteDisable(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RegisterService_ScriptCheck_ExecRemoteDisable(t, "enable_central_service_config = true")
})
}
func testAgent_RegisterService_ScriptCheck_ExecRemoteDisable(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
enable_local_script_checks = true
`)
`+extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

View File

@ -332,10 +332,22 @@ func TestAgent_makeNodeID(t *testing.T) {
}
func TestAgent_AddService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddService(t, "enable_central_service_config = true")
})
}
func testAgent_AddService(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
`)
`+extraHCL)
defer a.Shutdown()
tests := []struct {
@ -504,10 +516,22 @@ func TestAgent_AddService(t *testing.T) {
}
func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddServices_AliasUpdateCheckNotReverted(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddServices_AliasUpdateCheckNotReverted(t, "enable_central_service_config = true")
})
}
func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
`)
`+extraHCL)
defer a.Shutdown()
// It's tricky to get an UpdateCheck call to be timed properly so it lands
@ -560,10 +584,22 @@ func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) {
}
func TestAgent_AddServiceNoExec(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddServiceNoExec(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddServiceNoExec(t, "enable_central_service_config = true")
})
}
func testAgent_AddServiceNoExec(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
`)
`+extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -590,11 +626,23 @@ func TestAgent_AddServiceNoExec(t *testing.T) {
}
func TestAgent_AddServiceNoRemoteExec(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddServiceNoRemoteExec(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddServiceNoRemoteExec(t, "enable_central_service_config = true")
})
}
func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
enable_local_script_checks = true
`)
`+extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -616,17 +664,29 @@ func TestAgent_AddServiceNoRemoteExec(t *testing.T) {
}
func TestAgent_RemoveService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_RemoveService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RemoveService(t, "enable_central_service_config = true")
})
}
func testAgent_RemoveService(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
// Remove a service that doesn't exist
if err := a.RemoveService("redis", false); err != nil {
if err := a.RemoveService("redis"); err != nil {
t.Fatalf("err: %v", err)
}
// Remove without an ID
if err := a.RemoveService("", false); err == nil {
if err := a.RemoveService(""); err == nil {
t.Fatalf("should have errored")
}
@ -655,7 +715,7 @@ func TestAgent_RemoveService(t *testing.T) {
t.Fatalf("err: %s", err)
}
if err := a.RemoveService("memcache", false); err != nil {
if err := a.RemoveService("memcache"); err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := a.State.Checks()["service:memcache"]; ok {
@ -697,7 +757,7 @@ func TestAgent_RemoveService(t *testing.T) {
}
// Remove the service
if err := a.RemoveService("redis", false); err != nil {
if err := a.RemoveService("redis"); err != nil {
t.Fatalf("err: %v", err)
}
@ -745,10 +805,22 @@ func TestAgent_RemoveService(t *testing.T) {
}
func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RemoveServiceRemovesAllChecks(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RemoveServiceRemovesAllChecks(t, "enable_central_service_config = true")
})
}
func testAgent_RemoveServiceRemovesAllChecks(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
`)
`+extraHCL)
defer a.Shutdown()
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000}
@ -781,7 +853,7 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
}
// Remove service
if err := a.RemoveService("redis", false); err != nil {
if err := a.RemoveService("redis"); err != nil {
t.Fatal("Failed to remove service", err)
}
@ -1688,15 +1760,28 @@ func TestAgent_updateTTLCheck(t *testing.T) {
}
func TestAgent_PersistService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_PersistService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_PersistService(t, "enable_central_service_config = true")
})
}
func testAgent_PersistService(t *testing.T, extraHCL string) {
t.Helper()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
cfg := `
server = false
bootstrap = false
data_dir = "` + dataDir + `"
`
` + extraHCL
a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer os.RemoveAll(dataDir)
defer a.Shutdown()
svc := &structs.NodeService{
@ -1726,6 +1811,7 @@ func TestAgent_PersistService(t *testing.T) {
expected, err := json.Marshal(persistedService{
Token: "mytoken",
Service: svc,
Source: "local",
})
if err != nil {
t.Fatalf("err: %s", err)
@ -1746,6 +1832,7 @@ func TestAgent_PersistService(t *testing.T) {
expected, err = json.Marshal(persistedService{
Token: "mytoken",
Service: svc,
Source: "local",
})
if err != nil {
t.Fatalf("err: %s", err)
@ -1776,9 +1863,21 @@ func TestAgent_PersistService(t *testing.T) {
}
func TestAgent_persistedService_compat(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_persistedService_compat(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_persistedService_compat(t, "enable_central_service_config = true")
})
}
func testAgent_persistedService_compat(t *testing.T, extraHCL string) {
t.Helper()
// Tests backwards compatibility of persisted services from pre-0.5.1
a := NewTestAgent(t, t.Name(), "")
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
svc := &structs.NodeService{
@ -1820,8 +1919,20 @@ func TestAgent_persistedService_compat(t *testing.T) {
}
func TestAgent_PurgeService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_PurgeService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_PurgeService(t, "enable_central_service_config = true")
})
}
func testAgent_PurgeService(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
svc := &structs.NodeService{
@ -1835,9 +1946,13 @@ func TestAgent_PurgeService(t *testing.T) {
if err := a.AddService(svc, nil, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Exists
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
// Not removed
if err := a.RemoveService(svc.ID, false); err != nil {
if err := a.removeService(svc.ID, false); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); err != nil {
@ -1850,7 +1965,7 @@ func TestAgent_PurgeService(t *testing.T) {
}
// Removed
if err := a.RemoveService(svc.ID, true); err != nil {
if err := a.removeService(svc.ID, true); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
@ -1859,16 +1974,29 @@ func TestAgent_PurgeService(t *testing.T) {
}
func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_PurgeServiceOnDuplicate(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_PurgeServiceOnDuplicate(t, "enable_central_service_config = true")
})
}
func testAgent_PurgeServiceOnDuplicate(t *testing.T, extraHCL string) {
t.Helper()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
cfg := `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
`
` + extraHCL
a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer a.Shutdown()
defer os.RemoveAll(dataDir)
svc1 := &structs.NodeService{
ID: "redis",
@ -1953,6 +2081,7 @@ func TestAgent_PersistCheck(t *testing.T) {
Check: check,
ChkType: chkType,
Token: "mytoken",
Source: "local",
})
if err != nil {
t.Fatalf("err: %s", err)
@ -1974,6 +2103,7 @@ func TestAgent_PersistCheck(t *testing.T) {
Check: check,
ChkType: chkType,
Token: "mytoken",
Source: "local",
})
if err != nil {
t.Fatalf("err: %s", err)
@ -2185,7 +2315,19 @@ func TestAgent_unloadChecks(t *testing.T) {
}
func TestAgent_loadServices_token(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_token(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_token(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_token(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
id = "rabbitmq"
@ -2193,7 +2335,7 @@ func TestAgent_loadServices_token(t *testing.T) {
port = 5672
token = "abc123"
}
`)
`+extraHCL)
defer a.Shutdown()
services := a.State.Services()
@ -2206,7 +2348,19 @@ func TestAgent_loadServices_token(t *testing.T) {
}
func TestAgent_loadServices_sidecar(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecar(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecar(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_sidecar(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
id = "rabbitmq"
@ -2217,7 +2371,7 @@ func TestAgent_loadServices_sidecar(t *testing.T) {
sidecar_service {}
}
}
`)
`+extraHCL)
defer a.Shutdown()
services := a.State.Services()
@ -2240,7 +2394,19 @@ func TestAgent_loadServices_sidecar(t *testing.T) {
}
func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarSeparateToken(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarSeparateToken(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_sidecarSeparateToken(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
id = "rabbitmq"
@ -2253,7 +2419,7 @@ func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) {
}
}
}
`)
`+extraHCL)
defer a.Shutdown()
services := a.State.Services()
@ -2272,7 +2438,18 @@ func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) {
}
func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarInheritMeta(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarInheritMeta(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_sidecarInheritMeta(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
@ -2289,7 +2466,7 @@ func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) {
}
}
}
`)
`+extraHCL)
defer a.Shutdown()
services := a.State.Services()
@ -2309,7 +2486,18 @@ func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) {
}
func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarOverrideMeta(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarOverrideMeta(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_sidecarOverrideMeta(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
@ -2329,7 +2517,7 @@ func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) {
}
}
}
`)
`+extraHCL)
defer a.Shutdown()
services := a.State.Services()
@ -2350,8 +2538,20 @@ func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) {
}
func TestAgent_unloadServices(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_unloadServices(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_unloadServices(t, "enable_central_service_config = true")
})
}
func testAgent_unloadServices(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
svc := &structs.NodeService{
@ -2578,8 +2778,20 @@ func TestAgent_Service_NoReap(t *testing.T) {
}
func TestAgent_AddService_restoresSnapshot(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
testAgent_AddService_restoresSnapshot(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddService_restoresSnapshot(t, "enable_central_service_config = true")
})
}
func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
// First register a service
@ -2775,7 +2987,7 @@ func TestAgent_loadChecks_checkFails(t *testing.T) {
Status: api.HealthPassing,
ServiceID: "nope",
}
if err := a.persistCheck(check, nil); err != nil {
if err := a.persistCheck(check, nil, ConfigSourceLocal); err != nil {
t.Fatalf("err: %s", err)
}
@ -3325,3 +3537,40 @@ func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) {
defer a.Shutdown()
require.Equal(t, uint64(812345), a.consulConfig().RaftConfig.TrailingLogs)
}
func TestDefaultIfEmpty(t *testing.T) {
require.Equal(t, "", defaultIfEmpty("", ""))
require.Equal(t, "foo", defaultIfEmpty("", "foo"))
require.Equal(t, "bar", defaultIfEmpty("bar", "foo"))
require.Equal(t, "bar", defaultIfEmpty("bar", ""))
}
func TestConfigSourceFromName(t *testing.T) {
cases := []struct {
in string
expect configSource
bad bool
}{
{in: "local", expect: ConfigSourceLocal},
{in: "remote", expect: ConfigSourceRemote},
{in: "", expect: ConfigSourceLocal},
{in: "LOCAL", bad: true},
{in: "REMOTE", bad: true},
{in: "garbage", bad: true},
{in: " ", bad: true},
}
for _, tc := range cases {
tc := tc
t.Run(tc.in, func(t *testing.T) {
got, ok := ConfigSourceFromName(tc.in)
if tc.bad {
require.False(t, ok)
require.Empty(t, got)
} else {
require.True(t, ok)
require.Equal(t, tc.expect, got)
}
})
}
}

View File

@ -11,6 +11,7 @@ type persistedCheck struct {
Check *structs.HealthCheck
ChkType *structs.CheckType
Token string
Source string
}
// persistedCheckState is used to persist the current state of a given

View File

@ -18,92 +18,188 @@ import (
// configuration that applies to the service in order to register the final, merged
// service configuration locally in the agent state.
type ServiceManager struct {
services map[string]*serviceConfigWatch
agent *Agent
lock sync.Mutex
// servicesLock guards the services map, but not the watches contained
// therein
servicesLock sync.Mutex
// services tracks all active watches for registered services
services map[string]*serviceConfigWatch
// registerCh is a channel for processing service registrations in the
// background when watches are notified of changes. All sends and receives
// must also obey the ctx.Done() channel to avoid a deadlock during
// shutdown.
registerCh chan *asyncRegisterRequest
// ctx is the shared context for all goroutines launched
ctx context.Context
// cancel can be used to stop all goroutines launched
cancel context.CancelFunc
// running keeps track of live goroutines (worker and watcher)
running sync.WaitGroup
}
func NewServiceManager(agent *Agent) *ServiceManager {
ctx, cancel := context.WithCancel(context.Background())
return &ServiceManager{
services: make(map[string]*serviceConfigWatch),
agent: agent,
services: make(map[string]*serviceConfigWatch),
registerCh: make(chan *asyncRegisterRequest), // must be unbuffered
ctx: ctx,
cancel: cancel,
}
}
// AddService starts a new serviceConfigWatch if the service has not been registered, and
// updates the existing registration if it has. For a new service, a call will also be made
// to fetch the merged global defaults that apply to the service in order to compose the
// initial registration.
func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
// Stop forces all background goroutines to terminate and blocks until they complete.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func (s *ServiceManager) Stop() {
s.cancel()
s.running.Wait()
}
// Start starts a background worker goroutine that writes back into the Agent
// state. This only exists to keep the need to lock the agent state lock out of
// the main AddService/RemoveService codepaths to avoid deadlocks.
func (s *ServiceManager) Start() {
s.running.Add(1)
go func() {
defer s.running.Done()
for {
select {
case <-s.ctx.Done():
return
case req := <-s.registerCh:
req.Reply <- s.registerOnce(req.Args)
}
}
}()
}
// runOnce will process a single registration request
func (s *ServiceManager) registerOnce(args *addServiceRequest) error {
s.agent.stateLock.Lock()
defer s.agent.stateLock.Unlock()
err := s.agent.addServiceInternal(args)
if err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
return nil
}
// AddService will (re)create a serviceConfigWatch on the given service. For
// each call of this function the first registration will happen inline and
// will read the merged global defaults for the service through the agent cache
// (regardless of whether or not the service was already registered). This
// lets validation or authorization related errors bubble back up to the
// caller's RPC inline with their request. Upon success a goroutine will keep
// this updated in the background.
//
// If waitForCentralConfig=true is used, the initial registration blocks on
// fetching the merged global config through the cache. If false, no such RPC
// occurs and only the previousDefaults are used.
//
// persistServiceConfig controls if the INITIAL registration will result in
// persisting the service config to disk again. All background updates will
// always persist.
//
// service, chkTypes, persist, token, replaceExistingChecks, and source are
// basically pass-through arguments to Agent.addServiceInternal that follow the
// semantics there. The one key difference is that the service provided will be
// merged with the global defaults before registration.
//
// NOTE: the caller must hold the Agent.stateLock!
func (s *ServiceManager) AddService(req *addServiceRequest) error {
req.fixupForAddServiceLocked()
// For now only sidecar proxies have anything that can be configured
// centrally. So bypass the whole manager for regular services.
if !service.IsSidecarProxy() && !service.IsMeshGateway() {
return s.agent.addServiceInternal(service, chkTypes, persist, token, false, source)
if !req.service.IsSidecarProxy() && !req.service.IsMeshGateway() {
// previousDefaults are ignored here because they are only relevant for central config.
req.persistService = nil
req.persistDefaults = nil
req.persistServiceConfig = false
return s.agent.addServiceInternal(req)
}
s.lock.Lock()
defer s.lock.Unlock()
var (
service = req.service
chkTypes = req.chkTypes
previousDefaults = req.previousDefaults
waitForCentralConfig = req.waitForCentralConfig
persist = req.persist
persistServiceConfig = req.persistServiceConfig
token = req.token
replaceExistingChecks = req.replaceExistingChecks
source = req.source
)
reg := serviceRegistration{
reg := &serviceRegistration{
service: service,
chkTypes: chkTypes,
persist: persist,
token: token,
replaceExistingChecks: replaceExistingChecks,
source: source,
}
// If a service watch already exists, update the registration. Otherwise,
// start a new config watcher.
watch, ok := s.services[service.ID]
if ok {
if err := watch.updateRegistration(&reg); err != nil {
return err
s.servicesLock.Lock()
defer s.servicesLock.Unlock()
// If a service watch already exists, shut it down and replace it.
oldWatch, updating := s.services[service.ID]
if updating {
oldWatch.Stop()
delete(s.services, service.ID)
}
s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID)
} else {
// This is a new entry, so get the existing global config and do the initial
// registration with the merged config.
// Get the existing global config and do the initial registration with the
// merged config.
watch := &serviceConfigWatch{
registration: &reg,
readyCh: make(chan error),
registration: reg,
updateCh: make(chan cache.UpdateEvent, 1),
agent: s.agent,
registerCh: s.registerCh,
}
// Start the config watch, which starts a blocking query for the resolved service config
// in the background.
if err := watch.Start(); err != nil {
return err
}
// Call ReadyWait to block until the cache has returned the initial config and the service
// has been registered.
if err := watch.ReadyWait(); err != nil {
watch.Stop()
err := watch.RegisterAndStart(
previousDefaults,
waitForCentralConfig,
persistServiceConfig,
s.ctx,
&s.running,
)
if err != nil {
return err
}
s.services[service.ID] = watch
if updating {
s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID)
} else {
s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID)
}
return nil
}
// NOTE: the caller must hold the Agent.stateLock!
func (s *ServiceManager) RemoveService(serviceID string) {
s.lock.Lock()
defer s.lock.Unlock()
s.servicesLock.Lock()
defer s.servicesLock.Unlock()
serviceWatch, ok := s.services[serviceID]
if !ok {
return
}
serviceWatch.Stop()
if oldWatch, exists := s.services[serviceID]; exists {
oldWatch.Stop()
delete(s.services, serviceID)
}
}
// serviceRegistration represents a locally registered service.
type serviceRegistration struct {
@ -111,6 +207,7 @@ type serviceRegistration struct {
chkTypes []*structs.CheckType
persist bool
token string
replaceExistingChecks bool
source configSource
}
@ -122,159 +219,234 @@ type serviceConfigWatch struct {
defaults *structs.ServiceConfigResponse
agent *Agent
// readyCh is used for ReadyWait in order to block until the first update
// for the resolved service config is received from the cache.
readyCh chan error
// ctx and cancelFunc store the overall context that lives as long as the
// Watch instance is needed, possibly spanning multiple cache.Notify
// lifetimes.
ctx context.Context
cancelFunc func()
registerCh chan<- *asyncRegisterRequest
// cacheKey stores the key of the current request, when registration changes
// we check to see if a new cache watch is needed.
cacheKey string
// updateCh receives changes from cache watchers or registration changes.
// updateCh receives changes from cache watchers
updateCh chan cache.UpdateEvent
// notifyCancel, if non-nil it the cancel func that will stop the currently
// active Notify loop. It does not cancel ctx and is used when we need to
// switch to a new Notify call because cache key changed.
notifyCancel func()
lock sync.Mutex
ctx context.Context
cancelFunc func()
running sync.WaitGroup
}
// Start starts the config watch and a goroutine to handle updates over
// the updateCh. This is not safe to call more than once.
func (s *serviceConfigWatch) Start() error {
s.ctx, s.cancelFunc = context.WithCancel(context.Background())
if err := s.ensureConfigWatch(); err != nil {
return err
}
go s.runWatch()
// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) RegisterAndStart(
previousDefaults *structs.ServiceConfigResponse,
waitForCentralConfig bool,
persistServiceConfig bool,
ctx context.Context,
wg *sync.WaitGroup,
) error {
service := w.registration.service
return nil
}
func (s *serviceConfigWatch) Stop() {
s.cancelFunc()
}
// ReadyWait blocks until the readyCh is closed, which means the initial
// registration of the service has been completed. If there was an error
// with the initial registration, it will be returned.
func (s *serviceConfigWatch) ReadyWait() error {
err := <-s.readyCh
return err
}
// runWatch handles any update events from the cache.Notify until the
// config watch is shut down.
func (s *serviceConfigWatch) runWatch() {
firstRun := true
for {
select {
case <-s.ctx.Done():
return
case event := <-s.updateCh:
if err := s.handleUpdate(event, false, firstRun); err != nil {
s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err)
}
firstRun = false
}
}
}
// handleUpdate receives an update event about either the service registration or the
// global config defaults, updates the local state and re-registers the service with
// the newly merged config. This function takes the serviceConfigWatch lock to ensure
// only one update can be happening at a time.
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, firstRun bool) error {
// Take the agent state lock if needed. This is done before the local config watch
// lock in order to prevent a race between this config watch and others - the config
// watch lock is the inner lock and the agent stateLock is the outer lock. If this is the
// first run we also don't need to take the stateLock, as this is being waited on
// synchronously by a caller that already holds it.
if !locked && !firstRun {
s.agent.stateLock.Lock()
defer s.agent.stateLock.Unlock()
}
s.lock.Lock()
defer s.lock.Unlock()
// If we got an error, log a warning if this is the first update; otherwise return the error.
// We want the initial update to cause a service registration no matter what.
if event.Err != nil {
if firstRun {
s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v",
s.registration.service.ID, event.Err)
} else {
return fmt.Errorf("error watching service config: %v", event.Err)
// Either we explicitly block waiting for defaults before registering,
// or we feed it some seed data (or NO data) and bypass the blocking
// operation. Either way the watcher will end up with something flagged
// as defaults even if they don't actually reflect actual defaults.
if waitForCentralConfig {
if err := w.fetchDefaults(); err != nil {
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err)
}
} else {
switch res := event.Result.(type) {
case *serviceRegistration:
s.registration = res
// We may need to restart watch if upstreams changed
if err := s.ensureConfigWatch(); err != nil {
return err
}
case *structs.ServiceConfigResponse:
// Sanity check this even came from the currently active watch to ignore
// rare races when switching cache keys
if event.CorrelationID != s.cacheKey {
// It's a no-op. The new watcher will deliver (or may have already
// delivered) the correct config so just ignore this old message.
return nil
}
s.defaults = res
default:
return fmt.Errorf("unknown update event type: %T", event)
}
w.defaults = previousDefaults
}
// Merge the local registration with the central defaults and update this service
// in the local state.
service, err := s.mergeServiceConfig()
merged, err := w.mergeServiceConfig()
if err != nil {
return err
}
if err := s.updateAgentRegistration(service); err != nil {
// If this is the initial registration, return the error through the readyCh
// so it can be passed back to the original caller.
if firstRun {
s.readyCh <- err
}
// The first time we do this interactively, we need to know if it
// failed for validation reasons which we only get back from the
// initial underlying add service call.
err = w.agent.addServiceInternal(&addServiceRequest{
service: merged,
chkTypes: w.registration.chkTypes,
persistService: w.registration.service,
persistDefaults: w.defaults,
persist: w.registration.persist,
persistServiceConfig: persistServiceConfig,
token: w.registration.token,
replaceExistingChecks: w.registration.replaceExistingChecks,
source: w.registration.source,
})
if err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
// If this is the first registration, set the ready status by closing the channel.
if firstRun {
close(s.readyCh)
// Start the config watch, which starts a blocking query for the
// resolved service config in the background.
return w.start(ctx, wg)
}
// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) fetchDefaults() error {
req := makeConfigRequest(w.agent, w.registration)
raw, _, err := w.agent.cache.Get(cachetype.ResolvedServiceConfigName, req)
if err != nil {
return err
}
reply, ok := raw.(*structs.ServiceConfigResponse)
if !ok {
// This should never happen, but we want to protect against panics
return fmt.Errorf("internal error: response type not correct")
}
w.defaults = reply
return nil
}
// Start starts the config watch and a goroutine to handle updates over the
// updateCh. This is safe to call more than once assuming you have called Stop
// after each Start.
//
// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error {
w.ctx, w.cancelFunc = context.WithCancel(ctx)
// Configure and start a cache.Notify goroutine to run a continuous
// blocking query on the resolved service config for this service.
req := makeConfigRequest(w.agent, w.registration)
w.cacheKey = req.CacheInfo().Key
// We use the cache key as the correlationID here. Notify in general will not
// respond on the updateCh after the context is cancelled however there could
// possible be a race where it has only just got an update and checked the
// context before we cancel and so might still deliver the old event. Using
// the cacheKey allows us to ignore updates from the old cache watch and makes
// even this rare edge case safe.
err := w.agent.cache.Notify(
w.ctx,
cachetype.ResolvedServiceConfigName,
req,
w.cacheKey,
w.updateCh,
)
if err != nil {
w.cancelFunc()
return err
}
w.running.Add(1)
wg.Add(1)
go w.runWatch(wg)
return nil
}
// updateAgentRegistration updates the service (and its sidecar, if applicable) in the
// local state.
func (s *serviceConfigWatch) updateAgentRegistration(ns *structs.NodeService) error {
return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, false, s.registration.source)
func (w *serviceConfigWatch) Stop() {
w.cancelFunc()
w.running.Wait()
}
// ensureConfigWatch starts a cache.Notify goroutine to run a continuous
// blocking query on the resolved service config for this service. If the
// registration has changed in a way that requires a new blocking query, it will
// cancel any current watch and start a new one. It is a no-op if there is an
// existing watch that is sufficient for the current registration. It is not
// thread-safe and must only be called from the Start method (which is only safe
// to call once as documented) or from inside the run loop.
func (s *serviceConfigWatch) ensureConfigWatch() error {
ns := s.registration.service
// runWatch handles any update events from the cache.Notify until the
// config watch is shut down.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) {
defer wg.Done()
defer w.running.Done()
for {
select {
case <-w.ctx.Done():
return
case event := <-w.updateCh:
if err := w.handleUpdate(event); err != nil {
w.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err)
}
}
}
}
// handleUpdate receives an update event the global config defaults, updates
// the local state and re-registers the service with the newly merged config.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error {
// If we got an error, log a warning if this is the first update; otherwise return the error.
// We want the initial update to cause a service registration no matter what.
if event.Err != nil {
return fmt.Errorf("error watching service config: %v", event.Err)
}
res, ok := event.Result.(*structs.ServiceConfigResponse)
if !ok {
return fmt.Errorf("unknown update event type: %T", event)
}
// Sanity check this even came from the currently active watch to ignore
// rare races when switching cache keys
if event.CorrelationID != w.cacheKey {
// It's a no-op. The new watcher will deliver (or may have already
// delivered) the correct config so just ignore this old message.
return nil
}
w.defaults = res
// Merge the local registration with the central defaults and update this service
// in the local state.
merged, err := w.mergeServiceConfig()
if err != nil {
return err
}
// While we were waiting on the agent state lock we may have been shutdown.
// So avoid doing a registration in that case.
select {
case <-w.ctx.Done():
return nil
default:
}
registerReq := &asyncRegisterRequest{
Args: &addServiceRequest{
service: merged,
chkTypes: w.registration.chkTypes,
persistService: w.registration.service,
persistDefaults: w.defaults,
persist: w.registration.persist,
persistServiceConfig: true,
token: w.registration.token,
replaceExistingChecks: w.registration.replaceExistingChecks,
source: w.registration.source,
},
Reply: make(chan error, 1),
}
select {
case <-w.ctx.Done():
return nil
case w.registerCh <- registerReq:
}
select {
case <-w.ctx.Done():
return nil
case err := <-registerReq.Reply:
if err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
return nil
}
}
type asyncRegisterRequest struct {
Args *addServiceRequest
Reply chan error
}
func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs.ServiceConfigRequest {
ns := registration.service
name := ns.Service
var upstreams []string
@ -296,62 +468,29 @@ func (s *serviceConfigWatch) ensureConfigWatch() error {
req := &structs.ServiceConfigRequest{
Name: name,
Datacenter: s.agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken},
Datacenter: agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: agent.tokens.AgentToken()},
Upstreams: upstreams,
}
if s.registration.token != "" {
req.QueryOptions.Token = s.registration.token
if registration.token != "" {
req.QueryOptions.Token = registration.token
}
// See if this request is different from the current one
cacheKey := req.CacheInfo().Key
if cacheKey == s.cacheKey {
return nil
}
// If there is an existing notify running, stop it first. This may leave a
// blocking query running in the background but the Notify loop will swallow
// the response and exit when it next unblocks so we can consider it stopped.
if s.notifyCancel != nil {
s.notifyCancel()
}
// Make a new context just for this Notify call
ctx, cancel := context.WithCancel(s.ctx)
s.notifyCancel = cancel
s.cacheKey = cacheKey
// We use the cache key as the correlationID here. Notify in general will not
// respond on the updateCh after the context is cancelled however there could
// possible be a race where it has only just got an update and checked the
// context before we cancel and so might still deliver the old event. Using
// the cacheKey allows us to ignore updates from the old cache watch and makes
// even this rare edge case safe.
err := s.agent.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, req,
s.cacheKey, s.updateCh)
return err
}
// updateRegistration does a synchronous update of the local service registration and
// returns the result. The agent stateLock should be held when calling this function.
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error {
return s.handleUpdate(cache.UpdateEvent{
Result: registration,
}, true, false)
return req
}
// mergeServiceConfig returns the final effective config for the watched service,
// including the latest known global defaults from the servers.
func (s *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) {
if s.defaults == nil || (!s.registration.service.IsSidecarProxy() && !s.registration.service.IsMeshGateway()) {
return s.registration.service, nil
//
// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) {
if w.defaults == nil {
return w.registration.service, nil
}
// We don't want to change s.registration in place since it is our source of
// truth about what was actually registered before defaults applied. So copy
// it first.
nsRaw, err := copystructure.Copy(s.registration.service)
nsRaw, err := copystructure.Copy(w.registration.service)
if err != nil {
return nil, err
}
@ -359,12 +498,12 @@ func (s *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error)
// Merge proxy defaults
ns := nsRaw.(*structs.NodeService)
if err := mergo.Merge(&ns.Proxy.Config, s.defaults.ProxyConfig); err != nil {
if err := mergo.Merge(&ns.Proxy.Config, w.defaults.ProxyConfig); err != nil {
return nil, err
}
if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault {
ns.Proxy.MeshGateway.Mode = s.defaults.MeshGateway.Mode
ns.Proxy.MeshGateway.Mode = w.defaults.MeshGateway.Mode
}
// Merge upstream defaults if there were any returned
@ -380,7 +519,7 @@ func (s *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error)
us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode
}
usCfg, ok := s.defaults.UpstreamConfigs[us.DestinationName]
usCfg, ok := w.defaults.UpstreamConfigs[us.DestinationName]
if !ok {
// No config defaults to merge
continue

View File

@ -1,9 +1,17 @@
package agent
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)
@ -17,30 +25,18 @@ func TestServiceManager_RegisterService(t *testing.T) {
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register a global proxy and service config
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ProxyConfigEntry{
testApplyConfigEntries(t, a,
&structs.ProxyConfigEntry{
Config: map[string]interface{}{
"foo": 1,
},
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "redis",
Protocol: "tcp",
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
)
// Now register a service locally with no sidecar, it should be a no-op.
svc := &structs.NodeService{
@ -73,42 +69,23 @@ func TestServiceManager_RegisterSidecar(t *testing.T) {
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register a global proxy and service config
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ProxyConfigEntry{
testApplyConfigEntries(t, a,
&structs.ProxyConfigEntry{
Config: map[string]interface{}{
"foo": 1,
},
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "redis",
Protocol: "tcp",
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
)
// Now register a sidecar proxy. Note we don't use SidecarService here because
// that gets resolved earlier in config handling than the AddService call
@ -176,30 +153,18 @@ func TestServiceManager_RegisterMeshGateway(t *testing.T) {
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register a global proxy and service config
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ProxyConfigEntry{
testApplyConfigEntries(t, a,
&structs.ProxyConfigEntry{
Config: map[string]interface{}{
"foo": 1,
},
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mesh-gateway",
Protocol: "http",
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
)
// Now register a mesh-gateway.
svc := &structs.NodeService{
@ -232,6 +197,382 @@ func TestServiceManager_RegisterMeshGateway(t *testing.T) {
}, gateway)
}
func TestServiceManager_PersistService_API(t *testing.T) {
// This is the ServiceManager version of TestAgent_PersistService and
// TestAgent_PurgeService.
t.Parallel()
require := require.New(t)
// Launch a server to manage the config entries.
serverAgent := NewTestAgent(t, t.Name(), `enable_central_service_config = true`)
defer serverAgent.Shutdown()
testrpc.WaitForLeader(t, serverAgent.RPC, "dc1")
// Register a global proxy and service config
testApplyConfigEntries(t, serverAgent,
&structs.ProxyConfigEntry{
Config: map[string]interface{}{
"foo": 1,
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "redis",
Protocol: "tcp",
},
)
// Now launch a single client agent
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
cfg := `
enable_central_service_config = true
server = false
bootstrap = false
data_dir = "` + dataDir + `"
`
a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer a.Shutdown()
// Join first
_, err := a.JoinLAN([]string{
fmt.Sprintf("127.0.0.1:%d", serverAgent.Config.SerfPortLAN),
})
require.NoError(err)
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Now register a sidecar proxy via the API.
svc := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 21000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8000,
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
LocalBindPort: 5000,
},
},
},
}
expectState := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 21000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8000,
Config: map[string]interface{}{
"foo": int64(1),
"protocol": "http",
},
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
LocalBindPort: 5000,
Config: map[string]interface{}{
"protocol": "tcp",
},
},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
svcFile := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID))
configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, stringHash(svc.ID))
// Service is not persisted unless requested, but we always persist service configs.
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceRemote))
requireFileIsAbsent(t, svcFile)
requireFileIsPresent(t, configFile)
// Persists to file if requested
require.NoError(a.AddService(svc, nil, true, "mytoken", ConfigSourceRemote))
requireFileIsPresent(t, svcFile)
requireFileIsPresent(t, configFile)
// Service definition file is sane.
expectJSONFile(t, svcFile, persistedService{
Token: "mytoken",
Service: svc,
Source: "remote",
}, nil)
// Service config file is sane.
expectJSONFile(t, configFile, persistedServiceConfig{
ServiceID: "web-sidecar-proxy",
Defaults: &structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"foo": 1,
"protocol": "http",
},
UpstreamConfigs: map[string]map[string]interface{}{
"redis": map[string]interface{}{
"protocol": "tcp",
},
},
},
}, resetDefaultsQueryMeta)
// Verify in memory state.
{
sidecarService := a.State.Service("web-sidecar-proxy")
require.NotNil(sidecarService)
require.Equal(expectState, sidecarService)
}
// Updates service definition on disk
svc.Proxy.LocalServicePort = 8001
require.NoError(a.AddService(svc, nil, true, "mytoken", ConfigSourceRemote))
requireFileIsPresent(t, svcFile)
requireFileIsPresent(t, configFile)
// Service definition file is updated.
expectJSONFile(t, svcFile, persistedService{
Token: "mytoken",
Service: svc,
Source: "remote",
}, nil)
// Service config file is the same.
expectJSONFile(t, configFile, persistedServiceConfig{
ServiceID: "web-sidecar-proxy",
Defaults: &structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"foo": 1,
"protocol": "http",
},
UpstreamConfigs: map[string]map[string]interface{}{
"redis": map[string]interface{}{
"protocol": "tcp",
},
},
},
}, resetDefaultsQueryMeta)
// Verify in memory state.
expectState.Proxy.LocalServicePort = 8001
{
sidecarService := a.State.Service("web-sidecar-proxy")
require.NotNil(sidecarService)
require.Equal(expectState, sidecarService)
}
// Kill the agent to restart it.
a.Shutdown()
// Kill the server so that it can't phone home and must rely upon the persisted defaults.
serverAgent.Shutdown()
// Should load it back during later start.
a2 := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer a2.Shutdown()
{
restored := a.State.Service("web-sidecar-proxy")
require.NotNil(restored)
require.Equal(expectState, restored)
}
// Now remove it.
require.NoError(a2.RemoveService("web-sidecar-proxy"))
requireFileIsAbsent(t, svcFile)
requireFileIsAbsent(t, configFile)
}
func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
// This is the ServiceManager version of TestAgent_PersistService and
// TestAgent_PurgeService but for config files.
t.Parallel()
require := require.New(t)
// Launch a server to manage the config entries.
serverAgent := NewTestAgent(t, t.Name(), `enable_central_service_config = true`)
defer serverAgent.Shutdown()
testrpc.WaitForLeader(t, serverAgent.RPC, "dc1")
// Register a global proxy and service config
testApplyConfigEntries(t, serverAgent,
&structs.ProxyConfigEntry{
Config: map[string]interface{}{
"foo": 1,
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "redis",
Protocol: "tcp",
},
)
// Now launch a single client agent
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
serviceSnippet := `
service = {
kind = "connect-proxy"
id = "web-sidecar-proxy"
name = "web-sidecar-proxy"
port = 21000
token = "mytoken"
proxy {
destination_service_name = "web"
destination_service_id = "web"
local_service_address = "127.0.0.1"
local_service_port = 8000
upstreams = [{
destination_name = "redis"
local_bind_port = 5000
}]
}
}
`
cfg := `
enable_central_service_config = true
data_dir = "` + dataDir + `"
server = false
bootstrap = false
` + serviceSnippet
a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer a.Shutdown()
// Join first
_, err := a.JoinLAN([]string{
fmt.Sprintf("127.0.0.1:%d", serverAgent.Config.SerfPortLAN),
})
require.NoError(err)
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Now register a sidecar proxy via the API.
svcID := "web-sidecar-proxy"
expectState := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 21000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8000,
Config: map[string]interface{}{
"foo": int64(1),
"protocol": "http",
},
Upstreams: structs.Upstreams{
{
DestinationType: "service",
DestinationName: "redis",
LocalBindPort: 5000,
Config: map[string]interface{}{
"protocol": "tcp",
},
},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
// Now wait until we've re-registered using central config updated data.
retry.Run(t, func(r *retry.R) {
a.stateLock.Lock()
defer a.stateLock.Unlock()
current := a.State.Service("web-sidecar-proxy")
if current == nil {
r.Fatalf("service is missing")
}
if !reflect.DeepEqual(expectState, current) {
r.Fatalf("expected: %#v\nactual :%#v", expectState, current)
}
})
svcFile := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svcID))
configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, stringHash(svcID))
// Service is never persisted, but we always persist service configs.
requireFileIsAbsent(t, svcFile)
requireFileIsPresent(t, configFile)
// Service config file is sane.
expectJSONFile(t, configFile, persistedServiceConfig{
ServiceID: "web-sidecar-proxy",
Defaults: &structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"foo": 1,
"protocol": "http",
},
UpstreamConfigs: map[string]map[string]interface{}{
"redis": map[string]interface{}{
"protocol": "tcp",
},
},
},
}, resetDefaultsQueryMeta)
// Verify in memory state.
{
sidecarService := a.State.Service("web-sidecar-proxy")
require.NotNil(sidecarService)
require.Equal(expectState, sidecarService)
}
// Kill the agent to restart it.
a.Shutdown()
// Kill the server so that it can't phone home and must rely upon the persisted defaults.
serverAgent.Shutdown()
// Should load it back during later start.
a2 := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer a2.Shutdown()
{
restored := a.State.Service("web-sidecar-proxy")
require.NotNil(restored)
require.Equal(expectState, restored)
}
// Now remove it.
require.NoError(a2.RemoveService("web-sidecar-proxy"))
requireFileIsAbsent(t, svcFile)
requireFileIsAbsent(t, configFile)
}
func TestServiceManager_Disabled(t *testing.T) {
require := require.New(t)
@ -241,42 +582,23 @@ func TestServiceManager_Disabled(t *testing.T) {
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register a global proxy and service config
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ProxyConfigEntry{
testApplyConfigEntries(t, a,
&structs.ProxyConfigEntry{
Config: map[string]interface{}{
"foo": 1,
},
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
{
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "redis",
Protocol: "tcp",
},
}
var out bool
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
}
)
// Now register a sidecar proxy. Note we don't use SidecarService here because
// that gets resolved earlier in config handling than the AddService call
@ -329,3 +651,91 @@ func TestServiceManager_Disabled(t *testing.T) {
},
}, sidecarService)
}
func testApplyConfigEntries(t *testing.T, a *TestAgent, entries ...structs.ConfigEntry) {
t.Helper()
for _, entry := range entries {
args := &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: entry,
}
var out bool
require.NoError(t, a.RPC("ConfigEntry.Apply", args, &out))
}
}
func requireFileIsAbsent(t *testing.T, file string) {
t.Helper()
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("should not persist")
}
}
func requireFileIsPresent(t *testing.T, file string) {
t.Helper()
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %v", err)
}
}
func expectJSONFile(t *testing.T, file string, expect interface{}, fixupContentBeforeCompareFn func([]byte) ([]byte, error)) {
t.Helper()
expected, err := json.Marshal(expect)
require.NoError(t, err)
content, err := ioutil.ReadFile(file)
require.NoError(t, err)
if fixupContentBeforeCompareFn != nil {
content, err = fixupContentBeforeCompareFn(content)
require.NoError(t, err)
}
require.JSONEq(t, string(expected), string(content))
}
// resetDefaultsQueryMeta will reset the embedded fields from structs.QueryMeta
// to their zero values in the json object keyed under 'Defaults'.
func resetDefaultsQueryMeta(content []byte) ([]byte, error) {
var raw map[string]interface{}
if err := json.Unmarshal(content, &raw); err != nil {
return nil, err
}
def, ok := raw["Defaults"]
if !ok {
return content, nil
}
rawDef, ok := def.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("unexpected structure found in 'Defaults' key")
}
qmZero, err := convertToMap(structs.QueryMeta{})
if err != nil {
return nil, err
}
for k, v := range qmZero {
rawDef[k] = v
}
raw["Defaults"] = rawDef
return json.Marshal(raw)
}
func convertToMap(v interface{}) (map[string]interface{}, error) {
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
var raw map[string]interface{}
if err := json.Unmarshal(b, &raw); err != nil {
return nil, err
}
return raw, nil
}