diff --git a/agent/agentpb/auto_config.pb.go b/agent/agentpb/auto_config.pb.go index 9508d8a4ce..1cd7ad9356 100644 --- a/agent/agentpb/auto_config.pb.go +++ b/agent/agentpb/auto_config.pb.go @@ -23,7 +23,7 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package // AutoConfigRequest is the data structure to be sent along with the -// Cluster.AutoConfig RPC +// AutoConfig.InitialConfiguration RPC type AutoConfigRequest struct { // Datacenter is the local datacenter name. This wont actually be set by clients // but rather will be set by the servers to allow for forwarding to @@ -113,7 +113,7 @@ func (m *AutoConfigRequest) GetConsulToken() string { return "" } -// AutoConfigResponse is the data structure sent in response to a Cluster.AutoConfig request +// AutoConfigResponse is the data structure sent in response to a AutoConfig.InitialConfiguration request type AutoConfigResponse struct { Config *config.Config `protobuf:"bytes,1,opt,name=Config,proto3" json:"Config,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` diff --git a/agent/agentpb/auto_config.proto b/agent/agentpb/auto_config.proto index 91aa7e2c52..deb24218b4 100644 --- a/agent/agentpb/auto_config.proto +++ b/agent/agentpb/auto_config.proto @@ -7,7 +7,7 @@ option go_package = "github.com/hashicorp/consul/agent/agentpb"; import "agent/agentpb/config/config.proto"; // AutoConfigRequest is the data structure to be sent along with the -// Cluster.AutoConfig RPC +// AutoConfig.InitialConfiguration RPC message AutoConfigRequest { // Datacenter is the local datacenter name. This wont actually be set by clients // but rather will be set by the servers to allow for forwarding to @@ -30,7 +30,7 @@ message AutoConfigRequest { string ConsulToken = 6; } -// AutoConfigResponse is the data structure sent in response to a Cluster.AutoConfig request +// AutoConfigResponse is the data structure sent in response to a AutoConfig.InitialConfiguration request message AutoConfigResponse { config.Config Config = 1; } \ No newline at end of file diff --git a/agent/auto-config/auto_config.go b/agent/auto-config/auto_config.go index 8ee879123a..1e2b7110ff 100644 --- a/agent/auto-config/auto_config.go +++ b/agent/auto-config/auto_config.go @@ -279,7 +279,7 @@ func (ac *AutoConfig) InitialConfiguration(ctx context.Context) (*config.Runtime } // introToken is responsible for determining the correct intro token to use -// when making the initial Cluster.AutoConfig RPC request. +// when making the initial AutoConfig.InitialConfiguration RPC request. func (ac *AutoConfig) introToken() (string, error) { conf := ac.config.AutoConfig // without an intro token or intro token file we cannot do anything @@ -431,7 +431,7 @@ func (ac *AutoConfig) recordAutoConfigReply(reply *agentpb.AutoConfigResponse) e } // getInitialConfigurationOnce will perform full server to TCPAddr resolution and -// loop through each host trying to make the Cluster.AutoConfig RPC call. When +// loop through each host trying to make the AutoConfig.InitialConfiguration RPC call. When // successful the bool return will be true and the err value will indicate whether we // successfully recorded the auto config settings (persisted to disk and stored internally // on the AutoConfig object) @@ -462,9 +462,9 @@ func (ac *AutoConfig) getInitialConfigurationOnce(ctx context.Context) (bool, er return false, ctx.Err() } - ac.logger.Debug("making Cluster.AutoConfig RPC", "addr", addr.String()) - if err = ac.directRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "Cluster.AutoConfig", &request, &reply); err != nil { - ac.logger.Error("AutoConfig RPC failed", "addr", addr.String(), "error", err) + ac.logger.Debug("making AutoConfig.InitialConfiguration RPC", "addr", addr.String()) + if err = ac.directRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoConfig.InitialConfiguration", &request, &reply); err != nil { + ac.logger.Error("AutoConfig.InitialConfiguration RPC failed", "addr", addr.String(), "error", err) continue } diff --git a/agent/auto-config/auto_config_test.go b/agent/auto-config/auto_config_test.go index fa15169432..7151409092 100644 --- a/agent/auto-config/auto_config_test.go +++ b/agent/auto-config/auto_config_test.go @@ -207,7 +207,7 @@ func TestInitialConfiguration_cancelled(t *testing.T) { JWT: "blarg", } - directRPC.On("RPC", "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300}, "Cluster.AutoConfig", &expectedRequest, mock.Anything).Return(fmt.Errorf("injected error")).Times(0) + directRPC.On("RPC", "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300}, "AutoConfig.InitialConfiguration", &expectedRequest, mock.Anything).Return(fmt.Errorf("injected error")).Times(0) ac, err := New(WithBuilderOpts(builderOpts), WithTLSConfigurator(&tlsutil.Configurator{}), WithDirectRPC(&directRPC)) require.NoError(t, err) require.NotNil(t, ac) @@ -289,7 +289,7 @@ func TestInitialConfiguration_success(t *testing.T) { "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300}, - "Cluster.AutoConfig", + "AutoConfig.InitialConfiguration", &expectedRequest, &agentpb.AutoConfigResponse{}).Return(populateResponse) @@ -344,7 +344,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(198, 18, 0, 1), Port: 8300}, - "Cluster.AutoConfig", + "AutoConfig.InitialConfiguration", &expectedRequest, &agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0) directRPC.On( @@ -352,7 +352,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(198, 18, 0, 2), Port: 8398}, - "Cluster.AutoConfig", + "AutoConfig.InitialConfiguration", &expectedRequest, &agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0) directRPC.On( @@ -360,7 +360,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(198, 18, 0, 3), Port: 8399}, - "Cluster.AutoConfig", + "AutoConfig.InitialConfiguration", &expectedRequest, &agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0) directRPC.On( @@ -368,7 +368,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1234}, - "Cluster.AutoConfig", + "AutoConfig.InitialConfiguration", &expectedRequest, &agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Once() directRPC.On( @@ -376,7 +376,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1234}, - "Cluster.AutoConfig", + "AutoConfig.InitialConfiguration", &expectedRequest, &agentpb.AutoConfigResponse{}).Return(populateResponse) diff --git a/agent/consul/acl_endpoint.go b/agent/consul/acl_endpoint.go index f4d29ab9f7..6d61d4f857 100644 --- a/agent/consul/acl_endpoint.go +++ b/agent/consul/acl_endpoint.go @@ -115,7 +115,7 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC if err := a.aclPreCheck(); err != nil { return err } - if done, err := a.srv.forward("ACL.BootstrapTokens", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BootstrapTokens", args, args, reply); done { return err } @@ -214,7 +214,7 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenRead", args, args, reply); done { return err } @@ -283,7 +283,7 @@ func (a *ACL) TokenClone(args *structs.ACLTokenSetRequest, reply *structs.ACLTok args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenClone", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenClone", args, args, reply); done { return err } @@ -354,7 +354,7 @@ func (a *ACL) TokenSet(args *structs.ACLTokenSetRequest, reply *structs.ACLToken return fmt.Errorf("Local tokens are disabled") } - if done, err := a.srv.forward("ACL.TokenSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenSet", args, args, reply); done { return err } @@ -764,7 +764,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenDelete", args, args, reply); done { return err } @@ -854,7 +854,7 @@ func (a *ACL) TokenList(args *structs.ACLTokenListRequest, reply *structs.ACLTok args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenList", args, args, reply); done { return err } @@ -917,7 +917,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchGetRequest, reply *struc args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenBatchRead", args, args, reply); done { return err } @@ -971,7 +971,7 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo return err } - if done, err := a.srv.forward("ACL.PolicyRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyRead", args, args, reply); done { return err } @@ -1009,7 +1009,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchGetRequest, reply *str return err } - if done, err := a.srv.forward("ACL.PolicyBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyBatchRead", args, args, reply); done { return err } @@ -1047,7 +1047,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.PolicySet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicySet", args, args, reply); done { return err } @@ -1182,7 +1182,7 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string) args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.PolicyDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyDelete", args, args, reply); done { return err } @@ -1241,7 +1241,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP return err } - if done, err := a.srv.forward("ACL.PolicyList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyList", args, args, reply); done { return err } @@ -1281,7 +1281,7 @@ func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchGetRequest, reply *struc return err } - if done, err := a.srv.forward("ACL.PolicyResolve", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyResolve", args, args, reply); done { return err } @@ -1339,7 +1339,7 @@ func makeACLETag(parent string, policy *acl.Policy) string { // GetPolicy is used to retrieve a compiled policy object with a TTL. Does not // support a blocking query. func (a *ACL) GetPolicy(args *structs.ACLPolicyResolveLegacyRequest, reply *structs.ACLPolicyResolveLegacyResponse) error { - if done, err := a.srv.forward("ACL.GetPolicy", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.GetPolicy", args, args, reply); done { return err } @@ -1386,7 +1386,7 @@ func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest, // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := a.srv.forward("ACL.ReplicationStatus", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.ReplicationStatus", args, args, reply); done { return err } @@ -1414,7 +1414,7 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe return err } - if done, err := a.srv.forward("ACL.RoleRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleRead", args, args, reply); done { return err } @@ -1453,7 +1453,7 @@ func (a *ACL) RoleBatchRead(args *structs.ACLRoleBatchGetRequest, reply *structs return err } - if done, err := a.srv.forward("ACL.RoleBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleBatchRead", args, args, reply); done { return err } @@ -1491,7 +1491,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.RoleSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleSet", args, args, reply); done { return err } @@ -1653,7 +1653,7 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.RoleDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleDelete", args, args, reply); done { return err } @@ -1708,7 +1708,7 @@ func (a *ACL) RoleList(args *structs.ACLRoleListRequest, reply *structs.ACLRoleL return err } - if done, err := a.srv.forward("ACL.RoleList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleList", args, args, reply); done { return err } @@ -1742,7 +1742,7 @@ func (a *ACL) RoleResolve(args *structs.ACLRoleBatchGetRequest, reply *structs.A return err } - if done, err := a.srv.forward("ACL.RoleResolve", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleResolve", args, args, reply); done { return err } @@ -1807,7 +1807,7 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.BindingRuleRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleRead", args, args, reply); done { return err } @@ -1846,7 +1846,7 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.BindingRuleSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleSet", args, args, reply); done { return err } @@ -1978,7 +1978,7 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.BindingRuleDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleDelete", args, args, reply); done { return err } @@ -2033,7 +2033,7 @@ func (a *ACL) BindingRuleList(args *structs.ACLBindingRuleListRequest, reply *st return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.BindingRuleList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleList", args, args, reply); done { return err } @@ -2073,7 +2073,7 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.AuthMethodRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodRead", args, args, reply); done { return err } @@ -2115,7 +2115,7 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.AuthMethodSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodSet", args, args, reply); done { return err } @@ -2231,7 +2231,7 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply * return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.AuthMethodDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodDelete", args, args, reply); done { return err } @@ -2291,7 +2291,7 @@ func (a *ACL) AuthMethodList(args *structs.ACLAuthMethodListRequest, reply *stru return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.AuthMethodList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodList", args, args, reply); done { return err } @@ -2345,7 +2345,7 @@ func (a *ACL) Login(args *structs.ACLLoginRequest, reply *structs.ACLToken) erro return errors.New("do not provide a token when logging in") } - if done, err := a.srv.forward("ACL.Login", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Login", args, args, reply); done { return err } @@ -2490,7 +2490,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error { return acl.ErrNotFound } - if done, err := a.srv.forward("ACL.Logout", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Logout", args, args, reply); done { return err } @@ -2543,7 +2543,7 @@ func (a *ACL) Authorize(args *structs.RemoteACLAuthorizationRequest, reply *[]st return err } - if done, err := a.srv.forward("ACL.Authorize", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Authorize", args, args, reply); done { return err } diff --git a/agent/consul/acl_endpoint_legacy.go b/agent/consul/acl_endpoint_legacy.go index 890699b639..2d771cf731 100644 --- a/agent/consul/acl_endpoint_legacy.go +++ b/agent/consul/acl_endpoint_legacy.go @@ -15,7 +15,7 @@ import ( // Bootstrap is used to perform a one-time ACL bootstrap operation on // a cluster to get the first management token. func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) error { - if done, err := a.srv.forward("ACL.Bootstrap", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Bootstrap", args, args, reply); done { return err } @@ -153,7 +153,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro // Apply is used to apply a modifying request to the data store. This should // only be used for operations that modify the data func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { - if done, err := a.srv.forward("ACL.Apply", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now()) @@ -199,7 +199,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { // Get is used to retrieve a single ACL func (a *ACL) Get(args *structs.ACLSpecificRequest, reply *structs.IndexedACLs) error { - if done, err := a.srv.forward("ACL.Get", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Get", args, args, reply); done { return err } @@ -245,7 +245,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, // List is used to list all the ACLs func (a *ACL) List(args *structs.DCSpecificRequest, reply *structs.IndexedACLs) error { - if done, err := a.srv.forward("ACL.List", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.List", args, args, reply); done { return err } diff --git a/agent/consul/cluster_endpoint.go b/agent/consul/auto_config_endpoint.go similarity index 62% rename from agent/consul/cluster_endpoint.go rename to agent/consul/auto_config_endpoint.go index d24bf8c2c1..f55adbcf41 100644 --- a/agent/consul/cluster_endpoint.go +++ b/agent/consul/auto_config_endpoint.go @@ -4,16 +4,12 @@ import ( "context" "encoding/base64" "fmt" - "net" - "time" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/agentpb" "github.com/hashicorp/consul/agent/agentpb/config" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" - "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/template" "github.com/hashicorp/consul/tlsutil" bexpr "github.com/hashicorp/go-bexpr" @@ -55,7 +51,6 @@ func (a *jwtAuthorizer) Authorize(req *agentpb.AutoConfigRequest) (AutoConfigOpt "segment": req.Segment, } - // TODO (autoconf) check for JWT reuse if configured to do so. for _, raw := range a.claimAssertions { // validate and fill any HIL filled, err := template.InterpolateHIL(raw, varMap, true) @@ -84,87 +79,81 @@ func (a *jwtAuthorizer) Authorize(req *agentpb.AutoConfigRequest) (AutoConfigOpt }, nil } -// Cluster endpoint is used for cluster configuration operations -type Cluster struct { - srv *Server +type AutoConfigBackend interface { + CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) + DatacenterJoinAddresses(segment string) ([]string, error) + ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error) +} +// AutoConfig endpoint is used for cluster auto configuration operations +type AutoConfig struct { + // currently AutoConfig does not support pushing down any configuration that would be reloadable on the servers + // (outside of some TLS settings such as the configured CA certs which are retrieved via the TLS configurator) + // If that changes then we will need to change this to use an atomic.Value and provide means of reloading it. + config *Config + tlsConfigurator *tlsutil.Configurator + + backend AutoConfigBackend authorizer AutoConfigAuthorizer } +func NewAutoConfig(conf *Config, tlsConfigurator *tlsutil.Configurator, backend AutoConfigBackend, authz AutoConfigAuthorizer) *AutoConfig { + if conf == nil { + conf = DefaultConfig() + } + + return &AutoConfig{ + config: conf, + tlsConfigurator: tlsConfigurator, + backend: backend, + authorizer: authz, + } +} + // updateTLSCertificatesInConfig will ensure that the TLS settings regarding how an agent is // made aware of its certificates are populated. This will only work if connect is enabled and // in some cases only if auto_encrypt is enabled on the servers. This endpoint has the option // to configure auto_encrypt or potentially in the future to generate the certificates inline. -func (c *Cluster) updateTLSCertificatesInConfig(opts AutoConfigOptions, conf *config.Config) error { - if c.srv.config.AutoEncryptAllowTLS { - conf.AutoEncrypt = &config.AutoEncrypt{TLS: true} - } else { - conf.AutoEncrypt = &config.AutoEncrypt{TLS: false} - } - +func (ac *AutoConfig) updateTLSCertificatesInConfig(opts AutoConfigOptions, conf *config.Config) error { + conf.AutoEncrypt = &config.AutoEncrypt{TLS: ac.config.AutoEncryptAllowTLS} return nil } // updateACLtokensInConfig will configure all of the agents ACL settings and will populate // the configuration with an agent token usable for all default agent operations. -func (c *Cluster) updateACLsInConfig(opts AutoConfigOptions, conf *config.Config) error { +func (ac *AutoConfig) updateACLsInConfig(opts AutoConfigOptions, conf *config.Config) error { acl := &config.ACL{ - Enabled: c.srv.config.ACLsEnabled, - PolicyTTL: c.srv.config.ACLPolicyTTL.String(), - RoleTTL: c.srv.config.ACLRoleTTL.String(), - TokenTTL: c.srv.config.ACLTokenTTL.String(), - DisabledTTL: c.srv.config.ACLDisabledTTL.String(), - DownPolicy: c.srv.config.ACLDownPolicy, - DefaultPolicy: c.srv.config.ACLDefaultPolicy, - EnableKeyListPolicy: c.srv.config.ACLEnableKeyListPolicy, + Enabled: ac.config.ACLsEnabled, + PolicyTTL: ac.config.ACLPolicyTTL.String(), + RoleTTL: ac.config.ACLRoleTTL.String(), + TokenTTL: ac.config.ACLTokenTTL.String(), + DisabledTTL: ac.config.ACLDisabledTTL.String(), + DownPolicy: ac.config.ACLDownPolicy, + DefaultPolicy: ac.config.ACLDefaultPolicy, + EnableKeyListPolicy: ac.config.ACLEnableKeyListPolicy, } // when ACLs are enabled we want to create a local token with a node identity - if c.srv.config.ACLsEnabled { - // we have to require local tokens or else it would require having these servers use a token with acl:write to make a - // token create RPC to the servers in the primary DC. - if !c.srv.LocalTokensEnabled() { - return fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", c.srv.config.Datacenter) - } - - // generate the accessor id - accessor, err := lib.GenerateUUID(c.srv.checkTokenUUID) - if err != nil { - return err - } - // generate the secret id - secret, err := lib.GenerateUUID(c.srv.checkTokenUUID) - if err != nil { - return err - } - - // set up the token - token := structs.ACLToken{ - AccessorID: accessor, - SecretID: secret, + if ac.config.ACLsEnabled { + // set up the token template - the ids and create + template := structs.ACLToken{ Description: fmt.Sprintf("Auto Config Token for Node %q", opts.NodeName), - CreateTime: time.Now(), Local: true, NodeIdentities: []*structs.ACLNodeIdentity{ { NodeName: opts.NodeName, - Datacenter: c.srv.config.Datacenter, + Datacenter: ac.config.Datacenter, }, }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - req := structs.ACLTokenBatchSetRequest{ - Tokens: structs.ACLTokens{&token}, - CAS: false, + token, err := ac.backend.CreateACLToken(&template) + if err != nil { + return fmt.Errorf("Failed to generate an ACL token for node %q - %w", opts.NodeName, err) } - // perform the request to mint the new token - if _, err := c.srv.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil { - return err - } - - acl.Tokens = &config.ACLTokens{Agent: secret} + acl.Tokens = &config.ACLTokens{Agent: token.SecretID} } conf.ACL = acl @@ -173,20 +162,12 @@ func (c *Cluster) updateACLsInConfig(opts AutoConfigOptions, conf *config.Config // updateJoinAddressesInConfig determines the correct gossip endpoints that clients should // be connecting to for joining the cluster based on the segment given in the opts parameter. -func (c *Cluster) updateJoinAddressesInConfig(opts AutoConfigOptions, conf *config.Config) error { - members, err := c.srv.LANSegmentMembers(opts.SegmentName) +func (ac *AutoConfig) updateJoinAddressesInConfig(opts AutoConfigOptions, conf *config.Config) error { + joinAddrs, err := ac.backend.DatacenterJoinAddresses(opts.SegmentName) if err != nil { return err } - var joinAddrs []string - for _, m := range members { - if ok, _ := metadata.IsConsulServer(m); ok { - serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)} - joinAddrs = append(joinAddrs, serfAddr.String()) - } - } - if conf.Gossip == nil { conf.Gossip = &config.Gossip{} } @@ -196,9 +177,9 @@ func (c *Cluster) updateJoinAddressesInConfig(opts AutoConfigOptions, conf *conf } // updateGossipEncryptionInConfig will populate the gossip encryption configuration settings -func (c *Cluster) updateGossipEncryptionInConfig(_ AutoConfigOptions, conf *config.Config) error { +func (ac *AutoConfig) updateGossipEncryptionInConfig(_ AutoConfigOptions, conf *config.Config) error { // Add gossip encryption settings if there is any key loaded - memberlistConfig := c.srv.config.SerfLANConfig.MemberlistConfig + memberlistConfig := ac.config.SerfLANConfig.MemberlistConfig if lanKeyring := memberlistConfig.Keyring; lanKeyring != nil { if conf.Gossip == nil { conf.Gossip = &config.Gossip{} @@ -221,13 +202,19 @@ func (c *Cluster) updateGossipEncryptionInConfig(_ AutoConfigOptions, conf *conf // updateTLSSettingsInConfig will populate the TLS configuration settings but will not // populate leaf or ca certficiates. -func (c *Cluster) updateTLSSettingsInConfig(_ AutoConfigOptions, conf *config.Config) error { +func (ac *AutoConfig) updateTLSSettingsInConfig(_ AutoConfigOptions, conf *config.Config) error { + if ac.tlsConfigurator == nil { + // TLS is not enabled? + return nil + } + // add in TLS configuration if conf.TLS == nil { conf.TLS = &config.TLS{} } - conf.TLS.VerifyServerHostname = c.srv.tlsConfigurator.VerifyServerHostname() - base := c.srv.tlsConfigurator.Base() + + conf.TLS.VerifyServerHostname = ac.tlsConfigurator.VerifyServerHostname() + base := ac.tlsConfigurator.Base() conf.TLS.VerifyOutgoing = base.VerifyOutgoing conf.TLS.MinVersion = base.TLSMinVersion conf.TLS.PreferServerCipherSuites = base.PreferServerCipherSuites @@ -239,61 +226,66 @@ func (c *Cluster) updateTLSSettingsInConfig(_ AutoConfigOptions, conf *config.Co // baseConfig will populate the configuration with some base settings such as the // datacenter names, node name etc. -func (c *Cluster) baseConfig(opts AutoConfigOptions, conf *config.Config) error { +func (ac *AutoConfig) baseConfig(opts AutoConfigOptions, conf *config.Config) error { if opts.NodeName == "" { return fmt.Errorf("Cannot generate auto config response without a node name") } - conf.Datacenter = c.srv.config.Datacenter - conf.PrimaryDatacenter = c.srv.config.PrimaryDatacenter + conf.Datacenter = ac.config.Datacenter + conf.PrimaryDatacenter = ac.config.PrimaryDatacenter conf.NodeName = opts.NodeName conf.SegmentName = opts.SegmentName return nil } -type autoConfigUpdater func(c *Cluster, opts AutoConfigOptions, conf *config.Config) error +type autoConfigUpdater func(c *AutoConfig, opts AutoConfigOptions, conf *config.Config) error var ( // variable holding the list of config updating functions to execute when generating // the auto config response. This will allow for more easily adding extra self-contained // configurators here in the future. autoConfigUpdaters []autoConfigUpdater = []autoConfigUpdater{ - (*Cluster).baseConfig, - (*Cluster).updateJoinAddressesInConfig, - (*Cluster).updateGossipEncryptionInConfig, - (*Cluster).updateTLSSettingsInConfig, - (*Cluster).updateACLsInConfig, - (*Cluster).updateTLSCertificatesInConfig, + (*AutoConfig).baseConfig, + (*AutoConfig).updateJoinAddressesInConfig, + (*AutoConfig).updateGossipEncryptionInConfig, + (*AutoConfig).updateTLSSettingsInConfig, + (*AutoConfig).updateACLsInConfig, + (*AutoConfig).updateTLSCertificatesInConfig, } ) // AgentAutoConfig will authorize the incoming request and then generate the configuration // to push down to the client -func (c *Cluster) AutoConfig(req *agentpb.AutoConfigRequest, resp *agentpb.AutoConfigResponse) error { +func (ac *AutoConfig) InitialConfiguration(req *agentpb.AutoConfigRequest, resp *agentpb.AutoConfigResponse) error { // default the datacenter to our datacenter - agents do not have to specify this as they may not // yet know the datacenter name they are going to be in. if req.Datacenter == "" { - req.Datacenter = c.srv.config.Datacenter + req.Datacenter = ac.config.Datacenter } // TODO (autoconf) Is performing auto configuration over the WAN really a bad idea? - if req.Datacenter != c.srv.config.Datacenter { + if req.Datacenter != ac.config.Datacenter { return fmt.Errorf("invalid datacenter %q - agent auto configuration cannot target a remote datacenter", req.Datacenter) } + // TODO (autoconf) maybe panic instead? + if ac.backend == nil { + return fmt.Errorf("No Auto Config backend is configured") + } + // forward to the leader - if done, err := c.srv.forward("Cluster.AutoConfig", req, req, resp); done { + if done, err := ac.backend.ForwardRPC("AutoConfig.InitialConfiguration", req, req, resp); done { return err } // TODO (autoconf) maybe panic instead? - if c.authorizer == nil { + if ac.authorizer == nil { return fmt.Errorf("No Auto Config authorizer is configured") } // authorize the request with the configured authorizer - opts, err := c.authorizer.Authorize(req) + opts, err := ac.authorizer.Authorize(req) if err != nil { return err } @@ -302,7 +294,7 @@ func (c *Cluster) AutoConfig(req *agentpb.AutoConfigRequest, resp *agentpb.AutoC // update all the configurations for _, configFn := range autoConfigUpdaters { - if err := configFn(c, opts, conf); err != nil { + if err := configFn(ac, opts, conf); err != nil { return err } } diff --git a/agent/consul/cluster_endpoint_test.go b/agent/consul/auto_config_endpoint_test.go similarity index 75% rename from agent/consul/cluster_endpoint_test.go rename to agent/consul/auto_config_endpoint_test.go index 0e21a9b8ef..2bdd4b5bdd 100644 --- a/agent/consul/cluster_endpoint_test.go +++ b/agent/consul/auto_config_endpoint_test.go @@ -19,11 +19,35 @@ import ( "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/memberlist" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "gopkg.in/square/go-jose.v2/jwt" ) +type mockAutoConfigBackend struct { + mock.Mock +} + +func (m *mockAutoConfigBackend) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) { + ret := m.Called(template) + // this handles converting an untyped nil to a typed nil + token, _ := ret.Get(0).(*structs.ACLToken) + return token, ret.Error(1) +} + +func (m *mockAutoConfigBackend) DatacenterJoinAddresses(segment string) ([]string, error) { + ret := m.Called(segment) + // this handles converting an untyped nil to a typed nil + addrs, _ := ret.Get(0).([]string) + return addrs, ret.Error(1) +} + +func (m *mockAutoConfigBackend) ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error) { + ret := m.Called(method, info, args, reply) + return ret.Bool(0), ret.Error(1) +} + func testJWTStandardClaims() jwt.Claims { now := time.Now() @@ -48,13 +72,13 @@ func signJWTWithStandardClaims(t *testing.T, privKey string, claims interface{}) return signJWT(t, privKey, testJWTStandardClaims(), claims) } -// TestClusterAutoConfig is really an integration test of all the moving parts of the Cluster.AutoConfig RPC. +// TestAutoConfigInitialConfiguration is really an integration test of all the moving parts of the AutoConfig.InitialConfiguration RPC. // Full testing of the individual parts will not be done in this test: // // * Any implementations of the AutoConfigAuthorizer interface (although these test do use the jwtAuthorizer) -// * Each of the individual config generation functions. These can be unit tested separately and many wont -// require a running test server. -func TestClusterAutoConfig(t *testing.T) { +// * Each of the individual config generation functions. These can be unit tested separately and should NOT +// require running test servers +func TestAutoConfigInitialConfiguration(t *testing.T) { type testCase struct { request agentpb.AutoConfigRequest expected agentpb.AutoConfigResponse @@ -227,7 +251,7 @@ func TestClusterAutoConfig(t *testing.T) { for testName, tcase := range cases { t.Run(testName, func(t *testing.T) { var reply agentpb.AutoConfigResponse - err := msgpackrpc.CallWithCodec(codec, "Cluster.AutoConfig", &tcase.request, &reply) + err := msgpackrpc.CallWithCodec(codec, "AutoConfig.InitialConfiguration", &tcase.request, &reply) if tcase.err != "" { testutil.RequireErrorContains(t, err, tcase.err) } else { @@ -241,7 +265,7 @@ func TestClusterAutoConfig(t *testing.T) { } } -func TestClusterAutoConfig_baseConfig(t *testing.T) { +func TestAutoConfig_baseConfig(t *testing.T) { type testCase struct { serverConfig Config opts AutoConfigOptions @@ -277,14 +301,12 @@ func TestClusterAutoConfig_baseConfig(t *testing.T) { for name, tcase := range cases { t.Run(name, func(t *testing.T) { - cluster := Cluster{ - srv: &Server{ - config: &tcase.serverConfig, - }, + ac := AutoConfig{ + config: &tcase.serverConfig, } var actual config.Config - err := cluster.baseConfig(tcase.opts, &actual) + err := ac.baseConfig(tcase.opts, &actual) if tcase.err == "" { require.NoError(t, err) require.Equal(t, tcase.expected, actual) @@ -295,7 +317,7 @@ func TestClusterAutoConfig_baseConfig(t *testing.T) { } } -func TestClusterAutoConfig_updateTLSSettingsInConfig(t *testing.T) { +func TestAutoConfig_updateTLSSettingsInConfig(t *testing.T) { _, _, cacert, err := testTLSCertificates("server.dc1.consul") require.NoError(t, err) @@ -365,14 +387,12 @@ func TestClusterAutoConfig_updateTLSSettingsInConfig(t *testing.T) { configurator, err := tlsutil.NewConfigurator(tcase.tlsConfig, logger) require.NoError(t, err) - cluster := &Cluster{ - srv: &Server{ - tlsConfigurator: configurator, - }, + ac := &AutoConfig{ + tlsConfigurator: configurator, } var actual config.Config - err = cluster.updateTLSSettingsInConfig(AutoConfigOptions{}, &actual) + err = ac.updateTLSSettingsInConfig(AutoConfigOptions{}, &actual) require.NoError(t, err) require.Equal(t, tcase.expected, actual) }) @@ -436,16 +456,15 @@ func TestAutoConfig_updateGossipEncryptionInConfig(t *testing.T) { for name, tcase := range cases { t.Run(name, func(t *testing.T) { - cluster := Cluster{ - srv: &Server{ - config: DefaultConfig(), - }, + cfg := DefaultConfig() + cfg.SerfLANConfig.MemberlistConfig = &tcase.conf + + ac := AutoConfig{ + config: cfg, } - cluster.srv.config.SerfLANConfig.MemberlistConfig = &tcase.conf - var actual config.Config - err := cluster.updateGossipEncryptionInConfig(AutoConfigOptions{}, &actual) + err := ac.updateGossipEncryptionInConfig(AutoConfigOptions{}, &actual) require.NoError(t, err) require.Equal(t, tcase.expected, actual) }) @@ -481,14 +500,12 @@ func TestAutoConfig_updateTLSCertificatesInConfig(t *testing.T) { for name, tcase := range cases { t.Run(name, func(t *testing.T) { - cluster := Cluster{ - srv: &Server{ - config: &tcase.serverConfig, - }, + ac := AutoConfig{ + config: &tcase.serverConfig, } var actual config.Config - err := cluster.updateTLSCertificatesInConfig(AutoConfigOptions{}, &actual) + err := ac.updateTLSCertificatesInConfig(AutoConfigOptions{}, &actual) require.NoError(t, err) require.Equal(t, tcase.expected, actual) }) @@ -497,24 +514,34 @@ func TestAutoConfig_updateTLSCertificatesInConfig(t *testing.T) { func TestAutoConfig_updateACLsInConfig(t *testing.T) { type testCase struct { - patch func(c *Config) - expected config.Config - verify func(t *testing.T, c *config.Config) - err string + config Config + expected config.Config + expectACLToken bool + err error } + const ( + tokenAccessor = "b98761aa-c0ee-445b-9b0c-f54b56b47778" + tokenSecret = "1c96448a-ab04-4caa-982a-e8b095a111e2" + ) + + testDC := "dc1" + cases := map[string]testCase{ "enabled": { - patch: func(c *Config) { - c.ACLsEnabled = true - c.ACLPolicyTTL = 7 * time.Second - c.ACLRoleTTL = 10 * time.Second - c.ACLTokenTTL = 12 * time.Second - c.ACLDisabledTTL = 31 * time.Second - c.ACLDefaultPolicy = "allow" - c.ACLDownPolicy = "deny" - c.ACLEnableKeyListPolicy = true + config: Config{ + Datacenter: testDC, + PrimaryDatacenter: testDC, + ACLsEnabled: true, + ACLPolicyTTL: 7 * time.Second, + ACLRoleTTL: 10 * time.Second, + ACLTokenTTL: 12 * time.Second, + ACLDisabledTTL: 31 * time.Second, + ACLDefaultPolicy: "allow", + ACLDownPolicy: "deny", + ACLEnableKeyListPolicy: true, }, + expectACLToken: true, expected: config.Config{ ACL: &config.ACL{ Enabled: true, @@ -525,32 +552,26 @@ func TestAutoConfig_updateACLsInConfig(t *testing.T) { DownPolicy: "deny", DefaultPolicy: "allow", EnableKeyListPolicy: true, - Tokens: &config.ACLTokens{Agent: "verified"}, + Tokens: &config.ACLTokens{ + Agent: tokenSecret, + }, }, }, - verify: func(t *testing.T, c *config.Config) { - t.Helper() - // the agent token secret is non-deterministically generated - // So we want to validate that one was set and overwrite with - // a value that the expected configurate wants. - require.NotNil(t, c) - require.NotNil(t, c.ACL) - require.NotNil(t, c.ACL.Tokens) - require.NotEmpty(t, c.ACL.Tokens.Agent) - c.ACL.Tokens.Agent = "verified" - }, }, "disabled": { - patch: func(c *Config) { - c.ACLsEnabled = false - c.ACLPolicyTTL = 7 * time.Second - c.ACLRoleTTL = 10 * time.Second - c.ACLTokenTTL = 12 * time.Second - c.ACLDisabledTTL = 31 * time.Second - c.ACLDefaultPolicy = "allow" - c.ACLDownPolicy = "deny" - c.ACLEnableKeyListPolicy = true + config: Config{ + Datacenter: testDC, + PrimaryDatacenter: testDC, + ACLsEnabled: false, + ACLPolicyTTL: 7 * time.Second, + ACLRoleTTL: 10 * time.Second, + ACLTokenTTL: 12 * time.Second, + ACLDisabledTTL: 31 * time.Second, + ACLDefaultPolicy: "allow", + ACLDownPolicy: "deny", + ACLEnableKeyListPolicy: true, }, + expectACLToken: false, expected: config.Config{ ACL: &config.ACL{ Enabled: false, @@ -565,53 +586,77 @@ func TestAutoConfig_updateACLsInConfig(t *testing.T) { }, }, "local-tokens-disabled": { - patch: func(c *Config) { - c.PrimaryDatacenter = "somewhere else" + config: Config{ + Datacenter: testDC, + PrimaryDatacenter: "somewhere-else", + ACLsEnabled: true, }, - err: "Agent Auto Configuration requires local token usage to be enabled in this datacenter", + expectACLToken: true, + err: fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter"), }, } for name, tcase := range cases { t.Run(name, func(t *testing.T) { - _, s, _ := testACLServerWithConfig(t, tcase.patch, false) + backend := &mockAutoConfigBackend{} + expectedTemplate := &structs.ACLToken{ + Description: `Auto Config Token for Node "something"`, + Local: true, + NodeIdentities: []*structs.ACLNodeIdentity{ + { + NodeName: "something", + Datacenter: testDC, + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } - waitForLeaderEstablishment(t, s) + testToken := &structs.ACLToken{ + AccessorID: tokenAccessor, + SecretID: tokenSecret, + Description: `Auto Config Token for Node "something"`, + Local: true, + NodeIdentities: []*structs.ACLNodeIdentity{ + { + NodeName: "something", + Datacenter: testDC, + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } - cluster := Cluster{srv: s} + if tcase.expectACLToken { + backend.On("CreateACLToken", expectedTemplate).Return(testToken, tcase.err).Once() + } + + ac := AutoConfig{config: &tcase.config, backend: backend} var actual config.Config - err := cluster.updateACLsInConfig(AutoConfigOptions{NodeName: "something"}, &actual) - if tcase.err != "" { - testutil.RequireErrorContains(t, err, tcase.err) + err := ac.updateACLsInConfig(AutoConfigOptions{NodeName: "something"}, &actual) + if tcase.err != nil { + testutil.RequireErrorContains(t, err, tcase.err.Error()) } else { require.NoError(t, err) - if tcase.verify != nil { - tcase.verify(t, &actual) - } require.Equal(t, tcase.expected, actual) } + + backend.AssertExpectations(t) }) } } func TestAutoConfig_updateJoinAddressesInConfig(t *testing.T) { - conf := testClusterConfig{ - Datacenter: "primary", - Servers: 3, - } + addrs := []string{"198.18.0.7:8300", "198.18.0.1:8300"} + backend := &mockAutoConfigBackend{} + backend.On("DatacenterJoinAddresses", "").Return(addrs, nil).Once() - nodes := newTestCluster(t, &conf) - - cluster := Cluster{srv: nodes.Servers[0]} + ac := AutoConfig{backend: backend} var actual config.Config - err := cluster.updateJoinAddressesInConfig(AutoConfigOptions{}, &actual) + err := ac.updateJoinAddressesInConfig(AutoConfigOptions{}, &actual) require.NoError(t, err) - var expected []string - for _, srv := range nodes.Servers { - expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort)) - } require.NotNil(t, actual.Gossip) - require.ElementsMatch(t, expected, actual.Gossip.RetryJoinLAN) + require.ElementsMatch(t, addrs, actual.Gossip.RetryJoinLAN) + + backend.AssertExpectations(t) } diff --git a/agent/consul/auto_encrypt_endpoint.go b/agent/consul/auto_encrypt_endpoint.go index a8267da076..78a100acc2 100644 --- a/agent/consul/auto_encrypt_endpoint.go +++ b/agent/consul/auto_encrypt_endpoint.go @@ -24,7 +24,7 @@ func (a *AutoEncrypt) Sign( if !a.srv.config.AutoEncryptAllowTLS { return ErrAutoEncryptAllowTLSNotEnabled } - if done, err := a.srv.forward("AutoEncrypt.Sign", args, args, reply); done { + if done, err := a.srv.ForwardRPC("AutoEncrypt.Sign", args, args, reply); done { return err } diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index b4f7ce2790..301e00974c 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -94,7 +94,7 @@ func checkPreApply(check *structs.HealthCheck) { // Register is used register that a node is providing a given service. func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Register", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.Register", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now()) @@ -175,7 +175,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error // Deregister is used to remove a service registration for a given node. func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Deregister", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"catalog", "deregister"}, time.Now()) @@ -244,7 +244,7 @@ func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]str // ListNodes is used to query the nodes in a DC func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error { - if done, err := c.srv.forward("Catalog.ListNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ListNodes", args, args, reply); done { return err } @@ -292,7 +292,7 @@ func isUnmodified(opts structs.QueryOptions, index uint64) bool { // ListServices is used to query the services in a DC func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { - if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, args, reply); done { return err } @@ -331,7 +331,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I } func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error { - if done, err := c.srv.forward("Catalog.ServiceList", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ServiceList", args, args, reply); done { return err } @@ -360,7 +360,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In // ServiceNodes returns all the nodes registered as part of a service func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error { - if done, err := c.srv.forward("Catalog.ServiceNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, args, reply); done { return err } @@ -498,7 +498,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // NodeServices returns all the services registered as part of a node func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error { - if done, err := c.srv.forward("Catalog.NodeServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.NodeServices", args, args, reply); done { return err } @@ -549,7 +549,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs } func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServiceList) error { - if done, err := c.srv.forward("Catalog.NodeServiceList", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.NodeServiceList", args, args, reply); done { return err } @@ -602,7 +602,7 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru } func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error { - if done, err := c.srv.forward("Catalog.GatewayServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.GatewayServices", args, args, reply); done { return err } diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index f3c3177ba3..996f3d23b2 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -27,7 +27,7 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now()) @@ -73,7 +73,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE return err } - if done, err := c.srv.forward("ConfigEntry.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Get", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now()) @@ -120,7 +120,7 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe return err } - if done, err := c.srv.forward("ConfigEntry.List", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.List", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now()) @@ -165,7 +165,7 @@ func (c *ConfigEntry) ListAll(args *structs.DCSpecificRequest, reply *structs.In return err } - if done, err := c.srv.forward("ConfigEntry.ListAll", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.ListAll", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now()) @@ -209,7 +209,7 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Delete", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now()) @@ -245,7 +245,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return err } - if done, err := c.srv.forward("ConfigEntry.ResolveServiceConfig", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.ResolveServiceConfig", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now()) diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index 995cdd3f3d..9691d203a2 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -63,7 +63,7 @@ func (s *ConnectCA) ConfigurationGet( return ErrConnectNotEnabled } - if done, err := s.srv.forward("ConnectCA.ConfigurationGet", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationGet", args, args, reply); done { return err } @@ -95,7 +95,7 @@ func (s *ConnectCA) ConfigurationSet( return ErrConnectNotEnabled } - if done, err := s.srv.forward("ConnectCA.ConfigurationSet", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationSet", args, args, reply); done { return err } @@ -312,7 +312,7 @@ func (s *ConnectCA) Roots( args *structs.DCSpecificRequest, reply *structs.IndexedCARoots) error { // Forward if necessary - if done, err := s.srv.forward("ConnectCA.Roots", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.Roots", args, args, reply); done { return err } @@ -387,7 +387,7 @@ func (s *ConnectCA) Sign( return ErrConnectNotEnabled } - if done, err := s.srv.forward("ConnectCA.Sign", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.Sign", args, args, reply); done { return err } @@ -593,7 +593,7 @@ func (s *ConnectCA) SignIntermediate( return ErrConnectNotEnabled } - if done, err := s.srv.forward("ConnectCA.SignIntermediate", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.SignIntermediate", args, args, reply); done { return err } diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index d31136ebc8..e4a3ac4f79 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -117,7 +117,7 @@ func (c *Coordinate) batchApplyUpdates() error { // Update inserts or updates the LAN coordinate of a node. func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) { - if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.Update", args, args, reply); done { return err } @@ -184,7 +184,7 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter // ListNodes returns the list of nodes with their raw network coordinates (if no // coordinates are available for a node it won't appear in this list). func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error { - if done, err := c.srv.forward("Coordinate.ListNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, args, reply); done { return err } @@ -207,7 +207,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I // Node returns the raw coordinates for a single node. func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error { - if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.Node", args, args, reply); done { return err } diff --git a/agent/consul/discovery_chain_endpoint.go b/agent/consul/discovery_chain_endpoint.go index a4a21ec053..a9933fa453 100644 --- a/agent/consul/discovery_chain_endpoint.go +++ b/agent/consul/discovery_chain_endpoint.go @@ -24,7 +24,7 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs return ErrConnectNotEnabled } - if done, err := c.srv.forward("DiscoveryChain.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("DiscoveryChain.Get", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"discovery_chain", "get"}, time.Now()) diff --git a/agent/consul/federation_state_endpoint.go b/agent/consul/federation_state_endpoint.go index cd914e9ba8..a98ab83e8f 100644 --- a/agent/consul/federation_state_endpoint.go +++ b/agent/consul/federation_state_endpoint.go @@ -27,7 +27,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.forward("FederationState.Apply", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.Apply", args, args, reply); done { return err } @@ -76,7 +76,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo } func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs.FederationStateResponse) error { - if done, err := c.srv.forward("FederationState.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.Get", args, args, reply); done { return err } @@ -117,7 +117,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs // List is the endpoint meant to be used by consul servers performing // replication. func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.IndexedFederationStates) error { - if done, err := c.srv.forward("FederationState.List", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.List", args, args, reply); done { return err } @@ -160,7 +160,7 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I // in the discovery info for dialing mesh gateways. Analogous to catalog // endpoints. func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, reply *structs.DatacenterIndexedCheckServiceNodes) error { - if done, err := c.srv.forward("FederationState.ListMeshGateways", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.ListMeshGateways", args, args, reply); done { return err } diff --git a/agent/consul/gateway_locator.go b/agent/consul/gateway_locator.go index 638c9efdf4..7233266462 100644 --- a/agent/consul/gateway_locator.go +++ b/agent/consul/gateway_locator.go @@ -325,7 +325,7 @@ func (g *GatewayLocator) runOnce(lastFetchIndex uint64) (uint64, error) { return queryMeta.Index, nil } -// checkLocalStateIsReady is inlined a bit from (*Server).forward(). We need to +// checkLocalStateIsReady is inlined a bit from (*Server).ForwardRPC(). We need to // wait until our own state machine is safe to read from. func (g *GatewayLocator) checkLocalStateIsReady() error { // Check if we can allow a stale read, ensure our local DB is initialized diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index 0b8353840f..4706405a83 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -20,7 +20,7 @@ type Health struct { // ChecksInState is used to get all the checks in a given state func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.forward("Health.ChecksInState", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ChecksInState", args, args, reply); done { return err } @@ -71,7 +71,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // NodeChecks is used to get all the checks for a node func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.forward("Health.NodeChecks", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.NodeChecks", args, args, reply); done { return err } @@ -121,7 +121,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, } // Potentially forward - if done, err := h.srv.forward("Health.ServiceChecks", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ServiceChecks", args, args, reply); done { return err } @@ -171,7 +171,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // ServiceNodes returns all the nodes registered as part of a service including health info func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { - if done, err := h.srv.forward("Health.ServiceNodes", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ServiceNodes", args, args, reply); done { return err } diff --git a/agent/consul/intention_endpoint.go b/agent/consul/intention_endpoint.go index a030b4e9e9..4204d1c72f 100644 --- a/agent/consul/intention_endpoint.go +++ b/agent/consul/intention_endpoint.go @@ -197,7 +197,7 @@ func (s *Intention) Apply( args.Datacenter = s.srv.config.PrimaryDatacenter } - if done, err := s.srv.forward("Intention.Apply", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now()) @@ -253,7 +253,7 @@ func (s *Intention) Get( args *structs.IntentionQueryRequest, reply *structs.IndexedIntentions) error { // Forward if necessary - if done, err := s.srv.forward("Intention.Get", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Get", args, args, reply); done { return err } @@ -328,7 +328,7 @@ func (s *Intention) List( args *structs.DCSpecificRequest, reply *structs.IndexedIntentions) error { // Forward if necessary - if done, err := s.srv.forward("Intention.List", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.List", args, args, reply); done { return err } @@ -379,7 +379,7 @@ func (s *Intention) Match( args *structs.IntentionQueryRequest, reply *structs.IndexedIntentionMatches) error { // Forward if necessary - if done, err := s.srv.forward("Intention.Match", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Match", args, args, reply); done { return err } @@ -448,7 +448,7 @@ func (s *Intention) Check( args *structs.IntentionQueryRequest, reply *structs.IntentionQueryCheckResponse) error { // Forward maybe - if done, err := s.srv.forward("Intention.Check", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Check", args, args, reply); done { return err } diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index e02c21d9d7..720810f0d8 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -24,7 +24,7 @@ type Internal struct { // NodeInfo is used to retrieve information about a specific node. func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeDump) error { - if done, err := m.srv.forward("Internal.NodeInfo", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.NodeInfo", args, args, reply); done { return err } @@ -50,7 +50,7 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, // NodeDump is used to generate information about all of the nodes. func (m *Internal) NodeDump(args *structs.DCSpecificRequest, reply *structs.IndexedNodeDump) error { - if done, err := m.srv.forward("Internal.NodeDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.NodeDump", args, args, reply); done { return err } @@ -89,7 +89,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, } func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedCheckServiceNodes) error { - if done, err := m.srv.forward("Internal.ServiceDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.ServiceDump", args, args, reply); done { return err } @@ -129,7 +129,7 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. // GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error { - if done, err := m.srv.forward("Internal.GatewayServiceDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done { return err } @@ -210,7 +210,7 @@ func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, repl // triggered in a remote DC. func (m *Internal) EventFire(args *structs.EventFireRequest, reply *structs.EventFireResponse) error { - if done, err := m.srv.forward("Internal.EventFire", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.EventFire", args, args, reply); done { return err } diff --git a/agent/consul/kvs_endpoint.go b/agent/consul/kvs_endpoint.go index 2e93cdbf69..04dee57b62 100644 --- a/agent/consul/kvs_endpoint.go +++ b/agent/consul/kvs_endpoint.go @@ -87,7 +87,7 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz acl.Authorizer, op api. // Apply is used to apply a KVS update request to the data store. func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { - if done, err := k.srv.forward("KVS.Apply", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"kvs", "apply"}, time.Now()) @@ -130,7 +130,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // Get is used to lookup a single key. func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.forward("KVS.Get", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.Get", args, args, reply); done { return err } @@ -175,7 +175,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // List is used to list all keys with a given prefix. func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.forward("KVS.List", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.List", args, args, reply); done { return err } @@ -227,7 +227,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // of the response so that only a subset of the prefix is returned. In this // mode, the keys which are omitted are still counted in the returned index. func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error { - if done, err := k.srv.forward("KVS.ListKeys", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.ListKeys", args, args, reply); done { return err } diff --git a/agent/consul/operator_autopilot_endpoint.go b/agent/consul/operator_autopilot_endpoint.go index 47cc8f49a8..c286fd1e95 100644 --- a/agent/consul/operator_autopilot_endpoint.go +++ b/agent/consul/operator_autopilot_endpoint.go @@ -10,7 +10,7 @@ import ( // AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *autopilot.Config) error { - if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotGetConfiguration", args, args, reply); done { return err } @@ -42,7 +42,7 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r // AutopilotSetConfiguration is used to set the current Autopilot configuration. func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error { - if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotSetConfiguration", args, args, reply); done { return err } @@ -81,7 +81,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *autopil // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.ServerHealth", args, args, reply); done { return err } diff --git a/agent/consul/operator_raft_endpoint.go b/agent/consul/operator_raft_endpoint.go index 3fa4a6691a..b3ea4da28e 100644 --- a/agent/consul/operator_raft_endpoint.go +++ b/agent/consul/operator_raft_endpoint.go @@ -13,7 +13,7 @@ import ( // RaftGetConfiguration is used to retrieve the current Raft configuration. func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error { - if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftGetConfiguration", args, args, reply); done { return err } @@ -74,7 +74,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply // "IP:port". The reply argument is not used, but it required to fulfill the RPC // interface. func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByAddress", args, args, reply); done { return err } @@ -146,7 +146,7 @@ REMOVE: // "IP:port". The reply argument is not used, but is required to fulfill the RPC // interface. func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByID", args, args, reply); done { return err } diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index 01957a7c61..bb13ff3cb3 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -25,7 +25,7 @@ type PreparedQuery struct { // only be used for operations that modify the data. The ID of the session is // returned in the reply. func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) { - if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "apply"}, time.Now()) @@ -209,7 +209,7 @@ func parseDNS(dns *structs.QueryDNSOptions) error { // Get returns a single prepared query by ID. func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, reply *structs.IndexedPreparedQueries) error { - if done, err := p.srv.forward("PreparedQuery.Get", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Get", args, args, reply); done { return err } @@ -253,7 +253,7 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, // List returns all the prepared queries. func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error { - if done, err := p.srv.forward("PreparedQuery.List", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.List", args, args, reply); done { return err } @@ -277,7 +277,7 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind // will be executed here. func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error { - if done, err := p.srv.forward("PreparedQuery.Explain", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Explain", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "explain"}, time.Now()) @@ -324,7 +324,7 @@ func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest, // part of a DNS lookup, or when executing prepared queries from the HTTP API. func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { - if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Execute", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "execute"}, time.Now()) @@ -459,7 +459,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, // We don't want things to fan out further than one level. func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { - if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.ExecuteRemote", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "execute_remote"}, time.Now()) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 938f1ee21e..a73f613be8 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -499,9 +499,9 @@ func canRetry(args interface{}, err error) bool { return false } -// forward is used to forward to a remote DC or to forward to the local leader +// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader // Returns a bool of if forwarding was performed, as well as any error -func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { +func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { var firstCheck time.Time // Handle DC forwarding diff --git a/agent/consul/server.go b/agent/consul/server.go index a79783a723..67f46c0bd7 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -876,7 +876,7 @@ func (s *Server) setupRPC() error { authz = &disabledAuthorizer{} } // now register with the insecure RPC server - s.insecureRPCServer.Register(&Cluster{srv: s, authorizer: authz}) + s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, s, authz)) ln, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { @@ -1436,6 +1436,72 @@ func (s *Server) intentionReplicationEnabled() bool { return s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter } +// CreateACLToken will create an ACL token from the given template +func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) { + // we have to require local tokens or else it would require having these servers use a token with acl:write to make a + // token create RPC to the servers in the primary DC. + if !s.LocalTokensEnabled() { + return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", s.config.Datacenter) + } + + newToken := *template + + // generate the accessor id + if newToken.AccessorID == "" { + accessor, err := lib.GenerateUUID(s.checkTokenUUID) + if err != nil { + return nil, err + } + + newToken.AccessorID = accessor + } + + // generate the secret id + if newToken.SecretID == "" { + secret, err := lib.GenerateUUID(s.checkTokenUUID) + if err != nil { + return nil, err + } + + newToken.SecretID = secret + } + + newToken.CreateTime = time.Now() + + req := structs.ACLTokenBatchSetRequest{ + Tokens: structs.ACLTokens{&newToken}, + CAS: false, + } + + // perform the request to mint the new token + if _, err := s.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil { + return nil, err + } + + // return the full token definition from the FSM + _, token, err := s.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta) + return token, err +} + +// DatacenterJoinAddresses will return all the strings suitable for usage in +// retry join operations to connect to the the LAN or LAN segment gossip pool. +func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) { + members, err := s.LANSegmentMembers(segment) + if err != nil { + return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err) + } + + var joinAddrs []string + for _, m := range members { + if ok, _ := metadata.IsConsulServer(m); ok { + serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)} + joinAddrs = append(joinAddrs, serfAddr.String()) + } + } + + return joinAddrs, nil +} + // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 8822a98c9b..9033a0bf39 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -1534,3 +1534,97 @@ func TestServer_CALogging(t *testing.T) { require.Contains(t, buf.String(), "consul CA provider configured") } + +func TestServer_DatacenterJoinAddresses(t *testing.T) { + conf := testClusterConfig{ + Datacenter: "primary", + Servers: 3, + } + + nodes := newTestCluster(t, &conf) + + var expected []string + for _, srv := range nodes.Servers { + expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort)) + } + + actual, err := nodes.Servers[0].DatacenterJoinAddresses("") + require.NoError(t, err) + require.ElementsMatch(t, expected, actual) +} + +func TestServer_CreateACLToken(t *testing.T) { + _, srv, codec := testACLServerWithConfig(t, nil, false) + + waitForLeaderEstablishment(t, srv) + + r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1") + require.NoError(t, err) + + t.Run("predefined-ids", func(t *testing.T) { + accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3" + secret := "ef453f31-ad58-4ec8-8bf8-342e99763026" + in := &structs.ACLToken{ + AccessorID: accessor, + SecretID: secret, + Description: "test", + Policies: []structs.ACLTokenPolicyLink{ + { + ID: structs.ACLPolicyGlobalManagementID, + }, + }, + NodeIdentities: []*structs.ACLNodeIdentity{ + { + NodeName: "foo", + Datacenter: "bar", + }, + }, + ServiceIdentities: []*structs.ACLServiceIdentity{ + { + ServiceName: "web", + }, + }, + Roles: []structs.ACLTokenRoleLink{ + { + ID: r1.ID, + }, + }, + } + + out, err := srv.CreateACLToken(in) + require.NoError(t, err) + require.Equal(t, accessor, out.AccessorID) + require.Equal(t, secret, out.SecretID) + require.Equal(t, "test", out.Description) + require.NotZero(t, out.CreateTime) + require.Len(t, out.Policies, 1) + require.Len(t, out.Roles, 1) + require.Len(t, out.NodeIdentities, 1) + require.Len(t, out.ServiceIdentities, 1) + require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID) + require.Equal(t, "foo", out.NodeIdentities[0].NodeName) + require.Equal(t, "web", out.ServiceIdentities[0].ServiceName) + require.Equal(t, r1.ID, out.Roles[0].ID) + }) + + t.Run("autogen-ids", func(t *testing.T) { + in := &structs.ACLToken{ + Description: "test", + NodeIdentities: []*structs.ACLNodeIdentity{ + { + NodeName: "foo", + Datacenter: "bar", + }, + }, + } + + out, err := srv.CreateACLToken(in) + require.NoError(t, err) + require.NotEmpty(t, out.AccessorID) + require.NotEmpty(t, out.SecretID) + require.Equal(t, "test", out.Description) + require.NotZero(t, out.CreateTime) + require.Len(t, out.NodeIdentities, 1) + require.Equal(t, "foo", out.NodeIdentities[0].NodeName) + }) +} diff --git a/agent/consul/session_endpoint.go b/agent/consul/session_endpoint.go index 2dd0527c29..3ac8b41dc0 100644 --- a/agent/consul/session_endpoint.go +++ b/agent/consul/session_endpoint.go @@ -32,7 +32,7 @@ func fixupSessionSpecificRequest(args *structs.SessionSpecificRequest) { // Apply is used to apply a modifying request to the data store. This should // only be used for operations that modify the data func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { - if done, err := s.srv.forward("Session.Apply", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"session", "apply"}, time.Now()) @@ -162,7 +162,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { // Get is used to retrieve a single session func (s *Session) Get(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.forward("Session.Get", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Get", args, args, reply); done { return err } @@ -203,7 +203,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, // List is used to list all the active sessions func (s *Session) List(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.forward("Session.List", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.List", args, args, reply); done { return err } @@ -237,7 +237,7 @@ func (s *Session) List(args *structs.SessionSpecificRequest, // NodeSessions is used to get all the sessions for a particular node func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.forward("Session.NodeSessions", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.NodeSessions", args, args, reply); done { return err } @@ -271,7 +271,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, // Renew is used to renew the TTL on a single session func (s *Session) Renew(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.forward("Session.Renew", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Renew", args, args, reply); done { return err } diff --git a/agent/consul/txn_endpoint.go b/agent/consul/txn_endpoint.go index 83d46be896..9819d63704 100644 --- a/agent/consul/txn_endpoint.go +++ b/agent/consul/txn_endpoint.go @@ -108,7 +108,7 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx // Apply is used to apply multiple operations in a single, atomic transaction. func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error { - if done, err := t.srv.forward("Txn.Apply", args, args, reply); done { + if done, err := t.srv.ForwardRPC("Txn.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"txn", "apply"}, time.Now()) @@ -151,7 +151,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error // supports staleness, so this should be preferred if you're just performing // reads. func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error { - if done, err := t.srv.forward("Txn.Read", args, args, reply); done { + if done, err := t.srv.ForwardRPC("Txn.Read", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"txn", "read"}, time.Now()) diff --git a/agent/pool/pool.go b/agent/pool/pool.go index e7dbb5ea6a..1aca440345 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -553,7 +553,7 @@ func (p *ConnPool) RPC( // secure or insecure variant depending on whether its an ongoing // or first time config request. For now though this is fine until // those ongoing requests are implemented. - if method == "AutoEncrypt.Sign" || method == "Cluster.AutoConfig" { + if method == "AutoEncrypt.Sign" || method == "AutoConfig.InitialConfiguration" { return p.rpcInsecure(dc, addr, method, args, reply) } else { return p.rpc(dc, nodeName, addr, method, args, reply)