mirror of https://github.com/status-im/consul.git
parent
b3bfeee100
commit
9ed4b2d631
|
@ -259,7 +259,7 @@ func (a *Agent) vetServiceRegister(token string, service *structs.NodeService) e
|
|||
}
|
||||
|
||||
// Vet any service that might be getting overwritten.
|
||||
services := a.State.Services()
|
||||
services := a.state.Services()
|
||||
if existing, ok := services[service.ID]; ok {
|
||||
if !rule.ServiceWrite(existing.Service, nil) {
|
||||
return acl.ErrPermissionDenied
|
||||
|
@ -282,7 +282,7 @@ func (a *Agent) vetServiceUpdate(token string, serviceID string) error {
|
|||
}
|
||||
|
||||
// Vet any changes based on the existing services's info.
|
||||
services := a.State.Services()
|
||||
services := a.state.Services()
|
||||
if existing, ok := services[serviceID]; ok {
|
||||
if !rule.ServiceWrite(existing.Service, nil) {
|
||||
return acl.ErrPermissionDenied
|
||||
|
@ -318,7 +318,7 @@ func (a *Agent) vetCheckRegister(token string, check *structs.HealthCheck) error
|
|||
}
|
||||
|
||||
// Vet any check that might be getting overwritten.
|
||||
checks := a.State.Checks()
|
||||
checks := a.state.Checks()
|
||||
if existing, ok := checks[check.CheckID]; ok {
|
||||
if len(existing.ServiceName) > 0 {
|
||||
if !rule.ServiceWrite(existing.ServiceName, nil) {
|
||||
|
@ -346,7 +346,7 @@ func (a *Agent) vetCheckUpdate(token string, checkID types.CheckID) error {
|
|||
}
|
||||
|
||||
// Vet any changes based on the existing check's info.
|
||||
checks := a.State.Checks()
|
||||
checks := a.state.Checks()
|
||||
if existing, ok := checks[checkID]; ok {
|
||||
if len(existing.ServiceName) > 0 {
|
||||
if !rule.ServiceWrite(existing.ServiceName, nil) {
|
||||
|
|
|
@ -564,7 +564,7 @@ func TestACL_vetServiceRegister(t *testing.T) {
|
|||
|
||||
// Try to register over a service without write privs to the existing
|
||||
// service.
|
||||
a.State.AddService(&structs.NodeService{
|
||||
a.state.AddService(&structs.NodeService{
|
||||
ID: "my-service",
|
||||
Service: "other",
|
||||
}, "")
|
||||
|
@ -596,7 +596,7 @@ func TestACL_vetServiceUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
// Update with write privs.
|
||||
a.State.AddService(&structs.NodeService{
|
||||
a.state.AddService(&structs.NodeService{
|
||||
ID: "my-service",
|
||||
Service: "service",
|
||||
}, "")
|
||||
|
@ -662,11 +662,11 @@ func TestACL_vetCheckRegister(t *testing.T) {
|
|||
|
||||
// Try to register over a service check without write privs to the
|
||||
// existing service.
|
||||
a.State.AddService(&structs.NodeService{
|
||||
a.state.AddService(&structs.NodeService{
|
||||
ID: "my-service",
|
||||
Service: "service",
|
||||
}, "")
|
||||
a.State.AddCheck(&structs.HealthCheck{
|
||||
a.state.AddCheck(&structs.HealthCheck{
|
||||
CheckID: types.CheckID("my-check"),
|
||||
ServiceID: "my-service",
|
||||
ServiceName: "other",
|
||||
|
@ -681,7 +681,7 @@ func TestACL_vetCheckRegister(t *testing.T) {
|
|||
}
|
||||
|
||||
// Try to register over a node check without write privs to the node.
|
||||
a.State.AddCheck(&structs.HealthCheck{
|
||||
a.state.AddCheck(&structs.HealthCheck{
|
||||
CheckID: types.CheckID("my-node-check"),
|
||||
}, "")
|
||||
err = a.vetCheckRegister("service-rw", &structs.HealthCheck{
|
||||
|
@ -713,11 +713,11 @@ func TestACL_vetCheckUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
// Update service check with write privs.
|
||||
a.State.AddService(&structs.NodeService{
|
||||
a.state.AddService(&structs.NodeService{
|
||||
ID: "my-service",
|
||||
Service: "service",
|
||||
}, "")
|
||||
a.State.AddCheck(&structs.HealthCheck{
|
||||
a.state.AddCheck(&structs.HealthCheck{
|
||||
CheckID: types.CheckID("my-service-check"),
|
||||
ServiceID: "my-service",
|
||||
ServiceName: "service",
|
||||
|
@ -734,7 +734,7 @@ func TestACL_vetCheckUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
// Update node check with write privs.
|
||||
a.State.AddCheck(&structs.HealthCheck{
|
||||
a.state.AddCheck(&structs.HealthCheck{
|
||||
CheckID: types.CheckID("my-node-check"),
|
||||
}, "")
|
||||
err = a.vetCheckUpdate("node-rw", "my-node-check")
|
||||
|
|
|
@ -109,7 +109,7 @@ type Agent struct {
|
|||
|
||||
// state stores a local representation of the node,
|
||||
// services and checks. Used for anti-entropy.
|
||||
State *local.State
|
||||
state *local.State
|
||||
|
||||
// sync manages the synchronization of the local
|
||||
// and the remote state.
|
||||
|
@ -230,22 +230,6 @@ func New(c *config.RuntimeConfig) (*Agent, error) {
|
|||
return a, nil
|
||||
}
|
||||
|
||||
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
|
||||
lc := local.Config{
|
||||
AdvertiseAddr: cfg.AdvertiseAddrLAN.String(),
|
||||
CheckUpdateInterval: cfg.CheckUpdateInterval,
|
||||
Datacenter: cfg.Datacenter,
|
||||
DiscardCheckOutput: cfg.DiscardCheckOutput,
|
||||
NodeID: cfg.NodeID,
|
||||
NodeName: cfg.NodeName,
|
||||
TaggedAddresses: map[string]string{},
|
||||
}
|
||||
for k, v := range cfg.TaggedAddresses {
|
||||
lc.TaggedAddresses[k] = v
|
||||
}
|
||||
return lc
|
||||
}
|
||||
|
||||
func (a *Agent) Start() error {
|
||||
c := a.config
|
||||
|
||||
|
@ -272,12 +256,24 @@ func (a *Agent) Start() error {
|
|||
triggerCh := make(chan struct{}, 1)
|
||||
|
||||
// create the local state
|
||||
a.State = local.NewState(LocalConfig(c), a.logger, a.tokens, triggerCh)
|
||||
lc := local.Config{
|
||||
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
|
||||
CheckUpdateInterval: c.CheckUpdateInterval,
|
||||
Datacenter: c.Datacenter,
|
||||
DiscardCheckOutput: c.DiscardCheckOutput,
|
||||
NodeID: c.NodeID,
|
||||
NodeName: c.NodeName,
|
||||
TaggedAddresses: map[string]string{},
|
||||
}
|
||||
for k, v := range c.TaggedAddresses {
|
||||
lc.TaggedAddresses[k] = v
|
||||
}
|
||||
a.state = local.NewState(lc, a.logger, a.tokens, triggerCh)
|
||||
|
||||
// create the state synchronization manager which performs
|
||||
// regular and on-demand state synchronizations (anti-entropy).
|
||||
a.sync = &ae.StateSyncer{
|
||||
State: a.State,
|
||||
State: a.state,
|
||||
Interval: c.AEInterval,
|
||||
ShutdownCh: a.shutdownCh,
|
||||
ServerUpCh: serverUpCh,
|
||||
|
@ -310,7 +306,7 @@ func (a *Agent) Start() error {
|
|||
}
|
||||
|
||||
a.delegate = server
|
||||
a.State.SetDelegate(server)
|
||||
a.state.SetDelegate(server)
|
||||
a.sync.ClusterSize = func() int { return len(server.LANMembers()) }
|
||||
} else {
|
||||
client, err := consul.NewClientLogger(consulCfg, a.logger)
|
||||
|
@ -319,7 +315,7 @@ func (a *Agent) Start() error {
|
|||
}
|
||||
|
||||
a.delegate = client
|
||||
a.State.SetDelegate(client)
|
||||
a.state.SetDelegate(client)
|
||||
a.sync.ClusterSize = func() int { return len(client.LANMembers()) }
|
||||
}
|
||||
|
||||
|
@ -1391,7 +1387,7 @@ OUTER:
|
|||
// reapServicesInternal does a single pass, looking for services to reap.
|
||||
func (a *Agent) reapServicesInternal() {
|
||||
reaped := make(map[string]bool)
|
||||
for checkID, cs := range a.State.CriticalCheckStates() {
|
||||
for checkID, cs := range a.state.CriticalCheckStates() {
|
||||
serviceID := cs.Check.ServiceID
|
||||
|
||||
// There's nothing to do if there's no service.
|
||||
|
@ -1449,7 +1445,7 @@ func (a *Agent) persistService(service *structs.NodeService) error {
|
|||
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
|
||||
|
||||
wrapped := persistedService{
|
||||
Token: a.State.ServiceToken(service.ID),
|
||||
Token: a.state.ServiceToken(service.ID),
|
||||
Service: service,
|
||||
}
|
||||
encoded, err := json.Marshal(wrapped)
|
||||
|
@ -1477,7 +1473,7 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckT
|
|||
wrapped := persistedCheck{
|
||||
Check: check,
|
||||
ChkType: chkType,
|
||||
Token: a.State.CheckToken(check.CheckID),
|
||||
Token: a.state.CheckToken(check.CheckID),
|
||||
}
|
||||
|
||||
encoded, err := json.Marshal(wrapped)
|
||||
|
@ -1576,7 +1572,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
|
|||
defer a.restoreCheckState(snap)
|
||||
|
||||
// Add the service
|
||||
a.State.AddService(service, token)
|
||||
a.state.AddService(service, token)
|
||||
|
||||
// Persist the service to a file
|
||||
if persist && !a.config.DevMode {
|
||||
|
@ -1626,7 +1622,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
|
|||
}
|
||||
|
||||
// Remove service immediately
|
||||
if err := a.State.RemoveService(serviceID); err != nil {
|
||||
if err := a.state.RemoveService(serviceID); err != nil {
|
||||
a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err)
|
||||
return nil
|
||||
}
|
||||
|
@ -1639,7 +1635,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
|
|||
}
|
||||
|
||||
// Deregister any associated health checks
|
||||
for checkID, check := range a.State.Checks() {
|
||||
for checkID, check := range a.state.Checks() {
|
||||
if check.ServiceID != serviceID {
|
||||
continue
|
||||
}
|
||||
|
@ -1672,7 +1668,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
}
|
||||
|
||||
if check.ServiceID != "" {
|
||||
s := a.State.Service(check.ServiceID)
|
||||
s := a.state.Services()[check.ServiceID]
|
||||
if s == nil {
|
||||
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
|
||||
}
|
||||
|
@ -1693,7 +1689,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
}
|
||||
|
||||
ttl := &CheckTTL{
|
||||
Notify: a.State,
|
||||
Notify: a.state,
|
||||
CheckID: check.CheckID,
|
||||
TTL: chkType.TTL,
|
||||
Logger: a.logger,
|
||||
|
@ -1720,7 +1716,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
}
|
||||
|
||||
http := &CheckHTTP{
|
||||
Notify: a.State,
|
||||
Notify: a.state,
|
||||
CheckID: check.CheckID,
|
||||
HTTP: chkType.HTTP,
|
||||
Header: chkType.Header,
|
||||
|
@ -1745,7 +1741,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
}
|
||||
|
||||
tcp := &CheckTCP{
|
||||
Notify: a.State,
|
||||
Notify: a.state,
|
||||
CheckID: check.CheckID,
|
||||
TCP: chkType.TCP,
|
||||
Interval: chkType.Interval,
|
||||
|
@ -1782,7 +1778,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
}
|
||||
|
||||
dockerCheck := &CheckDocker{
|
||||
Notify: a.State,
|
||||
Notify: a.state,
|
||||
CheckID: check.CheckID,
|
||||
DockerContainerID: chkType.DockerContainerID,
|
||||
Shell: chkType.Shell,
|
||||
|
@ -1812,7 +1808,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
}
|
||||
|
||||
monitor := &CheckMonitor{
|
||||
Notify: a.State,
|
||||
Notify: a.state,
|
||||
CheckID: check.CheckID,
|
||||
Script: chkType.Script,
|
||||
ScriptArgs: chkType.ScriptArgs,
|
||||
|
@ -1841,7 +1837,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
|
|||
}
|
||||
|
||||
// Add to the local state for anti-entropy
|
||||
err := a.State.AddCheck(check, token)
|
||||
err := a.state.AddCheck(check, token)
|
||||
if err != nil {
|
||||
a.cancelCheckMonitors(check.CheckID)
|
||||
return err
|
||||
|
@ -1864,7 +1860,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
|
|||
}
|
||||
|
||||
// Add to the local state for anti-entropy
|
||||
a.State.RemoveCheck(checkID)
|
||||
a.state.RemoveCheck(checkID)
|
||||
|
||||
a.checkLock.Lock()
|
||||
defer a.checkLock.Unlock()
|
||||
|
@ -2029,7 +2025,7 @@ func (a *Agent) Stats() map[string]map[string]string {
|
|||
"check_monitors": strconv.Itoa(len(a.checkMonitors)),
|
||||
"check_ttls": strconv.Itoa(len(a.checkTTLs)),
|
||||
}
|
||||
for k, v := range a.State.Stats() {
|
||||
for k, v := range a.state.Stats() {
|
||||
stats["agent"][k] = v
|
||||
}
|
||||
|
||||
|
@ -2153,7 +2149,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
|
|||
}
|
||||
serviceID := p.Service.ID
|
||||
|
||||
if a.State.Service(serviceID) != nil {
|
||||
if a.state.Service(serviceID) != nil {
|
||||
// Purge previously persisted service. This allows config to be
|
||||
// preferred over services persisted from the API.
|
||||
a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q",
|
||||
|
@ -2176,7 +2172,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
|
|||
// unloadServices will deregister all services other than the 'consul' service
|
||||
// known to the local agent.
|
||||
func (a *Agent) unloadServices() error {
|
||||
for id := range a.State.Services() {
|
||||
for id := range a.state.Services() {
|
||||
if err := a.RemoveService(id, false); err != nil {
|
||||
return fmt.Errorf("Failed deregistering service '%s': %v", id, err)
|
||||
}
|
||||
|
@ -2232,7 +2228,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
|
|||
}
|
||||
checkID := p.Check.CheckID
|
||||
|
||||
if a.State.Check(checkID) != nil {
|
||||
if a.state.Check(checkID) != nil {
|
||||
// Purge previously persisted check. This allows config to be
|
||||
// preferred over persisted checks from the API.
|
||||
a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q",
|
||||
|
@ -2263,7 +2259,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
|
|||
|
||||
// unloadChecks will deregister all checks known to the local agent.
|
||||
func (a *Agent) unloadChecks() error {
|
||||
for id := range a.State.Checks() {
|
||||
for id := range a.state.Checks() {
|
||||
if err := a.RemoveCheck(id, false); err != nil {
|
||||
return fmt.Errorf("Failed deregistering check '%s': %s", id, err)
|
||||
}
|
||||
|
@ -2275,7 +2271,7 @@ func (a *Agent) unloadChecks() error {
|
|||
// checks. This is done before we reload our checks, so that we can properly
|
||||
// restore into the same state.
|
||||
func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck {
|
||||
return a.State.Checks()
|
||||
return a.state.Checks()
|
||||
}
|
||||
|
||||
// restoreCheckState is used to reset the health state based on a snapshot.
|
||||
|
@ -2283,7 +2279,7 @@ func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck {
|
|||
// in health state and potential session invalidations.
|
||||
func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
|
||||
for id, check := range snap {
|
||||
a.State.UpdateCheck(id, check.Status, check.Output)
|
||||
a.state.UpdateCheck(id, check.Status, check.Output)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2295,12 +2291,12 @@ func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error {
|
|||
meta[k] = v
|
||||
}
|
||||
meta[structs.MetaSegmentKey] = conf.SegmentName
|
||||
return a.State.LoadMetadata(meta)
|
||||
return a.state.LoadMetadata(meta)
|
||||
}
|
||||
|
||||
// unloadMetadata resets the local metadata state
|
||||
func (a *Agent) unloadMetadata() {
|
||||
a.State.UnloadMetadata()
|
||||
a.state.UnloadMetadata()
|
||||
}
|
||||
|
||||
// serviceMaintCheckID returns the ID of a given service's maintenance check
|
||||
|
@ -2311,14 +2307,14 @@ func serviceMaintCheckID(serviceID string) types.CheckID {
|
|||
// EnableServiceMaintenance will register a false health check against the given
|
||||
// service ID with critical status. This will exclude the service from queries.
|
||||
func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error {
|
||||
service, ok := a.State.Services()[serviceID]
|
||||
service, ok := a.state.Services()[serviceID]
|
||||
if !ok {
|
||||
return fmt.Errorf("No service registered with ID %q", serviceID)
|
||||
}
|
||||
|
||||
// Check if maintenance mode is not already enabled
|
||||
checkID := serviceMaintCheckID(serviceID)
|
||||
if _, ok := a.State.Checks()[checkID]; ok {
|
||||
if _, ok := a.state.Checks()[checkID]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2346,13 +2342,13 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error
|
|||
// DisableServiceMaintenance will deregister the fake maintenance mode check
|
||||
// if the service has been marked as in maintenance.
|
||||
func (a *Agent) DisableServiceMaintenance(serviceID string) error {
|
||||
if _, ok := a.State.Services()[serviceID]; !ok {
|
||||
if _, ok := a.state.Services()[serviceID]; !ok {
|
||||
return fmt.Errorf("No service registered with ID %q", serviceID)
|
||||
}
|
||||
|
||||
// Check if maintenance mode is enabled
|
||||
checkID := serviceMaintCheckID(serviceID)
|
||||
if _, ok := a.State.Checks()[checkID]; !ok {
|
||||
if _, ok := a.state.Checks()[checkID]; !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2366,7 +2362,7 @@ func (a *Agent) DisableServiceMaintenance(serviceID string) error {
|
|||
// EnableNodeMaintenance places a node into maintenance mode.
|
||||
func (a *Agent) EnableNodeMaintenance(reason, token string) {
|
||||
// Ensure node maintenance is not already enabled
|
||||
if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
|
||||
if _, ok := a.state.Checks()[structs.NodeMaint]; ok {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -2389,7 +2385,7 @@ func (a *Agent) EnableNodeMaintenance(reason, token string) {
|
|||
|
||||
// DisableNodeMaintenance removes a node from maintenance mode
|
||||
func (a *Agent) DisableNodeMaintenance() {
|
||||
if _, ok := a.State.Checks()[structs.NodeMaint]; !ok {
|
||||
if _, ok := a.state.Checks()[structs.NodeMaint]; !ok {
|
||||
return
|
||||
}
|
||||
a.RemoveCheck(structs.NodeMaint, true)
|
||||
|
@ -2433,7 +2429,7 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
|||
// Update filtered metrics
|
||||
metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes)
|
||||
|
||||
a.State.SetDiscardCheckOutput(newCfg.DiscardCheckOutput)
|
||||
a.state.SetDiscardCheckOutput(newCfg.DiscardCheckOutput)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
|
|||
Coord: cs[s.agent.config.SegmentName],
|
||||
Member: s.agent.LocalMember(),
|
||||
Stats: s.agent.Stats(),
|
||||
Meta: s.agent.State.Metadata(),
|
||||
Meta: s.agent.state.Metadata(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
services := s.agent.State.Services()
|
||||
services := s.agent.state.Services()
|
||||
if err := s.agent.filterServices(token, &services); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -161,7 +161,7 @@ func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (i
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
checks := s.agent.State.Checks()
|
||||
checks := s.agent.state.Checks()
|
||||
if err := s.agent.filterChecks(token, &checks); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request
|
|||
// services and checks to the server. If the operation fails, we only
|
||||
// only warn because the write did succeed and anti-entropy will sync later.
|
||||
func (s *HTTPServer) syncChanges() {
|
||||
if err := s.agent.State.SyncChanges(); err != nil {
|
||||
if err := s.agent.state.SyncChanges(); err != nil {
|
||||
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ func TestAgent_Services(t *testing.T) {
|
|||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv1, "")
|
||||
a.state.AddService(srv1, "")
|
||||
|
||||
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
|
||||
obj, err := a.srv.AgentServices(nil, req)
|
||||
|
@ -78,7 +78,7 @@ func TestAgent_Services_ACLFilter(t *testing.T) {
|
|||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv1, "")
|
||||
a.state.AddService(srv1, "")
|
||||
|
||||
t.Run("no token", func(t *testing.T) {
|
||||
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
|
||||
|
@ -116,7 +116,7 @@ func TestAgent_Checks(t *testing.T) {
|
|||
Name: "mysql",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk1, "")
|
||||
a.state.AddCheck(chk1, "")
|
||||
|
||||
req, _ := http.NewRequest("GET", "/v1/agent/checks", nil)
|
||||
obj, err := a.srv.AgentChecks(nil, req)
|
||||
|
@ -143,7 +143,7 @@ func TestAgent_Checks_ACLFilter(t *testing.T) {
|
|||
Name: "mysql",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk1, "")
|
||||
a.state.AddCheck(chk1, "")
|
||||
|
||||
t.Run("no token", func(t *testing.T) {
|
||||
req, _ := http.NewRequest("GET", "/v1/agent/checks", nil)
|
||||
|
@ -283,8 +283,8 @@ func TestAgent_Reload(t *testing.T) {
|
|||
`)
|
||||
defer a.Shutdown()
|
||||
|
||||
if a.State.Service("redis") == nil {
|
||||
t.Fatal("missing redis service")
|
||||
if _, ok := a.state.services["redis"]; !ok {
|
||||
t.Fatalf("missing redis service")
|
||||
}
|
||||
|
||||
cfg2 := TestConfig(config.Source{
|
||||
|
@ -307,8 +307,8 @@ func TestAgent_Reload(t *testing.T) {
|
|||
if err := a.ReloadConfig(cfg2); err != nil {
|
||||
t.Fatalf("got error %v want nil", err)
|
||||
}
|
||||
if a.State.Service("redis-reloaded") == nil {
|
||||
t.Fatal("missing redis-reloaded service")
|
||||
if _, ok := a.state.services["redis-reloaded"]; !ok {
|
||||
t.Fatalf("missing redis-reloaded service")
|
||||
}
|
||||
|
||||
for _, wp := range a.watchPlans {
|
||||
|
@ -682,7 +682,7 @@ func TestAgent_RegisterCheck(t *testing.T) {
|
|||
|
||||
// Ensure we have a check mapping
|
||||
checkID := types.CheckID("test")
|
||||
if _, ok := a.State.Checks()[checkID]; !ok {
|
||||
if _, ok := a.state.Checks()[checkID]; !ok {
|
||||
t.Fatalf("missing test check")
|
||||
}
|
||||
|
||||
|
@ -691,12 +691,12 @@ func TestAgent_RegisterCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the token was configured
|
||||
if token := a.State.CheckToken(checkID); token == "" {
|
||||
if token := a.state.CheckToken(checkID); token == "" {
|
||||
t.Fatalf("missing token")
|
||||
}
|
||||
|
||||
// By default, checks start in critical state.
|
||||
state := a.State.Checks()[checkID]
|
||||
state := a.state.Checks()[checkID]
|
||||
if state.Status != api.HealthCritical {
|
||||
t.Fatalf("bad: %v", state)
|
||||
}
|
||||
|
@ -817,7 +817,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) {
|
|||
|
||||
// Ensure we have a check mapping
|
||||
checkID := types.CheckID("test")
|
||||
if _, ok := a.State.Checks()[checkID]; !ok {
|
||||
if _, ok := a.state.Checks()[checkID]; !ok {
|
||||
t.Fatalf("missing test check")
|
||||
}
|
||||
|
||||
|
@ -825,7 +825,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) {
|
|||
t.Fatalf("missing test check ttl")
|
||||
}
|
||||
|
||||
state := a.State.Checks()[checkID]
|
||||
state := a.state.Checks()[checkID]
|
||||
if state.Status != api.HealthPassing {
|
||||
t.Fatalf("bad: %v", state)
|
||||
}
|
||||
|
@ -896,7 +896,7 @@ func TestAgent_DeregisterCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
if _, ok := a.State.Checks()["test"]; ok {
|
||||
if _, ok := a.state.Checks()["test"]; ok {
|
||||
t.Fatalf("have test check")
|
||||
}
|
||||
}
|
||||
|
@ -947,7 +947,7 @@ func TestAgent_PassCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
state := a.State.Checks()["test"]
|
||||
state := a.state.Checks()["test"]
|
||||
if state.Status != api.HealthPassing {
|
||||
t.Fatalf("bad: %v", state)
|
||||
}
|
||||
|
@ -1000,7 +1000,7 @@ func TestAgent_WarnCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
state := a.State.Checks()["test"]
|
||||
state := a.state.Checks()["test"]
|
||||
if state.Status != api.HealthWarning {
|
||||
t.Fatalf("bad: %v", state)
|
||||
}
|
||||
|
@ -1053,7 +1053,7 @@ func TestAgent_FailCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
state := a.State.Checks()["test"]
|
||||
state := a.state.Checks()["test"]
|
||||
if state.Status != api.HealthCritical {
|
||||
t.Fatalf("bad: %v", state)
|
||||
}
|
||||
|
@ -1117,7 +1117,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
|
|||
t.Fatalf("expected 200, got %d", resp.Code)
|
||||
}
|
||||
|
||||
state := a.State.Checks()["test"]
|
||||
state := a.state.Checks()["test"]
|
||||
if state.Status != c.Status || state.Output != c.Output {
|
||||
t.Fatalf("bad: %v", state)
|
||||
}
|
||||
|
@ -1145,7 +1145,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
|
|||
// Since we append some notes about truncating, we just do a
|
||||
// rough check that the output buffer was cut down so this test
|
||||
// isn't super brittle.
|
||||
state := a.State.Checks()["test"]
|
||||
state := a.state.Checks()["test"]
|
||||
if state.Status != api.HealthPassing || len(state.Output) > 2*CheckBufSize {
|
||||
t.Fatalf("bad: %v", state)
|
||||
}
|
||||
|
@ -1228,12 +1228,12 @@ func TestAgent_RegisterService(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the servie
|
||||
if _, ok := a.State.Services()["test"]; !ok {
|
||||
if _, ok := a.state.Services()["test"]; !ok {
|
||||
t.Fatalf("missing test service")
|
||||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
checks := a.State.Checks()
|
||||
checks := a.state.Checks()
|
||||
if len(checks) != 3 {
|
||||
t.Fatalf("bad: %v", checks)
|
||||
}
|
||||
|
@ -1243,7 +1243,7 @@ func TestAgent_RegisterService(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the token was configured
|
||||
if token := a.State.ServiceToken("test"); token == "" {
|
||||
if token := a.state.ServiceToken("test"); token == "" {
|
||||
t.Fatalf("missing token")
|
||||
}
|
||||
}
|
||||
|
@ -1364,11 +1364,11 @@ func TestAgent_DeregisterService(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
if _, ok := a.State.Services()["test"]; ok {
|
||||
if _, ok := a.state.Services()["test"]; ok {
|
||||
t.Fatalf("have test service")
|
||||
}
|
||||
|
||||
if _, ok := a.State.Checks()["test"]; ok {
|
||||
if _, ok := a.state.Checks()["test"]; ok {
|
||||
t.Fatalf("have test check")
|
||||
}
|
||||
}
|
||||
|
@ -1466,13 +1466,13 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) {
|
|||
|
||||
// Ensure the maintenance check was registered
|
||||
checkID := serviceMaintCheckID("test")
|
||||
check, ok := a.State.Checks()[checkID]
|
||||
check, ok := a.state.Checks()[checkID]
|
||||
if !ok {
|
||||
t.Fatalf("should have registered maintenance check")
|
||||
}
|
||||
|
||||
// Ensure the token was added
|
||||
if token := a.State.CheckToken(checkID); token != "mytoken" {
|
||||
if token := a.state.CheckToken(checkID); token != "mytoken" {
|
||||
t.Fatalf("expected 'mytoken', got '%s'", token)
|
||||
}
|
||||
|
||||
|
@ -1513,7 +1513,7 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) {
|
|||
|
||||
// Ensure the maintenance check was removed
|
||||
checkID := serviceMaintCheckID("test")
|
||||
if _, ok := a.State.Checks()[checkID]; ok {
|
||||
if _, ok := a.state.Checks()[checkID]; ok {
|
||||
t.Fatalf("should have removed maintenance check")
|
||||
}
|
||||
}
|
||||
|
@ -1579,13 +1579,13 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the maintenance check was registered
|
||||
check, ok := a.State.Checks()[structs.NodeMaint]
|
||||
check, ok := a.state.Checks()[structs.NodeMaint]
|
||||
if !ok {
|
||||
t.Fatalf("should have registered maintenance check")
|
||||
}
|
||||
|
||||
// Check that the token was used
|
||||
if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" {
|
||||
if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" {
|
||||
t.Fatalf("expected 'mytoken', got '%s'", token)
|
||||
}
|
||||
|
||||
|
@ -1614,7 +1614,7 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the maintenance check was removed
|
||||
if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
|
||||
if _, ok := a.state.Checks()[structs.NodeMaint]; ok {
|
||||
t.Fatalf("should have removed maintenance check")
|
||||
}
|
||||
}
|
||||
|
@ -1670,7 +1670,7 @@ func TestAgent_RegisterCheck_Service(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
result := a.State.Checks()
|
||||
result := a.state.Checks()
|
||||
if _, ok := result["service:memcache"]; !ok {
|
||||
t.Fatalf("missing memcached check")
|
||||
}
|
||||
|
|
|
@ -363,14 +363,14 @@ func TestAgent_AddService(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
got, want := a.State.Services()[tt.srv.ID], tt.srv
|
||||
got, want := a.state.Services()[tt.srv.ID], tt.srv
|
||||
verify.Values(t, "", got, want)
|
||||
})
|
||||
|
||||
// check the health checks
|
||||
for k, v := range tt.healthChks {
|
||||
t.Run(k, func(t *testing.T) {
|
||||
got, want := a.State.Checks()[types.CheckID(k)], v
|
||||
got, want := a.state.Checks()[types.CheckID(k)], v
|
||||
verify.Values(t, k, got, want)
|
||||
})
|
||||
}
|
||||
|
@ -437,10 +437,10 @@ func TestAgent_RemoveService(t *testing.T) {
|
|||
if err := a.RemoveService("memcache", false); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if _, ok := a.State.Checks()["service:memcache"]; ok {
|
||||
if _, ok := a.state.Checks()["service:memcache"]; ok {
|
||||
t.Fatalf("have memcache check")
|
||||
}
|
||||
if _, ok := a.State.Checks()["check2"]; ok {
|
||||
if _, ok := a.state.Checks()["check2"]; ok {
|
||||
t.Fatalf("have check2 check")
|
||||
}
|
||||
}
|
||||
|
@ -466,15 +466,15 @@ func TestAgent_RemoveService(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a state mapping
|
||||
if _, ok := a.State.Services()["redis"]; ok {
|
||||
if _, ok := a.state.Services()["redis"]; ok {
|
||||
t.Fatalf("have redis service")
|
||||
}
|
||||
|
||||
// Ensure checks were removed
|
||||
if _, ok := a.State.Checks()["service:redis:1"]; ok {
|
||||
if _, ok := a.state.Checks()["service:redis:1"]; ok {
|
||||
t.Fatalf("check redis:1 should be removed")
|
||||
}
|
||||
if _, ok := a.State.Checks()["service:redis:2"]; ok {
|
||||
if _, ok := a.state.Checks()["service:redis:2"]; ok {
|
||||
t.Fatalf("check redis:2 should be removed")
|
||||
}
|
||||
|
||||
|
@ -507,7 +507,7 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
|
|||
}
|
||||
|
||||
// verify chk1 exists
|
||||
if a.State.Checks()["chk1"] == nil {
|
||||
if a.state.Checks()["chk1"] == nil {
|
||||
t.Fatal("Could not find health check chk1")
|
||||
}
|
||||
|
||||
|
@ -517,10 +517,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
|
|||
}
|
||||
|
||||
// check that both checks are there
|
||||
if got, want := a.State.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) {
|
||||
if got, want := a.state.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) {
|
||||
t.FailNow()
|
||||
}
|
||||
if got, want := a.State.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) {
|
||||
if got, want := a.state.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
|
@ -530,10 +530,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that both checks are gone
|
||||
if a.State.Checks()["chk1"] != nil {
|
||||
if a.state.Checks()["chk1"] != nil {
|
||||
t.Fatal("Found health check chk1 want nil")
|
||||
}
|
||||
if a.State.Checks()["chk2"] != nil {
|
||||
if a.state.Checks()["chk2"] != nil {
|
||||
t.Fatal("Found health check chk2 want nil")
|
||||
}
|
||||
}
|
||||
|
@ -561,7 +561,7 @@ func TestAgent_AddCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
sChk, ok := a.State.Checks()["mem"]
|
||||
sChk, ok := a.state.Checks()["mem"]
|
||||
if !ok {
|
||||
t.Fatalf("missing mem check")
|
||||
}
|
||||
|
@ -600,7 +600,7 @@ func TestAgent_AddCheck_StartPassing(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
sChk, ok := a.State.Checks()["mem"]
|
||||
sChk, ok := a.state.Checks()["mem"]
|
||||
if !ok {
|
||||
t.Fatalf("missing mem check")
|
||||
}
|
||||
|
@ -639,7 +639,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
if _, ok := a.State.Checks()["mem"]; !ok {
|
||||
if _, ok := a.state.Checks()["mem"]; !ok {
|
||||
t.Fatalf("missing mem check")
|
||||
}
|
||||
|
||||
|
@ -704,7 +704,7 @@ func TestAgent_AddCheck_RestoreState(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the check status was restored during registration
|
||||
checks := a.State.Checks()
|
||||
checks := a.state.Checks()
|
||||
check, ok := checks["baz"]
|
||||
if !ok {
|
||||
t.Fatalf("missing check")
|
||||
|
@ -739,7 +739,7 @@ func TestAgent_AddCheck_ExecDisable(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we don't have a check mapping
|
||||
if memChk := a.State.Checks()["mem"]; memChk != nil {
|
||||
if memChk := a.state.Checks()["mem"]; memChk != nil {
|
||||
t.Fatalf("should be missing mem check")
|
||||
}
|
||||
}
|
||||
|
@ -782,7 +782,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping
|
||||
if _, ok := a.State.Checks()["mem"]; ok {
|
||||
if _, ok := a.state.Checks()["mem"]; ok {
|
||||
t.Fatalf("have mem check")
|
||||
}
|
||||
|
||||
|
@ -817,7 +817,7 @@ func TestAgent_updateTTLCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure we have a check mapping.
|
||||
status := a.State.Checks()["mem"]
|
||||
status := a.state.Checks()["mem"]
|
||||
if status.Status != api.HealthPassing {
|
||||
t.Fatalf("bad: %v", status)
|
||||
}
|
||||
|
@ -904,15 +904,15 @@ func TestAgent_PersistService(t *testing.T) {
|
|||
a2.Start()
|
||||
defer a2.Shutdown()
|
||||
|
||||
restored := a2.State.ServiceState(svc.ID)
|
||||
if restored == nil {
|
||||
t.Fatalf("service %q missing", svc.ID)
|
||||
restored, ok := a2.state.services[svc.ID]
|
||||
if !ok {
|
||||
t.Fatalf("bad: %#v", a2.state.services)
|
||||
}
|
||||
if got, want := restored.Token, "mytoken"; got != want {
|
||||
t.Fatalf("got token %q want %q", got, want)
|
||||
if a2.state.serviceTokens[svc.ID] != "mytoken" {
|
||||
t.Fatalf("bad: %#v", a2.state.services[svc.ID])
|
||||
}
|
||||
if got, want := restored.Service.Port, 8081; got != want {
|
||||
t.Fatalf("got port %d want %d", got, want)
|
||||
if restored.Port != 8001 {
|
||||
t.Fatalf("bad: %#v", restored)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -951,7 +951,7 @@ func TestAgent_persistedService_compat(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the service was restored
|
||||
services := a.State.Services()
|
||||
services := a.state.Services()
|
||||
result, ok := services["redis"]
|
||||
if !ok {
|
||||
t.Fatalf("missing service")
|
||||
|
@ -1043,8 +1043,8 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
|
|||
if _, err := os.Stat(file); err == nil {
|
||||
t.Fatalf("should have removed persisted service")
|
||||
}
|
||||
result := a2.State.Service("redis")
|
||||
if result == nil {
|
||||
result, ok := a2.state.services["redis"]
|
||||
if !ok {
|
||||
t.Fatalf("missing service registration")
|
||||
}
|
||||
if !reflect.DeepEqual(result.Tags, []string{"bar"}) || result.Port != 9000 {
|
||||
|
@ -1137,9 +1137,9 @@ func TestAgent_PersistCheck(t *testing.T) {
|
|||
a2.Start()
|
||||
defer a2.Shutdown()
|
||||
|
||||
result := a2.State.Check(check.CheckID)
|
||||
if result == nil {
|
||||
t.Fatalf("bad: %#v", a2.State.Checks())
|
||||
result, ok := a2.state.checks[check.CheckID]
|
||||
if !ok {
|
||||
t.Fatalf("bad: %#v", a2.state.checks)
|
||||
}
|
||||
if result.Status != api.HealthCritical {
|
||||
t.Fatalf("bad: %#v", result)
|
||||
|
@ -1152,8 +1152,8 @@ func TestAgent_PersistCheck(t *testing.T) {
|
|||
if _, ok := a2.checkMonitors[check.CheckID]; !ok {
|
||||
t.Fatalf("bad: %#v", a2.checkMonitors)
|
||||
}
|
||||
if a2.State.CheckState(check.CheckID).Token != "mytoken" {
|
||||
t.Fatalf("bad: %s", a2.State.CheckState(check.CheckID).Token)
|
||||
if a2.state.checkTokens[check.CheckID] != "mytoken" {
|
||||
t.Fatalf("bad: %s", a2.state.checkTokens[check.CheckID])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1241,8 +1241,8 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
|
|||
if _, err := os.Stat(file); err == nil {
|
||||
t.Fatalf("should have removed persisted check")
|
||||
}
|
||||
result := a2.State.Check("mem")
|
||||
if result == nil {
|
||||
result, ok := a2.state.checks["mem"]
|
||||
if !ok {
|
||||
t.Fatalf("missing check registration")
|
||||
}
|
||||
expected := &structs.HealthCheck{
|
||||
|
@ -1269,11 +1269,11 @@ func TestAgent_loadChecks_token(t *testing.T) {
|
|||
`)
|
||||
defer a.Shutdown()
|
||||
|
||||
checks := a.State.Checks()
|
||||
checks := a.state.Checks()
|
||||
if _, ok := checks["rabbitmq"]; !ok {
|
||||
t.Fatalf("missing check")
|
||||
}
|
||||
if token := a.State.CheckToken("rabbitmq"); token != "abc123" {
|
||||
if token := a.state.CheckToken("rabbitmq"); token != "abc123" {
|
||||
t.Fatalf("bad: %s", token)
|
||||
}
|
||||
}
|
||||
|
@ -1307,7 +1307,7 @@ func TestAgent_unloadChecks(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
found := false
|
||||
for check := range a.State.Checks() {
|
||||
for check := range a.state.Checks() {
|
||||
if check == check1.CheckID {
|
||||
found = true
|
||||
break
|
||||
|
@ -1323,7 +1323,7 @@ func TestAgent_unloadChecks(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure it was unloaded
|
||||
for check := range a.State.Checks() {
|
||||
for check := range a.state.Checks() {
|
||||
if check == check1.CheckID {
|
||||
t.Fatalf("should have unloaded checks")
|
||||
}
|
||||
|
@ -1342,11 +1342,11 @@ func TestAgent_loadServices_token(t *testing.T) {
|
|||
`)
|
||||
defer a.Shutdown()
|
||||
|
||||
services := a.State.Services()
|
||||
services := a.state.Services()
|
||||
if _, ok := services["rabbitmq"]; !ok {
|
||||
t.Fatalf("missing service")
|
||||
}
|
||||
if token := a.State.ServiceToken("rabbitmq"); token != "abc123" {
|
||||
if token := a.state.ServiceToken("rabbitmq"); token != "abc123" {
|
||||
t.Fatalf("bad: %s", token)
|
||||
}
|
||||
}
|
||||
|
@ -1368,7 +1368,7 @@ func TestAgent_unloadServices(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
found := false
|
||||
for id := range a.State.Services() {
|
||||
for id := range a.state.Services() {
|
||||
if id == svc.ID {
|
||||
found = true
|
||||
break
|
||||
|
@ -1382,7 +1382,7 @@ func TestAgent_unloadServices(t *testing.T) {
|
|||
if err := a.unloadServices(); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(a.State.Services()) != 0 {
|
||||
if len(a.state.Services()) != 0 {
|
||||
t.Fatalf("should have unloaded services")
|
||||
}
|
||||
}
|
||||
|
@ -1411,13 +1411,13 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
|
|||
|
||||
// Make sure the critical health check was added
|
||||
checkID := serviceMaintCheckID("redis")
|
||||
check, ok := a.State.Checks()[checkID]
|
||||
check, ok := a.state.Checks()[checkID]
|
||||
if !ok {
|
||||
t.Fatalf("should have registered critical maintenance check")
|
||||
}
|
||||
|
||||
// Check that the token was used to register the check
|
||||
if token := a.State.CheckToken(checkID); token != "mytoken" {
|
||||
if token := a.state.CheckToken(checkID); token != "mytoken" {
|
||||
t.Fatalf("expected 'mytoken', got: '%s'", token)
|
||||
}
|
||||
|
||||
|
@ -1432,7 +1432,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the check was deregistered
|
||||
if _, ok := a.State.Checks()[checkID]; ok {
|
||||
if _, ok := a.state.Checks()[checkID]; ok {
|
||||
t.Fatalf("should have deregistered maintenance check")
|
||||
}
|
||||
|
||||
|
@ -1442,7 +1442,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure the check was registered with the default notes
|
||||
check, ok = a.State.Checks()[checkID]
|
||||
check, ok = a.state.Checks()[checkID]
|
||||
if !ok {
|
||||
t.Fatalf("should have registered critical check")
|
||||
}
|
||||
|
@ -1479,19 +1479,19 @@ func TestAgent_Service_Reap(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure it's there and there's no critical check yet.
|
||||
if _, ok := a.State.Services()["redis"]; !ok {
|
||||
if _, ok := a.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
|
||||
if checks := a.state.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have critical checks")
|
||||
}
|
||||
|
||||
// Wait for the check TTL to fail but before the check is reaped.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if _, ok := a.State.Services()["redis"]; !ok {
|
||||
if _, ok := a.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
|
||||
if checks := a.state.CriticalChecks(); len(checks) != 1 {
|
||||
t.Fatalf("should have a critical check")
|
||||
}
|
||||
|
||||
|
@ -1499,28 +1499,28 @@ func TestAgent_Service_Reap(t *testing.T) {
|
|||
if err := a.updateTTLCheck("service:redis", api.HealthPassing, "foo"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, ok := a.State.Services()["redis"]; !ok {
|
||||
if _, ok := a.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
|
||||
if checks := a.state.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have critical checks")
|
||||
}
|
||||
|
||||
// Wait for the check TTL to fail again.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if _, ok := a.State.Services()["redis"]; !ok {
|
||||
if _, ok := a.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
|
||||
if checks := a.state.CriticalChecks(); len(checks) != 1 {
|
||||
t.Fatalf("should have a critical check")
|
||||
}
|
||||
|
||||
// Wait for the reap.
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
if _, ok := a.State.Services()["redis"]; ok {
|
||||
if _, ok := a.state.Services()["redis"]; ok {
|
||||
t.Fatalf("redis service should have been reaped")
|
||||
}
|
||||
if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
|
||||
if checks := a.state.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have critical checks")
|
||||
}
|
||||
}
|
||||
|
@ -1552,28 +1552,28 @@ func TestAgent_Service_NoReap(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure it's there and there's no critical check yet.
|
||||
if _, ok := a.State.Services()["redis"]; !ok {
|
||||
if _, ok := a.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
|
||||
if checks := a.state.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have critical checks")
|
||||
}
|
||||
|
||||
// Wait for the check TTL to fail.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if _, ok := a.State.Services()["redis"]; !ok {
|
||||
if _, ok := a.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
|
||||
if checks := a.state.CriticalChecks(); len(checks) != 1 {
|
||||
t.Fatalf("should have a critical check")
|
||||
}
|
||||
|
||||
// Wait a while and make sure it doesn't reap.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if _, ok := a.State.Services()["redis"]; !ok {
|
||||
if _, ok := a.state.Services()["redis"]; !ok {
|
||||
t.Fatalf("should have redis service")
|
||||
}
|
||||
if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
|
||||
if checks := a.state.CriticalChecks(); len(checks) != 1 {
|
||||
t.Fatalf("should have a critical check")
|
||||
}
|
||||
}
|
||||
|
@ -1612,7 +1612,7 @@ func TestAgent_addCheck_restoresSnapshot(t *testing.T) {
|
|||
if err := a.AddService(svc, chkTypes, false, ""); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
check, ok := a.State.Checks()["service:redis"]
|
||||
check, ok := a.state.Checks()["service:redis"]
|
||||
if !ok {
|
||||
t.Fatalf("missing check")
|
||||
}
|
||||
|
@ -1630,13 +1630,13 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
|
|||
a.EnableNodeMaintenance("broken", "mytoken")
|
||||
|
||||
// Make sure the critical health check was added
|
||||
check, ok := a.State.Checks()[structs.NodeMaint]
|
||||
check, ok := a.state.Checks()[structs.NodeMaint]
|
||||
if !ok {
|
||||
t.Fatalf("should have registered critical node check")
|
||||
}
|
||||
|
||||
// Check that the token was used to register the check
|
||||
if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" {
|
||||
if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" {
|
||||
t.Fatalf("expected 'mytoken', got: '%s'", token)
|
||||
}
|
||||
|
||||
|
@ -1649,7 +1649,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
|
|||
a.DisableNodeMaintenance()
|
||||
|
||||
// Ensure the check was deregistered
|
||||
if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
|
||||
if _, ok := a.state.Checks()[structs.NodeMaint]; ok {
|
||||
t.Fatalf("should have deregistered critical node check")
|
||||
}
|
||||
|
||||
|
@ -1657,7 +1657,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
|
|||
a.EnableNodeMaintenance("", "")
|
||||
|
||||
// Make sure the check was registered with the default note
|
||||
check, ok = a.State.Checks()[structs.NodeMaint]
|
||||
check, ok = a.state.Checks()[structs.NodeMaint]
|
||||
if !ok {
|
||||
t.Fatalf("should have registered critical node check")
|
||||
}
|
||||
|
@ -1712,7 +1712,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) {
|
|||
a.restoreCheckState(snap)
|
||||
|
||||
// Search for the check
|
||||
out, ok := a.State.Checks()[check1.CheckID]
|
||||
out, ok := a.state.Checks()[check1.CheckID]
|
||||
if !ok {
|
||||
t.Fatalf("check should have been registered")
|
||||
}
|
||||
|
|
|
@ -33,32 +33,22 @@ func TestCatalogRegister(t *testing.T) {
|
|||
t.Fatalf("bad: %v", res)
|
||||
}
|
||||
|
||||
// todo(fs): data race
|
||||
// func() {
|
||||
// a.State.Lock()
|
||||
// defer a.State.Unlock()
|
||||
// data race
|
||||
func() {
|
||||
a.state.Lock()
|
||||
defer a.state.Unlock()
|
||||
|
||||
// // Service should be in sync
|
||||
// if err := a.State.syncService("foo"); err != nil {
|
||||
// t.Fatalf("err: %s", err)
|
||||
// }
|
||||
// if _, ok := a.State.serviceStatus["foo"]; !ok {
|
||||
// t.Fatalf("bad: %#v", a.State.serviceStatus)
|
||||
// }
|
||||
// if !a.State.serviceStatus["foo"].inSync {
|
||||
// t.Fatalf("should be in sync")
|
||||
// }
|
||||
// }()
|
||||
if err := a.State.SyncChanges(); err != nil {
|
||||
t.Fatal("sync failed: ", err)
|
||||
}
|
||||
s := a.State.ServiceState("foo")
|
||||
if s == nil {
|
||||
t.Fatal("service 'foo' missing")
|
||||
}
|
||||
if !s.InSync {
|
||||
t.Fatalf("service 'foo' should be in sync")
|
||||
}
|
||||
// Service should be in sync
|
||||
if err := a.state.syncService("foo"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if _, ok := a.state.serviceStatus["foo"]; !ok {
|
||||
t.Fatalf("bad: %#v", a.state.serviceStatus)
|
||||
}
|
||||
if !a.state.serviceStatus["foo"].inSync {
|
||||
t.Fatalf("should be in sync")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
|
||||
|
|
|
@ -51,14 +51,6 @@ type ServiceState struct {
|
|||
Deleted bool
|
||||
}
|
||||
|
||||
// Clone returns a shallow copy of the object. The service record still
|
||||
// points to the original service record and must not be modified.
|
||||
func (s *ServiceState) Clone() *ServiceState {
|
||||
s2 := new(ServiceState)
|
||||
*s2 = *s
|
||||
return s2
|
||||
}
|
||||
|
||||
// CheckState describes the state of a health check record.
|
||||
type CheckState struct {
|
||||
// Check is the local copy of the health check record.
|
||||
|
@ -87,15 +79,6 @@ type CheckState struct {
|
|||
Deleted bool
|
||||
}
|
||||
|
||||
// Clone returns a shallow copy of the object. The check record and the
|
||||
// defer timer still point to the original values and must not be
|
||||
// modified.
|
||||
func (c *CheckState) Clone() *CheckState {
|
||||
c2 := new(CheckState)
|
||||
*c2 = *c
|
||||
return c2
|
||||
}
|
||||
|
||||
// Critical returns true when the health check is in critical state.
|
||||
func (c *CheckState) Critical() bool {
|
||||
return !c.CriticalTime.IsZero()
|
||||
|
@ -206,6 +189,9 @@ func (l *State) serviceToken(id string) string {
|
|||
// ensure it is registered
|
||||
// todo(fs): where is the persistence happening?
|
||||
func (l *State) AddService(service *structs.NodeService, token string) error {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
if service == nil {
|
||||
return fmt.Errorf("no service")
|
||||
}
|
||||
|
@ -216,19 +202,13 @@ func (l *State) AddService(service *structs.NodeService, token string) error {
|
|||
service.ID = service.Service
|
||||
}
|
||||
|
||||
l.AddServiceState(&ServiceState{
|
||||
l.services[service.ID] = &ServiceState{
|
||||
Service: service,
|
||||
Token: token,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *State) AddServiceState(s *ServiceState) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
l.services[s.Service.ID] = s
|
||||
}
|
||||
l.changeMade()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveService is used to remove a service entry from the local state.
|
||||
|
@ -281,37 +261,6 @@ func (l *State) Services() map[string]*structs.NodeService {
|
|||
return m
|
||||
}
|
||||
|
||||
// ServiceState returns a shallow copy of the current service state
|
||||
// record. The service record still points to the original service
|
||||
// record and must not be modified.
|
||||
func (l *State) ServiceState(id string) *ServiceState {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
s := l.services[id]
|
||||
if s == nil || s.Deleted {
|
||||
return nil
|
||||
}
|
||||
return s.Clone()
|
||||
}
|
||||
|
||||
// ServiceStates returns a shallow copy of all service state records.
|
||||
// The service record still points to the original service record and
|
||||
// must not be modified.
|
||||
func (l *State) ServiceStates() map[string]*ServiceState {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
m := make(map[string]*ServiceState)
|
||||
for id, s := range l.services {
|
||||
if s.Deleted {
|
||||
continue
|
||||
}
|
||||
m[id] = s.Clone()
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// CheckToken is used to return the configured health check token for a
|
||||
// Check, or if none is configured, the default agent ACL token.
|
||||
func (l *State) CheckToken(checkID types.CheckID) string {
|
||||
|
@ -337,6 +286,9 @@ func (l *State) checkToken(id types.CheckID) string {
|
|||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered
|
||||
func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
if check == nil {
|
||||
return fmt.Errorf("no check")
|
||||
}
|
||||
|
@ -354,19 +306,13 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
|
|||
// hard-set the node name
|
||||
check.Node = l.config.NodeName
|
||||
|
||||
l.AddCheckState(&CheckState{
|
||||
l.checks[check.CheckID] = &CheckState{
|
||||
Check: check,
|
||||
Token: token,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *State) AddCheckState(c *CheckState) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
l.checks[c.Check.CheckID] = c
|
||||
}
|
||||
l.changeMade()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveCheck is used to remove a health check from the local state.
|
||||
|
@ -472,40 +418,17 @@ func (l *State) Check(id types.CheckID) *structs.HealthCheck {
|
|||
// Checks returns the locally registered checks that the
|
||||
// agent is aware of and are being kept in sync with the server
|
||||
func (l *State) Checks() map[types.CheckID]*structs.HealthCheck {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
m := make(map[types.CheckID]*structs.HealthCheck)
|
||||
for id, c := range l.CheckStates() {
|
||||
m[id] = c.Check
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// CheckState returns a shallow copy of the current health check state
|
||||
// record. The health check record and the deferred check still point to
|
||||
// the original values and must not be modified.
|
||||
func (l *State) CheckState(id types.CheckID) *CheckState {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
c := l.checks[id]
|
||||
if c == nil || c.Deleted {
|
||||
return nil
|
||||
}
|
||||
return c.Clone()
|
||||
}
|
||||
|
||||
// CheckStates returns a shallow copy of all health check state records.
|
||||
// The health check records and the deferred checks still point to
|
||||
// the original values and must not be modified.
|
||||
func (l *State) CheckStates() map[types.CheckID]*CheckState {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
m := make(map[types.CheckID]*CheckState)
|
||||
for id, c := range l.checks {
|
||||
if c.Deleted {
|
||||
continue
|
||||
}
|
||||
m[id] = c.Clone()
|
||||
c2 := new(structs.HealthCheck)
|
||||
*c2 = *c.Check
|
||||
m[id] = c2
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
@ -521,7 +444,7 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
|
|||
if c.Deleted || !c.Critical() {
|
||||
continue
|
||||
}
|
||||
m[id] = c.Clone()
|
||||
m[id] = c
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
package local_test
|
||||
package local
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
@ -19,7 +16,7 @@ import (
|
|||
|
||||
func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a := &TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -38,7 +35,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv1, "")
|
||||
a.state.AddService(srv1, "")
|
||||
args.Service = srv1
|
||||
if err := a.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -51,7 +48,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
Tags: []string{},
|
||||
Port: 8000,
|
||||
}
|
||||
a.State.AddService(srv2, "")
|
||||
a.state.AddService(srv2, "")
|
||||
|
||||
srv2_mod := new(structs.NodeService)
|
||||
*srv2_mod = *srv2
|
||||
|
@ -68,7 +65,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
Tags: []string{},
|
||||
Port: 80,
|
||||
}
|
||||
a.State.AddService(srv3, "")
|
||||
a.state.AddService(srv3, "")
|
||||
|
||||
// Exists remote (delete)
|
||||
srv4 := &structs.NodeService{
|
||||
|
@ -90,7 +87,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
Address: "127.0.0.10",
|
||||
Port: 8000,
|
||||
}
|
||||
a.State.AddService(srv5, "")
|
||||
a.state.AddService(srv5, "")
|
||||
|
||||
srv5_mod := new(structs.NodeService)
|
||||
*srv5_mod = *srv5
|
||||
|
@ -107,10 +104,12 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
Tags: []string{},
|
||||
Port: 11211,
|
||||
}
|
||||
a.State.AddServiceState(&local.ServiceState{
|
||||
Service: srv6,
|
||||
InSync: true,
|
||||
})
|
||||
a.state.AddService(srv6, "")
|
||||
|
||||
// todo(fs): data race
|
||||
a.state.Lock()
|
||||
a.state.serviceStatus["cache"] = syncStatus{inSync: true}
|
||||
a.state.Unlock()
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
@ -171,13 +170,26 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := servicesInSync(a.State, 5); err != nil {
|
||||
r.Fatal(err)
|
||||
// todo(fs): data race
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state
|
||||
if len(a.state.services) != 5 {
|
||||
r.Fatalf("bad: %v", a.state.services)
|
||||
}
|
||||
if len(a.state.serviceStatus) != 5 {
|
||||
r.Fatalf("bad: %v", a.state.serviceStatus)
|
||||
}
|
||||
for name, status := range a.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
r.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Remove one of the services
|
||||
a.State.RemoveService("api")
|
||||
a.state.RemoveService("api")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
@ -219,15 +231,28 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := servicesInSync(a.State, 4); err != nil {
|
||||
r.Fatal(err)
|
||||
// todo(fs): data race
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state
|
||||
if len(a.state.services) != 4 {
|
||||
r.Fatalf("bad: %v", a.state.services)
|
||||
}
|
||||
if len(a.state.serviceStatus) != 4 {
|
||||
r.Fatalf("bad: %v", a.state.serviceStatus)
|
||||
}
|
||||
for name, status := range a.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
r.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a := &TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -246,7 +271,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
|||
Port: 6100,
|
||||
EnableTagOverride: true,
|
||||
}
|
||||
a.State.AddService(srv1, "")
|
||||
a.state.AddService(srv1, "")
|
||||
srv1_mod := new(structs.NodeService)
|
||||
*srv1_mod = *srv1
|
||||
srv1_mod.Port = 7100
|
||||
|
@ -264,7 +289,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
|||
Port: 6200,
|
||||
EnableTagOverride: false,
|
||||
}
|
||||
a.State.AddService(srv2, "")
|
||||
a.state.AddService(srv2, "")
|
||||
srv2_mod := new(structs.NodeService)
|
||||
*srv2_mod = *srv2
|
||||
srv2_mod.Port = 7200
|
||||
|
@ -289,8 +314,8 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
|||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
a.State.RLock()
|
||||
defer a.State.RUnlock()
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
|
@ -317,15 +342,21 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := servicesInSync(a.State, 2); err != nil {
|
||||
r.Fatal(err)
|
||||
// todo(fs): data race
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
for name, status := range a.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
r.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := agent.NewTestAgent(t.Name(), "")
|
||||
a := NewTestAgent(t.Name(), "")
|
||||
defer a.Shutdown()
|
||||
|
||||
{
|
||||
|
@ -336,7 +367,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv, "")
|
||||
a.state.AddService(srv, "")
|
||||
|
||||
chk := &structs.HealthCheck{
|
||||
Node: a.Config.NodeName,
|
||||
|
@ -345,22 +376,18 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
ServiceID: "mysql",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk, "")
|
||||
a.state.AddCheck(chk, "")
|
||||
|
||||
// todo(fs): data race
|
||||
// func() {
|
||||
// a.State.RLock()
|
||||
// defer a.State.RUnlock()
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// // Sync the service once
|
||||
// if err := a.State.syncService("mysql"); err != nil {
|
||||
// t.Fatalf("err: %s", err)
|
||||
// }
|
||||
// }()
|
||||
// todo(fs): is this correct?
|
||||
if err := a.State.SyncChanges(); err != nil {
|
||||
t.Fatal("sync failed: ", err)
|
||||
}
|
||||
// Sync the service once
|
||||
if err := a.state.syncService("mysql"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// We should have 2 services (consul included)
|
||||
svcReq := structs.NodeSpecificRequest{
|
||||
|
@ -397,7 +424,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv, "")
|
||||
a.state.AddService(srv, "")
|
||||
|
||||
chk1 := &structs.HealthCheck{
|
||||
Node: a.Config.NodeName,
|
||||
|
@ -406,7 +433,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
ServiceID: "redis",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk1, "")
|
||||
a.state.AddCheck(chk1, "")
|
||||
|
||||
chk2 := &structs.HealthCheck{
|
||||
Node: a.Config.NodeName,
|
||||
|
@ -415,22 +442,18 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
ServiceID: "redis",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk2, "")
|
||||
a.state.AddCheck(chk2, "")
|
||||
|
||||
// todo(fs): data race
|
||||
// func() {
|
||||
// a.State.RLock()
|
||||
// defer a.State.RUnlock()
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// // Sync the service once
|
||||
// if err := a.State.syncService("redis"); err != nil {
|
||||
// t.Fatalf("err: %s", err)
|
||||
// }
|
||||
// }()
|
||||
// todo(fs): is this correct?
|
||||
if err := a.State.SyncChanges(); err != nil {
|
||||
t.Fatal("sync failed: ", err)
|
||||
}
|
||||
// Sync the service once
|
||||
if err := a.state.syncService("redis"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// We should have 3 services (consul included)
|
||||
svcReq := structs.NodeSpecificRequest{
|
||||
|
@ -476,7 +499,7 @@ var testRegisterRules = `
|
|||
|
||||
func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), HCL: `
|
||||
a := &TestAgent{Name: t.Name(), HCL: `
|
||||
acl_datacenter = "dc1"
|
||||
acl_master_token = "root"
|
||||
acl_default_policy = "deny"
|
||||
|
@ -510,7 +533,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv1, token)
|
||||
a.state.AddService(srv1, token)
|
||||
|
||||
// Create service (allowed)
|
||||
srv2 := &structs.NodeService{
|
||||
|
@ -519,7 +542,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
Tags: []string{"foo"},
|
||||
Port: 5001,
|
||||
}
|
||||
a.State.AddService(srv2, token)
|
||||
a.state.AddService(srv2, token)
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
@ -561,13 +584,28 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := servicesInSync(a.State, 2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// todo(fs): data race
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state
|
||||
if len(a.state.services) != 2 {
|
||||
t.Fatalf("bad: %v", a.state.services)
|
||||
}
|
||||
if len(a.state.serviceStatus) != 2 {
|
||||
t.Fatalf("bad: %v", a.state.serviceStatus)
|
||||
}
|
||||
for name, status := range a.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Now remove the service and re-sync
|
||||
a.State.RemoveService("api")
|
||||
a.state.RemoveService("api")
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
|
@ -605,20 +643,35 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := servicesInSync(a.State, 1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// todo(fs): data race
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state
|
||||
if len(a.state.services) != 1 {
|
||||
t.Fatalf("bad: %v", a.state.services)
|
||||
}
|
||||
if len(a.state.serviceStatus) != 1 {
|
||||
t.Fatalf("bad: %v", a.state.serviceStatus)
|
||||
}
|
||||
for name, status := range a.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Make sure the token got cleaned up.
|
||||
if token := a.State.ServiceToken("api"); token != "" {
|
||||
if token := a.state.ServiceToken("api"); token != "" {
|
||||
t.Fatalf("bad: %s", token)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a := &TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -637,7 +690,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
Name: "mysql",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk1, "")
|
||||
a.state.AddCheck(chk1, "")
|
||||
args.Check = chk1
|
||||
if err := a.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -650,7 +703,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
Name: "redis",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk2, "")
|
||||
a.state.AddCheck(chk2, "")
|
||||
|
||||
chk2_mod := new(structs.HealthCheck)
|
||||
*chk2_mod = *chk2
|
||||
|
@ -667,7 +720,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
Name: "web",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk3, "")
|
||||
a.state.AddCheck(chk3, "")
|
||||
|
||||
// Exists remote (delete)
|
||||
chk4 := &structs.HealthCheck{
|
||||
|
@ -688,10 +741,12 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
Name: "cache",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheckState(&local.CheckState{
|
||||
Check: chk5,
|
||||
InSync: true,
|
||||
})
|
||||
a.state.AddCheck(chk5, "")
|
||||
|
||||
// todo(fs): data race
|
||||
a.state.Lock()
|
||||
a.state.checkStatus["cache"] = syncStatus{inSync: true}
|
||||
a.state.Unlock()
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
@ -741,9 +796,24 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
if err := checksInSync(a.State, 4); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// todo(fs): data race
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state
|
||||
if len(a.state.checks) != 4 {
|
||||
t.Fatalf("bad: %v", a.state.checks)
|
||||
}
|
||||
if len(a.state.checkStatus) != 4 {
|
||||
t.Fatalf("bad: %v", a.state.checkStatus)
|
||||
}
|
||||
for name, status := range a.state.checkStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Make sure we sent along our node info addresses when we synced.
|
||||
{
|
||||
|
@ -766,7 +836,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
}
|
||||
|
||||
// Remove one of the checks
|
||||
a.State.RemoveCheck("redis")
|
||||
a.state.RemoveCheck("redis")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
@ -806,14 +876,29 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
if err := checksInSync(a.State, 3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// todo(fs): data race
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state
|
||||
if len(a.state.checks) != 3 {
|
||||
t.Fatalf("bad: %v", a.state.checks)
|
||||
}
|
||||
if len(a.state.checkStatus) != 3 {
|
||||
t.Fatalf("bad: %v", a.state.checkStatus)
|
||||
}
|
||||
for name, status := range a.state.checkStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), HCL: `
|
||||
a := &TestAgent{Name: t.Name(), HCL: `
|
||||
acl_datacenter = "dc1"
|
||||
acl_master_token = "root"
|
||||
acl_default_policy = "deny"
|
||||
|
@ -847,14 +932,14 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
Tags: []string{"master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv1, "root")
|
||||
a.state.AddService(srv1, "root")
|
||||
srv2 := &structs.NodeService{
|
||||
ID: "api",
|
||||
Service: "api",
|
||||
Tags: []string{"foo"},
|
||||
Port: 5001,
|
||||
}
|
||||
a.State.AddService(srv2, "root")
|
||||
a.state.AddService(srv2, "root")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
@ -898,9 +983,24 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := servicesInSync(a.State, 2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// todo(fs): data race
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state
|
||||
if len(a.state.services) != 2 {
|
||||
t.Fatalf("bad: %v", a.state.services)
|
||||
}
|
||||
if len(a.state.serviceStatus) != 2 {
|
||||
t.Fatalf("bad: %v", a.state.serviceStatus)
|
||||
}
|
||||
for name, status := range a.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// This check won't be allowed.
|
||||
|
@ -913,7 +1013,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
Name: "mysql",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk1, token)
|
||||
a.state.AddCheck(chk1, token)
|
||||
|
||||
// This one will be allowed.
|
||||
chk2 := &structs.HealthCheck{
|
||||
|
@ -925,7 +1025,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
Name: "api",
|
||||
Status: api.HealthPassing,
|
||||
}
|
||||
a.State.AddCheck(chk2, token)
|
||||
a.state.AddCheck(chk2, token)
|
||||
|
||||
// Trigger anti-entropy run and wait.
|
||||
a.StartSync()
|
||||
|
@ -968,12 +1068,27 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
if err := checksInSync(a.State, 2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// todo(fs): data race
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state.
|
||||
if len(a.state.checks) != 2 {
|
||||
t.Fatalf("bad: %v", a.state.checks)
|
||||
}
|
||||
if len(a.state.checkStatus) != 2 {
|
||||
t.Fatalf("bad: %v", a.state.checkStatus)
|
||||
}
|
||||
for name, status := range a.state.checkStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Now delete the check and wait for sync.
|
||||
a.State.RemoveCheck("api-check")
|
||||
a.state.RemoveCheck("api-check")
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// Verify that we are in sync
|
||||
|
@ -1011,12 +1126,27 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
if err := checksInSync(a.State, 1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// todo(fs): data race
|
||||
func() {
|
||||
a.state.RLock()
|
||||
defer a.state.RUnlock()
|
||||
|
||||
// Check the local state.
|
||||
if len(a.state.checks) != 1 {
|
||||
t.Fatalf("bad: %v", a.state.checks)
|
||||
}
|
||||
if len(a.state.checkStatus) != 1 {
|
||||
t.Fatalf("bad: %v", a.state.checkStatus)
|
||||
}
|
||||
for name, status := range a.state.checkStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Make sure the token got cleaned up.
|
||||
if token := a.State.CheckToken("api-check"); token != "" {
|
||||
if token := a.state.CheckToken("api-check"); token != "" {
|
||||
t.Fatalf("bad: %s", token)
|
||||
}
|
||||
}
|
||||
|
@ -1073,7 +1203,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) {
|
|||
|
||||
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), HCL: `
|
||||
a := &TestAgent{Name: t.Name(), HCL: `
|
||||
check_update_interval = "500ms"
|
||||
`, NoInitialSync: true}
|
||||
a.Start()
|
||||
|
@ -1087,7 +1217,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
Status: api.HealthPassing,
|
||||
Output: "",
|
||||
}
|
||||
a.State.AddCheck(check, "")
|
||||
a.state.AddCheck(check, "")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
@ -1108,7 +1238,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
})
|
||||
|
||||
// Update the check output! Should be deferred
|
||||
a.State.UpdateCheck("web", api.HealthPassing, "output")
|
||||
a.state.UpdateCheck("web", api.HealthPassing, "output")
|
||||
|
||||
// Should not update for 500 milliseconds
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
@ -1207,7 +1337,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
}
|
||||
|
||||
// Now make an update that should be deferred.
|
||||
a.State.UpdateCheck("web", api.HealthPassing, "deferred")
|
||||
a.state.UpdateCheck("web", api.HealthPassing, "deferred")
|
||||
|
||||
// Trigger anti-entropy run and wait.
|
||||
a.StartSync()
|
||||
|
@ -1251,7 +1381,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
|||
nodeMeta := map[string]string{
|
||||
"somekey": "somevalue",
|
||||
}
|
||||
a := &agent.TestAgent{Name: t.Name(), HCL: `
|
||||
a := &TestAgent{Name: t.Name(), HCL: `
|
||||
node_id = "40e4a748-2192-161a-0510-9bf59fe950b5"
|
||||
node_meta {
|
||||
somekey = "somevalue"
|
||||
|
@ -1323,15 +1453,40 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_deleteService_fails(t *testing.T) {
|
||||
t.Parallel()
|
||||
l := new(localState)
|
||||
|
||||
// todo(fs): data race
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
if err := l.deleteService(""); err == nil {
|
||||
t.Fatalf("should have failed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) {
|
||||
t.Parallel()
|
||||
l := new(localState)
|
||||
|
||||
// todo(fs): data race
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
if err := l.deleteCheck(""); err == nil {
|
||||
t.Fatalf("should have errored")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_serviceTokens(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cfg := agent.TestConfig()
|
||||
tokens := new(token.Store)
|
||||
tokens.UpdateUserToken("default")
|
||||
l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
|
||||
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
|
||||
|
||||
l.AddService(&structs.NodeService{ID: "redis"}, "")
|
||||
l.AddService(&structs.NodeService{
|
||||
ID: "redis",
|
||||
}, "")
|
||||
|
||||
// Returns default when no token is set
|
||||
if token := l.ServiceToken("redis"); token != "default" {
|
||||
|
@ -1339,7 +1494,7 @@ func TestAgent_serviceTokens(t *testing.T) {
|
|||
}
|
||||
|
||||
// Returns configured token
|
||||
l.AddService(&structs.NodeService{ID: "redis"}, "abc123")
|
||||
l.serviceTokens["redis"] = "abc123"
|
||||
if token := l.ServiceToken("redis"); token != "abc123" {
|
||||
t.Fatalf("bad: %s", token)
|
||||
}
|
||||
|
@ -1354,19 +1509,17 @@ func TestAgent_serviceTokens(t *testing.T) {
|
|||
func TestAgent_checkTokens(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cfg := agent.TestConfig()
|
||||
tokens := new(token.Store)
|
||||
tokens.UpdateUserToken("default")
|
||||
l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
|
||||
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
|
||||
|
||||
// Returns default when no token is set
|
||||
l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "")
|
||||
if token := l.CheckToken("mem"); token != "default" {
|
||||
t.Fatalf("bad: %s", token)
|
||||
}
|
||||
|
||||
// Returns configured token
|
||||
l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "abc123")
|
||||
l.checkTokens["mem"] = "abc123"
|
||||
if token := l.CheckToken("mem"); token != "abc123" {
|
||||
t.Fatalf("bad: %s", token)
|
||||
}
|
||||
|
@ -1380,7 +1533,7 @@ func TestAgent_checkTokens(t *testing.T) {
|
|||
|
||||
func TestAgent_checkCriticalTime(t *testing.T) {
|
||||
t.Parallel()
|
||||
l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
|
||||
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
|
||||
|
||||
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000}
|
||||
l.AddService(svc, "")
|
||||
|
@ -1395,54 +1548,54 @@ func TestAgent_checkCriticalTime(t *testing.T) {
|
|||
Status: api.HealthPassing,
|
||||
}
|
||||
l.AddCheck(chk, "")
|
||||
if checks := l.CriticalCheckStates(); len(checks) > 0 {
|
||||
if checks := l.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have any critical checks")
|
||||
}
|
||||
|
||||
// Set it to warning and make sure that doesn't show up as critical.
|
||||
l.UpdateCheck(checkID, api.HealthWarning, "")
|
||||
if checks := l.CriticalCheckStates(); len(checks) > 0 {
|
||||
if checks := l.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have any critical checks")
|
||||
}
|
||||
|
||||
// Fail the check and make sure the time looks reasonable.
|
||||
l.UpdateCheck(checkID, api.HealthCritical, "")
|
||||
if c, ok := l.CriticalCheckStates()[checkID]; !ok {
|
||||
if crit, ok := l.CriticalChecks()[checkID]; !ok {
|
||||
t.Fatalf("should have a critical check")
|
||||
} else if c.CriticalFor() > time.Millisecond {
|
||||
t.Fatalf("bad: %#v", c)
|
||||
} else if crit.CriticalFor > time.Millisecond {
|
||||
t.Fatalf("bad: %#v", crit)
|
||||
}
|
||||
|
||||
// Wait a while, then fail it again and make sure the time keeps track
|
||||
// of the initial failure, and doesn't reset here.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
l.UpdateCheck(chk.CheckID, api.HealthCritical, "")
|
||||
if c, ok := l.CriticalCheckStates()[checkID]; !ok {
|
||||
if crit, ok := l.CriticalChecks()[checkID]; !ok {
|
||||
t.Fatalf("should have a critical check")
|
||||
} else if c.CriticalFor() < 25*time.Millisecond ||
|
||||
c.CriticalFor() > 75*time.Millisecond {
|
||||
t.Fatalf("bad: %#v", c)
|
||||
} else if crit.CriticalFor < 25*time.Millisecond ||
|
||||
crit.CriticalFor > 75*time.Millisecond {
|
||||
t.Fatalf("bad: %#v", crit)
|
||||
}
|
||||
|
||||
// Set it passing again.
|
||||
l.UpdateCheck(checkID, api.HealthPassing, "")
|
||||
if checks := l.CriticalCheckStates(); len(checks) > 0 {
|
||||
if checks := l.CriticalChecks(); len(checks) > 0 {
|
||||
t.Fatalf("should not have any critical checks")
|
||||
}
|
||||
|
||||
// Fail the check and make sure the time looks like it started again
|
||||
// from the latest failure, not the original one.
|
||||
l.UpdateCheck(checkID, api.HealthCritical, "")
|
||||
if c, ok := l.CriticalCheckStates()[checkID]; !ok {
|
||||
if crit, ok := l.CriticalChecks()[checkID]; !ok {
|
||||
t.Fatalf("should have a critical check")
|
||||
} else if c.CriticalFor() > time.Millisecond {
|
||||
t.Fatalf("bad: %#v", c)
|
||||
} else if crit.CriticalFor > time.Millisecond {
|
||||
t.Fatalf("bad: %#v", crit)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_AddCheckFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
|
||||
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
|
||||
|
||||
// Add a check for a service that does not exist and verify that it fails
|
||||
checkID := types.CheckID("redis:1")
|
||||
|
@ -1462,7 +1615,7 @@ func TestAgent_AddCheckFailure(t *testing.T) {
|
|||
|
||||
func TestAgent_sendCoordinate(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := agent.NewTestAgent(t.Name(), `
|
||||
a := NewTestAgent(t.Name(), `
|
||||
sync_coordinate_interval_min = "1ms"
|
||||
sync_coordinate_rate_target = 10.0
|
||||
consul = {
|
||||
|
@ -1496,29 +1649,3 @@ func TestAgent_sendCoordinate(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func servicesInSync(state *local.State, wantServices int) error {
|
||||
services := state.ServiceStates()
|
||||
if got, want := len(services), wantServices; got != want {
|
||||
return fmt.Errorf("got %d services want %d", got, want)
|
||||
}
|
||||
for id, s := range services {
|
||||
if !s.InSync {
|
||||
return fmt.Errorf("service %q should be in sync", id)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func checksInSync(state *local.State, wantChecks int) error {
|
||||
checks := state.CheckStates()
|
||||
if got, want := len(checks), wantChecks; got != want {
|
||||
return fmt.Errorf("got %d checks want %d", got, want)
|
||||
}
|
||||
for id, c := range checks {
|
||||
if !c.InSync {
|
||||
return fmt.Errorf("check %q should be in sync", id)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -173,7 +173,7 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool {
|
|||
}
|
||||
|
||||
// Scan for a match
|
||||
services := a.State.Services()
|
||||
services := a.state.Services()
|
||||
found := false
|
||||
OUTER:
|
||||
for name, info := range services {
|
||||
|
|
|
@ -57,7 +57,7 @@ func TestShouldProcessUserEvent(t *testing.T) {
|
|||
Tags: []string{"test", "foo", "bar", "master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv1, "")
|
||||
a.state.AddService(srv1, "")
|
||||
|
||||
p := &UserEvent{}
|
||||
if !a.shouldProcessUserEvent(p) {
|
||||
|
@ -157,7 +157,7 @@ func TestFireReceiveEvent(t *testing.T) {
|
|||
Tags: []string{"test", "foo", "bar", "master"},
|
||||
Port: 5000,
|
||||
}
|
||||
a.State.AddService(srv1, "")
|
||||
a.state.AddService(srv1, "")
|
||||
|
||||
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
|
||||
err := a.UserEvent("dc1", "root", p1)
|
||||
|
|
Loading…
Reference in New Issue