agent: remove ServiceManager goroutine

The ServiceManager.Start goroutine was used to serialize calls to
agent.addServiceInternal.

All the goroutines which sent events to the channel would block waiting
for a response from that same goroutine, which is effectively the same
as a synchronous call without any channels.

This commit removes the goroutine and channels, and instead calls
addServiceInternal directly. Since all of these goroutines will need to
take the agent.stateLock, the mutex handles the serializing of calls.
This commit is contained in:
Daniel Nephin 2020-11-30 18:19:11 -05:00
parent 45b315060a
commit 4e42b8f1d4
2 changed files with 23 additions and 85 deletions

View File

@ -512,7 +512,6 @@ func (a *Agent) Start(ctx context.Context) error {
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
}
a.serviceManager.Start()
// Load checks/services/metadata.
emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{}

View File

@ -26,13 +26,6 @@ type ServiceManager struct {
// services tracks all active watches for registered services
services map[structs.ServiceID]*serviceConfigWatch
// registerCh is a channel for receiving service registration requests from
// from serviceConfigWatchers.
// The registrations are handled in the background when watches are notified of
// changes. All sends and receives must also obey the ctx.Done() channel to
// avoid a deadlock during shutdown.
registerCh chan *asyncRegisterRequest
// ctx is the shared context for all goroutines launched
ctx context.Context
@ -48,7 +41,6 @@ func NewServiceManager(agent *Agent) *ServiceManager {
return &ServiceManager{
agent: agent,
services: make(map[structs.ServiceID]*serviceConfigWatch),
registerCh: make(chan *asyncRegisterRequest), // must be unbuffered
ctx: ctx,
cancel: cancel,
}
@ -62,36 +54,6 @@ func (s *ServiceManager) Stop() {
s.running.Wait()
}
// Start starts a background worker goroutine that writes back into the Agent
// state. This only exists to keep the need to lock the agent state lock out of
// the main AddService/RemoveService codepaths to avoid deadlocks.
func (s *ServiceManager) Start() {
s.running.Add(1)
go func() {
defer s.running.Done()
for {
select {
case <-s.ctx.Done():
return
case req := <-s.registerCh:
req.Reply <- s.registerOnce(req.Args)
}
}
}()
}
// runOnce will process a single registration request
func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
s.agent.stateLock.Lock()
defer s.agent.stateLock.Unlock()
if err := s.agent.addServiceInternal(args); err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
return nil
}
// AddService will (re)create a serviceConfigWatch on the given service. For
// each call of this function the first registration will happen inline and
// will read the merged global defaults for the service through the agent cache
@ -129,11 +91,7 @@ func (s *ServiceManager) AddService(req addServiceLockedRequest) error {
// Get the existing global config and do the initial registration with the
// merged config.
watch := &serviceConfigWatch{
registration: req,
agent: s.agent,
registerCh: s.registerCh,
}
watch := &serviceConfigWatch{registration: req, agent: s.agent}
if err := watch.register(s.ctx); err != nil {
return err
}
@ -168,9 +126,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
// service/proxy defaults.
type serviceConfigWatch struct {
registration addServiceLockedRequest
agent *Agent
registerCh chan<- *asyncRegisterRequest
// cacheKey stores the key of the current request, when registration changes
// we check to see if a new cache watch is needed.
@ -325,47 +281,30 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
return err
}
// make a copy of the AddServiceRequest
req := w.registration
req.Service = merged
req.persistServiceConfig = true
args := addServiceInternalRequest{
addServiceLockedRequest: req,
persistService: w.registration.Service,
persistServiceDefaults: serviceDefaults,
}
w.agent.stateLock.Lock()
defer w.agent.stateLock.Unlock()
// While we were waiting on the agent state lock we may have been shutdown.
// So avoid doing a registration in that case.
if err := ctx.Err(); err != nil {
return nil
}
// make a copy of the AddServiceRequest
req := w.registration
req.Service = merged
req.persistServiceConfig = true
registerReq := &asyncRegisterRequest{
Args: addServiceInternalRequest{
addServiceLockedRequest: req,
persistService: w.registration.Service,
persistServiceDefaults: serviceDefaults,
},
Reply: make(chan error, 1),
}
select {
case <-ctx.Done():
return nil
case w.registerCh <- registerReq:
}
select {
case <-ctx.Done():
return nil
case err := <-registerReq.Reply:
if err != nil {
if err := w.agent.addServiceInternal(args); err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
return nil
}
}
type asyncRegisterRequest struct {
Args addServiceInternalRequest
Reply chan error
}
func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {