mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 21:35:52 +00:00
agent: move two fields off of AddServiceRequest
This commit is contained in:
parent
60c6a1c220
commit
1495291054
108
agent/agent.go
108
agent/agent.go
@ -1894,18 +1894,21 @@ func (a *Agent) readPersistedServiceConfigs() (map[structs.ServiceID]*structs.Se
|
|||||||
// This entry is persistent and the agent will make a best effort to
|
// This entry is persistent and the agent will make a best effort to
|
||||||
// ensure it is registered
|
// ensure it is registered
|
||||||
func (a *Agent) AddService(req AddServiceRequest) error {
|
func (a *Agent) AddService(req AddServiceRequest) error {
|
||||||
req.serviceDefaults = serviceDefaultsFromCache(a.baseDeps, req)
|
rl := addServiceLockedRequest{
|
||||||
req.persistServiceConfig = true
|
AddServiceRequest: req,
|
||||||
|
serviceDefaults: serviceDefaultsFromCache(a.baseDeps, req),
|
||||||
|
persistServiceConfig: true,
|
||||||
|
}
|
||||||
a.stateLock.Lock()
|
a.stateLock.Lock()
|
||||||
defer a.stateLock.Unlock()
|
defer a.stateLock.Unlock()
|
||||||
|
|
||||||
req.snap = a.State.Checks(structs.WildcardEnterpriseMeta())
|
rl.snap = a.State.Checks(structs.WildcardEnterpriseMeta())
|
||||||
return a.addServiceLocked(req)
|
return a.addServiceLocked(rl)
|
||||||
}
|
}
|
||||||
|
|
||||||
// addServiceLocked adds a service entry to the service manager if enabled, or directly
|
// addServiceLocked adds a service entry to the service manager if enabled, or directly
|
||||||
// to the local state if it is not. This function assumes the state lock is already held.
|
// to the local state if it is not. This function assumes the state lock is already held.
|
||||||
func (a *Agent) addServiceLocked(req AddServiceRequest) error {
|
func (a *Agent) addServiceLocked(req addServiceLockedRequest) error {
|
||||||
req.Service.EnterpriseMeta.Normalize()
|
req.Service.EnterpriseMeta.Normalize()
|
||||||
|
|
||||||
if err := a.validateService(req.Service, req.chkTypes); err != nil {
|
if err := a.validateService(req.Service, req.chkTypes); err != nil {
|
||||||
@ -1917,7 +1920,22 @@ func (a *Agent) addServiceLocked(req AddServiceRequest) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
req.persistServiceConfig = false
|
req.persistServiceConfig = false
|
||||||
return a.addServiceInternal(addServiceInternalRequest{AddServiceRequest: req})
|
return a.addServiceInternal(addServiceInternalRequest{addServiceLockedRequest: req})
|
||||||
|
}
|
||||||
|
|
||||||
|
type addServiceLockedRequest struct {
|
||||||
|
AddServiceRequest
|
||||||
|
|
||||||
|
persistServiceConfig bool
|
||||||
|
|
||||||
|
// serviceDefaults is a function which will return centralized service
|
||||||
|
// configuration.
|
||||||
|
// When loading service definitions from disk this will return a copy
|
||||||
|
// loaded from a persisted file. Otherwise it will query a Server for the
|
||||||
|
// centralized config.
|
||||||
|
// serviceDefaults is called when the Agent.stateLock is held, so it must
|
||||||
|
// never attempt to acquire that lock.
|
||||||
|
serviceDefaults func(context.Context) (*structs.ServiceConfigResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddServiceRequest is the union of arguments for calling both
|
// AddServiceRequest is the union of arguments for calling both
|
||||||
@ -1934,24 +1952,14 @@ type AddServiceRequest struct {
|
|||||||
Service *structs.NodeService
|
Service *structs.NodeService
|
||||||
chkTypes []*structs.CheckType
|
chkTypes []*structs.CheckType
|
||||||
persist bool
|
persist bool
|
||||||
persistServiceConfig bool
|
|
||||||
token string
|
token string
|
||||||
replaceExistingChecks bool
|
replaceExistingChecks bool
|
||||||
Source configSource
|
Source configSource
|
||||||
snap map[structs.CheckID]*structs.HealthCheck
|
snap map[structs.CheckID]*structs.HealthCheck
|
||||||
|
|
||||||
// serviceDefaults is a function which will return centralized service
|
|
||||||
// configuration.
|
|
||||||
// When loading service definitions from disk this will return a copy
|
|
||||||
// loaded from a persisted file. Otherwise it will query a Server for the
|
|
||||||
// centralized config.
|
|
||||||
// serviceDefaults is called when the Agent.stateLock is held, so it must
|
|
||||||
// never attempt to acquire that lock.
|
|
||||||
serviceDefaults func(context.Context) (*structs.ServiceConfigResponse, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type addServiceInternalRequest struct {
|
type addServiceInternalRequest struct {
|
||||||
AddServiceRequest
|
addServiceLockedRequest
|
||||||
persistService *structs.NodeService
|
persistService *structs.NodeService
|
||||||
persistDefaults *structs.ServiceConfigResponse
|
persistDefaults *structs.ServiceConfigResponse
|
||||||
}
|
}
|
||||||
@ -3084,16 +3092,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
|||||||
ns.Connect.SidecarService = nil
|
ns.Connect.SidecarService = nil
|
||||||
|
|
||||||
sid := ns.CompoundServiceID()
|
sid := ns.CompoundServiceID()
|
||||||
err = a.addServiceLocked(AddServiceRequest{
|
err = a.addServiceLocked(addServiceLockedRequest{
|
||||||
Service: ns,
|
AddServiceRequest: AddServiceRequest{
|
||||||
chkTypes: chkTypes,
|
Service: ns,
|
||||||
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sid]),
|
chkTypes: chkTypes,
|
||||||
persist: false, // don't rewrite the file with the same data we just read
|
persist: false, // don't rewrite the file with the same data we just read
|
||||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
token: service.Token,
|
||||||
token: service.Token,
|
replaceExistingChecks: false, // do default behavior
|
||||||
replaceExistingChecks: false, // do default behavior
|
Source: ConfigSourceLocal,
|
||||||
Source: ConfigSourceLocal,
|
snap: snap,
|
||||||
snap: snap,
|
},
|
||||||
|
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sid]),
|
||||||
|
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
|
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
|
||||||
@ -3102,16 +3112,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
|||||||
// If there is a sidecar service, register that too.
|
// If there is a sidecar service, register that too.
|
||||||
if sidecar != nil {
|
if sidecar != nil {
|
||||||
sidecarServiceID := sidecar.CompoundServiceID()
|
sidecarServiceID := sidecar.CompoundServiceID()
|
||||||
err = a.addServiceLocked(AddServiceRequest{
|
err = a.addServiceLocked(addServiceLockedRequest{
|
||||||
Service: sidecar,
|
AddServiceRequest: AddServiceRequest{
|
||||||
chkTypes: sidecarChecks,
|
Service: sidecar,
|
||||||
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sidecarServiceID]),
|
chkTypes: sidecarChecks,
|
||||||
persist: false, // don't rewrite the file with the same data we just read
|
persist: false, // don't rewrite the file with the same data we just read
|
||||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
token: sidecarToken,
|
||||||
token: sidecarToken,
|
replaceExistingChecks: false, // do default behavior
|
||||||
replaceExistingChecks: false, // do default behavior
|
Source: ConfigSourceLocal,
|
||||||
Source: ConfigSourceLocal,
|
snap: snap,
|
||||||
snap: snap,
|
},
|
||||||
|
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sidecarServiceID]),
|
||||||
|
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
|
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
|
||||||
@ -3198,16 +3210,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
|||||||
"service", serviceID.String(),
|
"service", serviceID.String(),
|
||||||
"file", file,
|
"file", file,
|
||||||
)
|
)
|
||||||
err = a.addServiceLocked(AddServiceRequest{
|
err = a.addServiceLocked(addServiceLockedRequest{
|
||||||
Service: p.Service,
|
AddServiceRequest: AddServiceRequest{
|
||||||
chkTypes: nil,
|
Service: p.Service,
|
||||||
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[serviceID]),
|
chkTypes: nil,
|
||||||
persist: false, // don't rewrite the file with the same data we just read
|
persist: false, // don't rewrite the file with the same data we just read
|
||||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
token: p.Token,
|
||||||
token: p.Token,
|
replaceExistingChecks: false, // do default behavior
|
||||||
replaceExistingChecks: false, // do default behavior
|
Source: source,
|
||||||
Source: source,
|
snap: snap,
|
||||||
snap: snap,
|
},
|
||||||
|
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[serviceID]),
|
||||||
|
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed adding service %q: %s", serviceID, err)
|
return fmt.Errorf("failed adding service %q: %s", serviceID, err)
|
||||||
|
@ -3312,8 +3312,6 @@ func TestAgent_AddService_restoresSnapshot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) {
|
func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) {
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
a := NewTestAgent(t, extraHCL)
|
a := NewTestAgent(t, extraHCL)
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
@ -119,7 +119,7 @@ func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
|
|||||||
// merged with the global defaults before registration.
|
// merged with the global defaults before registration.
|
||||||
//
|
//
|
||||||
// NOTE: the caller must hold the Agent.stateLock!
|
// NOTE: the caller must hold the Agent.stateLock!
|
||||||
func (s *ServiceManager) AddService(req AddServiceRequest) error {
|
func (s *ServiceManager) AddService(req addServiceLockedRequest) error {
|
||||||
s.servicesLock.Lock()
|
s.servicesLock.Lock()
|
||||||
defer s.servicesLock.Unlock()
|
defer s.servicesLock.Unlock()
|
||||||
|
|
||||||
@ -169,7 +169,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
|
|||||||
// for a given service from both the local registration and the global
|
// for a given service from both the local registration and the global
|
||||||
// service/proxy defaults.
|
// service/proxy defaults.
|
||||||
type serviceConfigWatch struct {
|
type serviceConfigWatch struct {
|
||||||
registration AddServiceRequest
|
registration addServiceLockedRequest
|
||||||
|
|
||||||
agent *Agent
|
agent *Agent
|
||||||
registerCh chan<- *asyncRegisterRequest
|
registerCh chan<- *asyncRegisterRequest
|
||||||
@ -206,9 +206,9 @@ func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.Wait
|
|||||||
req.snap = w.agent.snapshotCheckState() // requires Agent.stateLock
|
req.snap = w.agent.snapshotCheckState() // requires Agent.stateLock
|
||||||
|
|
||||||
err = w.agent.addServiceInternal(addServiceInternalRequest{
|
err = w.agent.addServiceInternal(addServiceInternalRequest{
|
||||||
AddServiceRequest: req,
|
addServiceLockedRequest: req,
|
||||||
persistService: w.registration.Service,
|
persistService: w.registration.Service,
|
||||||
persistDefaults: serviceDefaults,
|
persistDefaults: serviceDefaults,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error updating service registration: %v", err)
|
return fmt.Errorf("error updating service registration: %v", err)
|
||||||
@ -254,7 +254,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
|
|||||||
|
|
||||||
// Configure and start a cache.Notify goroutine to run a continuous
|
// Configure and start a cache.Notify goroutine to run a continuous
|
||||||
// blocking query on the resolved service config for this service.
|
// blocking query on the resolved service config for this service.
|
||||||
req := makeConfigRequest(w.agent.baseDeps, w.registration)
|
req := makeConfigRequest(w.agent.baseDeps, w.registration.AddServiceRequest)
|
||||||
w.cacheKey = req.CacheInfo().Key
|
w.cacheKey = req.CacheInfo().Key
|
||||||
|
|
||||||
updateCh := make(chan cache.UpdateEvent, 1)
|
updateCh := make(chan cache.UpdateEvent, 1)
|
||||||
@ -353,9 +353,9 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
|||||||
|
|
||||||
registerReq := &asyncRegisterRequest{
|
registerReq := &asyncRegisterRequest{
|
||||||
Args: addServiceInternalRequest{
|
Args: addServiceInternalRequest{
|
||||||
AddServiceRequest: req,
|
addServiceLockedRequest: req,
|
||||||
persistService: w.registration.Service,
|
persistService: w.registration.Service,
|
||||||
persistDefaults: serviceDefaults,
|
persistDefaults: serviceDefaults,
|
||||||
},
|
},
|
||||||
Reply: make(chan error, 1),
|
Reply: make(chan error, 1),
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user