Keeps the service and check tokens around for deregistration.

We fixed a few related issues while we were in here. We now only let
services register checks with a matching token, and we also close out
service and check delete operations if the catalog deregister claims
it doesn't know about the ID of the service or check being deleted.
This commit is contained in:
James Phillips 2017-03-24 17:15:20 -07:00
parent 4d3f3ea8d2
commit 8940d1c3fe
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
3 changed files with 382 additions and 56 deletions

View File

@ -1192,11 +1192,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
} }
// Remove service immediately // Remove service immediately
err := a.state.RemoveService(serviceID) if err := a.state.RemoveService(serviceID); err != nil {
// TODO: Return the error instead of just logging here in Consul 0.8
// For now, keep the current idempotent behavior on deleting a nonexistent service
if err != nil {
a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err) a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err)
return nil return nil
} }

View File

@ -177,7 +177,8 @@ func (l *localState) RemoveService(serviceID string) error {
if _, ok := l.services[serviceID]; ok { if _, ok := l.services[serviceID]; ok {
delete(l.services, serviceID) delete(l.services, serviceID)
delete(l.serviceTokens, serviceID) // Leave the service token around, if any, until we successfully
// delete the service.
l.serviceStatus[serviceID] = syncStatus{inSync: false} l.serviceStatus[serviceID] = syncStatus{inSync: false}
l.changeMade() l.changeMade()
} else { } else {
@ -241,7 +242,8 @@ func (l *localState) RemoveCheck(checkID types.CheckID) {
defer l.Unlock() defer l.Unlock()
delete(l.checks, checkID) delete(l.checks, checkID)
delete(l.checkTokens, checkID) // Leave the check token around, if any, until we successfully delete
// the check.
delete(l.checkCriticalTime, checkID) delete(l.checkCriticalTime, checkID)
l.checkStatus[checkID] = syncStatus{inSync: false} l.checkStatus[checkID] = syncStatus{inSync: false}
l.changeMade() l.changeMade()
@ -602,9 +604,15 @@ func (l *localState) deleteService(id string) error {
} }
var out struct{} var out struct{}
err := l.iface.RPC("Catalog.Deregister", &req, &out) err := l.iface.RPC("Catalog.Deregister", &req, &out)
if err == nil { if err == nil || strings.Contains(err.Error(), "Unknown service") {
delete(l.serviceStatus, id) delete(l.serviceStatus, id)
delete(l.serviceTokens, id)
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id) l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
return nil
} else if strings.Contains(err.Error(), permissionDenied) {
l.serviceStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id)
return nil
} }
return err return err
} }
@ -623,9 +631,15 @@ func (l *localState) deleteCheck(id types.CheckID) error {
} }
var out struct{} var out struct{}
err := l.iface.RPC("Catalog.Deregister", &req, &out) err := l.iface.RPC("Catalog.Deregister", &req, &out)
if err == nil { if err == nil || strings.Contains(err.Error(), "Unknown check") {
delete(l.checkStatus, id) delete(l.checkStatus, id)
delete(l.checkTokens, id)
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id) l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
return nil
} else if strings.Contains(err.Error(), permissionDenied) {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id)
return nil
} }
return err return err
} }
@ -645,10 +659,13 @@ func (l *localState) syncService(id string) error {
// If the service has associated checks that are out of sync, // If the service has associated checks that are out of sync,
// piggyback them on the service sync so they are part of the // piggyback them on the service sync so they are part of the
// same transaction and are registered atomically. // same transaction and are registered atomically. We only let
// checks ride on service registrations with the same token,
// otherwise we need to register them separately so they don't
// pick up privileges from the service token.
var checks structs.HealthChecks var checks structs.HealthChecks
for _, check := range l.checks { for _, check := range l.checks {
if check.ServiceID == id { if check.ServiceID == id && (l.serviceToken(id) == l.checkToken(check.CheckID)) {
if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync { if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync {
checks = append(checks, check) checks = append(checks, check)
} }
@ -711,7 +728,7 @@ func (l *localState) syncCheck(id types.CheckID) error {
if err == nil { if err == nil {
l.checkStatus[id] = syncStatus{inSync: true} l.checkStatus[id] = syncStatus{inSync: true}
// Given how the register API works, this info is also updated // Given how the register API works, this info is also updated
// every time we sync a service. // every time we sync a check.
l.nodeInfoInSync = true l.nodeInfoInSync = true
l.logger.Printf("[INFO] agent: Synced check '%s'", id) l.logger.Printf("[INFO] agent: Synced check '%s'", id)
} else if strings.Contains(err.Error(), permissionDenied) { } else if strings.Contains(err.Error(), permissionDenied) {

View File

@ -476,9 +476,17 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
} }
var testRegisterRules = ` var testRegisterRules = `
node "" {
policy = "write"
}
service "api" { service "api" {
policy = "write" policy = "write"
} }
service "consul" {
policy = "write"
}
` `
func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
@ -486,6 +494,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
conf.ACLDatacenter = "dc1" conf.ACLDatacenter = "dc1"
conf.ACLMasterToken = "root" conf.ACLMasterToken = "root"
conf.ACLDefaultPolicy = "deny" conf.ACLDefaultPolicy = "deny"
conf.ACLEnforceVersion8 = Bool(true)
dir, agent := makeAgent(t, conf) dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer agent.Shutdown() defer agent.Shutdown()
@ -501,43 +510,50 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
Type: structs.ACLTypeClient, Type: structs.ACLTypeClient,
Rules: testRegisterRules, Rules: testRegisterRules,
}, },
WriteRequest: structs.WriteRequest{Token: "root"}, WriteRequest: structs.WriteRequest{
Token: "root",
},
} }
var out string var token string
if err := agent.RPC("ACL.Apply", &arg, &out); err != nil { if err := agent.RPC("ACL.Apply", &arg, &token); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Update the agent ACL token, resume sync // Update the agent ACL token, resume sync
conf.ACLToken = out conf.ACLAgentToken = token
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Create service (Allowed) // Create service (disallowed)
srv1 := &structs.NodeService{ srv1 := &structs.NodeService{
ID: "mysql", ID: "mysql",
Service: "mysql", Service: "mysql",
Tags: []string{"master"}, Tags: []string{"master"},
Port: 5000, Port: 5000,
} }
agent.state.AddService(srv1, "") agent.state.AddService(srv1, token)
// Create service (Disallowed) // Create service (allowed)
srv2 := &structs.NodeService{ srv2 := &structs.NodeService{
ID: "api", ID: "api",
Service: "api", Service: "api",
Tags: []string{"foo"}, Tags: []string{"foo"},
Port: 5001, Port: 5001,
} }
agent.state.AddService(srv2, "") agent.state.AddService(srv2, token)
// Trigger anti-entropy run and wait // Trigger anti-entropy run and wait
agent.StartSync() agent.StartSync()
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
// Verify that we are in sync // Verify that we are in sync
{
req := structs.NodeSpecificRequest{ req := structs.NodeSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: agent.config.NodeName, Node: agent.config.NodeName,
QueryOptions: structs.QueryOptions{Token: out}, QueryOptions: structs.QueryOptions{
Token: "root",
},
} }
var services structs.IndexedNodeServices var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
@ -578,6 +594,60 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
t.Fatalf("should be in sync: %v %v", name, status) t.Fatalf("should be in sync: %v %v", name, status)
} }
} }
}
// Now remove the service and re-sync
agent.state.RemoveService("api")
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that we are in sync
{
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
QueryOptions: structs.QueryOptions{
Token: "root",
},
}
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
t.Fatalf("err: %v", err)
}
// We should have 1 service (just consul)
if len(services.NodeServices.Services) != 1 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}
// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql":
t.Fatalf("should not be permitted")
case "api":
t.Fatalf("should be deleted")
case "consul":
// ignore
default:
t.Fatalf("unexpected service: %v", id)
}
}
// Check the local state
if len(agent.state.services) != 2 {
t.Fatalf("bad: %v", agent.state.services)
}
if len(agent.state.serviceStatus) != 2 {
t.Fatalf("bad: %v", agent.state.serviceStatus)
}
for name, status := range agent.state.serviceStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}
} }
func TestAgentAntiEntropy_Checks(t *testing.T) { func TestAgentAntiEntropy_Checks(t *testing.T) {
@ -800,6 +870,249 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
} }
} }
func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
conf := nextConfig()
conf.ACLDatacenter = "dc1"
conf.ACLMasterToken = "root"
conf.ACLDefaultPolicy = "deny"
conf.ACLEnforceVersion8 = Bool(true)
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
testutil.WaitForLeader(t, agent.RPC, "dc1")
// Create the ACL
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testRegisterRules,
},
WriteRequest: structs.WriteRequest{
Token: "root",
},
}
var token string
if err := agent.RPC("ACL.Apply", &arg, &token); err != nil {
t.Fatalf("err: %v", err)
}
// Update the agent ACL token, resume sync
conf.ACLAgentToken = token
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Create services using the root token
srv1 := &structs.NodeService{
ID: "mysql",
Service: "mysql",
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv1, "root")
srv2 := &structs.NodeService{
ID: "api",
Service: "api",
Tags: []string{"foo"},
Port: 5001,
}
agent.state.AddService(srv2, "root")
// Trigger anti-entropy run and wait
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that we are in sync
{
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
QueryOptions: structs.QueryOptions{
Token: "root",
},
}
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
t.Fatalf("err: %v", err)
}
// We should have 3 services (consul included)
if len(services.NodeServices.Services) != 3 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}
// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql":
if !reflect.DeepEqual(serv, srv1) {
t.Fatalf("bad: %#v %#v", serv, srv1)
}
case "api":
if !reflect.DeepEqual(serv, srv2) {
t.Fatalf("bad: %#v %#v", serv, srv2)
}
case "consul":
// ignore
default:
t.Fatalf("unexpected service: %v", id)
}
}
// Check the local state
if len(agent.state.services) != 3 {
t.Fatalf("bad: %v", agent.state.services)
}
if len(agent.state.serviceStatus) != 3 {
t.Fatalf("bad: %v", agent.state.serviceStatus)
}
for name, status := range agent.state.serviceStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}
// This check won't be allowed.
chk1 := &structs.HealthCheck{
Node: agent.config.NodeName,
ServiceID: "mysql",
ServiceName: "mysql",
CheckID: "mysql-check",
Name: "mysql",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk1, token)
// This one will be allowed.
chk2 := &structs.HealthCheck{
Node: agent.config.NodeName,
ServiceID: "api",
ServiceName: "api",
CheckID: "api-check",
Name: "api",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk2, token)
// Trigger anti-entropy run and wait.
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that we are in sync
if err := testutil.WaitForResult(func() (bool, error) {
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
QueryOptions: structs.QueryOptions{
Token: "root",
},
}
var checks structs.IndexedHealthChecks
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
return false, fmt.Errorf("err: %v", err)
}
// We should have 2 checks (serf included)
if len(checks.HealthChecks) != 2 {
return false, fmt.Errorf("bad: %v", checks)
}
// All the checks should match
for _, chk := range checks.HealthChecks {
chk.CreateIndex, chk.ModifyIndex = 0, 0
switch chk.CheckID {
case "mysql-check":
t.Fatalf("should not be permitted")
case "api-check":
if !reflect.DeepEqual(chk, chk2) {
return false, fmt.Errorf("bad: %v %v", chk, chk2)
}
case "serfHealth":
// ignore
default:
return false, fmt.Errorf("unexpected check: %v", chk)
}
}
return true, nil
}); err != nil {
t.Fatal(err)
}
// Check the local state.
if len(agent.state.checks) != 2 {
t.Fatalf("bad: %v", agent.state.checks)
}
if len(agent.state.checkStatus) != 2 {
t.Fatalf("bad: %v", agent.state.checkStatus)
}
for name, status := range agent.state.checkStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
// Now delete the check and wait for sync.
agent.state.RemoveCheck("api-check")
agent.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that we are in sync
if err := testutil.WaitForResult(func() (bool, error) {
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
QueryOptions: structs.QueryOptions{
Token: "root",
},
}
var checks structs.IndexedHealthChecks
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
return false, fmt.Errorf("err: %v", err)
}
// We should have 1 check (just serf)
if len(checks.HealthChecks) != 1 {
return false, fmt.Errorf("bad: %v", checks)
}
// All the checks should match
for _, chk := range checks.HealthChecks {
chk.CreateIndex, chk.ModifyIndex = 0, 0
switch chk.CheckID {
case "mysql-check":
t.Fatalf("should not be permitted")
case "api-check":
t.Fatalf("should be deleted")
case "serfHealth":
// ignore
default:
return false, fmt.Errorf("unexpected check: %v", chk)
}
}
return true, nil
}); err != nil {
t.Fatal(err)
}
// Check the local state.
if len(agent.state.checks) != 1 {
t.Fatalf("bad: %v", agent.state.checks)
}
if len(agent.state.checkStatus) != 1 {
t.Fatalf("bad: %v", agent.state.checkStatus)
}
for name, status := range agent.state.checkStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
conf := nextConfig() conf := nextConfig()
conf.CheckUpdateInterval = 500 * time.Millisecond conf.CheckUpdateInterval = 500 * time.Millisecond