mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 11:40:06 +00:00
Add RPC endpoints for config entry operations
This commit is contained in:
parent
5f569fb2ac
commit
f2ed482680
266
agent/consul/config_endpoint.go
Normal file
266
agent/consul/config_endpoint.go
Normal file
@ -0,0 +1,266 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// The ConfigEntry endpoint is used to query centralized config information
|
||||
type ConfigEntry struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// Apply does an upsert of the given config entry.
|
||||
func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *struct{}) error {
|
||||
if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now())
|
||||
|
||||
// Normalize and validate the incoming config entry.
|
||||
if err := args.Entry.Normalize(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := args.Entry.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
rule, err := c.srv.ResolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := verifyConfigWriteACL(rule, args.Entry.GetKind(), args.Entry.GetName()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
args.Op = structs.ConfigEntryUpsert
|
||||
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get returns a single config entry by Kind/Name.
|
||||
func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.IndexedConfigEntries) error {
|
||||
if done, err := c.srv.forward("ConfigEntry.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now())
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
rule, err := c.srv.ResolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := verifyConfigReadACL(rule, args.Kind, args.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, entry, err := state.ConfigEntry(ws, args.Kind, args.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reply.Index = index
|
||||
reply.Entries = []structs.ConfigEntry{entry}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// List returns all the config entries of the given kind. If Kind is blank,
|
||||
// all existing config entries will be returned.
|
||||
func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.IndexedConfigEntries) error {
|
||||
if done, err := c.srv.forward("ConfigEntry.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now())
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
rule, err := c.srv.ResolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, entries, err := state.ConfigEntriesByKind(ws, args.Kind)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Filter the entries returned by ACL permissions.
|
||||
// TODO(kyhavlov): should we handle the proxy config differently here since
|
||||
// it's a singleton?
|
||||
filteredEntries := make([]structs.ConfigEntry, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
if err := verifyConfigReadACL(rule, entry.GetKind(), entry.GetName()); err != nil {
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
continue
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
filteredEntries = append(filteredEntries, entry)
|
||||
}
|
||||
|
||||
reply.Index = index
|
||||
reply.Entries = filteredEntries
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Delete deletes a config entry.
|
||||
func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error {
|
||||
if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now())
|
||||
|
||||
// Normalize the incoming entry.
|
||||
if err := args.Entry.Normalize(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
rule, err := c.srv.ResolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := verifyConfigWriteACL(rule, args.Entry.GetKind(), args.Entry.GetName()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
args.Op = structs.ConfigEntryDelete
|
||||
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResolveServiceConfig
|
||||
func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, reply *structs.ServiceConfigResponse) error {
|
||||
if done, err := c.srv.forward("ConfigEntry.ResolveServiceConfig", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now())
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
rule, err := c.srv.ResolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rule != nil && !rule.ServiceRead(args.Name) {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
return c.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
|
||||
// during the blocking query, this function will be rerun and these state store lookups
|
||||
// will both be current.
|
||||
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serviceConf, ok := serviceEntry.(*structs.ServiceConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid service config type %T", serviceEntry)
|
||||
}
|
||||
|
||||
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid proxy config type %T", serviceEntry)
|
||||
}
|
||||
|
||||
// Resolve the service definition by overlaying the service config onto the global
|
||||
// proxy config.
|
||||
definition := structs.ServiceDefinition{
|
||||
Name: args.Name,
|
||||
}
|
||||
if proxyConf != nil {
|
||||
definition.Proxy = &structs.ConnectProxyConfig{
|
||||
Config: proxyConf.Config,
|
||||
}
|
||||
}
|
||||
if serviceConf != nil {
|
||||
definition.Name = serviceConf.Name
|
||||
}
|
||||
|
||||
reply.Index = index
|
||||
reply.Definition = definition
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// verifyConfigReadACL checks whether the given ACL authorizer has permission
|
||||
// to read the config entry of the given kind/name.
|
||||
func verifyConfigReadACL(rule acl.Authorizer, kind, name string) error {
|
||||
if rule == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch kind {
|
||||
case structs.ServiceDefaults:
|
||||
if !rule.ServiceRead(name) {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
case structs.ProxyDefaults:
|
||||
if !rule.OperatorRead() {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown config entry type %q", kind)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyConfigWriteACL checks whether the given ACL authorizer has permission
|
||||
// to update the config entry of the given kind/name.
|
||||
func verifyConfigWriteACL(rule acl.Authorizer, kind, name string) error {
|
||||
if rule == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch kind {
|
||||
case structs.ServiceDefaults:
|
||||
if !rule.ServiceWrite(name, nil) {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
case structs.ProxyDefaults:
|
||||
if !rule.OperatorWrite() {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown config entry type %q", kind)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
619
agent/consul/config_endpoint_test.go
Normal file
619
agent/consul/config_endpoint_test.go
Normal file
@ -0,0 +1,619 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConfigEntry_Apply(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Name: "foo",
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
||||
|
||||
state := s1.fsm.State()
|
||||
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||
require.NoError(err)
|
||||
|
||||
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
|
||||
require.True(ok)
|
||||
require.Equal("foo", serviceConf.Name)
|
||||
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
|
||||
}
|
||||
|
||||
func TestConfigEntry_Apply_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create the ACL.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
operator = "write"
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// This should fail since we don't have write perms for the "db" service.
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Name: "db",
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: id},
|
||||
}
|
||||
var out struct{}
|
||||
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// The "foo" service should work.
|
||||
args.Entry = &structs.ServiceConfigEntry{
|
||||
Name: "foo",
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)
|
||||
require.NoError(err)
|
||||
|
||||
state := s1.fsm.State()
|
||||
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||
require.NoError(err)
|
||||
|
||||
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
|
||||
require.True(ok)
|
||||
require.Equal("foo", serviceConf.Name)
|
||||
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
|
||||
|
||||
// Try to update the global proxy args with the anonymous token - this should fail.
|
||||
proxyArgs := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ProxyConfigEntry{
|
||||
Config: map[string]interface{}{
|
||||
"foo": 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &proxyArgs, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now with the privileged token.
|
||||
proxyArgs.WriteRequest.Token = id
|
||||
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &proxyArgs, &out)
|
||||
require.NoError(err)
|
||||
}
|
||||
|
||||
func TestConfigEntry_Get(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create a dummy service in the state store to look up.
|
||||
entry := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, entry))
|
||||
|
||||
args := structs.ConfigEntryQuery{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
Datacenter: s1.config.Datacenter,
|
||||
}
|
||||
var out structs.IndexedConfigEntries
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out))
|
||||
|
||||
serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry)
|
||||
require.True(ok)
|
||||
require.Equal("foo", serviceConf.Name)
|
||||
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
|
||||
}
|
||||
|
||||
func TestConfigEntry_Get_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create the ACL.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: `
|
||||
service "foo" {
|
||||
policy = "read"
|
||||
}
|
||||
operator = "read"
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create a dummy service in the state store to look up.
|
||||
// Create some dummy service/proxy configs to be looked up.
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}))
|
||||
|
||||
// This should fail since we don't have write perms for the "db" service.
|
||||
args := structs.ConfigEntryQuery{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "db",
|
||||
Datacenter: s1.config.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: id},
|
||||
}
|
||||
var out structs.IndexedConfigEntries
|
||||
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// The "foo" service should work.
|
||||
args.Name = "foo"
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out))
|
||||
|
||||
serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry)
|
||||
require.True(ok)
|
||||
require.Equal("foo", serviceConf.Name)
|
||||
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
|
||||
|
||||
// Try to look up the proxy config with no token.
|
||||
args.Kind = structs.ProxyDefaults
|
||||
args.Name = structs.ProxyConfigGlobal
|
||||
args.QueryOptions.Token = ""
|
||||
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
args.QueryOptions.Token = id
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out))
|
||||
}
|
||||
|
||||
func TestConfigEntry_List(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create some dummy services in the state store to look up.
|
||||
state := s1.fsm.State()
|
||||
expected := structs.IndexedConfigEntries{
|
||||
Entries: []structs.ConfigEntry{
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "bar",
|
||||
},
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
},
|
||||
},
|
||||
}
|
||||
require.NoError(state.EnsureConfigEntry(1, expected.Entries[0]))
|
||||
require.NoError(state.EnsureConfigEntry(2, expected.Entries[1]))
|
||||
|
||||
args := structs.ConfigEntryQuery{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var out structs.IndexedConfigEntries
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out))
|
||||
|
||||
expected.QueryMeta = out.QueryMeta
|
||||
require.Equal(expected, out)
|
||||
}
|
||||
|
||||
func TestConfigEntry_List_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create the ACL.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: `
|
||||
service "foo" {
|
||||
policy = "read"
|
||||
}
|
||||
operator = "read"
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create some dummy service/proxy configs to be looked up.
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "db",
|
||||
}))
|
||||
|
||||
// This should filter out the "db" service since we don't have permissions for it.
|
||||
args := structs.ConfigEntryQuery{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Datacenter: s1.config.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: id},
|
||||
}
|
||||
var out structs.IndexedConfigEntries
|
||||
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out)
|
||||
require.NoError(err)
|
||||
|
||||
serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry)
|
||||
require.Len(out.Entries, 1)
|
||||
require.True(ok)
|
||||
require.Equal("foo", serviceConf.Name)
|
||||
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
|
||||
|
||||
// Get the global proxy config.
|
||||
args.Kind = structs.ProxyDefaults
|
||||
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out)
|
||||
require.NoError(err)
|
||||
|
||||
proxyConf, ok := out.Entries[0].(*structs.ProxyConfigEntry)
|
||||
require.Len(out.Entries, 1)
|
||||
require.True(ok)
|
||||
require.Equal(structs.ProxyConfigGlobal, proxyConf.Name)
|
||||
require.Equal(structs.ProxyDefaults, proxyConf.Kind)
|
||||
}
|
||||
|
||||
func TestConfigEntry_Delete(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create a dummy service in the state store to look up.
|
||||
entry := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, entry))
|
||||
|
||||
// Verify it's there.
|
||||
_, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||
require.NoError(err)
|
||||
|
||||
serviceConf, ok := existing.(*structs.ServiceConfigEntry)
|
||||
require.True(ok)
|
||||
require.Equal("foo", serviceConf.Name)
|
||||
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
|
||||
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
args.Entry = entry
|
||||
var out struct{}
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
|
||||
|
||||
// Verify the entry was deleted.
|
||||
_, existing, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||
require.NoError(err)
|
||||
require.Nil(existing)
|
||||
}
|
||||
|
||||
func TestConfigEntry_Delete_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create the ACL.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
operator = "write"
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create some dummy service/proxy configs to be looked up.
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}))
|
||||
|
||||
// This should fail since we don't have write perms for the "db" service.
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: s1.config.Datacenter,
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Name: "db",
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: id},
|
||||
}
|
||||
var out struct{}
|
||||
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// The "foo" service should work.
|
||||
args.Entry = &structs.ServiceConfigEntry{
|
||||
Name: "foo",
|
||||
}
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
|
||||
|
||||
// Verify the entry was deleted.
|
||||
_, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||
require.NoError(err)
|
||||
require.Nil(existing)
|
||||
|
||||
// Try to delete the global proxy config without a token.
|
||||
args = structs.ConfigEntryRequest{
|
||||
Datacenter: s1.config.Datacenter,
|
||||
Entry: &structs.ProxyConfigEntry{
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
},
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now delete with a valid token.
|
||||
args.WriteRequest.Token = id
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
|
||||
|
||||
_, existing, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||
require.NoError(err)
|
||||
require.Nil(existing)
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create a dummy proxy/service config in the state store to look up.
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}))
|
||||
|
||||
args := structs.ServiceConfigRequest{
|
||||
Name: "foo",
|
||||
Datacenter: s1.config.Datacenter,
|
||||
}
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
|
||||
|
||||
expected := structs.ServiceDefinition{
|
||||
Name: "foo",
|
||||
Proxy: &structs.ConnectProxyConfig{
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
out.Definition.Proxy.Config["foo"] = structs.Uint8ToString(out.Definition.Proxy.Config["foo"].([]uint8))
|
||||
require.Equal(expected, out.Definition)
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfig_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Create the ACL.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
operator = "write"
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create some dummy service/proxy configs to be looked up.
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "db",
|
||||
}))
|
||||
|
||||
// This should fail since we don't have write perms for the "db" service.
|
||||
args := structs.ServiceConfigRequest{
|
||||
Name: "db",
|
||||
Datacenter: s1.config.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: id},
|
||||
}
|
||||
var out struct{}
|
||||
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// The "foo" service should work.
|
||||
args.Name = "foo"
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
|
||||
|
||||
}
|
@ -1390,7 +1390,7 @@ func TestFSM_ConfigEntry(t *testing.T) {
|
||||
|
||||
// Verify it's in the state store.
|
||||
{
|
||||
_, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
_, config, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global")
|
||||
require.NoError(err)
|
||||
entry.RaftIndex.CreateIndex = 1
|
||||
entry.RaftIndex.ModifyIndex = 1
|
||||
|
@ -403,11 +403,11 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||
assert.Equal(caConfig, caConf)
|
||||
|
||||
// Verify config entries are restored
|
||||
_, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo")
|
||||
_, serviceConfEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||
require.NoError(err)
|
||||
assert.Equal(serviceConfig, serviceConfEntry)
|
||||
|
||||
_, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
_, proxyConfEntry, err := fsm2.state.ConfigEntry(nil, structs.ProxyDefaults, "global")
|
||||
require.NoError(err)
|
||||
assert.Equal(proxyConfig, proxyConfEntry)
|
||||
|
||||
|
@ -4,6 +4,7 @@ func init() {
|
||||
registerEndpoint(func(s *Server) interface{} { return &ACL{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &Catalog{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) })
|
||||
registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &Health{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &Intention{s} })
|
||||
|
@ -37,7 +37,7 @@ func configTableSchema() *memdb.TableSchema {
|
||||
"kind": &memdb.IndexSchema{
|
||||
Name: "kind",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Kind",
|
||||
Lowercase: true,
|
||||
@ -80,7 +80,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
|
||||
}
|
||||
|
||||
// ConfigEntry is called to get a given config entry.
|
||||
func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) {
|
||||
func (s *Store) ConfigEntry(ws memdb.WatchSet, kind, name string) (uint64, structs.ConfigEntry, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
@ -88,13 +88,14 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err
|
||||
idx := maxIndexTxn(tx, configTableName)
|
||||
|
||||
// Get the existing config entry.
|
||||
existing, err := tx.First(configTableName, "id", kind, name)
|
||||
watchCh, existing, err := tx.FirstWatch(configTableName, "id", kind, name)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return idx, nil, nil
|
||||
}
|
||||
ws.Add(watchCh)
|
||||
|
||||
conf, ok := existing.(structs.ConfigEntry)
|
||||
if !ok {
|
||||
@ -105,13 +106,13 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err
|
||||
}
|
||||
|
||||
// ConfigEntries is called to get all config entry objects.
|
||||
func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) {
|
||||
return s.ConfigEntriesByKind("")
|
||||
func (s *Store) ConfigEntries(ws memdb.WatchSet) (uint64, []structs.ConfigEntry, error) {
|
||||
return s.ConfigEntriesByKind(nil, "")
|
||||
}
|
||||
|
||||
// ConfigEntriesByKind is called to get all config entry objects with the given kind.
|
||||
// If kind is empty, all config entries will be returned.
|
||||
func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, error) {
|
||||
func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string) (uint64, []structs.ConfigEntry, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
@ -129,6 +130,7 @@ func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry,
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
var results []structs.ConfigEntry
|
||||
for v := iter.Next(); v != nil; v = iter.Next() {
|
||||
@ -218,7 +220,7 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Try to retrieve the existing health check.
|
||||
// Try to retrieve the existing config entry.
|
||||
existing, err := tx.First(configTableName, "id", kind, name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed config entry lookup: %s", err)
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -22,7 +23,7 @@ func TestStore_ConfigEntry(t *testing.T) {
|
||||
// Create
|
||||
require.NoError(s.EnsureConfigEntry(0, expected))
|
||||
|
||||
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
idx, config, err := s.ConfigEntry(nil, structs.ProxyDefaults, "global")
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(0), idx)
|
||||
require.Equal(expected, config)
|
||||
@ -37,7 +38,7 @@ func TestStore_ConfigEntry(t *testing.T) {
|
||||
}
|
||||
require.NoError(s.EnsureConfigEntry(1, updated))
|
||||
|
||||
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global")
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(1), idx)
|
||||
require.Equal(updated, config)
|
||||
@ -45,10 +46,30 @@ func TestStore_ConfigEntry(t *testing.T) {
|
||||
// Delete
|
||||
require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global"))
|
||||
|
||||
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global")
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(2), idx)
|
||||
require.Nil(config)
|
||||
|
||||
// Set up a watch.
|
||||
serviceConf := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}
|
||||
require.NoError(s.EnsureConfigEntry(3, serviceConf))
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
_, _, err = s.ConfigEntry(ws, structs.ServiceDefaults, "foo")
|
||||
require.NoError(err)
|
||||
|
||||
// Make an unrelated modification and make sure the watch doesn't fire.
|
||||
require.NoError(s.EnsureConfigEntry(4, updated))
|
||||
require.False(watchFired(ws))
|
||||
|
||||
// Update the watched config and make sure it fires.
|
||||
serviceConf.Protocol = "http"
|
||||
require.NoError(s.EnsureConfigEntry(5, serviceConf))
|
||||
require.True(watchFired(ws))
|
||||
}
|
||||
|
||||
func TestStore_ConfigEntryCAS(t *testing.T) {
|
||||
@ -66,7 +87,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) {
|
||||
// Create
|
||||
require.NoError(s.EnsureConfigEntry(1, expected))
|
||||
|
||||
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
idx, config, err := s.ConfigEntry(nil, structs.ProxyDefaults, "global")
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(1), idx)
|
||||
require.Equal(expected, config)
|
||||
@ -84,7 +105,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) {
|
||||
require.NoError(err)
|
||||
|
||||
// Entry should not be changed
|
||||
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global")
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(1), idx)
|
||||
require.Equal(expected, config)
|
||||
@ -95,7 +116,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) {
|
||||
require.NoError(err)
|
||||
|
||||
// Entry should be updated
|
||||
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global")
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(2), idx)
|
||||
require.Equal(updated, config)
|
||||
@ -114,25 +135,42 @@ func TestStore_ConfigEntries(t *testing.T) {
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "test2",
|
||||
}
|
||||
entry3 := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "test3",
|
||||
}
|
||||
|
||||
require.NoError(s.EnsureConfigEntry(0, entry1))
|
||||
require.NoError(s.EnsureConfigEntry(1, entry2))
|
||||
require.NoError(s.EnsureConfigEntry(2, entry3))
|
||||
|
||||
// Get all entries
|
||||
idx, entries, err := s.ConfigEntries()
|
||||
idx, entries, err := s.ConfigEntries(nil)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(1), idx)
|
||||
require.Equal([]structs.ConfigEntry{entry1, entry2}, entries)
|
||||
require.Equal(uint64(2), idx)
|
||||
require.Equal([]structs.ConfigEntry{entry1, entry2, entry3}, entries)
|
||||
|
||||
// Get all proxy entries
|
||||
idx, entries, err = s.ConfigEntriesByKind(structs.ProxyDefaults)
|
||||
idx, entries, err = s.ConfigEntriesByKind(nil, structs.ProxyDefaults)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(1), idx)
|
||||
require.Equal(uint64(2), idx)
|
||||
require.Equal([]structs.ConfigEntry{entry1}, entries)
|
||||
|
||||
// Get all service entries
|
||||
idx, entries, err = s.ConfigEntriesByKind(structs.ServiceDefaults)
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, entries, err = s.ConfigEntriesByKind(ws, structs.ServiceDefaults)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(1), idx)
|
||||
require.Equal([]structs.ConfigEntry{entry2}, entries)
|
||||
require.Equal(uint64(2), idx)
|
||||
require.Equal([]structs.ConfigEntry{entry2, entry3}, entries)
|
||||
|
||||
// Watch should not have fired
|
||||
require.False(watchFired(ws))
|
||||
|
||||
// Now make an update and make sure the watch fires.
|
||||
require.NoError(s.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "test2",
|
||||
Protocol: "tcp",
|
||||
}))
|
||||
require.True(watchFired(ws))
|
||||
}
|
||||
|
@ -31,11 +31,10 @@ type ConfigEntry interface {
|
||||
// ServiceConfiguration is the top-level struct for the configuration of a service
|
||||
// across the entire cluster.
|
||||
type ServiceConfigEntry struct {
|
||||
Kind string
|
||||
Name string
|
||||
Protocol string
|
||||
Connect ConnectConfiguration
|
||||
ServiceDefinitionDefaults ServiceDefinitionDefaults
|
||||
Kind string
|
||||
Name string
|
||||
Protocol string
|
||||
Connect ConnectConfiguration
|
||||
|
||||
RaftIndex
|
||||
}
|
||||
@ -83,26 +82,6 @@ type ConnectConfiguration struct {
|
||||
SidecarProxy bool
|
||||
}
|
||||
|
||||
type ServiceDefinitionDefaults struct {
|
||||
EnableTagOverride bool
|
||||
|
||||
// Non script/docker checks only
|
||||
Check *HealthCheck
|
||||
Checks HealthChecks
|
||||
|
||||
// Kind is allowed to accommodate non-sidecar proxies but it will be an error
|
||||
// if they also set Connect.DestinationServiceID since sidecars are
|
||||
// configured via their associated service's config.
|
||||
Kind ServiceKind
|
||||
|
||||
// Only DestinationServiceName and Config are supported.
|
||||
Proxy ConnectProxyConfig
|
||||
|
||||
Connect ServiceConnect
|
||||
|
||||
Weights Weights
|
||||
}
|
||||
|
||||
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
|
||||
type ProxyConfigEntry struct {
|
||||
Kind string
|
||||
@ -130,6 +109,7 @@ func (e *ProxyConfigEntry) Normalize() error {
|
||||
}
|
||||
|
||||
e.Kind = ProxyDefaults
|
||||
e.Name = ProxyConfigGlobal
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -161,18 +141,26 @@ const (
|
||||
ConfigEntryDelete ConfigEntryOp = "delete"
|
||||
)
|
||||
|
||||
// ConfigEntryRequest is used when creating/updating/deleting a ConfigEntry.
|
||||
type ConfigEntryRequest struct {
|
||||
Op ConfigEntryOp
|
||||
Entry ConfigEntry
|
||||
Op ConfigEntryOp
|
||||
Datacenter string
|
||||
Entry ConfigEntry
|
||||
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
|
||||
func (c *ConfigEntryRequest) RequestDatacenter() string {
|
||||
return c.Datacenter
|
||||
}
|
||||
|
||||
func (c *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
|
||||
// bs will grow if needed but allocate enough to avoid reallocation in common
|
||||
// case.
|
||||
bs := make([]byte, 128)
|
||||
enc := codec.NewEncoderBytes(&bs, msgpackHandle)
|
||||
// Encode kind first
|
||||
err = enc.Encode(r.Entry.GetKind())
|
||||
err = enc.Encode(c.Entry.GetKind())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -181,7 +169,7 @@ func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
|
||||
err = enc.Encode(struct {
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(r),
|
||||
Alias: (*Alias)(c),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -189,7 +177,7 @@ func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
|
||||
func (c *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
|
||||
// First decode the kind prefix
|
||||
var kind string
|
||||
dec := codec.NewDecoderBytes(data, msgpackHandle)
|
||||
@ -202,7 +190,7 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Entry = entry
|
||||
c.Entry = entry
|
||||
|
||||
// Alias juggling to prevent infinite recursive calls back to this decode
|
||||
// method.
|
||||
@ -210,7 +198,7 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
|
||||
as := struct {
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(r),
|
||||
Alias: (*Alias)(c),
|
||||
}
|
||||
if err := dec.Decode(&as); err != nil {
|
||||
return err
|
||||
@ -228,3 +216,35 @@ func makeConfigEntry(kind string) (ConfigEntry, error) {
|
||||
return nil, fmt.Errorf("invalid config entry kind: %s", kind)
|
||||
}
|
||||
}
|
||||
|
||||
// ConfigEntryQuery is used when requesting info about a config entry.
|
||||
type ConfigEntryQuery struct {
|
||||
Kind string
|
||||
Name string
|
||||
Datacenter string
|
||||
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
func (c *ConfigEntryQuery) RequestDatacenter() string {
|
||||
return c.Datacenter
|
||||
}
|
||||
|
||||
// ServiceConfigRequest is used when requesting the resolved configuration
|
||||
// for a service.
|
||||
type ServiceConfigRequest struct {
|
||||
Name string
|
||||
Datacenter string
|
||||
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
func (s *ServiceConfigRequest) RequestDatacenter() string {
|
||||
return s.Datacenter
|
||||
}
|
||||
|
||||
type ServiceConfigResponse struct {
|
||||
Definition ServiceDefinition
|
||||
|
||||
QueryMeta
|
||||
}
|
||||
|
@ -1140,6 +1140,82 @@ type IndexedNodeDump struct {
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// IndexedConfigEntries has its own encoding logic which differs from
|
||||
// ConfigEntryRequest as it has to send a slice of ConfigEntry.
|
||||
type IndexedConfigEntries struct {
|
||||
Entries []ConfigEntry
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
func (c *IndexedConfigEntries) MarshalBinary() (data []byte, err error) {
|
||||
// bs will grow if needed but allocate enough to avoid reallocation in common
|
||||
// case.
|
||||
bs := make([]byte, 128)
|
||||
enc := codec.NewEncoderBytes(&bs, msgpackHandle)
|
||||
|
||||
// Encode kinds of entries first
|
||||
err = enc.Encode(len(c.Entries))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, entry := range c.Entries {
|
||||
err = enc.Encode(entry.GetKind())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Then actual value using alias trick to avoid infinite recursion
|
||||
type Alias IndexedConfigEntries
|
||||
err = enc.Encode(struct {
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(c),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func (c *IndexedConfigEntries) UnmarshalBinary(data []byte) error {
|
||||
// First decode the number of entries
|
||||
var numEntries int
|
||||
dec := codec.NewDecoderBytes(data, msgpackHandle)
|
||||
if err := dec.Decode(&numEntries); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Entries = make([]ConfigEntry, numEntries)
|
||||
for i := 0; i < numEntries; i++ {
|
||||
// First decode the kind prefix
|
||||
var kind string
|
||||
if err := dec.Decode(&kind); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Then decode the real thing with appropriate kind of ConfigEntry
|
||||
entry, err := makeConfigEntry(kind)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Entries[i] = entry
|
||||
}
|
||||
|
||||
// Alias juggling to prevent infinite recursive calls back to this decode
|
||||
// method.
|
||||
type Alias IndexedConfigEntries
|
||||
as := struct {
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(c),
|
||||
}
|
||||
if err := dec.Decode(&as); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DirEntry is used to represent a directory entry. This is
|
||||
// used for values in our Key-Value store.
|
||||
type DirEntry struct {
|
||||
|
Loading…
x
Reference in New Issue
Block a user