From 5887242db2ebaae6c6cb9dfe68bdf078f9c1eefd Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Dec 2014 11:43:01 -0800 Subject: [PATCH] agent: Handle service ACLs when doing anti-entropy --- command/agent/local.go | 56 +++++++++++++------- command/agent/local_test.go | 103 ++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 19 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index 86b7e45ff2..aff1353487 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -1,18 +1,23 @@ package agent import ( - "github.com/hashicorp/consul/consul" - "github.com/hashicorp/consul/consul/structs" "log" "reflect" + "strings" "sync" "sync/atomic" "time" + + "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/consul/structs" ) const ( syncStaggerIntv = 3 * time.Second syncRetryIntv = 15 * time.Second + + // permissionDenied is returned when an ACL based rejection happens + permissionDenied = "Permission denied" ) // syncStatus is used to represent the difference between @@ -292,8 +297,9 @@ SYNC: // the local syncStatus as appropriate func (l *localState) setSyncState() error { req := structs.NodeSpecificRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + QueryOptions: structs.QueryOptions{Token: l.config.ACLToken}, } var out1 structs.IndexedNodeServices var out2 structs.IndexedHealthChecks @@ -403,9 +409,10 @@ func (l *localState) syncChanges() error { // deleteService is used to delete a service from the server func (l *localState) deleteService(id string) error { req := structs.DeregisterRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, - ServiceID: id, + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + ServiceID: id, + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -419,9 +426,10 @@ func (l *localState) deleteService(id string) error { // deleteCheck is used to delete a service from the server func (l *localState) deleteCheck(id string) error { req := structs.DeregisterRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, - CheckID: id, + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + CheckID: id, + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -435,16 +443,21 @@ func (l *localState) deleteCheck(id string) error { // syncService is used to sync a service to the server func (l *localState) syncService(id string) error { req := structs.RegisterRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, - Address: l.config.AdvertiseAddr, - Service: l.services[id], + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + Address: l.config.AdvertiseAddr, + Service: l.services[id], + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out) if err == nil { l.serviceStatus[id] = syncStatus{inSync: true} l.logger.Printf("[INFO] agent: Synced service '%s'", id) + } else if strings.Contains(err.Error(), permissionDenied) { + l.serviceStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id) + return nil } return err } @@ -460,17 +473,22 @@ func (l *localState) syncCheck(id string) error { } } req := structs.RegisterRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, - Address: l.config.AdvertiseAddr, - Service: service, - Check: l.checks[id], + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + Address: l.config.AdvertiseAddr, + Service: service, + Check: l.checks[id], + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out) if err == nil { l.checkStatus[id] = syncStatus{inSync: true} l.logger.Printf("[INFO] agent: Synced check '%s'", id) + } else if strings.Contains(err.Error(), permissionDenied) { + l.checkStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) + return nil } return err } diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 7145d28a28..ded25ad3d5 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -132,6 +132,103 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } +func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { + conf := nextConfig() + conf.ACLDatacenter = "dc1" + conf.ACLMasterToken = "root" + conf.ACLDefaultPolicy = "deny" + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + testutil.WaitForLeader(t, agent.RPC, "dc1") + + // Create the ACL + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testRegisterRules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var out string + if err := agent.RPC("ACL.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Update the agent ACL token, resume sync + conf.ACLToken = out + + // Create service (Allowed) + srv1 := &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Tags: []string{"master"}, + Port: 5000, + } + agent.state.AddService(srv1) + + // Create service (Disallowed) + srv2 := &structs.NodeService{ + ID: "api", + Service: "api", + Tags: []string{"foo"}, + Port: 5001, + } + agent.state.AddService(srv2) + + // Trigger anti-entropy run and wait + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that we are in sync + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + t.Fatalf("err: %v", err) + } + + // We should have 2 services (consul included) + if len(services.NodeServices.Services) != 2 { + t.Fatalf("bad: %v", services.NodeServices.Services) + } + + // All the services should match + for id, serv := range services.NodeServices.Services { + switch id { + case "mysql": + t.Fatalf("should not be permitted") + case "api": + if !reflect.DeepEqual(serv, srv2) { + t.Fatalf("bad: %#v %#v", serv, srv2) + } + case "consul": + // ignore + default: + t.Fatalf("unexpected service: %v", id) + } + } + + // Check the local state + if len(agent.state.services) != 3 { + t.Fatalf("bad: %v", agent.state.services) + } + if len(agent.state.serviceStatus) != 3 { + t.Fatalf("bad: %v", agent.state.serviceStatus) + } + for name, status := range agent.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } +} + func TestAgentAntiEntropy_Checks(t *testing.T) { conf := nextConfig() dir, agent := makeAgent(t, conf) @@ -327,3 +424,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { } } } + +var testRegisterRules = ` +service "api" { + policy = "write" +} +`