diff --git a/agent/agent.go b/agent/agent.go index ae549c1618..6c552ccf6f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1894,7 +1894,7 @@ func (a *Agent) readPersistedServiceConfigs() (map[structs.ServiceID]*structs.Se // This entry is persistent and the agent will make a best effort to // ensure it is registered func (a *Agent) AddService(req AddServiceRequest) error { - req.waitForCentralConfig = true + req.serviceDefaults = serviceDefaultsFromCache(a.baseDeps, req) req.persistServiceConfig = true a.stateLock.Lock() defer a.stateLock.Unlock() @@ -1933,14 +1933,21 @@ func (a *Agent) addServiceLocked(req AddServiceRequest) error { type AddServiceRequest struct { Service *structs.NodeService chkTypes []*structs.CheckType - previousDefaults *structs.ServiceConfigResponse // just for: addServiceLocked - waitForCentralConfig bool // just for: addServiceLocked persist bool persistServiceConfig bool token string replaceExistingChecks bool Source configSource snap map[structs.CheckID]*structs.HealthCheck + + // serviceDefaults is a function which will return centralized service + // configuration. + // When loading service definitions from disk this will return a copy + // loaded from a persisted file. Otherwise it will query a Server for the + // centralized config. + // serviceDefaults is called when the Agent.stateLock is held, so it must + // never attempt to acquire that lock. + serviceDefaults func(context.Context) (*structs.ServiceConfigResponse, error) } type addServiceInternalRequest struct { @@ -3080,8 +3087,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(AddServiceRequest{ Service: ns, chkTypes: chkTypes, - previousDefaults: persistedServiceConfigs[sid], - waitForCentralConfig: false, // exclusively use cached values + serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sid]), persist: false, // don't rewrite the file with the same data we just read persistServiceConfig: false, // don't rewrite the file with the same data we just read token: service.Token, @@ -3099,8 +3105,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(AddServiceRequest{ Service: sidecar, chkTypes: sidecarChecks, - previousDefaults: persistedServiceConfigs[sidecarServiceID], - waitForCentralConfig: false, // exclusively use cached values + serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sidecarServiceID]), persist: false, // don't rewrite the file with the same data we just read persistServiceConfig: false, // don't rewrite the file with the same data we just read token: sidecarToken, @@ -3196,8 +3201,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(AddServiceRequest{ Service: p.Service, chkTypes: nil, - previousDefaults: persistedServiceConfigs[serviceID], - waitForCentralConfig: false, // exclusively use cached values + serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[serviceID]), persist: false, // don't rewrite the file with the same data we just read persistServiceConfig: false, // don't rewrite the file with the same data we just read token: p.Token, diff --git a/agent/service_manager.go b/agent/service_manager.go index 0ad1f45b51..aef9aa1883 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -184,18 +184,10 @@ type serviceConfigWatch struct { // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error { - serviceDefaults := w.registration.previousDefaults - // Either we explicitly block waiting for defaults before registering, - // or we feed it some seed data (or NO data) and bypass the blocking - // operation. Either way the watcher will end up with something flagged - // as defaults even if they don't actually reflect actual defaults. - if w.registration.waitForCentralConfig { - var err error - serviceDefaults, err = w.fetchDefaults(ctx) - if err != nil { - return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", - w.registration.Service.ID, err) - } + serviceDefaults, err := w.registration.serviceDefaults(ctx) + if err != nil { + return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", + w.registration.Service.ID, err) } // Merge the local registration with the central defaults and update this service @@ -227,21 +219,29 @@ func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.Wait return w.start(ctx, wg) } -// NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) (*structs.ServiceConfigResponse, error) { - req := makeConfigRequest(w.agent, w.registration) - - raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) - if err != nil { - return nil, err +func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) { + return func(_ context.Context) (*structs.ServiceConfigResponse, error) { + return v, nil } +} - serviceConfig, ok := raw.(*structs.ServiceConfigResponse) - if !ok { - // This should never happen, but we want to protect against panics - return nil, fmt.Errorf("internal error: response type not correct") +func serviceDefaultsFromCache(bd BaseDeps, req AddServiceRequest) func(context.Context) (*structs.ServiceConfigResponse, error) { + // NOTE: this is called while holding the Agent.stateLock + return func(ctx context.Context) (*structs.ServiceConfigResponse, error) { + req := makeConfigRequest(bd, req) + + raw, _, err := bd.Cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) + if err != nil { + return nil, err + } + + serviceConfig, ok := raw.(*structs.ServiceConfigResponse) + if !ok { + // This should never happen, but we want to protect against panics + return nil, fmt.Errorf("internal error: response type not correct") + } + return serviceConfig, nil } - return serviceConfig, nil } // Start starts the config watch and a goroutine to handle updates over the @@ -254,7 +254,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro // Configure and start a cache.Notify goroutine to run a continuous // blocking query on the resolved service config for this service. - req := makeConfigRequest(w.agent, w.registration) + req := makeConfigRequest(w.agent.baseDeps, w.registration) w.cacheKey = req.CacheInfo().Key updateCh := make(chan cache.UpdateEvent, 1) @@ -383,7 +383,7 @@ type asyncRegisterRequest struct { Reply chan error } -func makeConfigRequest(agent *Agent, addReq AddServiceRequest) *structs.ServiceConfigRequest { +func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest { ns := addReq.Service name := ns.Service var upstreams []structs.ServiceID @@ -408,13 +408,13 @@ func makeConfigRequest(agent *Agent, addReq AddServiceRequest) *structs.ServiceC req := &structs.ServiceConfigRequest{ Name: name, - Datacenter: agent.config.Datacenter, + Datacenter: bd.RuntimeConfig.Datacenter, QueryOptions: structs.QueryOptions{Token: addReq.token}, UpstreamIDs: upstreams, EnterpriseMeta: ns.EnterpriseMeta, } if req.QueryOptions.Token == "" { - req.QueryOptions.Token = agent.tokens.AgentToken() + req.QueryOptions.Token = bd.Tokens.AgentToken() } return req }