From 93235da25322b35565d02d1023fce95ebc360c83 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 17 Apr 2020 15:25:54 -0400 Subject: [PATCH 1/3] agent/service_manager: Pass ctx around [The documentation for context](https://golang.org/pkg/context/) recommends not storing context in a struct field: > Do not store Contexts inside a struct type; instead, pass a Context > explicitly to each function that needs it. The Context should be the > first parameter, typically named ctx... Sometimes there are good reasons to not follow this recommendation, but in this case it seems easy enough to follow. Also moved the ctx argument to be the first in one of the function calls to follow the same recommendation. --- agent/service_manager.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/agent/service_manager.go b/agent/service_manager.go index 5a163f3596..915cf9dfd8 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -173,10 +173,10 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error { } err := watch.RegisterAndStart( + s.ctx, previousDefaults, waitForCentralConfig, persistServiceConfig, - s.ctx, &s.running, ) if err != nil { @@ -232,17 +232,16 @@ type serviceConfigWatch struct { // updateCh receives changes from cache watchers updateCh chan cache.UpdateEvent - ctx context.Context cancelFunc func() running sync.WaitGroup } // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) RegisterAndStart( + ctx context.Context, previousDefaults *structs.ServiceConfigResponse, waitForCentralConfig bool, persistServiceConfig bool, - ctx context.Context, wg *sync.WaitGroup, ) error { service := w.registration.service @@ -314,7 +313,7 @@ func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error { // // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error { - w.ctx, w.cancelFunc = context.WithCancel(ctx) + ctx, w.cancelFunc = context.WithCancel(ctx) // Configure and start a cache.Notify goroutine to run a continuous // blocking query on the resolved service config for this service. @@ -328,7 +327,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro // the cacheKey allows us to ignore updates from the old cache watch and makes // even this rare edge case safe. err := w.agent.cache.Notify( - w.ctx, + ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, @@ -341,7 +340,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro w.running.Add(1) wg.Add(1) - go w.runWatch(wg) + go w.runWatch(ctx, wg) return nil } @@ -355,16 +354,16 @@ func (w *serviceConfigWatch) Stop() { // config watch is shut down. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) { +func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() defer w.running.Done() for { select { - case <-w.ctx.Done(): + case <-ctx.Done(): return case event := <-w.updateCh: - if err := w.handleUpdate(event); err != nil { + if err := w.handleUpdate(ctx, event); err != nil { w.agent.logger.Error("error handling service update", "error", err) } } @@ -375,7 +374,7 @@ func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) { // the local state and re-registers the service with the newly merged config. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { +func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.UpdateEvent) error { // If we got an error, log a warning if this is the first update; otherwise return the error. // We want the initial update to cause a service registration no matter what. if event.Err != nil { @@ -406,7 +405,7 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. select { - case <-w.ctx.Done(): + case <-ctx.Done(): return nil default: } @@ -427,13 +426,13 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { } select { - case <-w.ctx.Done(): + case <-ctx.Done(): return nil case w.registerCh <- registerReq: } select { - case <-w.ctx.Done(): + case <-ctx.Done(): return nil case err := <-registerReq.Reply: From 26291a84820a01cbbd95d2273138178797b73513 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 17 Apr 2020 16:50:25 -0400 Subject: [PATCH 2/3] agent/service_manager: remove 'defaults' field from serviceConfigWatch This field was always read by the same function that populated the field, so it does not need to be a field. Passing the value as an argument to functions makes it more obvious where the value comes from, and also reduces the scope of the variable significantly. --- agent/service_manager.go | 63 ++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/agent/service_manager.go b/agent/service_manager.go index 915cf9dfd8..16d15255be 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -27,10 +27,11 @@ type ServiceManager struct { // services tracks all active watches for registered services services map[structs.ServiceID]*serviceConfigWatch - // registerCh is a channel for processing service registrations in the - // background when watches are notified of changes. All sends and receives - // must also obey the ctx.Done() channel to avoid a deadlock during - // shutdown. + // registerCh is a channel for receiving service registration requests from + // from serviceConfigWatchers. + // The registrations are handled in the background when watches are notified of + // changes. All sends and receives must also obey the ctx.Done() channel to + // avoid a deadlock during shutdown. registerCh chan *asyncRegisterRequest // ctx is the shared context for all goroutines launched @@ -220,7 +221,6 @@ type serviceRegistration struct { // service/proxy defaults. type serviceConfigWatch struct { registration *serviceRegistration - defaults *structs.ServiceConfigResponse agent *Agent registerCh chan<- *asyncRegisterRequest @@ -239,7 +239,7 @@ type serviceConfigWatch struct { // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) RegisterAndStart( ctx context.Context, - previousDefaults *structs.ServiceConfigResponse, + serviceDefaults *structs.ServiceConfigResponse, waitForCentralConfig bool, persistServiceConfig bool, wg *sync.WaitGroup, @@ -251,16 +251,16 @@ func (w *serviceConfigWatch) RegisterAndStart( // operation. Either way the watcher will end up with something flagged // as defaults even if they don't actually reflect actual defaults. if waitForCentralConfig { - if err := w.fetchDefaults(ctx); err != nil { + var err error + serviceDefaults, err = w.fetchDefaults(ctx) + if err != nil { return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) } - } else { - w.defaults = previousDefaults } // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := w.mergeServiceConfig() + merged, err := mergeServiceConfig(serviceDefaults, w.registration.service) if err != nil { return err } @@ -272,7 +272,7 @@ func (w *serviceConfigWatch) RegisterAndStart( service: merged, chkTypes: w.registration.chkTypes, persistService: w.registration.service, - persistDefaults: w.defaults, + persistDefaults: serviceDefaults, persist: w.registration.persist, persistServiceConfig: persistServiceConfig, token: w.registration.token, @@ -289,22 +289,20 @@ func (w *serviceConfigWatch) RegisterAndStart( } // NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error { +func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) (*structs.ServiceConfigResponse, error) { req := makeConfigRequest(w.agent, w.registration) raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) if err != nil { - return err + return nil, err } - reply, ok := raw.(*structs.ServiceConfigResponse) + serviceConfig, ok := raw.(*structs.ServiceConfigResponse) if !ok { // This should never happen, but we want to protect against panics - return fmt.Errorf("internal error: response type not correct") + return nil, fmt.Errorf("internal error: response type not correct") } - - w.defaults = reply - return nil + return serviceConfig, nil } // Start starts the config watch and a goroutine to handle updates over the @@ -381,7 +379,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat return fmt.Errorf("error watching service config: %v", event.Err) } - res, ok := event.Result.(*structs.ServiceConfigResponse) + serviceDefaults, ok := event.Result.(*structs.ServiceConfigResponse) if !ok { return fmt.Errorf("unknown update event type: %T", event) } @@ -393,11 +391,10 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat // delivered) the correct config so just ignore this old message. return nil } - w.defaults = res // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := w.mergeServiceConfig() + merged, err := mergeServiceConfig(serviceDefaults, w.registration.service) if err != nil { return err } @@ -415,7 +412,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat service: merged, chkTypes: w.registration.chkTypes, persistService: w.registration.service, - persistDefaults: w.defaults, + persistDefaults: serviceDefaults, persist: w.registration.persist, persistServiceConfig: true, token: w.registration.token, @@ -484,19 +481,17 @@ func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs return req } -// mergeServiceConfig returns the final effective config for the watched service, -// including the latest known global defaults from the servers. -// -// NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) { - if w.defaults == nil { - return w.registration.service, nil +// mergeServiceConfig from service into defaults to produce the final effective +// config for the watched service. +func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) { + if defaults == nil { + return service, nil } // We don't want to change s.registration in place since it is our source of // truth about what was actually registered before defaults applied. So copy // it first. - nsRaw, err := copystructure.Copy(w.registration.service) + nsRaw, err := copystructure.Copy(service) if err != nil { return nil, err } @@ -504,16 +499,16 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) // Merge proxy defaults ns := nsRaw.(*structs.NodeService) - if err := mergo.Merge(&ns.Proxy.Config, w.defaults.ProxyConfig); err != nil { + if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil { return nil, err } - if err := mergo.Merge(&ns.Proxy.Expose, w.defaults.Expose); err != nil { + if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil { return nil, err } if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault { - ns.Proxy.MeshGateway.Mode = w.defaults.MeshGateway.Mode + ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode } // Merge upstream defaults if there were any returned @@ -529,7 +524,7 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode } - usCfg, ok := w.defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) + usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) if !ok { // No config defaults to merge continue From 73cd0b6fac4bec7901fd6d35e21f817f9c4d57b3 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 17 Apr 2020 17:04:58 -0400 Subject: [PATCH 3/3] agent/service_manager: remove 'updateCh' field from serviceConfigWatch Passing the channel to the function which uses it significantly reduces the scope of the variable, and makes its usage more explicit. It also moves the initialization of the channel closer to where it is used. Also includes a couple very small cleanups to remove a local var and read the error from `ctx.Err()` directly instead of creating a channel to check for an error. --- agent/service_manager.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/agent/service_manager.go b/agent/service_manager.go index 16d15255be..a20a00d198 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -168,7 +168,6 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error { // merged config. watch := &serviceConfigWatch{ registration: reg, - updateCh: make(chan cache.UpdateEvent, 1), agent: s.agent, registerCh: s.registerCh, } @@ -229,9 +228,6 @@ type serviceConfigWatch struct { // we check to see if a new cache watch is needed. cacheKey string - // updateCh receives changes from cache watchers - updateCh chan cache.UpdateEvent - cancelFunc func() running sync.WaitGroup } @@ -244,8 +240,6 @@ func (w *serviceConfigWatch) RegisterAndStart( persistServiceConfig bool, wg *sync.WaitGroup, ) error { - service := w.registration.service - // Either we explicitly block waiting for defaults before registering, // or we feed it some seed data (or NO data) and bypass the blocking // operation. Either way the watcher will end up with something flagged @@ -254,7 +248,8 @@ func (w *serviceConfigWatch) RegisterAndStart( var err error serviceDefaults, err = w.fetchDefaults(ctx) if err != nil { - return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) + return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", + w.registration.service.ID, err) } } @@ -318,6 +313,8 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro req := makeConfigRequest(w.agent, w.registration) w.cacheKey = req.CacheInfo().Key + updateCh := make(chan cache.UpdateEvent, 1) + // We use the cache key as the correlationID here. Notify in general will not // respond on the updateCh after the context is cancelled however there could // possible be a race where it has only just got an update and checked the @@ -329,7 +326,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro cachetype.ResolvedServiceConfigName, req, w.cacheKey, - w.updateCh, + updateCh, ) if err != nil { w.cancelFunc() @@ -338,7 +335,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro w.running.Add(1) wg.Add(1) - go w.runWatch(ctx, wg) + go w.runWatch(ctx, wg, updateCh) return nil } @@ -352,7 +349,7 @@ func (w *serviceConfigWatch) Stop() { // config watch is shut down. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { +func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) { defer wg.Done() defer w.running.Done() @@ -360,7 +357,7 @@ func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { select { case <-ctx.Done(): return - case event := <-w.updateCh: + case event := <-updateCh: if err := w.handleUpdate(ctx, event); err != nil { w.agent.logger.Error("error handling service update", "error", err) } @@ -401,10 +398,8 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. - select { - case <-ctx.Done(): + if err := ctx.Err(); err != nil { return nil - default: } registerReq := &asyncRegisterRequest{