mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 23:05:28 +00:00
agent: remove serviceRegiration type
Replace with the existing AddServiceRequest struct. These structs are almost identical. Additionally, the only reason the serviceRegistration struct existed was to recreate an AddServiceRequest. By storing and re-using the AddServiceRequest we remove the need to translate into one type and back to the original type. We also remove the extra parameters to a function, because those values are already available from the AddServiceRequest field. Also a minor optimization to only call tokens.AgentToken() when necessary. Previous it was being called every time, but the value was being ignored if the AddServiceRequest had a token.
This commit is contained in:
parent
5f9930a9be
commit
d0386ae025
@ -13,11 +13,9 @@ import (
|
|||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The ServiceManager is a layer for service registration in between the agent
|
// ServiceManager watches changes to central service config for all services
|
||||||
// and the local state. Any services must be registered with the ServiceManager,
|
// registered with it. When a central config changes, the local service will
|
||||||
// which then maintains a long-running watch of any globally-set service or proxy
|
// be updated with the correct values from the central config.
|
||||||
// configuration that applies to the service in order to register the final, merged
|
|
||||||
// service configuration locally in the agent state.
|
|
||||||
type ServiceManager struct {
|
type ServiceManager struct {
|
||||||
agent *Agent
|
agent *Agent
|
||||||
|
|
||||||
@ -122,16 +120,6 @@ func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
|
|||||||
//
|
//
|
||||||
// NOTE: the caller must hold the Agent.stateLock!
|
// NOTE: the caller must hold the Agent.stateLock!
|
||||||
func (s *ServiceManager) AddService(req AddServiceRequest) error {
|
func (s *ServiceManager) AddService(req AddServiceRequest) error {
|
||||||
// TODO: replace serviceRegistration with AddServiceRequest
|
|
||||||
reg := &serviceRegistration{
|
|
||||||
service: req.Service,
|
|
||||||
chkTypes: req.chkTypes,
|
|
||||||
persist: req.persist,
|
|
||||||
token: req.token,
|
|
||||||
replaceExistingChecks: req.replaceExistingChecks,
|
|
||||||
source: req.Source,
|
|
||||||
}
|
|
||||||
|
|
||||||
s.servicesLock.Lock()
|
s.servicesLock.Lock()
|
||||||
defer s.servicesLock.Unlock()
|
defer s.servicesLock.Unlock()
|
||||||
|
|
||||||
@ -147,19 +135,11 @@ func (s *ServiceManager) AddService(req AddServiceRequest) error {
|
|||||||
// Get the existing global config and do the initial registration with the
|
// Get the existing global config and do the initial registration with the
|
||||||
// merged config.
|
// merged config.
|
||||||
watch := &serviceConfigWatch{
|
watch := &serviceConfigWatch{
|
||||||
registration: reg,
|
registration: req,
|
||||||
agent: s.agent,
|
agent: s.agent,
|
||||||
registerCh: s.registerCh,
|
registerCh: s.registerCh,
|
||||||
}
|
}
|
||||||
|
if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil {
|
||||||
err := watch.RegisterAndStart(
|
|
||||||
s.ctx,
|
|
||||||
req.previousDefaults,
|
|
||||||
req.waitForCentralConfig,
|
|
||||||
req.persistServiceConfig,
|
|
||||||
&s.running,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -185,21 +165,11 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// serviceRegistration represents a locally registered service.
|
|
||||||
type serviceRegistration struct {
|
|
||||||
service *structs.NodeService
|
|
||||||
chkTypes []*structs.CheckType
|
|
||||||
persist bool
|
|
||||||
token string
|
|
||||||
replaceExistingChecks bool
|
|
||||||
source configSource
|
|
||||||
}
|
|
||||||
|
|
||||||
// serviceConfigWatch is a long running helper for composing the end config
|
// serviceConfigWatch is a long running helper for composing the end config
|
||||||
// for a given service from both the local registration and the global
|
// for a given service from both the local registration and the global
|
||||||
// service/proxy defaults.
|
// service/proxy defaults.
|
||||||
type serviceConfigWatch struct {
|
type serviceConfigWatch struct {
|
||||||
registration *serviceRegistration
|
registration AddServiceRequest
|
||||||
|
|
||||||
agent *Agent
|
agent *Agent
|
||||||
registerCh chan<- *asyncRegisterRequest
|
registerCh chan<- *asyncRegisterRequest
|
||||||
@ -213,49 +183,40 @@ type serviceConfigWatch struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: this is called while holding the Agent.stateLock
|
// NOTE: this is called while holding the Agent.stateLock
|
||||||
func (w *serviceConfigWatch) RegisterAndStart(
|
func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error {
|
||||||
ctx context.Context,
|
serviceDefaults := w.registration.previousDefaults
|
||||||
serviceDefaults *structs.ServiceConfigResponse,
|
|
||||||
waitForCentralConfig bool,
|
|
||||||
persistServiceConfig bool,
|
|
||||||
wg *sync.WaitGroup,
|
|
||||||
) error {
|
|
||||||
// Either we explicitly block waiting for defaults before registering,
|
// Either we explicitly block waiting for defaults before registering,
|
||||||
// or we feed it some seed data (or NO data) and bypass the blocking
|
// or we feed it some seed data (or NO data) and bypass the blocking
|
||||||
// operation. Either way the watcher will end up with something flagged
|
// operation. Either way the watcher will end up with something flagged
|
||||||
// as defaults even if they don't actually reflect actual defaults.
|
// as defaults even if they don't actually reflect actual defaults.
|
||||||
if waitForCentralConfig {
|
if w.registration.waitForCentralConfig {
|
||||||
var err error
|
var err error
|
||||||
serviceDefaults, err = w.fetchDefaults(ctx)
|
serviceDefaults, err = w.fetchDefaults(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
|
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
|
||||||
w.registration.service.ID, err)
|
w.registration.Service.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge the local registration with the central defaults and update this service
|
// Merge the local registration with the central defaults and update this service
|
||||||
// in the local state.
|
// in the local state.
|
||||||
merged, err := mergeServiceConfig(serviceDefaults, w.registration.service)
|
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// The first time we do this interactively, we need to know if it
|
// make a copy of the AddServiceRequest
|
||||||
// failed for validation reasons which we only get back from the
|
req := w.registration
|
||||||
// initial underlying add service call.
|
req.Service = merged
|
||||||
|
|
||||||
|
// TODO: move this line? it seems out of place. Maybe this default should
|
||||||
|
// be set in addServiceInternal
|
||||||
|
req.snap = w.agent.snapshotCheckState() // requires Agent.stateLock
|
||||||
|
|
||||||
err = w.agent.addServiceInternal(addServiceInternalRequest{
|
err = w.agent.addServiceInternal(addServiceInternalRequest{
|
||||||
AddServiceRequest: AddServiceRequest{
|
AddServiceRequest: req,
|
||||||
Service: merged,
|
persistService: w.registration.Service,
|
||||||
chkTypes: w.registration.chkTypes,
|
persistDefaults: serviceDefaults,
|
||||||
persist: w.registration.persist,
|
|
||||||
persistServiceConfig: persistServiceConfig,
|
|
||||||
token: w.registration.token,
|
|
||||||
replaceExistingChecks: w.registration.replaceExistingChecks,
|
|
||||||
Source: w.registration.source,
|
|
||||||
snap: w.agent.snapshotCheckState(),
|
|
||||||
},
|
|
||||||
persistService: w.registration.service,
|
|
||||||
persistDefaults: serviceDefaults,
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error updating service registration: %v", err)
|
return fmt.Errorf("error updating service registration: %v", err)
|
||||||
@ -374,7 +335,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
|||||||
|
|
||||||
// Merge the local registration with the central defaults and update this service
|
// Merge the local registration with the central defaults and update this service
|
||||||
// in the local state.
|
// in the local state.
|
||||||
merged, err := mergeServiceConfig(serviceDefaults, w.registration.service)
|
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -385,19 +346,16 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make a copy of the AddServiceRequest
|
||||||
|
req := w.registration
|
||||||
|
req.Service = merged
|
||||||
|
req.persistServiceConfig = true
|
||||||
|
|
||||||
registerReq := &asyncRegisterRequest{
|
registerReq := &asyncRegisterRequest{
|
||||||
Args: addServiceInternalRequest{
|
Args: addServiceInternalRequest{
|
||||||
AddServiceRequest: AddServiceRequest{
|
AddServiceRequest: req,
|
||||||
Service: merged,
|
persistService: w.registration.Service,
|
||||||
chkTypes: w.registration.chkTypes,
|
persistDefaults: serviceDefaults,
|
||||||
persist: w.registration.persist,
|
|
||||||
persistServiceConfig: true,
|
|
||||||
token: w.registration.token,
|
|
||||||
replaceExistingChecks: w.registration.replaceExistingChecks,
|
|
||||||
Source: w.registration.source,
|
|
||||||
},
|
|
||||||
persistService: w.registration.service,
|
|
||||||
persistDefaults: serviceDefaults,
|
|
||||||
},
|
},
|
||||||
Reply: make(chan error, 1),
|
Reply: make(chan error, 1),
|
||||||
}
|
}
|
||||||
@ -425,8 +383,8 @@ type asyncRegisterRequest struct {
|
|||||||
Reply chan error
|
Reply chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs.ServiceConfigRequest {
|
func makeConfigRequest(agent *Agent, addReq AddServiceRequest) *structs.ServiceConfigRequest {
|
||||||
ns := registration.service
|
ns := addReq.Service
|
||||||
name := ns.Service
|
name := ns.Service
|
||||||
var upstreams []structs.ServiceID
|
var upstreams []structs.ServiceID
|
||||||
|
|
||||||
@ -451,12 +409,12 @@ func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs
|
|||||||
req := &structs.ServiceConfigRequest{
|
req := &structs.ServiceConfigRequest{
|
||||||
Name: name,
|
Name: name,
|
||||||
Datacenter: agent.config.Datacenter,
|
Datacenter: agent.config.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: agent.tokens.AgentToken()},
|
QueryOptions: structs.QueryOptions{Token: addReq.token},
|
||||||
UpstreamIDs: upstreams,
|
UpstreamIDs: upstreams,
|
||||||
EnterpriseMeta: ns.EnterpriseMeta,
|
EnterpriseMeta: ns.EnterpriseMeta,
|
||||||
}
|
}
|
||||||
if registration.token != "" {
|
if req.QueryOptions.Token == "" {
|
||||||
req.QueryOptions.Token = registration.token
|
req.QueryOptions.Token = agent.tokens.AgentToken()
|
||||||
}
|
}
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user