mirror of https://github.com/status-im/consul.git
Merge pull request #8253 from hashicorp/feature/auto-config/rpc-delegate
This commit is contained in:
commit
c707572621
|
@ -23,7 +23,7 @@ var _ = math.Inf
|
|||
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
||||
|
||||
// AutoConfigRequest is the data structure to be sent along with the
|
||||
// Cluster.AutoConfig RPC
|
||||
// AutoConfig.InitialConfiguration RPC
|
||||
type AutoConfigRequest struct {
|
||||
// Datacenter is the local datacenter name. This wont actually be set by clients
|
||||
// but rather will be set by the servers to allow for forwarding to
|
||||
|
@ -113,7 +113,7 @@ func (m *AutoConfigRequest) GetConsulToken() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// AutoConfigResponse is the data structure sent in response to a Cluster.AutoConfig request
|
||||
// AutoConfigResponse is the data structure sent in response to a AutoConfig.InitialConfiguration request
|
||||
type AutoConfigResponse struct {
|
||||
Config *config.Config `protobuf:"bytes,1,opt,name=Config,proto3" json:"Config,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
|
|
|
@ -7,7 +7,7 @@ option go_package = "github.com/hashicorp/consul/agent/agentpb";
|
|||
import "agent/agentpb/config/config.proto";
|
||||
|
||||
// AutoConfigRequest is the data structure to be sent along with the
|
||||
// Cluster.AutoConfig RPC
|
||||
// AutoConfig.InitialConfiguration RPC
|
||||
message AutoConfigRequest {
|
||||
// Datacenter is the local datacenter name. This wont actually be set by clients
|
||||
// but rather will be set by the servers to allow for forwarding to
|
||||
|
@ -30,7 +30,7 @@ message AutoConfigRequest {
|
|||
string ConsulToken = 6;
|
||||
}
|
||||
|
||||
// AutoConfigResponse is the data structure sent in response to a Cluster.AutoConfig request
|
||||
// AutoConfigResponse is the data structure sent in response to a AutoConfig.InitialConfiguration request
|
||||
message AutoConfigResponse {
|
||||
config.Config Config = 1;
|
||||
}
|
|
@ -279,7 +279,7 @@ func (ac *AutoConfig) InitialConfiguration(ctx context.Context) (*config.Runtime
|
|||
}
|
||||
|
||||
// introToken is responsible for determining the correct intro token to use
|
||||
// when making the initial Cluster.AutoConfig RPC request.
|
||||
// when making the initial AutoConfig.InitialConfiguration RPC request.
|
||||
func (ac *AutoConfig) introToken() (string, error) {
|
||||
conf := ac.config.AutoConfig
|
||||
// without an intro token or intro token file we cannot do anything
|
||||
|
@ -431,7 +431,7 @@ func (ac *AutoConfig) recordAutoConfigReply(reply *agentpb.AutoConfigResponse) e
|
|||
}
|
||||
|
||||
// getInitialConfigurationOnce will perform full server to TCPAddr resolution and
|
||||
// loop through each host trying to make the Cluster.AutoConfig RPC call. When
|
||||
// loop through each host trying to make the AutoConfig.InitialConfiguration RPC call. When
|
||||
// successful the bool return will be true and the err value will indicate whether we
|
||||
// successfully recorded the auto config settings (persisted to disk and stored internally
|
||||
// on the AutoConfig object)
|
||||
|
@ -462,9 +462,9 @@ func (ac *AutoConfig) getInitialConfigurationOnce(ctx context.Context) (bool, er
|
|||
return false, ctx.Err()
|
||||
}
|
||||
|
||||
ac.logger.Debug("making Cluster.AutoConfig RPC", "addr", addr.String())
|
||||
if err = ac.directRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "Cluster.AutoConfig", &request, &reply); err != nil {
|
||||
ac.logger.Error("AutoConfig RPC failed", "addr", addr.String(), "error", err)
|
||||
ac.logger.Debug("making AutoConfig.InitialConfiguration RPC", "addr", addr.String())
|
||||
if err = ac.directRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoConfig.InitialConfiguration", &request, &reply); err != nil {
|
||||
ac.logger.Error("AutoConfig.InitialConfiguration RPC failed", "addr", addr.String(), "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -207,7 +207,7 @@ func TestInitialConfiguration_cancelled(t *testing.T) {
|
|||
JWT: "blarg",
|
||||
}
|
||||
|
||||
directRPC.On("RPC", "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300}, "Cluster.AutoConfig", &expectedRequest, mock.Anything).Return(fmt.Errorf("injected error")).Times(0)
|
||||
directRPC.On("RPC", "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300}, "AutoConfig.InitialConfiguration", &expectedRequest, mock.Anything).Return(fmt.Errorf("injected error")).Times(0)
|
||||
ac, err := New(WithBuilderOpts(builderOpts), WithTLSConfigurator(&tlsutil.Configurator{}), WithDirectRPC(&directRPC))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ac)
|
||||
|
@ -289,7 +289,7 @@ func TestInitialConfiguration_success(t *testing.T) {
|
|||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300},
|
||||
"Cluster.AutoConfig",
|
||||
"AutoConfig.InitialConfiguration",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(populateResponse)
|
||||
|
||||
|
@ -344,7 +344,7 @@ func TestInitialConfiguration_retries(t *testing.T) {
|
|||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 1), Port: 8300},
|
||||
"Cluster.AutoConfig",
|
||||
"AutoConfig.InitialConfiguration",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
|
||||
directRPC.On(
|
||||
|
@ -352,7 +352,7 @@ func TestInitialConfiguration_retries(t *testing.T) {
|
|||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 2), Port: 8398},
|
||||
"Cluster.AutoConfig",
|
||||
"AutoConfig.InitialConfiguration",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
|
||||
directRPC.On(
|
||||
|
@ -360,7 +360,7 @@ func TestInitialConfiguration_retries(t *testing.T) {
|
|||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 3), Port: 8399},
|
||||
"Cluster.AutoConfig",
|
||||
"AutoConfig.InitialConfiguration",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
|
||||
directRPC.On(
|
||||
|
@ -368,7 +368,7 @@ func TestInitialConfiguration_retries(t *testing.T) {
|
|||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1234},
|
||||
"Cluster.AutoConfig",
|
||||
"AutoConfig.InitialConfiguration",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Once()
|
||||
directRPC.On(
|
||||
|
@ -376,7 +376,7 @@ func TestInitialConfiguration_retries(t *testing.T) {
|
|||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1234},
|
||||
"Cluster.AutoConfig",
|
||||
"AutoConfig.InitialConfiguration",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(populateResponse)
|
||||
|
||||
|
|
|
@ -115,7 +115,7 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC
|
|||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
if done, err := a.srv.forward("ACL.BootstrapTokens", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.BootstrapTokens", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -214,7 +214,7 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.TokenRead", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.TokenRead", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -283,7 +283,7 @@ func (a *ACL) TokenClone(args *structs.ACLTokenSetRequest, reply *structs.ACLTok
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.TokenClone", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.TokenClone", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -354,7 +354,7 @@ func (a *ACL) TokenSet(args *structs.ACLTokenSetRequest, reply *structs.ACLToken
|
|||
return fmt.Errorf("Local tokens are disabled")
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.TokenSet", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.TokenSet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -764,7 +764,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.TokenDelete", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.TokenDelete", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -854,7 +854,7 @@ func (a *ACL) TokenList(args *structs.ACLTokenListRequest, reply *structs.ACLTok
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.TokenList", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.TokenList", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -917,7 +917,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchGetRequest, reply *struc
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.TokenBatchRead", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.TokenBatchRead", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -971,7 +971,7 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.PolicyRead", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.PolicyRead", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1009,7 +1009,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchGetRequest, reply *str
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.PolicyBatchRead", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.PolicyBatchRead", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1047,7 +1047,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.PolicySet", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.PolicySet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1182,7 +1182,7 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string)
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.PolicyDelete", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.PolicyDelete", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1241,7 +1241,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.PolicyList", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.PolicyList", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1281,7 +1281,7 @@ func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchGetRequest, reply *struc
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.PolicyResolve", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.PolicyResolve", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1339,7 +1339,7 @@ func makeACLETag(parent string, policy *acl.Policy) string {
|
|||
// GetPolicy is used to retrieve a compiled policy object with a TTL. Does not
|
||||
// support a blocking query.
|
||||
func (a *ACL) GetPolicy(args *structs.ACLPolicyResolveLegacyRequest, reply *structs.ACLPolicyResolveLegacyResponse) error {
|
||||
if done, err := a.srv.forward("ACL.GetPolicy", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.GetPolicy", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1386,7 +1386,7 @@ func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest,
|
|||
// re-using a structure where we don't support all the options.
|
||||
args.RequireConsistent = true
|
||||
args.AllowStale = false
|
||||
if done, err := a.srv.forward("ACL.ReplicationStatus", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.ReplicationStatus", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1414,7 +1414,7 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.RoleRead", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.RoleRead", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1453,7 +1453,7 @@ func (a *ACL) RoleBatchRead(args *structs.ACLRoleBatchGetRequest, reply *structs
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.RoleBatchRead", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.RoleBatchRead", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1491,7 +1491,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.RoleSet", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.RoleSet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1653,7 +1653,7 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.RoleDelete", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.RoleDelete", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1708,7 +1708,7 @@ func (a *ACL) RoleList(args *structs.ACLRoleListRequest, reply *structs.ACLRoleL
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.RoleList", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.RoleList", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1742,7 +1742,7 @@ func (a *ACL) RoleResolve(args *structs.ACLRoleBatchGetRequest, reply *structs.A
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.RoleResolve", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.RoleResolve", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1807,7 +1807,7 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str
|
|||
return errAuthMethodsRequireTokenReplication
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.BindingRuleRead", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.BindingRuleRead", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1846,7 +1846,7 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru
|
|||
return errAuthMethodsRequireTokenReplication
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.BindingRuleSet", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.BindingRuleSet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1978,7 +1978,7 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply
|
|||
return errAuthMethodsRequireTokenReplication
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.BindingRuleDelete", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.BindingRuleDelete", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2033,7 +2033,7 @@ func (a *ACL) BindingRuleList(args *structs.ACLBindingRuleListRequest, reply *st
|
|||
return errAuthMethodsRequireTokenReplication
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.BindingRuleList", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.BindingRuleList", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2073,7 +2073,7 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc
|
|||
return errAuthMethodsRequireTokenReplication
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.AuthMethodRead", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.AuthMethodRead", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2115,7 +2115,7 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct
|
|||
return errAuthMethodsRequireTokenReplication
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.AuthMethodSet", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.AuthMethodSet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2231,7 +2231,7 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply *
|
|||
return errAuthMethodsRequireTokenReplication
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.AuthMethodDelete", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.AuthMethodDelete", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2291,7 +2291,7 @@ func (a *ACL) AuthMethodList(args *structs.ACLAuthMethodListRequest, reply *stru
|
|||
return errAuthMethodsRequireTokenReplication
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.AuthMethodList", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.AuthMethodList", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2345,7 +2345,7 @@ func (a *ACL) Login(args *structs.ACLLoginRequest, reply *structs.ACLToken) erro
|
|||
return errors.New("do not provide a token when logging in")
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.Login", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.Login", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2490,7 +2490,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error {
|
|||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.Logout", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.Logout", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2543,7 +2543,7 @@ func (a *ACL) Authorize(args *structs.RemoteACLAuthorizationRequest, reply *[]st
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.Authorize", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.Authorize", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
// Bootstrap is used to perform a one-time ACL bootstrap operation on
|
||||
// a cluster to get the first management token.
|
||||
func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) error {
|
||||
if done, err := a.srv.forward("ACL.Bootstrap", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.Bootstrap", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -153,7 +153,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro
|
|||
// Apply is used to apply a modifying request to the data store. This should
|
||||
// only be used for operations that modify the data
|
||||
func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
|
||||
if done, err := a.srv.forward("ACL.Apply", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now())
|
||||
|
@ -199,7 +199,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
|
|||
// Get is used to retrieve a single ACL
|
||||
func (a *ACL) Get(args *structs.ACLSpecificRequest,
|
||||
reply *structs.IndexedACLs) error {
|
||||
if done, err := a.srv.forward("ACL.Get", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -245,7 +245,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
|
|||
// List is used to list all the ACLs
|
||||
func (a *ACL) List(args *structs.DCSpecificRequest,
|
||||
reply *structs.IndexedACLs) error {
|
||||
if done, err := a.srv.forward("ACL.List", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("ACL.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -4,16 +4,12 @@ import (
|
|||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/agentpb"
|
||||
"github.com/hashicorp/consul/agent/agentpb/config"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/template"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
bexpr "github.com/hashicorp/go-bexpr"
|
||||
|
@ -55,7 +51,6 @@ func (a *jwtAuthorizer) Authorize(req *agentpb.AutoConfigRequest) (AutoConfigOpt
|
|||
"segment": req.Segment,
|
||||
}
|
||||
|
||||
// TODO (autoconf) check for JWT reuse if configured to do so.
|
||||
for _, raw := range a.claimAssertions {
|
||||
// validate and fill any HIL
|
||||
filled, err := template.InterpolateHIL(raw, varMap, true)
|
||||
|
@ -84,87 +79,81 @@ func (a *jwtAuthorizer) Authorize(req *agentpb.AutoConfigRequest) (AutoConfigOpt
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Cluster endpoint is used for cluster configuration operations
|
||||
type Cluster struct {
|
||||
srv *Server
|
||||
type AutoConfigBackend interface {
|
||||
CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error)
|
||||
DatacenterJoinAddresses(segment string) ([]string, error)
|
||||
ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error)
|
||||
}
|
||||
|
||||
// AutoConfig endpoint is used for cluster auto configuration operations
|
||||
type AutoConfig struct {
|
||||
// currently AutoConfig does not support pushing down any configuration that would be reloadable on the servers
|
||||
// (outside of some TLS settings such as the configured CA certs which are retrieved via the TLS configurator)
|
||||
// If that changes then we will need to change this to use an atomic.Value and provide means of reloading it.
|
||||
config *Config
|
||||
tlsConfigurator *tlsutil.Configurator
|
||||
|
||||
backend AutoConfigBackend
|
||||
authorizer AutoConfigAuthorizer
|
||||
}
|
||||
|
||||
func NewAutoConfig(conf *Config, tlsConfigurator *tlsutil.Configurator, backend AutoConfigBackend, authz AutoConfigAuthorizer) *AutoConfig {
|
||||
if conf == nil {
|
||||
conf = DefaultConfig()
|
||||
}
|
||||
|
||||
return &AutoConfig{
|
||||
config: conf,
|
||||
tlsConfigurator: tlsConfigurator,
|
||||
backend: backend,
|
||||
authorizer: authz,
|
||||
}
|
||||
}
|
||||
|
||||
// updateTLSCertificatesInConfig will ensure that the TLS settings regarding how an agent is
|
||||
// made aware of its certificates are populated. This will only work if connect is enabled and
|
||||
// in some cases only if auto_encrypt is enabled on the servers. This endpoint has the option
|
||||
// to configure auto_encrypt or potentially in the future to generate the certificates inline.
|
||||
func (c *Cluster) updateTLSCertificatesInConfig(opts AutoConfigOptions, conf *config.Config) error {
|
||||
if c.srv.config.AutoEncryptAllowTLS {
|
||||
conf.AutoEncrypt = &config.AutoEncrypt{TLS: true}
|
||||
} else {
|
||||
conf.AutoEncrypt = &config.AutoEncrypt{TLS: false}
|
||||
}
|
||||
|
||||
func (ac *AutoConfig) updateTLSCertificatesInConfig(opts AutoConfigOptions, conf *config.Config) error {
|
||||
conf.AutoEncrypt = &config.AutoEncrypt{TLS: ac.config.AutoEncryptAllowTLS}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateACLtokensInConfig will configure all of the agents ACL settings and will populate
|
||||
// the configuration with an agent token usable for all default agent operations.
|
||||
func (c *Cluster) updateACLsInConfig(opts AutoConfigOptions, conf *config.Config) error {
|
||||
func (ac *AutoConfig) updateACLsInConfig(opts AutoConfigOptions, conf *config.Config) error {
|
||||
acl := &config.ACL{
|
||||
Enabled: c.srv.config.ACLsEnabled,
|
||||
PolicyTTL: c.srv.config.ACLPolicyTTL.String(),
|
||||
RoleTTL: c.srv.config.ACLRoleTTL.String(),
|
||||
TokenTTL: c.srv.config.ACLTokenTTL.String(),
|
||||
DisabledTTL: c.srv.config.ACLDisabledTTL.String(),
|
||||
DownPolicy: c.srv.config.ACLDownPolicy,
|
||||
DefaultPolicy: c.srv.config.ACLDefaultPolicy,
|
||||
EnableKeyListPolicy: c.srv.config.ACLEnableKeyListPolicy,
|
||||
Enabled: ac.config.ACLsEnabled,
|
||||
PolicyTTL: ac.config.ACLPolicyTTL.String(),
|
||||
RoleTTL: ac.config.ACLRoleTTL.String(),
|
||||
TokenTTL: ac.config.ACLTokenTTL.String(),
|
||||
DisabledTTL: ac.config.ACLDisabledTTL.String(),
|
||||
DownPolicy: ac.config.ACLDownPolicy,
|
||||
DefaultPolicy: ac.config.ACLDefaultPolicy,
|
||||
EnableKeyListPolicy: ac.config.ACLEnableKeyListPolicy,
|
||||
}
|
||||
|
||||
// when ACLs are enabled we want to create a local token with a node identity
|
||||
if c.srv.config.ACLsEnabled {
|
||||
// we have to require local tokens or else it would require having these servers use a token with acl:write to make a
|
||||
// token create RPC to the servers in the primary DC.
|
||||
if !c.srv.LocalTokensEnabled() {
|
||||
return fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", c.srv.config.Datacenter)
|
||||
}
|
||||
|
||||
// generate the accessor id
|
||||
accessor, err := lib.GenerateUUID(c.srv.checkTokenUUID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// generate the secret id
|
||||
secret, err := lib.GenerateUUID(c.srv.checkTokenUUID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// set up the token
|
||||
token := structs.ACLToken{
|
||||
AccessorID: accessor,
|
||||
SecretID: secret,
|
||||
if ac.config.ACLsEnabled {
|
||||
// set up the token template - the ids and create
|
||||
template := structs.ACLToken{
|
||||
Description: fmt.Sprintf("Auto Config Token for Node %q", opts.NodeName),
|
||||
CreateTime: time.Now(),
|
||||
Local: true,
|
||||
NodeIdentities: []*structs.ACLNodeIdentity{
|
||||
{
|
||||
NodeName: opts.NodeName,
|
||||
Datacenter: c.srv.config.Datacenter,
|
||||
Datacenter: ac.config.Datacenter,
|
||||
},
|
||||
},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}
|
||||
|
||||
req := structs.ACLTokenBatchSetRequest{
|
||||
Tokens: structs.ACLTokens{&token},
|
||||
CAS: false,
|
||||
token, err := ac.backend.CreateACLToken(&template)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to generate an ACL token for node %q - %w", opts.NodeName, err)
|
||||
}
|
||||
|
||||
// perform the request to mint the new token
|
||||
if _, err := c.srv.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
acl.Tokens = &config.ACLTokens{Agent: secret}
|
||||
acl.Tokens = &config.ACLTokens{Agent: token.SecretID}
|
||||
}
|
||||
|
||||
conf.ACL = acl
|
||||
|
@ -173,20 +162,12 @@ func (c *Cluster) updateACLsInConfig(opts AutoConfigOptions, conf *config.Config
|
|||
|
||||
// updateJoinAddressesInConfig determines the correct gossip endpoints that clients should
|
||||
// be connecting to for joining the cluster based on the segment given in the opts parameter.
|
||||
func (c *Cluster) updateJoinAddressesInConfig(opts AutoConfigOptions, conf *config.Config) error {
|
||||
members, err := c.srv.LANSegmentMembers(opts.SegmentName)
|
||||
func (ac *AutoConfig) updateJoinAddressesInConfig(opts AutoConfigOptions, conf *config.Config) error {
|
||||
joinAddrs, err := ac.backend.DatacenterJoinAddresses(opts.SegmentName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var joinAddrs []string
|
||||
for _, m := range members {
|
||||
if ok, _ := metadata.IsConsulServer(m); ok {
|
||||
serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)}
|
||||
joinAddrs = append(joinAddrs, serfAddr.String())
|
||||
}
|
||||
}
|
||||
|
||||
if conf.Gossip == nil {
|
||||
conf.Gossip = &config.Gossip{}
|
||||
}
|
||||
|
@ -196,9 +177,9 @@ func (c *Cluster) updateJoinAddressesInConfig(opts AutoConfigOptions, conf *conf
|
|||
}
|
||||
|
||||
// updateGossipEncryptionInConfig will populate the gossip encryption configuration settings
|
||||
func (c *Cluster) updateGossipEncryptionInConfig(_ AutoConfigOptions, conf *config.Config) error {
|
||||
func (ac *AutoConfig) updateGossipEncryptionInConfig(_ AutoConfigOptions, conf *config.Config) error {
|
||||
// Add gossip encryption settings if there is any key loaded
|
||||
memberlistConfig := c.srv.config.SerfLANConfig.MemberlistConfig
|
||||
memberlistConfig := ac.config.SerfLANConfig.MemberlistConfig
|
||||
if lanKeyring := memberlistConfig.Keyring; lanKeyring != nil {
|
||||
if conf.Gossip == nil {
|
||||
conf.Gossip = &config.Gossip{}
|
||||
|
@ -221,13 +202,19 @@ func (c *Cluster) updateGossipEncryptionInConfig(_ AutoConfigOptions, conf *conf
|
|||
|
||||
// updateTLSSettingsInConfig will populate the TLS configuration settings but will not
|
||||
// populate leaf or ca certficiates.
|
||||
func (c *Cluster) updateTLSSettingsInConfig(_ AutoConfigOptions, conf *config.Config) error {
|
||||
func (ac *AutoConfig) updateTLSSettingsInConfig(_ AutoConfigOptions, conf *config.Config) error {
|
||||
if ac.tlsConfigurator == nil {
|
||||
// TLS is not enabled?
|
||||
return nil
|
||||
}
|
||||
|
||||
// add in TLS configuration
|
||||
if conf.TLS == nil {
|
||||
conf.TLS = &config.TLS{}
|
||||
}
|
||||
conf.TLS.VerifyServerHostname = c.srv.tlsConfigurator.VerifyServerHostname()
|
||||
base := c.srv.tlsConfigurator.Base()
|
||||
|
||||
conf.TLS.VerifyServerHostname = ac.tlsConfigurator.VerifyServerHostname()
|
||||
base := ac.tlsConfigurator.Base()
|
||||
conf.TLS.VerifyOutgoing = base.VerifyOutgoing
|
||||
conf.TLS.MinVersion = base.TLSMinVersion
|
||||
conf.TLS.PreferServerCipherSuites = base.PreferServerCipherSuites
|
||||
|
@ -239,61 +226,66 @@ func (c *Cluster) updateTLSSettingsInConfig(_ AutoConfigOptions, conf *config.Co
|
|||
|
||||
// baseConfig will populate the configuration with some base settings such as the
|
||||
// datacenter names, node name etc.
|
||||
func (c *Cluster) baseConfig(opts AutoConfigOptions, conf *config.Config) error {
|
||||
func (ac *AutoConfig) baseConfig(opts AutoConfigOptions, conf *config.Config) error {
|
||||
if opts.NodeName == "" {
|
||||
return fmt.Errorf("Cannot generate auto config response without a node name")
|
||||
}
|
||||
|
||||
conf.Datacenter = c.srv.config.Datacenter
|
||||
conf.PrimaryDatacenter = c.srv.config.PrimaryDatacenter
|
||||
conf.Datacenter = ac.config.Datacenter
|
||||
conf.PrimaryDatacenter = ac.config.PrimaryDatacenter
|
||||
conf.NodeName = opts.NodeName
|
||||
conf.SegmentName = opts.SegmentName
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type autoConfigUpdater func(c *Cluster, opts AutoConfigOptions, conf *config.Config) error
|
||||
type autoConfigUpdater func(c *AutoConfig, opts AutoConfigOptions, conf *config.Config) error
|
||||
|
||||
var (
|
||||
// variable holding the list of config updating functions to execute when generating
|
||||
// the auto config response. This will allow for more easily adding extra self-contained
|
||||
// configurators here in the future.
|
||||
autoConfigUpdaters []autoConfigUpdater = []autoConfigUpdater{
|
||||
(*Cluster).baseConfig,
|
||||
(*Cluster).updateJoinAddressesInConfig,
|
||||
(*Cluster).updateGossipEncryptionInConfig,
|
||||
(*Cluster).updateTLSSettingsInConfig,
|
||||
(*Cluster).updateACLsInConfig,
|
||||
(*Cluster).updateTLSCertificatesInConfig,
|
||||
(*AutoConfig).baseConfig,
|
||||
(*AutoConfig).updateJoinAddressesInConfig,
|
||||
(*AutoConfig).updateGossipEncryptionInConfig,
|
||||
(*AutoConfig).updateTLSSettingsInConfig,
|
||||
(*AutoConfig).updateACLsInConfig,
|
||||
(*AutoConfig).updateTLSCertificatesInConfig,
|
||||
}
|
||||
)
|
||||
|
||||
// AgentAutoConfig will authorize the incoming request and then generate the configuration
|
||||
// to push down to the client
|
||||
func (c *Cluster) AutoConfig(req *agentpb.AutoConfigRequest, resp *agentpb.AutoConfigResponse) error {
|
||||
func (ac *AutoConfig) InitialConfiguration(req *agentpb.AutoConfigRequest, resp *agentpb.AutoConfigResponse) error {
|
||||
// default the datacenter to our datacenter - agents do not have to specify this as they may not
|
||||
// yet know the datacenter name they are going to be in.
|
||||
if req.Datacenter == "" {
|
||||
req.Datacenter = c.srv.config.Datacenter
|
||||
req.Datacenter = ac.config.Datacenter
|
||||
}
|
||||
|
||||
// TODO (autoconf) Is performing auto configuration over the WAN really a bad idea?
|
||||
if req.Datacenter != c.srv.config.Datacenter {
|
||||
if req.Datacenter != ac.config.Datacenter {
|
||||
return fmt.Errorf("invalid datacenter %q - agent auto configuration cannot target a remote datacenter", req.Datacenter)
|
||||
}
|
||||
|
||||
// TODO (autoconf) maybe panic instead?
|
||||
if ac.backend == nil {
|
||||
return fmt.Errorf("No Auto Config backend is configured")
|
||||
}
|
||||
|
||||
// forward to the leader
|
||||
if done, err := c.srv.forward("Cluster.AutoConfig", req, req, resp); done {
|
||||
if done, err := ac.backend.ForwardRPC("AutoConfig.InitialConfiguration", req, req, resp); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO (autoconf) maybe panic instead?
|
||||
if c.authorizer == nil {
|
||||
if ac.authorizer == nil {
|
||||
return fmt.Errorf("No Auto Config authorizer is configured")
|
||||
}
|
||||
|
||||
// authorize the request with the configured authorizer
|
||||
opts, err := c.authorizer.Authorize(req)
|
||||
opts, err := ac.authorizer.Authorize(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -302,7 +294,7 @@ func (c *Cluster) AutoConfig(req *agentpb.AutoConfigRequest, resp *agentpb.AutoC
|
|||
|
||||
// update all the configurations
|
||||
for _, configFn := range autoConfigUpdaters {
|
||||
if err := configFn(c, opts, conf); err != nil {
|
||||
if err := configFn(ac, opts, conf); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
|
@ -19,11 +19,35 @@ import (
|
|||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/memberlist"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"gopkg.in/square/go-jose.v2/jwt"
|
||||
)
|
||||
|
||||
type mockAutoConfigBackend struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockAutoConfigBackend) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) {
|
||||
ret := m.Called(template)
|
||||
// this handles converting an untyped nil to a typed nil
|
||||
token, _ := ret.Get(0).(*structs.ACLToken)
|
||||
return token, ret.Error(1)
|
||||
}
|
||||
|
||||
func (m *mockAutoConfigBackend) DatacenterJoinAddresses(segment string) ([]string, error) {
|
||||
ret := m.Called(segment)
|
||||
// this handles converting an untyped nil to a typed nil
|
||||
addrs, _ := ret.Get(0).([]string)
|
||||
return addrs, ret.Error(1)
|
||||
}
|
||||
|
||||
func (m *mockAutoConfigBackend) ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error) {
|
||||
ret := m.Called(method, info, args, reply)
|
||||
return ret.Bool(0), ret.Error(1)
|
||||
}
|
||||
|
||||
func testJWTStandardClaims() jwt.Claims {
|
||||
now := time.Now()
|
||||
|
||||
|
@ -48,13 +72,13 @@ func signJWTWithStandardClaims(t *testing.T, privKey string, claims interface{})
|
|||
return signJWT(t, privKey, testJWTStandardClaims(), claims)
|
||||
}
|
||||
|
||||
// TestClusterAutoConfig is really an integration test of all the moving parts of the Cluster.AutoConfig RPC.
|
||||
// TestAutoConfigInitialConfiguration is really an integration test of all the moving parts of the AutoConfig.InitialConfiguration RPC.
|
||||
// Full testing of the individual parts will not be done in this test:
|
||||
//
|
||||
// * Any implementations of the AutoConfigAuthorizer interface (although these test do use the jwtAuthorizer)
|
||||
// * Each of the individual config generation functions. These can be unit tested separately and many wont
|
||||
// require a running test server.
|
||||
func TestClusterAutoConfig(t *testing.T) {
|
||||
// * Each of the individual config generation functions. These can be unit tested separately and should NOT
|
||||
// require running test servers
|
||||
func TestAutoConfigInitialConfiguration(t *testing.T) {
|
||||
type testCase struct {
|
||||
request agentpb.AutoConfigRequest
|
||||
expected agentpb.AutoConfigResponse
|
||||
|
@ -227,7 +251,7 @@ func TestClusterAutoConfig(t *testing.T) {
|
|||
for testName, tcase := range cases {
|
||||
t.Run(testName, func(t *testing.T) {
|
||||
var reply agentpb.AutoConfigResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Cluster.AutoConfig", &tcase.request, &reply)
|
||||
err := msgpackrpc.CallWithCodec(codec, "AutoConfig.InitialConfiguration", &tcase.request, &reply)
|
||||
if tcase.err != "" {
|
||||
testutil.RequireErrorContains(t, err, tcase.err)
|
||||
} else {
|
||||
|
@ -241,7 +265,7 @@ func TestClusterAutoConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClusterAutoConfig_baseConfig(t *testing.T) {
|
||||
func TestAutoConfig_baseConfig(t *testing.T) {
|
||||
type testCase struct {
|
||||
serverConfig Config
|
||||
opts AutoConfigOptions
|
||||
|
@ -277,14 +301,12 @@ func TestClusterAutoConfig_baseConfig(t *testing.T) {
|
|||
|
||||
for name, tcase := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
cluster := Cluster{
|
||||
srv: &Server{
|
||||
config: &tcase.serverConfig,
|
||||
},
|
||||
ac := AutoConfig{
|
||||
config: &tcase.serverConfig,
|
||||
}
|
||||
|
||||
var actual config.Config
|
||||
err := cluster.baseConfig(tcase.opts, &actual)
|
||||
err := ac.baseConfig(tcase.opts, &actual)
|
||||
if tcase.err == "" {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tcase.expected, actual)
|
||||
|
@ -295,7 +317,7 @@ func TestClusterAutoConfig_baseConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClusterAutoConfig_updateTLSSettingsInConfig(t *testing.T) {
|
||||
func TestAutoConfig_updateTLSSettingsInConfig(t *testing.T) {
|
||||
_, _, cacert, err := testTLSCertificates("server.dc1.consul")
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -365,14 +387,12 @@ func TestClusterAutoConfig_updateTLSSettingsInConfig(t *testing.T) {
|
|||
configurator, err := tlsutil.NewConfigurator(tcase.tlsConfig, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
cluster := &Cluster{
|
||||
srv: &Server{
|
||||
tlsConfigurator: configurator,
|
||||
},
|
||||
ac := &AutoConfig{
|
||||
tlsConfigurator: configurator,
|
||||
}
|
||||
|
||||
var actual config.Config
|
||||
err = cluster.updateTLSSettingsInConfig(AutoConfigOptions{}, &actual)
|
||||
err = ac.updateTLSSettingsInConfig(AutoConfigOptions{}, &actual)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tcase.expected, actual)
|
||||
})
|
||||
|
@ -436,16 +456,15 @@ func TestAutoConfig_updateGossipEncryptionInConfig(t *testing.T) {
|
|||
|
||||
for name, tcase := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
cluster := Cluster{
|
||||
srv: &Server{
|
||||
config: DefaultConfig(),
|
||||
},
|
||||
cfg := DefaultConfig()
|
||||
cfg.SerfLANConfig.MemberlistConfig = &tcase.conf
|
||||
|
||||
ac := AutoConfig{
|
||||
config: cfg,
|
||||
}
|
||||
|
||||
cluster.srv.config.SerfLANConfig.MemberlistConfig = &tcase.conf
|
||||
|
||||
var actual config.Config
|
||||
err := cluster.updateGossipEncryptionInConfig(AutoConfigOptions{}, &actual)
|
||||
err := ac.updateGossipEncryptionInConfig(AutoConfigOptions{}, &actual)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tcase.expected, actual)
|
||||
})
|
||||
|
@ -481,14 +500,12 @@ func TestAutoConfig_updateTLSCertificatesInConfig(t *testing.T) {
|
|||
|
||||
for name, tcase := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
cluster := Cluster{
|
||||
srv: &Server{
|
||||
config: &tcase.serverConfig,
|
||||
},
|
||||
ac := AutoConfig{
|
||||
config: &tcase.serverConfig,
|
||||
}
|
||||
|
||||
var actual config.Config
|
||||
err := cluster.updateTLSCertificatesInConfig(AutoConfigOptions{}, &actual)
|
||||
err := ac.updateTLSCertificatesInConfig(AutoConfigOptions{}, &actual)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tcase.expected, actual)
|
||||
})
|
||||
|
@ -497,24 +514,34 @@ func TestAutoConfig_updateTLSCertificatesInConfig(t *testing.T) {
|
|||
|
||||
func TestAutoConfig_updateACLsInConfig(t *testing.T) {
|
||||
type testCase struct {
|
||||
patch func(c *Config)
|
||||
expected config.Config
|
||||
verify func(t *testing.T, c *config.Config)
|
||||
err string
|
||||
config Config
|
||||
expected config.Config
|
||||
expectACLToken bool
|
||||
err error
|
||||
}
|
||||
|
||||
const (
|
||||
tokenAccessor = "b98761aa-c0ee-445b-9b0c-f54b56b47778"
|
||||
tokenSecret = "1c96448a-ab04-4caa-982a-e8b095a111e2"
|
||||
)
|
||||
|
||||
testDC := "dc1"
|
||||
|
||||
cases := map[string]testCase{
|
||||
"enabled": {
|
||||
patch: func(c *Config) {
|
||||
c.ACLsEnabled = true
|
||||
c.ACLPolicyTTL = 7 * time.Second
|
||||
c.ACLRoleTTL = 10 * time.Second
|
||||
c.ACLTokenTTL = 12 * time.Second
|
||||
c.ACLDisabledTTL = 31 * time.Second
|
||||
c.ACLDefaultPolicy = "allow"
|
||||
c.ACLDownPolicy = "deny"
|
||||
c.ACLEnableKeyListPolicy = true
|
||||
config: Config{
|
||||
Datacenter: testDC,
|
||||
PrimaryDatacenter: testDC,
|
||||
ACLsEnabled: true,
|
||||
ACLPolicyTTL: 7 * time.Second,
|
||||
ACLRoleTTL: 10 * time.Second,
|
||||
ACLTokenTTL: 12 * time.Second,
|
||||
ACLDisabledTTL: 31 * time.Second,
|
||||
ACLDefaultPolicy: "allow",
|
||||
ACLDownPolicy: "deny",
|
||||
ACLEnableKeyListPolicy: true,
|
||||
},
|
||||
expectACLToken: true,
|
||||
expected: config.Config{
|
||||
ACL: &config.ACL{
|
||||
Enabled: true,
|
||||
|
@ -525,32 +552,26 @@ func TestAutoConfig_updateACLsInConfig(t *testing.T) {
|
|||
DownPolicy: "deny",
|
||||
DefaultPolicy: "allow",
|
||||
EnableKeyListPolicy: true,
|
||||
Tokens: &config.ACLTokens{Agent: "verified"},
|
||||
Tokens: &config.ACLTokens{
|
||||
Agent: tokenSecret,
|
||||
},
|
||||
},
|
||||
},
|
||||
verify: func(t *testing.T, c *config.Config) {
|
||||
t.Helper()
|
||||
// the agent token secret is non-deterministically generated
|
||||
// So we want to validate that one was set and overwrite with
|
||||
// a value that the expected configurate wants.
|
||||
require.NotNil(t, c)
|
||||
require.NotNil(t, c.ACL)
|
||||
require.NotNil(t, c.ACL.Tokens)
|
||||
require.NotEmpty(t, c.ACL.Tokens.Agent)
|
||||
c.ACL.Tokens.Agent = "verified"
|
||||
},
|
||||
},
|
||||
"disabled": {
|
||||
patch: func(c *Config) {
|
||||
c.ACLsEnabled = false
|
||||
c.ACLPolicyTTL = 7 * time.Second
|
||||
c.ACLRoleTTL = 10 * time.Second
|
||||
c.ACLTokenTTL = 12 * time.Second
|
||||
c.ACLDisabledTTL = 31 * time.Second
|
||||
c.ACLDefaultPolicy = "allow"
|
||||
c.ACLDownPolicy = "deny"
|
||||
c.ACLEnableKeyListPolicy = true
|
||||
config: Config{
|
||||
Datacenter: testDC,
|
||||
PrimaryDatacenter: testDC,
|
||||
ACLsEnabled: false,
|
||||
ACLPolicyTTL: 7 * time.Second,
|
||||
ACLRoleTTL: 10 * time.Second,
|
||||
ACLTokenTTL: 12 * time.Second,
|
||||
ACLDisabledTTL: 31 * time.Second,
|
||||
ACLDefaultPolicy: "allow",
|
||||
ACLDownPolicy: "deny",
|
||||
ACLEnableKeyListPolicy: true,
|
||||
},
|
||||
expectACLToken: false,
|
||||
expected: config.Config{
|
||||
ACL: &config.ACL{
|
||||
Enabled: false,
|
||||
|
@ -565,53 +586,77 @@ func TestAutoConfig_updateACLsInConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"local-tokens-disabled": {
|
||||
patch: func(c *Config) {
|
||||
c.PrimaryDatacenter = "somewhere else"
|
||||
config: Config{
|
||||
Datacenter: testDC,
|
||||
PrimaryDatacenter: "somewhere-else",
|
||||
ACLsEnabled: true,
|
||||
},
|
||||
err: "Agent Auto Configuration requires local token usage to be enabled in this datacenter",
|
||||
expectACLToken: true,
|
||||
err: fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter"),
|
||||
},
|
||||
}
|
||||
for name, tcase := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
_, s, _ := testACLServerWithConfig(t, tcase.patch, false)
|
||||
backend := &mockAutoConfigBackend{}
|
||||
expectedTemplate := &structs.ACLToken{
|
||||
Description: `Auto Config Token for Node "something"`,
|
||||
Local: true,
|
||||
NodeIdentities: []*structs.ACLNodeIdentity{
|
||||
{
|
||||
NodeName: "something",
|
||||
Datacenter: testDC,
|
||||
},
|
||||
},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}
|
||||
|
||||
waitForLeaderEstablishment(t, s)
|
||||
testToken := &structs.ACLToken{
|
||||
AccessorID: tokenAccessor,
|
||||
SecretID: tokenSecret,
|
||||
Description: `Auto Config Token for Node "something"`,
|
||||
Local: true,
|
||||
NodeIdentities: []*structs.ACLNodeIdentity{
|
||||
{
|
||||
NodeName: "something",
|
||||
Datacenter: testDC,
|
||||
},
|
||||
},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}
|
||||
|
||||
cluster := Cluster{srv: s}
|
||||
if tcase.expectACLToken {
|
||||
backend.On("CreateACLToken", expectedTemplate).Return(testToken, tcase.err).Once()
|
||||
}
|
||||
|
||||
ac := AutoConfig{config: &tcase.config, backend: backend}
|
||||
|
||||
var actual config.Config
|
||||
err := cluster.updateACLsInConfig(AutoConfigOptions{NodeName: "something"}, &actual)
|
||||
if tcase.err != "" {
|
||||
testutil.RequireErrorContains(t, err, tcase.err)
|
||||
err := ac.updateACLsInConfig(AutoConfigOptions{NodeName: "something"}, &actual)
|
||||
if tcase.err != nil {
|
||||
testutil.RequireErrorContains(t, err, tcase.err.Error())
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
if tcase.verify != nil {
|
||||
tcase.verify(t, &actual)
|
||||
}
|
||||
require.Equal(t, tcase.expected, actual)
|
||||
}
|
||||
|
||||
backend.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutoConfig_updateJoinAddressesInConfig(t *testing.T) {
|
||||
conf := testClusterConfig{
|
||||
Datacenter: "primary",
|
||||
Servers: 3,
|
||||
}
|
||||
addrs := []string{"198.18.0.7:8300", "198.18.0.1:8300"}
|
||||
backend := &mockAutoConfigBackend{}
|
||||
backend.On("DatacenterJoinAddresses", "").Return(addrs, nil).Once()
|
||||
|
||||
nodes := newTestCluster(t, &conf)
|
||||
|
||||
cluster := Cluster{srv: nodes.Servers[0]}
|
||||
ac := AutoConfig{backend: backend}
|
||||
|
||||
var actual config.Config
|
||||
err := cluster.updateJoinAddressesInConfig(AutoConfigOptions{}, &actual)
|
||||
err := ac.updateJoinAddressesInConfig(AutoConfigOptions{}, &actual)
|
||||
require.NoError(t, err)
|
||||
|
||||
var expected []string
|
||||
for _, srv := range nodes.Servers {
|
||||
expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort))
|
||||
}
|
||||
require.NotNil(t, actual.Gossip)
|
||||
require.ElementsMatch(t, expected, actual.Gossip.RetryJoinLAN)
|
||||
require.ElementsMatch(t, addrs, actual.Gossip.RetryJoinLAN)
|
||||
|
||||
backend.AssertExpectations(t)
|
||||
}
|
|
@ -24,7 +24,7 @@ func (a *AutoEncrypt) Sign(
|
|||
if !a.srv.config.AutoEncryptAllowTLS {
|
||||
return ErrAutoEncryptAllowTLSNotEnabled
|
||||
}
|
||||
if done, err := a.srv.forward("AutoEncrypt.Sign", args, args, reply); done {
|
||||
if done, err := a.srv.ForwardRPC("AutoEncrypt.Sign", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -94,7 +94,7 @@ func checkPreApply(check *structs.HealthCheck) {
|
|||
|
||||
// Register is used register that a node is providing a given service.
|
||||
func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error {
|
||||
if done, err := c.srv.forward("Catalog.Register", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.Register", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now())
|
||||
|
@ -175,7 +175,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
|
|||
|
||||
// Deregister is used to remove a service registration for a given node.
|
||||
func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error {
|
||||
if done, err := c.srv.forward("Catalog.Deregister", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"catalog", "deregister"}, time.Now())
|
||||
|
@ -244,7 +244,7 @@ func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]str
|
|||
|
||||
// ListNodes is used to query the nodes in a DC
|
||||
func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error {
|
||||
if done, err := c.srv.forward("Catalog.ListNodes", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.ListNodes", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -292,7 +292,7 @@ func isUnmodified(opts structs.QueryOptions, index uint64) bool {
|
|||
|
||||
// ListServices is used to query the services in a DC
|
||||
func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error {
|
||||
if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -331,7 +331,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
|
|||
}
|
||||
|
||||
func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error {
|
||||
if done, err := c.srv.forward("Catalog.ServiceList", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.ServiceList", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -360,7 +360,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In
|
|||
|
||||
// ServiceNodes returns all the nodes registered as part of a service
|
||||
func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error {
|
||||
if done, err := c.srv.forward("Catalog.ServiceNodes", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -498,7 +498,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
|||
|
||||
// NodeServices returns all the services registered as part of a node
|
||||
func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error {
|
||||
if done, err := c.srv.forward("Catalog.NodeServices", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.NodeServices", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -549,7 +549,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
|
|||
}
|
||||
|
||||
func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServiceList) error {
|
||||
if done, err := c.srv.forward("Catalog.NodeServiceList", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.NodeServiceList", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -602,7 +602,7 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru
|
|||
}
|
||||
|
||||
func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error {
|
||||
if done, err := c.srv.forward("Catalog.GatewayServices", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.GatewayServices", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error
|
|||
// be replicated to all the other datacenters.
|
||||
args.Datacenter = c.srv.config.PrimaryDatacenter
|
||||
|
||||
if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("ConfigEntry.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now())
|
||||
|
@ -73,7 +73,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := c.srv.forward("ConfigEntry.Get", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("ConfigEntry.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now())
|
||||
|
@ -120,7 +120,7 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := c.srv.forward("ConfigEntry.List", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("ConfigEntry.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now())
|
||||
|
@ -165,7 +165,7 @@ func (c *ConfigEntry) ListAll(args *structs.DCSpecificRequest, reply *structs.In
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := c.srv.forward("ConfigEntry.ListAll", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("ConfigEntry.ListAll", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now())
|
||||
|
@ -209,7 +209,7 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{})
|
|||
// be replicated to all the other datacenters.
|
||||
args.Datacenter = c.srv.config.PrimaryDatacenter
|
||||
|
||||
if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("ConfigEntry.Delete", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now())
|
||||
|
@ -245,7 +245,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
return err
|
||||
}
|
||||
|
||||
if done, err := c.srv.forward("ConfigEntry.ResolveServiceConfig", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("ConfigEntry.ResolveServiceConfig", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now())
|
||||
|
|
|
@ -63,7 +63,7 @@ func (s *ConnectCA) ConfigurationGet(
|
|||
return ErrConnectNotEnabled
|
||||
}
|
||||
|
||||
if done, err := s.srv.forward("ConnectCA.ConfigurationGet", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationGet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -95,7 +95,7 @@ func (s *ConnectCA) ConfigurationSet(
|
|||
return ErrConnectNotEnabled
|
||||
}
|
||||
|
||||
if done, err := s.srv.forward("ConnectCA.ConfigurationSet", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationSet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -312,7 +312,7 @@ func (s *ConnectCA) Roots(
|
|||
args *structs.DCSpecificRequest,
|
||||
reply *structs.IndexedCARoots) error {
|
||||
// Forward if necessary
|
||||
if done, err := s.srv.forward("ConnectCA.Roots", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("ConnectCA.Roots", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -387,7 +387,7 @@ func (s *ConnectCA) Sign(
|
|||
return ErrConnectNotEnabled
|
||||
}
|
||||
|
||||
if done, err := s.srv.forward("ConnectCA.Sign", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("ConnectCA.Sign", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -593,7 +593,7 @@ func (s *ConnectCA) SignIntermediate(
|
|||
return ErrConnectNotEnabled
|
||||
}
|
||||
|
||||
if done, err := s.srv.forward("ConnectCA.SignIntermediate", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("ConnectCA.SignIntermediate", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ func (c *Coordinate) batchApplyUpdates() error {
|
|||
|
||||
// Update inserts or updates the LAN coordinate of a node.
|
||||
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) {
|
||||
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Coordinate.Update", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -184,7 +184,7 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter
|
|||
// ListNodes returns the list of nodes with their raw network coordinates (if no
|
||||
// coordinates are available for a node it won't appear in this list).
|
||||
func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error {
|
||||
if done, err := c.srv.forward("Coordinate.ListNodes", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -207,7 +207,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
|
|||
|
||||
// Node returns the raw coordinates for a single node.
|
||||
func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error {
|
||||
if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("Coordinate.Node", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
|
|||
return ErrConnectNotEnabled
|
||||
}
|
||||
|
||||
if done, err := c.srv.forward("DiscoveryChain.Get", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("DiscoveryChain.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"discovery_chain", "get"}, time.Now())
|
||||
|
|
|
@ -27,7 +27,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo
|
|||
// be replicated to all the other datacenters.
|
||||
args.Datacenter = c.srv.config.PrimaryDatacenter
|
||||
|
||||
if done, err := c.srv.forward("FederationState.Apply", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("FederationState.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -76,7 +76,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo
|
|||
}
|
||||
|
||||
func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs.FederationStateResponse) error {
|
||||
if done, err := c.srv.forward("FederationState.Get", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("FederationState.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -117,7 +117,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs
|
|||
// List is the endpoint meant to be used by consul servers performing
|
||||
// replication.
|
||||
func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.IndexedFederationStates) error {
|
||||
if done, err := c.srv.forward("FederationState.List", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("FederationState.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,7 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I
|
|||
// in the discovery info for dialing mesh gateways. Analogous to catalog
|
||||
// endpoints.
|
||||
func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, reply *structs.DatacenterIndexedCheckServiceNodes) error {
|
||||
if done, err := c.srv.forward("FederationState.ListMeshGateways", args, args, reply); done {
|
||||
if done, err := c.srv.ForwardRPC("FederationState.ListMeshGateways", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -325,7 +325,7 @@ func (g *GatewayLocator) runOnce(lastFetchIndex uint64) (uint64, error) {
|
|||
return queryMeta.Index, nil
|
||||
}
|
||||
|
||||
// checkLocalStateIsReady is inlined a bit from (*Server).forward(). We need to
|
||||
// checkLocalStateIsReady is inlined a bit from (*Server).ForwardRPC(). We need to
|
||||
// wait until our own state machine is safe to read from.
|
||||
func (g *GatewayLocator) checkLocalStateIsReady() error {
|
||||
// Check if we can allow a stale read, ensure our local DB is initialized
|
||||
|
|
|
@ -20,7 +20,7 @@ type Health struct {
|
|||
// ChecksInState is used to get all the checks in a given state
|
||||
func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
|
||||
reply *structs.IndexedHealthChecks) error {
|
||||
if done, err := h.srv.forward("Health.ChecksInState", args, args, reply); done {
|
||||
if done, err := h.srv.ForwardRPC("Health.ChecksInState", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
|
|||
// NodeChecks is used to get all the checks for a node
|
||||
func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
|
||||
reply *structs.IndexedHealthChecks) error {
|
||||
if done, err := h.srv.forward("Health.NodeChecks", args, args, reply); done {
|
||||
if done, err := h.srv.ForwardRPC("Health.NodeChecks", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -121,7 +121,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
|
|||
}
|
||||
|
||||
// Potentially forward
|
||||
if done, err := h.srv.forward("Health.ServiceChecks", args, args, reply); done {
|
||||
if done, err := h.srv.ForwardRPC("Health.ServiceChecks", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -171,7 +171,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
|
|||
|
||||
// ServiceNodes returns all the nodes registered as part of a service including health info
|
||||
func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error {
|
||||
if done, err := h.srv.forward("Health.ServiceNodes", args, args, reply); done {
|
||||
if done, err := h.srv.ForwardRPC("Health.ServiceNodes", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -197,7 +197,7 @@ func (s *Intention) Apply(
|
|||
args.Datacenter = s.srv.config.PrimaryDatacenter
|
||||
}
|
||||
|
||||
if done, err := s.srv.forward("Intention.Apply", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Intention.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now())
|
||||
|
@ -253,7 +253,7 @@ func (s *Intention) Get(
|
|||
args *structs.IntentionQueryRequest,
|
||||
reply *structs.IndexedIntentions) error {
|
||||
// Forward if necessary
|
||||
if done, err := s.srv.forward("Intention.Get", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Intention.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -328,7 +328,7 @@ func (s *Intention) List(
|
|||
args *structs.DCSpecificRequest,
|
||||
reply *structs.IndexedIntentions) error {
|
||||
// Forward if necessary
|
||||
if done, err := s.srv.forward("Intention.List", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Intention.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -379,7 +379,7 @@ func (s *Intention) Match(
|
|||
args *structs.IntentionQueryRequest,
|
||||
reply *structs.IndexedIntentionMatches) error {
|
||||
// Forward if necessary
|
||||
if done, err := s.srv.forward("Intention.Match", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Intention.Match", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -448,7 +448,7 @@ func (s *Intention) Check(
|
|||
args *structs.IntentionQueryRequest,
|
||||
reply *structs.IntentionQueryCheckResponse) error {
|
||||
// Forward maybe
|
||||
if done, err := s.srv.forward("Intention.Check", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Intention.Check", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ type Internal struct {
|
|||
// NodeInfo is used to retrieve information about a specific node.
|
||||
func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest,
|
||||
reply *structs.IndexedNodeDump) error {
|
||||
if done, err := m.srv.forward("Internal.NodeInfo", args, args, reply); done {
|
||||
if done, err := m.srv.ForwardRPC("Internal.NodeInfo", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -50,7 +50,7 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest,
|
|||
// NodeDump is used to generate information about all of the nodes.
|
||||
func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
|
||||
reply *structs.IndexedNodeDump) error {
|
||||
if done, err := m.srv.forward("Internal.NodeDump", args, args, reply); done {
|
||||
if done, err := m.srv.ForwardRPC("Internal.NodeDump", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -89,7 +89,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
|
|||
}
|
||||
|
||||
func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedCheckServiceNodes) error {
|
||||
if done, err := m.srv.forward("Internal.ServiceDump", args, args, reply); done {
|
||||
if done, err := m.srv.ForwardRPC("Internal.ServiceDump", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -129,7 +129,7 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
|
|||
|
||||
// GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config
|
||||
func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error {
|
||||
if done, err := m.srv.forward("Internal.GatewayServiceDump", args, args, reply); done {
|
||||
if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -210,7 +210,7 @@ func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, repl
|
|||
// triggered in a remote DC.
|
||||
func (m *Internal) EventFire(args *structs.EventFireRequest,
|
||||
reply *structs.EventFireResponse) error {
|
||||
if done, err := m.srv.forward("Internal.EventFire", args, args, reply); done {
|
||||
if done, err := m.srv.ForwardRPC("Internal.EventFire", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz acl.Authorizer, op api.
|
|||
|
||||
// Apply is used to apply a KVS update request to the data store.
|
||||
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
|
||||
if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
|
||||
if done, err := k.srv.ForwardRPC("KVS.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"kvs", "apply"}, time.Now())
|
||||
|
@ -130,7 +130,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
|
|||
|
||||
// Get is used to lookup a single key.
|
||||
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
|
||||
if done, err := k.srv.forward("KVS.Get", args, args, reply); done {
|
||||
if done, err := k.srv.ForwardRPC("KVS.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -175,7 +175,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
|||
|
||||
// List is used to list all keys with a given prefix.
|
||||
func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
|
||||
if done, err := k.srv.forward("KVS.List", args, args, reply); done {
|
||||
if done, err := k.srv.ForwardRPC("KVS.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -227,7 +227,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
|||
// of the response so that only a subset of the prefix is returned. In this
|
||||
// mode, the keys which are omitted are still counted in the returned index.
|
||||
func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error {
|
||||
if done, err := k.srv.forward("KVS.ListKeys", args, args, reply); done {
|
||||
if done, err := k.srv.ForwardRPC("KVS.ListKeys", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *autopilot.Config) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||
if done, err := op.srv.ForwardRPC("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,7 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r
|
|||
|
||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
|
||||
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
|
||||
if done, err := op.srv.ForwardRPC("Operator.AutopilotSetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -81,7 +81,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *autopil
|
|||
// re-using a structure where we don't support all the options.
|
||||
args.RequireConsistent = true
|
||||
args.AllowStale = false
|
||||
if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done {
|
||||
if done, err := op.srv.ForwardRPC("Operator.ServerHealth", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
|
||||
// RaftGetConfiguration is used to retrieve the current Raft configuration.
|
||||
func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error {
|
||||
if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done {
|
||||
if done, err := op.srv.ForwardRPC("Operator.RaftGetConfiguration", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -74,7 +74,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply
|
|||
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
|
||||
// interface.
|
||||
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
|
||||
if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByAddress", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -146,7 +146,7 @@ REMOVE:
|
|||
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
|
||||
// interface.
|
||||
func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
|
||||
if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByID", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ type PreparedQuery struct {
|
|||
// only be used for operations that modify the data. The ID of the session is
|
||||
// returned in the reply.
|
||||
func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) {
|
||||
if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done {
|
||||
if done, err := p.srv.ForwardRPC("PreparedQuery.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"prepared-query", "apply"}, time.Now())
|
||||
|
@ -209,7 +209,7 @@ func parseDNS(dns *structs.QueryDNSOptions) error {
|
|||
// Get returns a single prepared query by ID.
|
||||
func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
|
||||
reply *structs.IndexedPreparedQueries) error {
|
||||
if done, err := p.srv.forward("PreparedQuery.Get", args, args, reply); done {
|
||||
if done, err := p.srv.ForwardRPC("PreparedQuery.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -253,7 +253,7 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
|
|||
|
||||
// List returns all the prepared queries.
|
||||
func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error {
|
||||
if done, err := p.srv.forward("PreparedQuery.List", args, args, reply); done {
|
||||
if done, err := p.srv.ForwardRPC("PreparedQuery.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -277,7 +277,7 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind
|
|||
// will be executed here.
|
||||
func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest,
|
||||
reply *structs.PreparedQueryExplainResponse) error {
|
||||
if done, err := p.srv.forward("PreparedQuery.Explain", args, args, reply); done {
|
||||
if done, err := p.srv.ForwardRPC("PreparedQuery.Explain", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"prepared-query", "explain"}, time.Now())
|
||||
|
@ -324,7 +324,7 @@ func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest,
|
|||
// part of a DNS lookup, or when executing prepared queries from the HTTP API.
|
||||
func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done {
|
||||
if done, err := p.srv.ForwardRPC("PreparedQuery.Execute", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"prepared-query", "execute"}, time.Now())
|
||||
|
@ -459,7 +459,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
|||
// We don't want things to fan out further than one level.
|
||||
func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done {
|
||||
if done, err := p.srv.ForwardRPC("PreparedQuery.ExecuteRemote", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"prepared-query", "execute_remote"}, time.Now())
|
||||
|
|
|
@ -499,9 +499,9 @@ func canRetry(args interface{}, err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// forward is used to forward to a remote DC or to forward to the local leader
|
||||
// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader
|
||||
// Returns a bool of if forwarding was performed, as well as any error
|
||||
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
|
||||
func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
|
||||
var firstCheck time.Time
|
||||
|
||||
// Handle DC forwarding
|
||||
|
|
|
@ -876,7 +876,7 @@ func (s *Server) setupRPC() error {
|
|||
authz = &disabledAuthorizer{}
|
||||
}
|
||||
// now register with the insecure RPC server
|
||||
s.insecureRPCServer.Register(&Cluster{srv: s, authorizer: authz})
|
||||
s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, s, authz))
|
||||
|
||||
ln, err := net.ListenTCP("tcp", s.config.RPCAddr)
|
||||
if err != nil {
|
||||
|
@ -1436,6 +1436,72 @@ func (s *Server) intentionReplicationEnabled() bool {
|
|||
return s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter
|
||||
}
|
||||
|
||||
// CreateACLToken will create an ACL token from the given template
|
||||
func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) {
|
||||
// we have to require local tokens or else it would require having these servers use a token with acl:write to make a
|
||||
// token create RPC to the servers in the primary DC.
|
||||
if !s.LocalTokensEnabled() {
|
||||
return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", s.config.Datacenter)
|
||||
}
|
||||
|
||||
newToken := *template
|
||||
|
||||
// generate the accessor id
|
||||
if newToken.AccessorID == "" {
|
||||
accessor, err := lib.GenerateUUID(s.checkTokenUUID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newToken.AccessorID = accessor
|
||||
}
|
||||
|
||||
// generate the secret id
|
||||
if newToken.SecretID == "" {
|
||||
secret, err := lib.GenerateUUID(s.checkTokenUUID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newToken.SecretID = secret
|
||||
}
|
||||
|
||||
newToken.CreateTime = time.Now()
|
||||
|
||||
req := structs.ACLTokenBatchSetRequest{
|
||||
Tokens: structs.ACLTokens{&newToken},
|
||||
CAS: false,
|
||||
}
|
||||
|
||||
// perform the request to mint the new token
|
||||
if _, err := s.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// return the full token definition from the FSM
|
||||
_, token, err := s.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta)
|
||||
return token, err
|
||||
}
|
||||
|
||||
// DatacenterJoinAddresses will return all the strings suitable for usage in
|
||||
// retry join operations to connect to the the LAN or LAN segment gossip pool.
|
||||
func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) {
|
||||
members, err := s.LANSegmentMembers(segment)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err)
|
||||
}
|
||||
|
||||
var joinAddrs []string
|
||||
for _, m := range members {
|
||||
if ok, _ := metadata.IsConsulServer(m); ok {
|
||||
serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)}
|
||||
joinAddrs = append(joinAddrs, serfAddr.String())
|
||||
}
|
||||
}
|
||||
|
||||
return joinAddrs, nil
|
||||
}
|
||||
|
||||
// peersInfoContent is used to help operators understand what happened to the
|
||||
// peers.json file. This is written to a file called peers.info in the same
|
||||
// location.
|
||||
|
|
|
@ -1534,3 +1534,97 @@ func TestServer_CALogging(t *testing.T) {
|
|||
|
||||
require.Contains(t, buf.String(), "consul CA provider configured")
|
||||
}
|
||||
|
||||
func TestServer_DatacenterJoinAddresses(t *testing.T) {
|
||||
conf := testClusterConfig{
|
||||
Datacenter: "primary",
|
||||
Servers: 3,
|
||||
}
|
||||
|
||||
nodes := newTestCluster(t, &conf)
|
||||
|
||||
var expected []string
|
||||
for _, srv := range nodes.Servers {
|
||||
expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort))
|
||||
}
|
||||
|
||||
actual, err := nodes.Servers[0].DatacenterJoinAddresses("")
|
||||
require.NoError(t, err)
|
||||
require.ElementsMatch(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestServer_CreateACLToken(t *testing.T) {
|
||||
_, srv, codec := testACLServerWithConfig(t, nil, false)
|
||||
|
||||
waitForLeaderEstablishment(t, srv)
|
||||
|
||||
r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1")
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("predefined-ids", func(t *testing.T) {
|
||||
accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3"
|
||||
secret := "ef453f31-ad58-4ec8-8bf8-342e99763026"
|
||||
in := &structs.ACLToken{
|
||||
AccessorID: accessor,
|
||||
SecretID: secret,
|
||||
Description: "test",
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
{
|
||||
ID: structs.ACLPolicyGlobalManagementID,
|
||||
},
|
||||
},
|
||||
NodeIdentities: []*structs.ACLNodeIdentity{
|
||||
{
|
||||
NodeName: "foo",
|
||||
Datacenter: "bar",
|
||||
},
|
||||
},
|
||||
ServiceIdentities: []*structs.ACLServiceIdentity{
|
||||
{
|
||||
ServiceName: "web",
|
||||
},
|
||||
},
|
||||
Roles: []structs.ACLTokenRoleLink{
|
||||
{
|
||||
ID: r1.ID,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
out, err := srv.CreateACLToken(in)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, accessor, out.AccessorID)
|
||||
require.Equal(t, secret, out.SecretID)
|
||||
require.Equal(t, "test", out.Description)
|
||||
require.NotZero(t, out.CreateTime)
|
||||
require.Len(t, out.Policies, 1)
|
||||
require.Len(t, out.Roles, 1)
|
||||
require.Len(t, out.NodeIdentities, 1)
|
||||
require.Len(t, out.ServiceIdentities, 1)
|
||||
require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID)
|
||||
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
|
||||
require.Equal(t, "web", out.ServiceIdentities[0].ServiceName)
|
||||
require.Equal(t, r1.ID, out.Roles[0].ID)
|
||||
})
|
||||
|
||||
t.Run("autogen-ids", func(t *testing.T) {
|
||||
in := &structs.ACLToken{
|
||||
Description: "test",
|
||||
NodeIdentities: []*structs.ACLNodeIdentity{
|
||||
{
|
||||
NodeName: "foo",
|
||||
Datacenter: "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
out, err := srv.CreateACLToken(in)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, out.AccessorID)
|
||||
require.NotEmpty(t, out.SecretID)
|
||||
require.Equal(t, "test", out.Description)
|
||||
require.NotZero(t, out.CreateTime)
|
||||
require.Len(t, out.NodeIdentities, 1)
|
||||
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ func fixupSessionSpecificRequest(args *structs.SessionSpecificRequest) {
|
|||
// Apply is used to apply a modifying request to the data store. This should
|
||||
// only be used for operations that modify the data
|
||||
func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
|
||||
if done, err := s.srv.forward("Session.Apply", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Session.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"session", "apply"}, time.Now())
|
||||
|
@ -162,7 +162,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
|
|||
// Get is used to retrieve a single session
|
||||
func (s *Session) Get(args *structs.SessionSpecificRequest,
|
||||
reply *structs.IndexedSessions) error {
|
||||
if done, err := s.srv.forward("Session.Get", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Session.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -203,7 +203,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
|
|||
// List is used to list all the active sessions
|
||||
func (s *Session) List(args *structs.SessionSpecificRequest,
|
||||
reply *structs.IndexedSessions) error {
|
||||
if done, err := s.srv.forward("Session.List", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Session.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -237,7 +237,7 @@ func (s *Session) List(args *structs.SessionSpecificRequest,
|
|||
// NodeSessions is used to get all the sessions for a particular node
|
||||
func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
|
||||
reply *structs.IndexedSessions) error {
|
||||
if done, err := s.srv.forward("Session.NodeSessions", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Session.NodeSessions", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -271,7 +271,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
|
|||
// Renew is used to renew the TTL on a single session
|
||||
func (s *Session) Renew(args *structs.SessionSpecificRequest,
|
||||
reply *structs.IndexedSessions) error {
|
||||
if done, err := s.srv.forward("Session.Renew", args, args, reply); done {
|
||||
if done, err := s.srv.ForwardRPC("Session.Renew", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
|
|||
|
||||
// Apply is used to apply multiple operations in a single, atomic transaction.
|
||||
func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error {
|
||||
if done, err := t.srv.forward("Txn.Apply", args, args, reply); done {
|
||||
if done, err := t.srv.ForwardRPC("Txn.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"txn", "apply"}, time.Now())
|
||||
|
@ -151,7 +151,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
|
|||
// supports staleness, so this should be preferred if you're just performing
|
||||
// reads.
|
||||
func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error {
|
||||
if done, err := t.srv.forward("Txn.Read", args, args, reply); done {
|
||||
if done, err := t.srv.ForwardRPC("Txn.Read", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"txn", "read"}, time.Now())
|
||||
|
|
|
@ -553,7 +553,7 @@ func (p *ConnPool) RPC(
|
|||
// secure or insecure variant depending on whether its an ongoing
|
||||
// or first time config request. For now though this is fine until
|
||||
// those ongoing requests are implemented.
|
||||
if method == "AutoEncrypt.Sign" || method == "Cluster.AutoConfig" {
|
||||
if method == "AutoEncrypt.Sign" || method == "AutoConfig.InitialConfiguration" {
|
||||
return p.rpcInsecure(dc, addr, method, args, reply)
|
||||
} else {
|
||||
return p.rpc(dc, nodeName, addr, method, args, reply)
|
||||
|
|
Loading…
Reference in New Issue