From cba47aa0ca76bf99c3625105616cbe0ffadeddfb Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 24 Apr 2019 06:46:30 -0700 Subject: [PATCH] Fix a race in the ready logic --- agent/agent.go | 2 ++ agent/service_manager.go | 35 ++++++++++++++++------------------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a2fa2b8853..da059801e6 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -243,6 +243,8 @@ type Agent struct { // directly. proxyConfig *proxycfg.Manager + // serviceManager is the manager for combining local service registrations with + // the centrally configured proxy/service defaults. serviceManager *ServiceManager // xdsServer is the Server instance that serves xDS gRPC API. diff --git a/agent/service_manager.go b/agent/service_manager.go index 163aa78a19..a227ff1586 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -34,6 +34,9 @@ func NewServiceManager(agent *Agent) *ServiceManager { // to fetch the merged global defaults that apply to the service in order to compose the // initial registration. func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { + s.lock.Lock() + defer s.lock.Unlock() + reg := serviceRegistration{ service: service, chkTypes: chkTypes, @@ -44,9 +47,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // If a service watch already exists, update the registration. Otherwise, // start a new config watcher. - s.lock.Lock() watch, ok := s.services[service.ID] - s.lock.Unlock() if ok { if err := watch.updateRegistration(®); err != nil { return err @@ -75,9 +76,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st return err } - s.lock.Lock() s.services[service.ID] = watch - s.lock.Unlock() s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID) } @@ -117,10 +116,8 @@ type serviceConfigWatch struct { agent *Agent // readyCh is used for ReadyWait in order to block until the first update - // for the resolved service config is received from the cache. Both this - // and ready are protected the lock. + // for the resolved service config is received from the cache. readyCh chan error - ready bool updateCh chan cache.UpdateEvent ctx context.Context @@ -156,14 +153,16 @@ func (s *serviceConfigWatch) ReadyWait() error { // runWatch handles any update events from the cache.Notify until the // config watch is shut down. func (s *serviceConfigWatch) runWatch() { + firstRun := true for { select { case <-s.ctx.Done(): return case event := <-s.updateCh: - if err := s.handleUpdate(event, false); err != nil { + if err := s.handleUpdate(event, false, firstRun); err != nil { s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) } + firstRun = false } } } @@ -172,14 +171,13 @@ func (s *serviceConfigWatch) runWatch() { // global config defaults, updates the local state and re-registers the service with // the newly merged config. This function takes the serviceConfigWatch lock to ensure // only one update can be happening at a time. -func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { +func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, firstRun bool) error { // Take the agent state lock if needed. This is done before the local config watch // lock in order to prevent a race between this config watch and others - the config - // watch lock is the inner lock and the agent stateLock is the outer lock. In the case - // where s.ready == false we assume the lock is already held, since this update is being - // waited on for the initial registration by a call from the agent that already holds - // the lock. - if !locked && s.ready { + // watch lock is the inner lock and the agent stateLock is the outer lock. If this is the + // first run we also don't need to take the stateLock, as this is being waited on + // synchronously by a caller that already holds it. + if !locked && !firstRun { s.agent.stateLock.Lock() defer s.agent.stateLock.Unlock() } @@ -189,7 +187,7 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) // If we got an error, log a warning if this is the first update; otherwise return the error. // We want the initial update to cause a service registration no matter what. if event.Err != nil { - if !s.ready { + if firstRun { s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v", s.registration.service.ID, event.Err) } else { @@ -212,16 +210,15 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) if err != nil { // If this is the initial registration, return the error through the readyCh // so it can be passed back to the original caller. - if !s.ready { + if firstRun { s.readyCh <- err } return fmt.Errorf("error updating service registration: %v", err) } // If this is the first registration, set the ready status by closing the channel. - if !s.ready { + if firstRun { close(s.readyCh) - s.ready = true } return nil @@ -250,7 +247,7 @@ func (s *serviceConfigWatch) startConfigWatch() error { func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { return s.handleUpdate(cache.UpdateEvent{ Result: registration, - }, true) + }, true, false) } // mergeServiceConfig returns the final effective config for the watched service,