package agent import ( "fmt" "sync" "github.com/imdario/mergo" "github.com/mitchellh/copystructure" "golang.org/x/net/context" "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" ) // ServiceManager watches changes to central service config for all services // registered with it. When a central config changes, the local service will // be updated with the correct values from the central config. type ServiceManager struct { agent *Agent // servicesLock guards the services map, but not the watches contained // therein servicesLock sync.Mutex // services tracks all active watches for registered services services map[structs.ServiceID]*serviceConfigWatch // registerCh is a channel for receiving service registration requests from // from serviceConfigWatchers. // The registrations are handled in the background when watches are notified of // changes. All sends and receives must also obey the ctx.Done() channel to // avoid a deadlock during shutdown. registerCh chan *asyncRegisterRequest // ctx is the shared context for all goroutines launched ctx context.Context // cancel can be used to stop all goroutines launched cancel context.CancelFunc // running keeps track of live goroutines (worker and watcher) running sync.WaitGroup } func NewServiceManager(agent *Agent) *ServiceManager { ctx, cancel := context.WithCancel(context.Background()) return &ServiceManager{ agent: agent, services: make(map[structs.ServiceID]*serviceConfigWatch), registerCh: make(chan *asyncRegisterRequest), // must be unbuffered ctx: ctx, cancel: cancel, } } // Stop forces all background goroutines to terminate and blocks until they complete. // // NOTE: the caller must NOT hold the Agent.stateLock! func (s *ServiceManager) Stop() { s.cancel() s.running.Wait() } // Start starts a background worker goroutine that writes back into the Agent // state. This only exists to keep the need to lock the agent state lock out of // the main AddService/RemoveService codepaths to avoid deadlocks. func (s *ServiceManager) Start() { s.running.Add(1) go func() { defer s.running.Done() for { select { case <-s.ctx.Done(): return case req := <-s.registerCh: req.Reply <- s.registerOnce(req.Args) } } }() } // runOnce will process a single registration request func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error { s.agent.stateLock.Lock() defer s.agent.stateLock.Unlock() if err := s.agent.addServiceInternal(args); err != nil { return fmt.Errorf("error updating service registration: %v", err) } return nil } // AddService will (re)create a serviceConfigWatch on the given service. For // each call of this function the first registration will happen inline and // will read the merged global defaults for the service through the agent cache // (regardless of whether or not the service was already registered). This // lets validation or authorization related errors bubble back up to the // caller's RPC inline with their request. Upon success a goroutine will keep // this updated in the background. // // If waitForCentralConfig=true is used, the initial registration blocks on // fetching the merged global config through the cache. If false, no such RPC // occurs and only the previousDefaults are used. // // persistServiceConfig controls if the INITIAL registration will result in // persisting the service config to disk again. All background updates will // always persist. // // service, chkTypes, persist, token, replaceExistingChecks, and source are // basically pass-through arguments to Agent.addServiceInternal that follow the // semantics there. The one key difference is that the service provided will be // merged with the global defaults before registration. // // NOTE: the caller must hold the Agent.stateLock! func (s *ServiceManager) AddService(req addServiceLockedRequest) error { s.servicesLock.Lock() defer s.servicesLock.Unlock() sid := req.Service.CompoundServiceID() // If a service watch already exists, shut it down and replace it. oldWatch, updating := s.services[sid] if updating { oldWatch.Stop() delete(s.services, sid) } // Get the existing global config and do the initial registration with the // merged config. watch := &serviceConfigWatch{ registration: req, agent: s.agent, registerCh: s.registerCh, } if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil { return err } s.services[sid] = watch if updating { s.agent.logger.Debug("updated local registration for service", "service", req.Service.ID) } else { s.agent.logger.Debug("added local registration for service", "service", req.Service.ID) } return nil } // NOTE: the caller must hold the Agent.stateLock! func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) { s.servicesLock.Lock() defer s.servicesLock.Unlock() if oldWatch, exists := s.services[serviceID]; exists { oldWatch.Stop() delete(s.services, serviceID) } } // serviceConfigWatch is a long running helper for composing the end config // for a given service from both the local registration and the global // service/proxy defaults. type serviceConfigWatch struct { registration addServiceLockedRequest agent *Agent registerCh chan<- *asyncRegisterRequest // cacheKey stores the key of the current request, when registration changes // we check to see if a new cache watch is needed. cacheKey string cancelFunc func() running sync.WaitGroup } // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error { serviceDefaults, err := w.registration.serviceDefaults(ctx) if err != nil { return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", w.registration.Service.ID, err) } // Merge the local registration with the central defaults and update this service // in the local state. merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service) if err != nil { return err } // make a copy of the AddServiceRequest req := w.registration req.Service = merged err = w.agent.addServiceInternal(addServiceInternalRequest{ addServiceLockedRequest: req, persistService: w.registration.Service, persistServiceDefaults: serviceDefaults, }) if err != nil { return fmt.Errorf("error updating service registration: %v", err) } // Start the config watch, which starts a blocking query for the // resolved service config in the background. return w.start(ctx, wg) } func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) { return func(_ context.Context) (*structs.ServiceConfigResponse, error) { return v, nil } } func serviceDefaultsFromCache(bd BaseDeps, req AddServiceRequest) func(context.Context) (*structs.ServiceConfigResponse, error) { // NOTE: this is called while holding the Agent.stateLock return func(ctx context.Context) (*structs.ServiceConfigResponse, error) { req := makeConfigRequest(bd, req) raw, _, err := bd.Cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) if err != nil { return nil, err } serviceConfig, ok := raw.(*structs.ServiceConfigResponse) if !ok { // This should never happen, but we want to protect against panics return nil, fmt.Errorf("internal error: response type not correct") } return serviceConfig, nil } } // Start starts the config watch and a goroutine to handle updates over the // updateCh. This is safe to call more than once assuming you have called Stop // after each Start. // // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error { ctx, w.cancelFunc = context.WithCancel(ctx) // Configure and start a cache.Notify goroutine to run a continuous // blocking query on the resolved service config for this service. req := makeConfigRequest(w.agent.baseDeps, w.registration.AddServiceRequest) w.cacheKey = req.CacheInfo().Key updateCh := make(chan cache.UpdateEvent, 1) // We use the cache key as the correlationID here. Notify in general will not // respond on the updateCh after the context is cancelled however there could // possible be a race where it has only just got an update and checked the // context before we cancel and so might still deliver the old event. Using // the cacheKey allows us to ignore updates from the old cache watch and makes // even this rare edge case safe. err := w.agent.cache.Notify( ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, updateCh, ) if err != nil { w.cancelFunc() return err } w.running.Add(1) wg.Add(1) go w.runWatch(ctx, wg, updateCh) return nil } func (w *serviceConfigWatch) Stop() { w.cancelFunc() w.running.Wait() } // runWatch handles any update events from the cache.Notify until the // config watch is shut down. // // NOTE: the caller must NOT hold the Agent.stateLock! func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) { defer wg.Done() defer w.running.Done() for { select { case <-ctx.Done(): return case event := <-updateCh: if err := w.handleUpdate(ctx, event); err != nil { w.agent.logger.Error("error handling service update", "error", err) } } } } // handleUpdate receives an update event the global config defaults, updates // the local state and re-registers the service with the newly merged config. // // NOTE: the caller must NOT hold the Agent.stateLock! func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.UpdateEvent) error { // If we got an error, log a warning if this is the first update; otherwise return the error. // We want the initial update to cause a service registration no matter what. if event.Err != nil { return fmt.Errorf("error watching service config: %v", event.Err) } serviceDefaults, ok := event.Result.(*structs.ServiceConfigResponse) if !ok { return fmt.Errorf("unknown update event type: %T", event) } // Sanity check this even came from the currently active watch to ignore // rare races when switching cache keys if event.CorrelationID != w.cacheKey { // It's a no-op. The new watcher will deliver (or may have already // delivered) the correct config so just ignore this old message. return nil } // Merge the local registration with the central defaults and update this service // in the local state. merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service) if err != nil { return err } // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. if err := ctx.Err(); err != nil { return nil } // make a copy of the AddServiceRequest req := w.registration req.Service = merged req.persistServiceConfig = true registerReq := &asyncRegisterRequest{ Args: addServiceInternalRequest{ addServiceLockedRequest: req, persistService: w.registration.Service, persistServiceDefaults: serviceDefaults, }, Reply: make(chan error, 1), } select { case <-ctx.Done(): return nil case w.registerCh <- registerReq: } select { case <-ctx.Done(): return nil case err := <-registerReq.Reply: if err != nil { return fmt.Errorf("error updating service registration: %v", err) } return nil } } type asyncRegisterRequest struct { Args addServiceInternalRequest Reply chan error } func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest { ns := addReq.Service name := ns.Service var upstreams []structs.ServiceID // Note that only sidecar proxies should even make it here for now although // later that will change to add the condition. if ns.IsSidecarProxy() { // This is a sidecar proxy, ignore the proxy service's config since we are // managed by the target service config. name = ns.Proxy.DestinationServiceName // Also if we have any upstreams defined, add them to the request so we can // learn about their configs. for _, us := range ns.Proxy.Upstreams { if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService { sid := us.DestinationID() sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta) upstreams = append(upstreams, sid) } } } req := &structs.ServiceConfigRequest{ Name: name, Datacenter: bd.RuntimeConfig.Datacenter, QueryOptions: structs.QueryOptions{Token: addReq.token}, UpstreamIDs: upstreams, EnterpriseMeta: ns.EnterpriseMeta, } if req.QueryOptions.Token == "" { req.QueryOptions.Token = bd.Tokens.AgentToken() } return req } // mergeServiceConfig from service into defaults to produce the final effective // config for the watched service. func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) { if defaults == nil { return service, nil } // We don't want to change s.registration in place since it is our source of // truth about what was actually registered before defaults applied. So copy // it first. nsRaw, err := copystructure.Copy(service) if err != nil { return nil, err } // Merge proxy defaults ns := nsRaw.(*structs.NodeService) if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil { return nil, err } if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil { return nil, err } if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault { ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode } // Merge upstream defaults if there were any returned for i := range ns.Proxy.Upstreams { // Get a pointer not a value copy of the upstream struct us := &ns.Proxy.Upstreams[i] if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService { continue } // default the upstreams gateway mode if it didn't specify one if us.MeshGateway.Mode == structs.MeshGatewayModeDefault { us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode } usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) if !ok { // No config defaults to merge continue } if err := mergo.Merge(&us.Config, usCfg); err != nil { return nil, err } } return ns, err }