mirror of https://github.com/status-im/consul.git
agent: Handle service ACLs when doing anti-entropy
This commit is contained in:
parent
53de386a08
commit
5887242db2
|
@ -1,18 +1,23 @@
|
||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/hashicorp/consul/consul"
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
|
||||||
"log"
|
"log"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
syncStaggerIntv = 3 * time.Second
|
syncStaggerIntv = 3 * time.Second
|
||||||
syncRetryIntv = 15 * time.Second
|
syncRetryIntv = 15 * time.Second
|
||||||
|
|
||||||
|
// permissionDenied is returned when an ACL based rejection happens
|
||||||
|
permissionDenied = "Permission denied"
|
||||||
)
|
)
|
||||||
|
|
||||||
// syncStatus is used to represent the difference between
|
// syncStatus is used to represent the difference between
|
||||||
|
@ -294,6 +299,7 @@ func (l *localState) setSyncState() error {
|
||||||
req := structs.NodeSpecificRequest{
|
req := structs.NodeSpecificRequest{
|
||||||
Datacenter: l.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: l.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: l.config.ACLToken},
|
||||||
}
|
}
|
||||||
var out1 structs.IndexedNodeServices
|
var out1 structs.IndexedNodeServices
|
||||||
var out2 structs.IndexedHealthChecks
|
var out2 structs.IndexedHealthChecks
|
||||||
|
@ -406,6 +412,7 @@ func (l *localState) deleteService(id string) error {
|
||||||
Datacenter: l.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: l.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
ServiceID: id,
|
ServiceID: id,
|
||||||
|
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
|
||||||
}
|
}
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||||
|
@ -422,6 +429,7 @@ func (l *localState) deleteCheck(id string) error {
|
||||||
Datacenter: l.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: l.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
CheckID: id,
|
CheckID: id,
|
||||||
|
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
|
||||||
}
|
}
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||||
|
@ -439,12 +447,17 @@ func (l *localState) syncService(id string) error {
|
||||||
Node: l.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
Address: l.config.AdvertiseAddr,
|
Address: l.config.AdvertiseAddr,
|
||||||
Service: l.services[id],
|
Service: l.services[id],
|
||||||
|
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
|
||||||
}
|
}
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||||
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
||||||
|
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||||
|
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||||
|
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -465,12 +478,17 @@ func (l *localState) syncCheck(id string) error {
|
||||||
Address: l.config.AdvertiseAddr,
|
Address: l.config.AdvertiseAddr,
|
||||||
Service: service,
|
Service: service,
|
||||||
Check: l.checks[id],
|
Check: l.checks[id],
|
||||||
|
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
|
||||||
}
|
}
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l.checkStatus[id] = syncStatus{inSync: true}
|
l.checkStatus[id] = syncStatus{inSync: true}
|
||||||
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||||
|
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||||
|
l.checkStatus[id] = syncStatus{inSync: true}
|
||||||
|
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,6 +132,103 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
||||||
|
conf := nextConfig()
|
||||||
|
conf.ACLDatacenter = "dc1"
|
||||||
|
conf.ACLMasterToken = "root"
|
||||||
|
conf.ACLDefaultPolicy = "deny"
|
||||||
|
dir, agent := makeAgent(t, conf)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer agent.Shutdown()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||||
|
|
||||||
|
// Create the ACL
|
||||||
|
arg := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: testRegisterRules,
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
var out string
|
||||||
|
if err := agent.RPC("ACL.Apply", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the agent ACL token, resume sync
|
||||||
|
conf.ACLToken = out
|
||||||
|
|
||||||
|
// Create service (Allowed)
|
||||||
|
srv1 := &structs.NodeService{
|
||||||
|
ID: "mysql",
|
||||||
|
Service: "mysql",
|
||||||
|
Tags: []string{"master"},
|
||||||
|
Port: 5000,
|
||||||
|
}
|
||||||
|
agent.state.AddService(srv1)
|
||||||
|
|
||||||
|
// Create service (Disallowed)
|
||||||
|
srv2 := &structs.NodeService{
|
||||||
|
ID: "api",
|
||||||
|
Service: "api",
|
||||||
|
Tags: []string{"foo"},
|
||||||
|
Port: 5001,
|
||||||
|
}
|
||||||
|
agent.state.AddService(srv2)
|
||||||
|
|
||||||
|
// Trigger anti-entropy run and wait
|
||||||
|
agent.StartSync()
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Verify that we are in sync
|
||||||
|
req := structs.NodeSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: agent.config.NodeName,
|
||||||
|
}
|
||||||
|
var services structs.IndexedNodeServices
|
||||||
|
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should have 2 services (consul included)
|
||||||
|
if len(services.NodeServices.Services) != 2 {
|
||||||
|
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All the services should match
|
||||||
|
for id, serv := range services.NodeServices.Services {
|
||||||
|
switch id {
|
||||||
|
case "mysql":
|
||||||
|
t.Fatalf("should not be permitted")
|
||||||
|
case "api":
|
||||||
|
if !reflect.DeepEqual(serv, srv2) {
|
||||||
|
t.Fatalf("bad: %#v %#v", serv, srv2)
|
||||||
|
}
|
||||||
|
case "consul":
|
||||||
|
// ignore
|
||||||
|
default:
|
||||||
|
t.Fatalf("unexpected service: %v", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the local state
|
||||||
|
if len(agent.state.services) != 3 {
|
||||||
|
t.Fatalf("bad: %v", agent.state.services)
|
||||||
|
}
|
||||||
|
if len(agent.state.serviceStatus) != 3 {
|
||||||
|
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||||
|
}
|
||||||
|
for name, status := range agent.state.serviceStatus {
|
||||||
|
if !status.inSync {
|
||||||
|
t.Fatalf("should be in sync: %v %v", name, status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAgentAntiEntropy_Checks(t *testing.T) {
|
func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||||
conf := nextConfig()
|
conf := nextConfig()
|
||||||
dir, agent := makeAgent(t, conf)
|
dir, agent := makeAgent(t, conf)
|
||||||
|
@ -327,3 +424,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var testRegisterRules = `
|
||||||
|
service "api" {
|
||||||
|
policy = "write"
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
Loading…
Reference in New Issue