From 5a6f15713c8fa3eed15fe06449605d94f68148f0 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Apr 2021 14:55:24 -0400 Subject: [PATCH 1/3] rpc: remove unnecessary arg to ForwardRPC --- agent/consul/acl_endpoint.go | 64 ++++++++++----------- agent/consul/acl_endpoint_legacy.go | 8 +-- agent/consul/auto_config_endpoint.go | 9 +-- agent/consul/auto_config_endpoint_test.go | 13 +++-- agent/consul/auto_encrypt_endpoint.go | 2 +- agent/consul/catalog_endpoint.go | 18 +++--- agent/consul/config_endpoint.go | 12 ++-- agent/consul/connect_ca_endpoint.go | 12 ++-- agent/consul/coordinate_endpoint.go | 6 +- agent/consul/discovery_chain_endpoint.go | 5 +- agent/consul/federation_state_endpoint.go | 8 +-- agent/consul/health_endpoint.go | 13 +++-- agent/consul/intention_endpoint.go | 10 ++-- agent/consul/internal_endpoint.go | 23 ++++---- agent/consul/kvs_endpoint.go | 8 +-- agent/consul/operator_autopilot_endpoint.go | 8 +-- agent/consul/operator_raft_endpoint.go | 11 ++-- agent/consul/prepared_query_endpoint.go | 12 ++-- agent/consul/rpc.go | 18 +++--- agent/consul/session_endpoint.go | 10 ++-- agent/consul/txn_endpoint.go | 4 +- 21 files changed, 140 insertions(+), 134 deletions(-) diff --git a/agent/consul/acl_endpoint.go b/agent/consul/acl_endpoint.go index 15143f7276..a49ba0501a 100644 --- a/agent/consul/acl_endpoint.go +++ b/agent/consul/acl_endpoint.go @@ -184,7 +184,7 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC if err := a.aclPreCheck(); err != nil { return err } - if done, err := a.srv.ForwardRPC("ACL.BootstrapTokens", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BootstrapTokens", args, reply); done { return err } @@ -279,7 +279,7 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenRead", args, reply); done { return err } @@ -348,7 +348,7 @@ func (a *ACL) TokenClone(args *structs.ACLTokenSetRequest, reply *structs.ACLTok args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenClone", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenClone", args, reply); done { return err } @@ -419,7 +419,7 @@ func (a *ACL) TokenSet(args *structs.ACLTokenSetRequest, reply *structs.ACLToken return fmt.Errorf("Local tokens are disabled") } - if done, err := a.srv.ForwardRPC("ACL.TokenSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenSet", args, reply); done { return err } @@ -825,7 +825,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenDelete", args, reply); done { return err } @@ -911,7 +911,7 @@ func (a *ACL) TokenList(args *structs.ACLTokenListRequest, reply *structs.ACLTok args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenList", args, reply); done { return err } @@ -974,7 +974,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchGetRequest, reply *struc args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenBatchRead", args, reply); done { return err } @@ -1028,7 +1028,7 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo return err } - if done, err := a.srv.ForwardRPC("ACL.PolicyRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyRead", args, reply); done { return err } @@ -1066,7 +1066,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchGetRequest, reply *str return err } - if done, err := a.srv.ForwardRPC("ACL.PolicyBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyBatchRead", args, reply); done { return err } @@ -1104,7 +1104,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.PolicySet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicySet", args, reply); done { return err } @@ -1235,7 +1235,7 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string) args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.PolicyDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyDelete", args, reply); done { return err } @@ -1288,7 +1288,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP return err } - if done, err := a.srv.ForwardRPC("ACL.PolicyList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyList", args, reply); done { return err } @@ -1328,7 +1328,7 @@ func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchGetRequest, reply *struc return err } - if done, err := a.srv.ForwardRPC("ACL.PolicyResolve", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyResolve", args, reply); done { return err } @@ -1386,7 +1386,7 @@ func makeACLETag(parent string, policy *acl.Policy) string { // GetPolicy is used to retrieve a compiled policy object with a TTL. Does not // support a blocking query. func (a *ACL) GetPolicy(args *structs.ACLPolicyResolveLegacyRequest, reply *structs.ACLPolicyResolveLegacyResponse) error { - if done, err := a.srv.ForwardRPC("ACL.GetPolicy", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.GetPolicy", args, reply); done { return err } @@ -1433,7 +1433,7 @@ func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest, // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := a.srv.ForwardRPC("ACL.ReplicationStatus", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.ReplicationStatus", args, reply); done { return err } @@ -1461,7 +1461,7 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe return err } - if done, err := a.srv.ForwardRPC("ACL.RoleRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleRead", args, reply); done { return err } @@ -1500,7 +1500,7 @@ func (a *ACL) RoleBatchRead(args *structs.ACLRoleBatchGetRequest, reply *structs return err } - if done, err := a.srv.ForwardRPC("ACL.RoleBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleBatchRead", args, reply); done { return err } @@ -1538,7 +1538,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.RoleSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleSet", args, reply); done { return err } @@ -1696,7 +1696,7 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.RoleDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleDelete", args, reply); done { return err } @@ -1745,7 +1745,7 @@ func (a *ACL) RoleList(args *structs.ACLRoleListRequest, reply *structs.ACLRoleL return err } - if done, err := a.srv.ForwardRPC("ACL.RoleList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleList", args, reply); done { return err } @@ -1779,7 +1779,7 @@ func (a *ACL) RoleResolve(args *structs.ACLRoleBatchGetRequest, reply *structs.A return err } - if done, err := a.srv.ForwardRPC("ACL.RoleResolve", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleResolve", args, reply); done { return err } @@ -1844,7 +1844,7 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.BindingRuleRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleRead", args, reply); done { return err } @@ -1883,7 +1883,7 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.BindingRuleSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleSet", args, reply); done { return err } @@ -2012,7 +2012,7 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.BindingRuleDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleDelete", args, reply); done { return err } @@ -2063,7 +2063,7 @@ func (a *ACL) BindingRuleList(args *structs.ACLBindingRuleListRequest, reply *st return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.BindingRuleList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleList", args, reply); done { return err } @@ -2103,7 +2103,7 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.AuthMethodRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodRead", args, reply); done { return err } @@ -2145,7 +2145,7 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.AuthMethodSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodSet", args, reply); done { return err } @@ -2257,7 +2257,7 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply * return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.AuthMethodDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodDelete", args, reply); done { return err } @@ -2313,7 +2313,7 @@ func (a *ACL) AuthMethodList(args *structs.ACLAuthMethodListRequest, reply *stru return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.AuthMethodList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodList", args, reply); done { return err } @@ -2367,7 +2367,7 @@ func (a *ACL) Login(args *structs.ACLLoginRequest, reply *structs.ACLToken) erro return errors.New("do not provide a token when logging in") } - if done, err := a.srv.ForwardRPC("ACL.Login", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Login", args, reply); done { return err } @@ -2512,7 +2512,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error { return acl.ErrNotFound } - if done, err := a.srv.ForwardRPC("ACL.Logout", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Logout", args, reply); done { return err } @@ -2559,7 +2559,7 @@ func (a *ACL) Authorize(args *structs.RemoteACLAuthorizationRequest, reply *[]st return err } - if done, err := a.srv.ForwardRPC("ACL.Authorize", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Authorize", args, reply); done { return err } diff --git a/agent/consul/acl_endpoint_legacy.go b/agent/consul/acl_endpoint_legacy.go index df7dcf8e55..3653ba8f90 100644 --- a/agent/consul/acl_endpoint_legacy.go +++ b/agent/consul/acl_endpoint_legacy.go @@ -24,7 +24,7 @@ var ACLEndpointLegacySummaries = []prometheus.SummaryDefinition{ // Bootstrap is used to perform a one-time ACL bootstrap operation on // a cluster to get the first management token. func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) error { - if done, err := a.srv.ForwardRPC("ACL.Bootstrap", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Bootstrap", args, reply); done { return err } @@ -155,7 +155,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro // Apply is used to apply a modifying request to the data store. This should // only be used for operations that modify the data func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { - if done, err := a.srv.ForwardRPC("ACL.Apply", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now()) @@ -201,7 +201,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { // Get is used to retrieve a single ACL func (a *ACL) Get(args *structs.ACLSpecificRequest, reply *structs.IndexedACLs) error { - if done, err := a.srv.ForwardRPC("ACL.Get", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Get", args, reply); done { return err } @@ -247,7 +247,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, // List is used to list all the ACLs func (a *ACL) List(args *structs.DCSpecificRequest, reply *structs.IndexedACLs) error { - if done, err := a.srv.ForwardRPC("ACL.List", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.List", args, reply); done { return err } diff --git a/agent/consul/auto_config_endpoint.go b/agent/consul/auto_config_endpoint.go index 6cbf0a5397..53007bde73 100644 --- a/agent/consul/auto_config_endpoint.go +++ b/agent/consul/auto_config_endpoint.go @@ -9,6 +9,9 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/proto" + bexpr "github.com/hashicorp/go-bexpr" + "github.com/mitchellh/mapstructure" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/structs" @@ -17,8 +20,6 @@ import ( "github.com/hashicorp/consul/proto/pbconfig" "github.com/hashicorp/consul/proto/pbconnect" "github.com/hashicorp/consul/tlsutil" - bexpr "github.com/hashicorp/go-bexpr" - "github.com/mitchellh/mapstructure" ) type AutoConfigOptions struct { @@ -107,7 +108,7 @@ func (a *jwtAuthorizer) Authorize(req *pbautoconf.AutoConfigRequest) (AutoConfig type AutoConfigBackend interface { CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) DatacenterJoinAddresses(segment string) ([]string, error) - ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error) + ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) GetCARoots() (*structs.IndexedCARoots, error) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) @@ -339,7 +340,7 @@ func (ac *AutoConfig) InitialConfiguration(req *pbautoconf.AutoConfigRequest, re } // forward to the leader - if done, err := ac.backend.ForwardRPC("AutoConfig.InitialConfiguration", req, req, resp); done { + if done, err := ac.backend.ForwardRPC("AutoConfig.InitialConfiguration", req, resp); done { return err } diff --git a/agent/consul/auto_config_endpoint_test.go b/agent/consul/auto_config_endpoint_test.go index c1495a7886..563ed42c35 100644 --- a/agent/consul/auto_config_endpoint_test.go +++ b/agent/consul/auto_config_endpoint_test.go @@ -11,6 +11,11 @@ import ( "testing" "time" + "github.com/hashicorp/memberlist" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/internal/go-sso/oidcauth/oidcauthtest" @@ -18,10 +23,6 @@ import ( "github.com/hashicorp/consul/proto/pbconfig" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/memberlist" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "gopkg.in/square/go-jose.v2/jwt" ) @@ -44,8 +45,8 @@ func (m *mockAutoConfigBackend) DatacenterJoinAddresses(segment string) ([]strin return addrs, ret.Error(1) } -func (m *mockAutoConfigBackend) ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error) { - ret := m.Called(method, info, args, reply) +func (m *mockAutoConfigBackend) ForwardRPC(method string, req structs.RPCInfo, reply interface{}) (bool, error) { + ret := m.Called(method, req, reply) return ret.Bool(0), ret.Error(1) } diff --git a/agent/consul/auto_encrypt_endpoint.go b/agent/consul/auto_encrypt_endpoint.go index 78a100acc2..2f96efacfc 100644 --- a/agent/consul/auto_encrypt_endpoint.go +++ b/agent/consul/auto_encrypt_endpoint.go @@ -24,7 +24,7 @@ func (a *AutoEncrypt) Sign( if !a.srv.config.AutoEncryptAllowTLS { return ErrAutoEncryptAllowTLSNotEnabled } - if done, err := a.srv.ForwardRPC("AutoEncrypt.Sign", args, args, reply); done { + if done, err := a.srv.ForwardRPC("AutoEncrypt.Sign", args, reply); done { return err } diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index e5d430992d..f657608696 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -142,7 +142,7 @@ func checkPreApply(check *structs.HealthCheck) { // Register is used register that a node is providing a given service. func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { - if done, err := c.srv.ForwardRPC("Catalog.Register", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.Register", args, reply); done { return err } defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now()) @@ -217,7 +217,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error // Deregister is used to remove a service registration for a given node. func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error { - if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, reply); done { return err } defer metrics.MeasureSince([]string{"catalog", "deregister"}, time.Now()) @@ -284,7 +284,7 @@ func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]str // ListNodes is used to query the nodes in a DC func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error { - if done, err := c.srv.ForwardRPC("Catalog.ListNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ListNodes", args, reply); done { return err } @@ -332,7 +332,7 @@ func isUnmodified(opts structs.QueryOptions, index uint64) bool { // ListServices is used to query the services in a DC func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { - if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, reply); done { return err } @@ -373,7 +373,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I } func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error { - if done, err := c.srv.ForwardRPC("Catalog.ServiceList", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ServiceList", args, reply); done { return err } @@ -402,7 +402,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In // ServiceNodes returns all the nodes registered as part of a service func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error { - if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, reply); done { return err } @@ -540,7 +540,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // NodeServices returns all the services registered as part of a node func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error { - if done, err := c.srv.ForwardRPC("Catalog.NodeServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.NodeServices", args, reply); done { return err } @@ -591,7 +591,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs } func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServiceList) error { - if done, err := c.srv.ForwardRPC("Catalog.NodeServiceList", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.NodeServiceList", args, reply); done { return err } @@ -644,7 +644,7 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru } func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error { - if done, err := c.srv.ForwardRPC("Catalog.GatewayServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.GatewayServices", args, reply); done { return err } diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 51a919675d..c569799ecf 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -58,7 +58,7 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.ForwardRPC("ConfigEntry.Apply", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now()) @@ -105,7 +105,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE return err } - if done, err := c.srv.ForwardRPC("ConfigEntry.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Get", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now()) @@ -152,7 +152,7 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe return err } - if done, err := c.srv.ForwardRPC("ConfigEntry.List", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.List", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now()) @@ -209,7 +209,7 @@ func (c *ConfigEntry) ListAll(args *structs.ConfigEntryListAllRequest, reply *st return err } - if done, err := c.srv.ForwardRPC("ConfigEntry.ListAll", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.ListAll", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now()) @@ -271,7 +271,7 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.ForwardRPC("ConfigEntry.Delete", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Delete", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now()) @@ -305,7 +305,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return err } - if done, err := c.srv.ForwardRPC("ConfigEntry.ResolveServiceConfig", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.ResolveServiceConfig", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now()) diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index ffecaae1c2..93f3b55fb3 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -6,12 +6,12 @@ import ( "time" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-memdb" ) var ( @@ -56,7 +56,7 @@ func (s *ConnectCA) ConfigurationGet( return ErrConnectNotEnabled } - if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationGet", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationGet", args, reply); done { return err } @@ -88,7 +88,7 @@ func (s *ConnectCA) ConfigurationSet( return ErrConnectNotEnabled } - if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationSet", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationSet", args, reply); done { return err } @@ -109,7 +109,7 @@ func (s *ConnectCA) Roots( args *structs.DCSpecificRequest, reply *structs.IndexedCARoots) error { // Forward if necessary - if done, err := s.srv.ForwardRPC("ConnectCA.Roots", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.Roots", args, reply); done { return err } @@ -141,7 +141,7 @@ func (s *ConnectCA) Sign( return ErrConnectNotEnabled } - if done, err := s.srv.ForwardRPC("ConnectCA.Sign", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.Sign", args, reply); done { return err } @@ -209,7 +209,7 @@ func (s *ConnectCA) SignIntermediate( return ErrConnectNotEnabled } - if done, err := s.srv.ForwardRPC("ConnectCA.SignIntermediate", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.SignIntermediate", args, reply); done { return err } diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index 2fbc10c700..f276215dfe 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -116,7 +116,7 @@ func (c *Coordinate) batchApplyUpdates() error { // Update inserts or updates the LAN coordinate of a node. func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) { - if done, err := c.srv.ForwardRPC("Coordinate.Update", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.Update", args, reply); done { return err } @@ -192,7 +192,7 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter // ListNodes returns the list of nodes with their raw network coordinates (if no // coordinates are available for a node it won't appear in this list). func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error { - if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, reply); done { return err } @@ -215,7 +215,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I // Node returns the raw coordinates for a single node. func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error { - if done, err := c.srv.ForwardRPC("Coordinate.Node", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.Node", args, reply); done { return err } diff --git a/agent/consul/discovery_chain_endpoint.go b/agent/consul/discovery_chain_endpoint.go index 23f545ef02..00327d1bb4 100644 --- a/agent/consul/discovery_chain_endpoint.go +++ b/agent/consul/discovery_chain_endpoint.go @@ -5,11 +5,12 @@ import ( "time" metrics "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" ) type DiscoveryChain struct { @@ -22,7 +23,7 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs return ErrConnectNotEnabled } - if done, err := c.srv.ForwardRPC("DiscoveryChain.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("DiscoveryChain.Get", args, reply); done { return err } defer metrics.MeasureSince([]string{"discovery_chain", "get"}, time.Now()) diff --git a/agent/consul/federation_state_endpoint.go b/agent/consul/federation_state_endpoint.go index f5fa358bb4..d4eafe8350 100644 --- a/agent/consul/federation_state_endpoint.go +++ b/agent/consul/federation_state_endpoint.go @@ -48,7 +48,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.ForwardRPC("FederationState.Apply", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.Apply", args, reply); done { return err } @@ -94,7 +94,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo } func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs.FederationStateResponse) error { - if done, err := c.srv.ForwardRPC("FederationState.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.Get", args, reply); done { return err } @@ -135,7 +135,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs // List is the endpoint meant to be used by consul servers performing // replication. func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.IndexedFederationStates) error { - if done, err := c.srv.ForwardRPC("FederationState.List", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.List", args, reply); done { return err } @@ -178,7 +178,7 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I // in the discovery info for dialing mesh gateways. Analogous to catalog // endpoints. func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, reply *structs.DatacenterIndexedCheckServiceNodes) error { - if done, err := c.srv.ForwardRPC("FederationState.ListMeshGateways", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.ListMeshGateways", args, reply); done { return err } diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index 4706405a83..2dd7d575b8 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -5,11 +5,12 @@ import ( "sort" "github.com/armon/go-metrics" + bexpr "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - bexpr "github.com/hashicorp/go-bexpr" - "github.com/hashicorp/go-memdb" ) // Health endpoint is used to query the health information @@ -20,7 +21,7 @@ type Health struct { // ChecksInState is used to get all the checks in a given state func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.ForwardRPC("Health.ChecksInState", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ChecksInState", args, reply); done { return err } @@ -71,7 +72,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // NodeChecks is used to get all the checks for a node func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.ForwardRPC("Health.NodeChecks", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.NodeChecks", args, reply); done { return err } @@ -121,7 +122,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, } // Potentially forward - if done, err := h.srv.ForwardRPC("Health.ServiceChecks", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ServiceChecks", args, reply); done { return err } @@ -171,7 +172,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // ServiceNodes returns all the nodes registered as part of a service including health info func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { - if done, err := h.srv.ForwardRPC("Health.ServiceNodes", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ServiceNodes", args, reply); done { return err } diff --git a/agent/consul/intention_endpoint.go b/agent/consul/intention_endpoint.go index 83c7f4bcdb..a8b71b7248 100644 --- a/agent/consul/intention_endpoint.go +++ b/agent/consul/intention_endpoint.go @@ -80,7 +80,7 @@ func (s *Intention) Apply(args *structs.IntentionRequest, reply *string) error { // datacenter. These will then be replicated to all the other datacenters. args.Datacenter = s.srv.config.PrimaryDatacenter - if done, err := s.srv.ForwardRPC("Intention.Apply", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now()) @@ -423,7 +423,7 @@ func (s *Intention) Get(args *structs.IntentionQueryRequest, reply *structs.Inde } // Forward if necessary - if done, err := s.srv.ForwardRPC("Intention.Get", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Get", args, reply); done { return err } @@ -501,7 +501,7 @@ func (s *Intention) List(args *structs.IntentionListRequest, reply *structs.Inde } // Forward if necessary - if done, err := s.srv.ForwardRPC("Intention.List", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.List", args, reply); done { return err } @@ -571,7 +571,7 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In } // Forward if necessary - if done, err := s.srv.ForwardRPC("Intention.Match", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Match", args, reply); done { return err } @@ -645,7 +645,7 @@ func (s *Intention) Check(args *structs.IntentionQueryRequest, reply *structs.In } // Forward maybe - if done, err := s.srv.ForwardRPC("Intention.Check", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Check", args, reply); done { return err } diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 9a82f136b1..dad798dfc6 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -3,14 +3,15 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/structs" bexpr "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/serf/serf" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" ) // Internal endpoint is used to query the miscellaneous info that @@ -24,7 +25,7 @@ type Internal struct { // NodeInfo is used to retrieve information about a specific node. func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeDump) error { - if done, err := m.srv.ForwardRPC("Internal.NodeInfo", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.NodeInfo", args, reply); done { return err } @@ -50,7 +51,7 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, // NodeDump is used to generate information about all of the nodes. func (m *Internal) NodeDump(args *structs.DCSpecificRequest, reply *structs.IndexedNodeDump) error { - if done, err := m.srv.ForwardRPC("Internal.NodeDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.NodeDump", args, reply); done { return err } @@ -89,7 +90,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, } func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedNodesWithGateways) error { - if done, err := m.srv.ForwardRPC("Internal.ServiceDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.ServiceDump", args, reply); done { return err } @@ -145,7 +146,7 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. } func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceTopology) error { - if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, reply); done { return err } if args.ServiceName == "" { @@ -199,7 +200,7 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl if args.ServiceName == "" { return fmt.Errorf("Must provide a service name") } - if done, err := m.srv.ForwardRPC("Internal.IntentionUpstreams", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.IntentionUpstreams", args, reply); done { return err } @@ -233,7 +234,7 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl // GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error { - if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, reply); done { return err } @@ -312,7 +313,7 @@ func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, repl // Match returns the set of intentions that match the given source/destination. func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply *structs.IndexedIntentions) error { // Forward if necessary - if done, err := m.srv.ForwardRPC("Internal.GatewayIntentions", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.GatewayIntentions", args, reply); done { return err } @@ -398,7 +399,7 @@ func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply // triggered in a remote DC. func (m *Internal) EventFire(args *structs.EventFireRequest, reply *structs.EventFireResponse) error { - if done, err := m.srv.ForwardRPC("Internal.EventFire", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.EventFire", args, reply); done { return err } diff --git a/agent/consul/kvs_endpoint.go b/agent/consul/kvs_endpoint.go index d52ba169c4..df168c5c55 100644 --- a/agent/consul/kvs_endpoint.go +++ b/agent/consul/kvs_endpoint.go @@ -96,7 +96,7 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz acl.Authorizer, op api. // Apply is used to apply a KVS update request to the data store. func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { - if done, err := k.srv.ForwardRPC("KVS.Apply", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"kvs", "apply"}, time.Now()) @@ -135,7 +135,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // Get is used to lookup a single key. func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.ForwardRPC("KVS.Get", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.Get", args, reply); done { return err } @@ -180,7 +180,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // List is used to list all keys with a given prefix. func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.ForwardRPC("KVS.List", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.List", args, reply); done { return err } @@ -232,7 +232,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // of the response so that only a subset of the prefix is returned. In this // mode, the keys which are omitted are still counted in the returned index. func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error { - if done, err := k.srv.ForwardRPC("KVS.ListKeys", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.ListKeys", args, reply); done { return err } diff --git a/agent/consul/operator_autopilot_endpoint.go b/agent/consul/operator_autopilot_endpoint.go index 29cdbe912c..b415a7cc29 100644 --- a/agent/consul/operator_autopilot_endpoint.go +++ b/agent/consul/operator_autopilot_endpoint.go @@ -12,7 +12,7 @@ import ( // AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error { - if done, err := op.srv.ForwardRPC("Operator.AutopilotGetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotGetConfiguration", args, reply); done { return err } @@ -44,7 +44,7 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r // AutopilotSetConfiguration is used to set the current Autopilot configuration. func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error { - if done, err := op.srv.ForwardRPC("Operator.AutopilotSetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotSetConfiguration", args, reply); done { return err } @@ -79,7 +79,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := op.srv.ForwardRPC("Operator.ServerHealth", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.ServerHealth", args, reply); done { return err } @@ -146,7 +146,7 @@ func (op *Operator) AutopilotState(args *structs.DCSpecificRequest, reply *autop // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := op.srv.ForwardRPC("Operator.AutopilotState", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotState", args, reply); done { return err } diff --git a/agent/consul/operator_raft_endpoint.go b/agent/consul/operator_raft_endpoint.go index 71fa1bbde8..d7ceb7e9a2 100644 --- a/agent/consul/operator_raft_endpoint.go +++ b/agent/consul/operator_raft_endpoint.go @@ -4,16 +4,17 @@ import ( "fmt" "net" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/raft" - "github.com/hashicorp/serf/serf" ) // RaftGetConfiguration is used to retrieve the current Raft configuration. func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error { - if done, err := op.srv.ForwardRPC("Operator.RaftGetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftGetConfiguration", args, reply); done { return err } @@ -74,7 +75,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply // "IP:port". The reply argument is not used, but it required to fulfill the RPC // interface. func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByAddress", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByAddress", args, reply); done { return err } @@ -127,7 +128,7 @@ REMOVE: // "IP:port". The reply argument is not used, but is required to fulfill the RPC // interface. func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByID", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByID", args, reply); done { return err } diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index e4dc05e9a6..c05de898f7 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -46,7 +46,7 @@ type PreparedQuery struct { // only be used for operations that modify the data. The ID of the session is // returned in the reply. func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) { - if done, err := p.srv.ForwardRPC("PreparedQuery.Apply", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "apply"}, time.Now()) @@ -225,7 +225,7 @@ func parseDNS(dns *structs.QueryDNSOptions) error { // Get returns a single prepared query by ID. func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, reply *structs.IndexedPreparedQueries) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.Get", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Get", args, reply); done { return err } @@ -269,7 +269,7 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, // List returns all the prepared queries. func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.List", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.List", args, reply); done { return err } @@ -293,7 +293,7 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind // will be executed here. func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.Explain", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Explain", args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "explain"}, time.Now()) @@ -340,7 +340,7 @@ func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest, // part of a DNS lookup, or when executing prepared queries from the HTTP API. func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.Execute", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Execute", args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "execute"}, time.Now()) @@ -475,7 +475,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, // We don't want things to fan out further than one level. func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.ExecuteRemote", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.ExecuteRemote", args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "execute_remote"}, time.Now()) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 80d5b95e6c..2e9cf15ee4 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -552,16 +552,16 @@ func canRetry(args interface{}, err error) bool { // ForwardRPC is used to forward an RPC request to a remote DC or to the local leader // Returns a bool of if forwarding was performed, as well as any error -func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { +func (s *Server) ForwardRPC(method string, req structs.RPCInfo, reply interface{}) (bool, error) { var firstCheck time.Time // Handle DC forwarding - dc := info.RequestDatacenter() + dc := req.RequestDatacenter() if dc != s.config.Datacenter { // Local tokens only work within the current datacenter. Check to see // if we are attempting to forward one to a remote datacenter and strip // it, falling back on the anonymous token on the other end. - if token := info.TokenSecret(); token != "" { + if token := req.TokenSecret(); token != "" { done, ident, err := s.ResolveIdentityFromToken(token) if done { if err != nil && !acl.IsErrNotFound(err) { @@ -569,18 +569,18 @@ func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{ } if ident != nil && ident.IsLocal() { // Strip it from the request. - info.SetTokenSecret("") - defer info.SetTokenSecret(token) + req.SetTokenSecret("") + defer req.SetTokenSecret(token) } } } - err := s.forwardDC(method, dc, args, reply) + err := s.forwardDC(method, dc, req, reply) return true, err } // Check if we can allow a stale read, ensure our local DB is initialized - if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() { + if req.IsRead() && req.AllowStaleRead() && !s.raft.LastContact().IsZero() { return false, nil } @@ -603,8 +603,8 @@ CHECK_LEADER: // Handle the case of a known leader if leader != nil { rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr, - method, args, reply) - if rpcErr != nil && canRetry(info, rpcErr) { + method, req, reply) + if rpcErr != nil && canRetry(req, rpcErr) { goto RETRY } return true, rpcErr diff --git a/agent/consul/session_endpoint.go b/agent/consul/session_endpoint.go index 1287ca7960..ca81e5cdca 100644 --- a/agent/consul/session_endpoint.go +++ b/agent/consul/session_endpoint.go @@ -45,7 +45,7 @@ func fixupSessionSpecificRequest(args *structs.SessionSpecificRequest) { // Apply is used to apply a modifying request to the data store. This should // only be used for operations that modify the data func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { - if done, err := s.srv.ForwardRPC("Session.Apply", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"session", "apply"}, time.Now()) @@ -170,7 +170,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { // Get is used to retrieve a single session func (s *Session) Get(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.ForwardRPC("Session.Get", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Get", args, reply); done { return err } @@ -211,7 +211,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, // List is used to list all the active sessions func (s *Session) List(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.ForwardRPC("Session.List", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.List", args, reply); done { return err } @@ -245,7 +245,7 @@ func (s *Session) List(args *structs.SessionSpecificRequest, // NodeSessions is used to get all the sessions for a particular node func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.ForwardRPC("Session.NodeSessions", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.NodeSessions", args, reply); done { return err } @@ -279,7 +279,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, // Renew is used to renew the TTL on a single session func (s *Session) Renew(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.ForwardRPC("Session.Renew", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Renew", args, reply); done { return err } diff --git a/agent/consul/txn_endpoint.go b/agent/consul/txn_endpoint.go index da1a224cb5..6f6db77375 100644 --- a/agent/consul/txn_endpoint.go +++ b/agent/consul/txn_endpoint.go @@ -121,7 +121,7 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx // Apply is used to apply multiple operations in a single, atomic transaction. func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error { - if done, err := t.srv.ForwardRPC("Txn.Apply", args, args, reply); done { + if done, err := t.srv.ForwardRPC("Txn.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"txn", "apply"}, time.Now()) @@ -160,7 +160,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error // supports staleness, so this should be preferred if you're just performing // reads. func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error { - if done, err := t.srv.ForwardRPC("Txn.Read", args, args, reply); done { + if done, err := t.srv.ForwardRPC("Txn.Read", args, reply); done { return err } defer metrics.MeasureSince([]string{"txn", "read"}, time.Now()) From 6b513c1ba491bf3f4f30b9ca8ff89a0536439657 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Apr 2021 14:23:50 -0400 Subject: [PATCH 2/3] rpc: add tests for canRetry Also accept an RPCInfo instead of interface{}. Accepting an interface lead to a bug where the caller was expecting the arg to be the response when in fact it was always passed the request. By accepting RPCInfo it should indicate that this is actually the request value. One caller of canRetry already passed an RPCInfo, the second handles the type assertion before calling canRetry. --- agent/consul/client.go | 12 ++++--- agent/consul/rpc.go | 20 +++--------- agent/consul/rpc_test.go | 67 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/agent/consul/client.go b/agent/consul/client.go index bff2565be3..f341f9abc3 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -10,6 +10,10 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/serf/serf" + "golang.org/x/time/rate" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" @@ -17,9 +21,6 @@ import ( "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/serf/serf" - "golang.org/x/time/rate" ) var ClientCounters = []prometheus.CounterDefinition{ @@ -287,7 +288,10 @@ TRY: ) metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}}) manager.NotifyFailedServer(server) - if retry := canRetry(args, rpcErr); !retry { + + // Use the zero value for RPCInfo if the request doesn't implement RPCInfo + info, _ := args.(structs.RPCInfo) + if retry := canRetry(info, rpcErr); !retry { return rpcErr } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 2e9cf15ee4..0766afe2e7 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -92,9 +92,7 @@ const ( enqueueLimit = 30 * time.Second ) -var ( - ErrChunkingResubmit = errors.New("please resubmit call for rechunking") -) +var ErrChunkingResubmit = errors.New("please resubmit call for rechunking") func (s *Server) rpcLogger() hclog.Logger { return s.loggers.Named(logging.RPC) @@ -527,8 +525,8 @@ func (c *limitedConn) Read(b []byte) (n int, err error) { return c.lr.Read(b) } -// canRetry returns true if the given situation is safe for a retry. -func canRetry(args interface{}, err error) bool { +// canRetry returns true if the request and error indicate that a retry is safe. +func canRetry(info structs.RPCInfo, err error) bool { // No leader errors are always safe to retry since no state could have // been changed. if structs.IsErrNoLeader(err) { @@ -542,12 +540,7 @@ func canRetry(args interface{}, err error) bool { // Reads are safe to retry for stream errors, such as if a server was // being shut down. - info, ok := args.(structs.RPCInfo) - if ok && info.IsRead() && lib.IsErrEOF(err) { - return true - } - - return false + return info != nil && info.IsRead() && lib.IsErrEOF(err) } // ForwardRPC is used to forward an RPC request to a remote DC or to the local leader @@ -790,11 +783,6 @@ func (s *Server) raftApplyWithEncoder( // In this case we didn't apply all chunks successfully, possibly due // to a term change; resubmit if resp == nil { - // This returns the error in the interface because the raft library - // returns errors from the FSM via the future, not via err from the - // apply function. Downstream client code expects to see any error - // from the FSM (as opposed to the apply itself) and decide whether - // it can retry in the future's response. return nil, ErrChunkingResubmit } // We expect that this conversion should always work diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 4c283867e9..48981234be 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" + "io" "math" "net" "os" @@ -12,6 +14,11 @@ import ( "testing" "time" + "github.com/hashicorp/go-memdb" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" @@ -20,10 +27,6 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/go-memdb" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestRPC_NoLeader_Fail(t *testing.T) { @@ -952,3 +955,59 @@ func TestRPC_LocalTokenStrippedOnForward(t *testing.T) { require.NoError(t, err) require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped") } + +func TestCanRetry(t *testing.T) { + type testCase struct { + name string + req structs.RPCInfo + err error + expected bool + } + + run := func(t *testing.T, tc testCase) { + require.Equal(t, tc.expected, canRetry(tc.req, tc.err)) + } + + var testCases = []testCase{ + { + name: "unexpected error", + err: fmt.Errorf("some arbitrary error"), + expected: false, + }, + { + name: "checking error", + err: fmt.Errorf("some wrapping :%w", ErrChunkingResubmit), + expected: true, + }, + { + name: "no leader error", + err: fmt.Errorf("some wrapping: %w", structs.ErrNoLeader), + expected: true, + }, + { + name: "EOF on read request", + req: isReadRequest{}, + err: io.EOF, + expected: true, + }, + { + name: "EOF on write request", + err: io.EOF, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +type isReadRequest struct { + structs.RPCInfo +} + +func (r isReadRequest) IsRead() bool { + return true +} From 426565b68cc37d83886089a8478dfd2760f25885 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Apr 2021 13:04:46 -0400 Subject: [PATCH 3/3] fix failing integration tests The new IDs include a leading slash for the partition ID section --- test/integration/connect/envoy/case-basic/verify.bats | 2 +- test/integration/connect/envoy/case-http/verify.bats | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/connect/envoy/case-basic/verify.bats b/test/integration/connect/envoy/case-basic/verify.bats index a788a8e48d..47787f5560 100644 --- a/test/integration/connect/envoy/case-basic/verify.bats +++ b/test/integration/connect/envoy/case-basic/verify.bats @@ -39,7 +39,7 @@ load helpers @test "s1 proxy should have been configured with one rbac listener filter at L4" { LISTEN_FILTERS=$(get_envoy_listener_filters localhost:19000) PUB=$(echo "$LISTEN_FILTERS" | grep -E "^public_listener:" | cut -f 2 -d ' ' ) - UPS=$(echo "$LISTEN_FILTERS" | grep -E "^(default\/)?s2:" | cut -f 2 -d ' ' ) + UPS=$(echo "$LISTEN_FILTERS" | grep -E "^(\/default\/)?s2:" | cut -f 2 -d ' ' ) echo "LISTEN_FILTERS = $LISTEN_FILTERS" echo "PUB = $PUB" diff --git a/test/integration/connect/envoy/case-http/verify.bats b/test/integration/connect/envoy/case-http/verify.bats index 551d5a60fa..ce20b268af 100644 --- a/test/integration/connect/envoy/case-http/verify.bats +++ b/test/integration/connect/envoy/case-http/verify.bats @@ -36,7 +36,7 @@ load helpers @test "s1 proxy should have been configured with http connection managers" { LISTEN_FILTERS=$(get_envoy_listener_filters localhost:19000) PUB=$(echo "$LISTEN_FILTERS" | grep -E "^public_listener:" | cut -f 2 -d ' ' ) - UPS=$(echo "$LISTEN_FILTERS" | grep -E "^(default\/)?s2:" | cut -f 2 -d ' ' ) + UPS=$(echo "$LISTEN_FILTERS" | grep -E "^(\/default\/)?s2:" | cut -f 2 -d ' ' ) echo "LISTEN_FILTERS = $LISTEN_FILTERS" echo "PUB = $PUB" @@ -59,7 +59,7 @@ load helpers @test "s1 proxy should have been configured with http rbac filters" { HTTP_FILTERS=$(get_envoy_http_filters localhost:19000) PUB=$(echo "$HTTP_FILTERS" | grep -E "^public_listener:" | cut -f 2 -d ' ' ) - UPS=$(echo "$HTTP_FILTERS" | grep -E "^(default\/)?s2:" | cut -f 2 -d ' ' ) + UPS=$(echo "$HTTP_FILTERS" | grep -E "^(\/default\/)?s2:" | cut -f 2 -d ' ' ) echo "HTTP_FILTERS = $HTTP_FILTERS" echo "PUB = $PUB"