mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 13:26:07 +00:00
agent: Replace two fields on AddServiceRequest with a func field
The two previous fields were mutually exclusive. They can be represented with a single function which provides the value.
This commit is contained in:
parent
d0386ae025
commit
60c6a1c220
@ -1894,7 +1894,7 @@ func (a *Agent) readPersistedServiceConfigs() (map[structs.ServiceID]*structs.Se
|
||||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered
|
||||
func (a *Agent) AddService(req AddServiceRequest) error {
|
||||
req.waitForCentralConfig = true
|
||||
req.serviceDefaults = serviceDefaultsFromCache(a.baseDeps, req)
|
||||
req.persistServiceConfig = true
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
@ -1933,14 +1933,21 @@ func (a *Agent) addServiceLocked(req AddServiceRequest) error {
|
||||
type AddServiceRequest struct {
|
||||
Service *structs.NodeService
|
||||
chkTypes []*structs.CheckType
|
||||
previousDefaults *structs.ServiceConfigResponse // just for: addServiceLocked
|
||||
waitForCentralConfig bool // just for: addServiceLocked
|
||||
persist bool
|
||||
persistServiceConfig bool
|
||||
token string
|
||||
replaceExistingChecks bool
|
||||
Source configSource
|
||||
snap map[structs.CheckID]*structs.HealthCheck
|
||||
|
||||
// serviceDefaults is a function which will return centralized service
|
||||
// configuration.
|
||||
// When loading service definitions from disk this will return a copy
|
||||
// loaded from a persisted file. Otherwise it will query a Server for the
|
||||
// centralized config.
|
||||
// serviceDefaults is called when the Agent.stateLock is held, so it must
|
||||
// never attempt to acquire that lock.
|
||||
serviceDefaults func(context.Context) (*structs.ServiceConfigResponse, error)
|
||||
}
|
||||
|
||||
type addServiceInternalRequest struct {
|
||||
@ -3080,8 +3087,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||
err = a.addServiceLocked(AddServiceRequest{
|
||||
Service: ns,
|
||||
chkTypes: chkTypes,
|
||||
previousDefaults: persistedServiceConfigs[sid],
|
||||
waitForCentralConfig: false, // exclusively use cached values
|
||||
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sid]),
|
||||
persist: false, // don't rewrite the file with the same data we just read
|
||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||
token: service.Token,
|
||||
@ -3099,8 +3105,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||
err = a.addServiceLocked(AddServiceRequest{
|
||||
Service: sidecar,
|
||||
chkTypes: sidecarChecks,
|
||||
previousDefaults: persistedServiceConfigs[sidecarServiceID],
|
||||
waitForCentralConfig: false, // exclusively use cached values
|
||||
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[sidecarServiceID]),
|
||||
persist: false, // don't rewrite the file with the same data we just read
|
||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||
token: sidecarToken,
|
||||
@ -3196,8 +3201,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||
err = a.addServiceLocked(AddServiceRequest{
|
||||
Service: p.Service,
|
||||
chkTypes: nil,
|
||||
previousDefaults: persistedServiceConfigs[serviceID],
|
||||
waitForCentralConfig: false, // exclusively use cached values
|
||||
serviceDefaults: serviceDefaultsFromStruct(persistedServiceConfigs[serviceID]),
|
||||
persist: false, // don't rewrite the file with the same data we just read
|
||||
persistServiceConfig: false, // don't rewrite the file with the same data we just read
|
||||
token: p.Token,
|
||||
|
@ -184,18 +184,10 @@ type serviceConfigWatch struct {
|
||||
|
||||
// NOTE: this is called while holding the Agent.stateLock
|
||||
func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error {
|
||||
serviceDefaults := w.registration.previousDefaults
|
||||
// Either we explicitly block waiting for defaults before registering,
|
||||
// or we feed it some seed data (or NO data) and bypass the blocking
|
||||
// operation. Either way the watcher will end up with something flagged
|
||||
// as defaults even if they don't actually reflect actual defaults.
|
||||
if w.registration.waitForCentralConfig {
|
||||
var err error
|
||||
serviceDefaults, err = w.fetchDefaults(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
|
||||
w.registration.Service.ID, err)
|
||||
}
|
||||
serviceDefaults, err := w.registration.serviceDefaults(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
|
||||
w.registration.Service.ID, err)
|
||||
}
|
||||
|
||||
// Merge the local registration with the central defaults and update this service
|
||||
@ -227,21 +219,29 @@ func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.Wait
|
||||
return w.start(ctx, wg)
|
||||
}
|
||||
|
||||
// NOTE: this is called while holding the Agent.stateLock
|
||||
func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) (*structs.ServiceConfigResponse, error) {
|
||||
req := makeConfigRequest(w.agent, w.registration)
|
||||
|
||||
raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) {
|
||||
return func(_ context.Context) (*structs.ServiceConfigResponse, error) {
|
||||
return v, nil
|
||||
}
|
||||
}
|
||||
|
||||
serviceConfig, ok := raw.(*structs.ServiceConfigResponse)
|
||||
if !ok {
|
||||
// This should never happen, but we want to protect against panics
|
||||
return nil, fmt.Errorf("internal error: response type not correct")
|
||||
func serviceDefaultsFromCache(bd BaseDeps, req AddServiceRequest) func(context.Context) (*structs.ServiceConfigResponse, error) {
|
||||
// NOTE: this is called while holding the Agent.stateLock
|
||||
return func(ctx context.Context) (*structs.ServiceConfigResponse, error) {
|
||||
req := makeConfigRequest(bd, req)
|
||||
|
||||
raw, _, err := bd.Cache.Get(ctx, cachetype.ResolvedServiceConfigName, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceConfig, ok := raw.(*structs.ServiceConfigResponse)
|
||||
if !ok {
|
||||
// This should never happen, but we want to protect against panics
|
||||
return nil, fmt.Errorf("internal error: response type not correct")
|
||||
}
|
||||
return serviceConfig, nil
|
||||
}
|
||||
return serviceConfig, nil
|
||||
}
|
||||
|
||||
// Start starts the config watch and a goroutine to handle updates over the
|
||||
@ -254,7 +254,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
|
||||
|
||||
// Configure and start a cache.Notify goroutine to run a continuous
|
||||
// blocking query on the resolved service config for this service.
|
||||
req := makeConfigRequest(w.agent, w.registration)
|
||||
req := makeConfigRequest(w.agent.baseDeps, w.registration)
|
||||
w.cacheKey = req.CacheInfo().Key
|
||||
|
||||
updateCh := make(chan cache.UpdateEvent, 1)
|
||||
@ -383,7 +383,7 @@ type asyncRegisterRequest struct {
|
||||
Reply chan error
|
||||
}
|
||||
|
||||
func makeConfigRequest(agent *Agent, addReq AddServiceRequest) *structs.ServiceConfigRequest {
|
||||
func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {
|
||||
ns := addReq.Service
|
||||
name := ns.Service
|
||||
var upstreams []structs.ServiceID
|
||||
@ -408,13 +408,13 @@ func makeConfigRequest(agent *Agent, addReq AddServiceRequest) *structs.ServiceC
|
||||
|
||||
req := &structs.ServiceConfigRequest{
|
||||
Name: name,
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Datacenter: bd.RuntimeConfig.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: addReq.token},
|
||||
UpstreamIDs: upstreams,
|
||||
EnterpriseMeta: ns.EnterpriseMeta,
|
||||
}
|
||||
if req.QueryOptions.Token == "" {
|
||||
req.QueryOptions.Token = agent.tokens.AgentToken()
|
||||
req.QueryOptions.Token = bd.Tokens.AgentToken()
|
||||
}
|
||||
return req
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user