diff --git a/command/agent/agent.go b/command/agent/agent.go index c741240ac7..f24e90f585 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -155,7 +155,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { Port: agent.config.Ports.Server, Tags: []string{}, } - agent.state.AddService(&consulService) + agent.state.AddService(&consulService, "") } else { err = agent.setupClient() agent.state.SetIface(agent.client) @@ -591,7 +591,8 @@ func (a *Agent) purgeCheck(checkID string) error { // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error { +func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, + persist bool, token string) error { if service.Service == "" { return fmt.Errorf("Service name missing") } @@ -621,7 +622,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe } // Add the service - a.state.AddService(service) + a.state.AddService(service, token) // Persist the service to a file if persist { @@ -645,7 +646,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe ServiceID: service.ID, ServiceName: service.Service, } - if err := a.AddCheck(check, chkType, persist); err != nil { + if err := a.AddCheck(check, chkType, persist, token); err != nil { return err } } @@ -696,7 +697,8 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { // This entry is persistent and the agent will make a best effort to // ensure it is registered. The Check may include a CheckType which // is used to automatically update the check status -func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error { +func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, + persist bool, token string) error { if check.CheckID == "" { return fmt.Errorf("CheckID missing") } @@ -775,7 +777,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist } // Add to the local state for anti-entropy - a.state.AddCheck(check) + a.state.AddCheck(check, token) // Persist the check if persist { @@ -920,7 +922,7 @@ func (a *Agent) loadServices(conf *Config) error { for _, service := range conf.Services { ns := service.NodeService() chkTypes := service.CheckTypes() - if err := a.AddService(ns, chkTypes, false); err != nil { + if err := a.AddService(ns, chkTypes, false, service.Token); err != nil { return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) } } @@ -962,7 +964,7 @@ func (a *Agent) loadServices(conf *Config) error { } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", svc.ID, filePath) - return a.AddService(svc, nil, false) + return a.AddService(svc, nil, false, "") } }) @@ -991,7 +993,7 @@ func (a *Agent) loadChecks(conf *Config) error { for _, check := range conf.Checks { health := check.HealthCheck(conf.NodeName) chkType := &check.CheckType - if err := a.AddCheck(health, chkType, false); err != nil { + if err := a.AddCheck(health, chkType, false, ""); err != nil { return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) } } @@ -1035,7 +1037,7 @@ func (a *Agent) loadChecks(conf *Config) error { // services into the active pool p.Check.Status = structs.HealthCritical - if err := a.AddCheck(p.Check, p.ChkType, false); err != nil { + if err := a.AddCheck(p.Check, p.ChkType, false, ""); err != nil { // Purge the check if it is unable to be restored. a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", p.Check.CheckID, err) @@ -1112,7 +1114,7 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error { ServiceName: service.Service, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true) + a.AddCheck(check, nil, true, "") a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID) return nil @@ -1158,7 +1160,7 @@ func (a *Agent) EnableNodeMaintenance(reason string) { Notes: reason, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true) + a.AddCheck(check, nil, true, "") a.logger.Printf("[INFO] agent: Node entered maintenance mode") } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 78299c78a6..18a8556205 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -97,8 +97,12 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ return nil, nil } + // Get the provided token, if any + var token string + s.parseToken(req, &token) + // Add the check - if err := s.agent.AddCheck(health, chkType, true); err != nil { + if err := s.agent.AddCheck(health, chkType, true, token); err != nil { return nil, err } s.syncChanges() @@ -199,8 +203,12 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re } } + // Get the provided token, if any + var token string + s.parseToken(req, &token) + // Add the check - if err := s.agent.AddService(ns, chkTypes, true); err != nil { + if err := s.agent.AddService(ns, chkTypes, true, token); err != nil { return nil, err } s.syncChanges() diff --git a/command/agent/local.go b/command/agent/local.go index f85d6671ad..d8790b30b2 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -48,10 +48,12 @@ type localState struct { // Services tracks the local services services map[string]*structs.NodeService serviceStatus map[string]syncStatus + serviceTokens map[string]string // Checks tracks the local checks checks map[string]*structs.HealthCheck checkStatus map[string]syncStatus + checkTokens map[string]string // Used to track checks that are being deferred deferCheck map[string]*time.Timer @@ -71,8 +73,10 @@ func (l *localState) Init(config *Config, logger *log.Logger) { l.logger = logger l.services = make(map[string]*structs.NodeService) l.serviceStatus = make(map[string]syncStatus) + l.serviceTokens = make(map[string]string) l.checks = make(map[string]*structs.HealthCheck) l.checkStatus = make(map[string]syncStatus) + l.checkTokens = make(map[string]string) l.deferCheck = make(map[string]*time.Timer) l.consulCh = make(chan struct{}, 1) l.triggerCh = make(chan struct{}, 1) @@ -122,7 +126,7 @@ func (l *localState) isPaused() bool { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddService(service *structs.NodeService) { +func (l *localState) AddService(service *structs.NodeService, token string) { // Assign the ID if none given if service.ID == "" && service.Service != "" { service.ID = service.Service @@ -133,6 +137,7 @@ func (l *localState) AddService(service *structs.NodeService) { l.services[service.ID] = service l.serviceStatus[service.ID] = syncStatus{} + l.serviceTokens[service.ID] = token l.changeMade() } @@ -163,7 +168,7 @@ func (l *localState) Services() map[string]*structs.NodeService { // AddCheck is used to add a health check to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddCheck(check *structs.HealthCheck) { +func (l *localState) AddCheck(check *structs.HealthCheck, token string) { // Set the node name check.Node = l.config.NodeName @@ -172,6 +177,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck) { l.checks[check.CheckID] = check l.checkStatus[check.CheckID] = syncStatus{} + l.checkTokens[check.CheckID] = token l.changeMade() } @@ -436,11 +442,16 @@ func (l *localState) deleteService(id string) error { return fmt.Errorf("ServiceID missing") } + token := l.serviceTokens[id] + if token == "" { + token = l.config.ACLToken + } + req := structs.DeregisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, ServiceID: id, - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: token}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -457,11 +468,16 @@ func (l *localState) deleteCheck(id string) error { return fmt.Errorf("CheckID missing") } + token := l.checkTokens[id] + if token == "" { + token = l.config.ACLToken + } + req := structs.DeregisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, CheckID: id, - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: token}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -474,12 +490,17 @@ func (l *localState) deleteCheck(id string) error { // syncService is used to sync a service to the server func (l *localState) syncService(id string) error { + token := l.serviceTokens[id] + if token == "" { + token = l.config.ACLToken + } + req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: l.services[id], - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: token}, } // If the service has associated checks that are out of sync, @@ -530,13 +551,19 @@ func (l *localState) syncCheck(id string) error { service = serv } } + + token := l.checkTokens[id] + if token == "" { + token = l.config.ACLToken + } + req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: service, Check: l.checks[id], - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: token}, } var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out) diff --git a/command/agent/structs.go b/command/agent/structs.go index 84f606e46c..a309965041 100644 --- a/command/agent/structs.go +++ b/command/agent/structs.go @@ -13,6 +13,7 @@ type ServiceDefinition struct { Port int Check CheckType Checks CheckTypes + Token string } func (s *ServiceDefinition) NodeService() *structs.NodeService {