Handle FSM.Apply errors in raftApply

Previously we were inconsistently checking the response for errors. This
PR moves the response-is-error check into raftApply, so that all callers
can look at only the error response, instead of having to know that
errors could come from two places.

This should expose a few more errors that were previously hidden because
in some calls to raftApply we were ignoring the response return value.

Also handle errors more consistently. In some cases we would log the
error before returning it. This can be very confusing because it can
result in the same error being logged multiple times. Instead return
a wrapped error.
This commit is contained in:
Daniel Nephin 2021-04-08 18:58:15 -04:00
parent ccb2511ff3
commit d2ab767fef
25 changed files with 142 additions and 350 deletions

View File

@ -13,16 +13,17 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/template" "github.com/hashicorp/consul/lib/template"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
uuid "github.com/hashicorp/go-uuid"
) )
const ( const (
@ -250,15 +251,11 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC
req.Token.SetHash(true) req.Token.SetHash(true)
resp, err := a.srv.raftApply(structs.ACLBootstrapRequestType, &req) _, err = a.srv.raftApply(structs.ACLBootstrapRequestType, &req)
if err != nil { if err != nil {
return err return err
} }
if err, ok := resp.(error); ok {
return err
}
if _, token, err := state.ACLTokenGetByAccessor(nil, accessor, structs.DefaultEnterpriseMeta()); err == nil { if _, token, err := state.ACLTokenGetByAccessor(nil, accessor, structs.DefaultEnterpriseMeta()); err == nil {
*reply = *token *reply = *token
} }
@ -729,7 +726,7 @@ func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs.
req.ProhibitUnprivileged = true req.ProhibitUnprivileged = true
} }
resp, err := a.srv.raftApply(structs.ACLTokenSetRequestType, req) _, err = a.srv.raftApply(structs.ACLTokenSetRequestType, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply token write request: %v", err) return fmt.Errorf("Failed to apply token write request: %v", err)
} }
@ -737,10 +734,6 @@ func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs.
// Purge the identity from the cache to prevent using the previous definition of the identity // Purge the identity from the cache to prevent using the previous definition of the identity
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID)) a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
if respErr, ok := resp.(error); ok {
return respErr
}
// Don't check expiration times here as it doesn't really matter. // Don't check expiration times here as it doesn't really matter.
if _, updatedToken, err := a.srv.fsm.State().ACLTokenGetByAccessor(nil, token.AccessorID, nil); err == nil && updatedToken != nil { if _, updatedToken, err := a.srv.fsm.State().ACLTokenGetByAccessor(nil, token.AccessorID, nil); err == nil && updatedToken != nil {
*reply = *updatedToken *reply = *updatedToken
@ -885,7 +878,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er
TokenIDs: []string{args.TokenID}, TokenIDs: []string{args.TokenID},
} }
resp, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, req) _, err = a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply token delete request: %v", err) return fmt.Errorf("Failed to apply token delete request: %v", err)
} }
@ -893,10 +886,6 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er
// Purge the identity from the cache to prevent using the previous definition of the identity // Purge the identity from the cache to prevent using the previous definition of the identity
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID)) a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
if respErr, ok := resp.(error); ok {
return respErr
}
if reply != nil { if reply != nil {
*reply = token.AccessorID *reply = token.AccessorID
} }
@ -1218,7 +1207,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol
Policies: structs.ACLPolicies{policy}, Policies: structs.ACLPolicies{policy},
} }
resp, err := a.srv.raftApply(structs.ACLPolicySetRequestType, req) _, err = a.srv.raftApply(structs.ACLPolicySetRequestType, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply policy upsert request: %v", err) return fmt.Errorf("Failed to apply policy upsert request: %v", err)
} }
@ -1226,10 +1215,6 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol
// Remove from the cache to prevent stale cache usage // Remove from the cache to prevent stale cache usage
a.srv.acls.cache.RemovePolicy(policy.ID) a.srv.acls.cache.RemovePolicy(policy.ID)
if respErr, ok := resp.(error); ok {
return respErr
}
if _, policy, err := a.srv.fsm.State().ACLPolicyGetByID(nil, policy.ID, &policy.EnterpriseMeta); err == nil && policy != nil { if _, policy, err := a.srv.fsm.State().ACLPolicyGetByID(nil, policy.ID, &policy.EnterpriseMeta); err == nil && policy != nil {
*reply = *policy *reply = *policy
} }
@ -1282,17 +1267,13 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string)
PolicyIDs: []string{args.PolicyID}, PolicyIDs: []string{args.PolicyID},
} }
resp, err := a.srv.raftApply(structs.ACLPolicyDeleteRequestType, &req) _, err = a.srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply policy delete request: %v", err) return fmt.Errorf("Failed to apply policy delete request: %v", err)
} }
a.srv.acls.cache.RemovePolicy(policy.ID) a.srv.acls.cache.RemovePolicy(policy.ID)
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = policy.Name *reply = policy.Name
return nil return nil
@ -1687,7 +1668,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e
Roles: structs.ACLRoles{role}, Roles: structs.ACLRoles{role},
} }
resp, err := a.srv.raftApply(structs.ACLRoleSetRequestType, req) _, err = a.srv.raftApply(structs.ACLRoleSetRequestType, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply role upsert request: %v", err) return fmt.Errorf("Failed to apply role upsert request: %v", err)
} }
@ -1695,10 +1676,6 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e
// Remove from the cache to prevent stale cache usage // Remove from the cache to prevent stale cache usage
a.srv.acls.cache.RemoveRole(role.ID) a.srv.acls.cache.RemoveRole(role.ID)
if respErr, ok := resp.(error); ok {
return respErr
}
if _, role, err := a.srv.fsm.State().ACLRoleGetByID(nil, role.ID, &role.EnterpriseMeta); err == nil && role != nil { if _, role, err := a.srv.fsm.State().ACLRoleGetByID(nil, role.ID, &role.EnterpriseMeta); err == nil && role != nil {
*reply = *role *reply = *role
} }
@ -1747,17 +1724,13 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro
RoleIDs: []string{args.RoleID}, RoleIDs: []string{args.RoleID},
} }
resp, err := a.srv.raftApply(structs.ACLRoleDeleteRequestType, &req) _, err = a.srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply role delete request: %v", err) return fmt.Errorf("Failed to apply role delete request: %v", err)
} }
a.srv.acls.cache.RemoveRole(role.ID) a.srv.acls.cache.RemoveRole(role.ID)
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = role.Name *reply = role.Name
return nil return nil
@ -2014,13 +1987,10 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru
BindingRules: structs.ACLBindingRules{rule}, BindingRules: structs.ACLBindingRules{rule},
} }
resp, err := a.srv.raftApply(structs.ACLBindingRuleSetRequestType, req) _, err = a.srv.raftApply(structs.ACLBindingRuleSetRequestType, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply binding rule upsert request: %v", err) return fmt.Errorf("Failed to apply binding rule upsert request: %v", err)
} }
if respErr, ok := resp.(error); ok {
return fmt.Errorf("Failed to apply binding rule upsert request: %v", respErr)
}
if _, rule, err := a.srv.fsm.State().ACLBindingRuleGetByID(nil, rule.ID, &rule.EnterpriseMeta); err == nil && rule != nil { if _, rule, err := a.srv.fsm.State().ACLBindingRuleGetByID(nil, rule.ID, &rule.EnterpriseMeta); err == nil && rule != nil {
*reply = *rule *reply = *rule
@ -2070,15 +2040,11 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply
BindingRuleIDs: []string{args.BindingRuleID}, BindingRuleIDs: []string{args.BindingRuleID},
} }
resp, err := a.srv.raftApply(structs.ACLBindingRuleDeleteRequestType, &req) _, err = a.srv.raftApply(structs.ACLBindingRuleDeleteRequestType, &req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply binding rule delete request: %v", err) return fmt.Errorf("Failed to apply binding rule delete request: %v", err)
} }
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = true *reply = true
return nil return nil
@ -2266,15 +2232,11 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct
AuthMethods: structs.ACLAuthMethods{method}, AuthMethods: structs.ACLAuthMethods{method},
} }
resp, err := a.srv.raftApply(structs.ACLAuthMethodSetRequestType, req) _, err = a.srv.raftApply(structs.ACLAuthMethodSetRequestType, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply auth method upsert request: %v", err) return fmt.Errorf("Failed to apply auth method upsert request: %v", err)
} }
if respErr, ok := resp.(error); ok {
return respErr
}
if _, method, err := a.srv.fsm.State().ACLAuthMethodGetByName(nil, method.Name, &method.EnterpriseMeta); err == nil && method != nil { if _, method, err := a.srv.fsm.State().ACLAuthMethodGetByName(nil, method.Name, &method.EnterpriseMeta); err == nil && method != nil {
*reply = *method *reply = *method
} }
@ -2328,15 +2290,11 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply *
EnterpriseMeta: args.EnterpriseMeta, EnterpriseMeta: args.EnterpriseMeta,
} }
resp, err := a.srv.raftApply(structs.ACLAuthMethodDeleteRequestType, &req) _, err = a.srv.raftApply(structs.ACLAuthMethodDeleteRequestType, &req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply auth method delete request: %v", err) return fmt.Errorf("Failed to apply auth method delete request: %v", err)
} }
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = true *reply = true
return nil return nil
@ -2583,7 +2541,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error {
TokenIDs: []string{token.AccessorID}, TokenIDs: []string{token.AccessorID},
} }
resp, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, req) _, err = a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply token delete request: %v", err) return fmt.Errorf("Failed to apply token delete request: %v", err)
} }
@ -2591,10 +2549,6 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error {
// Purge the identity from the cache to prevent using the previous definition of the identity // Purge the identity from the cache to prevent using the previous definition of the identity
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID)) a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = true *reply = true
return nil return nil

View File

@ -6,11 +6,12 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
) )
var ACLEndpointLegacySummaries = []prometheus.SummaryDefinition{ var ACLEndpointLegacySummaries = []prometheus.SummaryDefinition{
@ -68,9 +69,6 @@ func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) err
return err return err
} }
switch v := resp.(type) { switch v := resp.(type) {
case error:
return v
case *structs.ACL: case *structs.ACL:
*reply = *v *reply = *v
@ -143,11 +141,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro
// Apply the update // Apply the update
resp, err := srv.raftApply(structs.ACLRequestType, args) resp, err := srv.raftApply(structs.ACLRequestType, args)
if err != nil { if err != nil {
srv.logger.Error("Raft apply failed", "acl_op", args.Op, "error", err) return fmt.Errorf("raft apply failed: %w", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
} }
// Check if the return type is a string // Check if the return type is a string

View File

@ -86,14 +86,8 @@ func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error
TokenIDs: batch, TokenIDs: batch,
} }
resp, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req) _, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
func (r *aclTokenReplicator) LenPendingUpdates() int { func (r *aclTokenReplicator) LenPendingUpdates() int {
@ -116,15 +110,8 @@ func (r *aclTokenReplicator) UpdateLocalBatch(ctx context.Context, srv *Server,
FromReplication: true, FromReplication: true,
} }
resp, err := srv.raftApply(structs.ACLTokenSetRequestType, &req) _, err := srv.raftApply(structs.ACLTokenSetRequestType, &req)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
/////////////////////// ///////////////////////
@ -199,14 +186,8 @@ func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) erro
PolicyIDs: batch, PolicyIDs: batch,
} }
resp, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req) _, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
func (r *aclPolicyReplicator) LenPendingUpdates() int { func (r *aclPolicyReplicator) LenPendingUpdates() int {
@ -226,16 +207,8 @@ func (r *aclPolicyReplicator) UpdateLocalBatch(ctx context.Context, srv *Server,
Policies: r.updated[start:end], Policies: r.updated[start:end],
} }
resp, err := srv.raftApply(structs.ACLPolicySetRequestType, &req) _, err := srv.raftApply(structs.ACLPolicySetRequestType, &req)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
//////////////////////////////// ////////////////////////////////
@ -334,16 +307,8 @@ func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error
RoleIDs: batch, RoleIDs: batch,
} }
resp, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req) _, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
func (r *aclRoleReplicator) LenPendingUpdates() int { func (r *aclRoleReplicator) LenPendingUpdates() int {
@ -364,14 +329,6 @@ func (r *aclRoleReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, s
AllowMissingLinks: true, AllowMissingLinks: true,
} }
resp, err := srv.raftApply(structs.ACLRoleSetRequestType, &req) _, err := srv.raftApply(structs.ACLRoleSetRequestType, &req)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }

View File

@ -5,8 +5,9 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/hashicorp/consul/agent/structs"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/structs"
) )
func (s *Server) reapExpiredTokens(ctx context.Context) error { func (s *Server) reapExpiredTokens(ctx context.Context) error {
@ -102,7 +103,7 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
"amount", len(req.TokenIDs), "amount", len(req.TokenIDs),
"locality", locality, "locality", locality,
) )
resp, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req) _, err = s.raftApply(structs.ACLTokenDeleteRequestType, &req)
if err != nil { if err != nil {
return 0, fmt.Errorf("Failed to apply token expiration deletions: %v", err) return 0, fmt.Errorf("Failed to apply token expiration deletions: %v", err)
} }
@ -112,10 +113,6 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
s.acls.cache.RemoveIdentity(tokenSecretCacheID(secretID)) s.acls.cache.RemoveIdentity(tokenSecretCacheID(secretID))
} }
if respErr, ok := resp.(error); ok {
return 0, respErr
}
return len(req.TokenIDs), nil return len(req.TokenIDs), nil
} }

View File

@ -7,15 +7,16 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
) )
var CatalogCounters = []prometheus.CounterDefinition{ var CatalogCounters = []prometheus.CounterDefinition{
@ -210,14 +211,8 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
} }
} }
resp, err := c.srv.raftApply(structs.RegisterRequestType, args) _, err = c.srv.raftApply(structs.RegisterRequestType, args)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
// Deregister is used to remove a service registration for a given node. // Deregister is used to remove a service registration for a given node.
@ -268,10 +263,8 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
} }
if _, err := c.srv.raftApply(structs.DeregisterRequestType, args); err != nil { _, err = c.srv.raftApply(structs.DeregisterRequestType, args)
return err return err
}
return nil
} }
// ListDatacenters is used to query for the list of known datacenters // ListDatacenters is used to query for the list of known datacenters

View File

@ -92,9 +92,6 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error
if err != nil { if err != nil {
return err return err
} }
if respErr, ok := resp.(error); ok {
return respErr
}
if respBool, ok := resp.(bool); ok { if respBool, ok := resp.(bool); ok {
*reply = respBool *reply = respBool
} }
@ -296,14 +293,8 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{})
} }
args.Op = structs.ConfigEntryDelete args.Op = structs.ConfigEntryDelete
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args) _, err = c.srv.raftApply(structs.ConfigEntryRequestType, args)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
// ResolveServiceConfig // ResolveServiceConfig

View File

@ -7,8 +7,9 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/structs"
) )
func configSort(configs []structs.ConfigEntry) { func configSort(configs []structs.ConfigEntry) {
@ -97,15 +98,11 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con
Entry: entry, Entry: entry,
} }
resp, err := s.raftApply(structs.ConfigEntryRequestType, &req) _, err := s.raftApply(structs.ConfigEntryRequestType, &req)
if err != nil { if err != nil {
return false, fmt.Errorf("Failed to apply config %s: %v", op, err) return false, fmt.Errorf("Failed to apply config %s: %v", op, err)
} }
if respErr, ok := resp.(error); ok {
return false, fmt.Errorf("Failed to apply config %s: %v", op, respErr)
}
if i < len(configs)-1 { if i < len(configs)-1 {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -16,13 +16,5 @@ func (c *consulCADelegate) State() *state.Store {
} }
func (c *consulCADelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) { func (c *consulCADelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) {
resp, err := c.srv.raftApply(structs.ConnectCARequestType, req) return c.srv.raftApply(structs.ConnectCARequestType, req)
if err != nil {
return nil, err
}
if respErr, ok := resp.(error); ok {
return nil, respErr
}
return resp, nil
} }

View File

@ -6,13 +6,14 @@ import (
"sync" "sync"
"time" "time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
) )
// Coordinate manages queries and updates for network coordinates. // Coordinate manages queries and updates for network coordinates.
@ -105,13 +106,10 @@ func (c *Coordinate) batchApplyUpdates() error {
t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag
slice := updates[start:end] slice := updates[start:end]
resp, err := c.srv.raftApply(t, slice) _, err := c.srv.raftApply(t, slice)
if err != nil { if err != nil {
return err return err
} }
if respErr, ok := resp.(error); ok {
return respErr
}
} }
return nil return nil
} }

View File

@ -7,10 +7,11 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
) )
var FederationStateSummaries = []prometheus.SummaryDefinition{ var FederationStateSummaries = []prometheus.SummaryDefinition{
@ -85,9 +86,6 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo
if err != nil { if err != nil {
return err return err
} }
if respErr, ok := resp.(error); ok {
return respErr
}
if respBool, ok := resp.(bool); ok { if respBool, ok := resp.(bool); ok {
*reply = respBool *reply = respBool
} }

View File

@ -154,15 +154,11 @@ func (r *FederationStateReplicator) PerformDeletions(ctx context.Context, deleti
State: state, State: state,
} }
resp, err := r.srv.raftApply(structs.FederationStateRequestType, &req) _, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
if err != nil { if err != nil {
return false, err return false, err
} }
if respErr, ok := resp.(error); ok {
return false, respErr
}
if i < len(deletions)-1 { if i < len(deletions)-1 {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -199,15 +195,11 @@ func (r *FederationStateReplicator) PerformUpdates(ctx context.Context, updatesR
State: state2, State: state2,
} }
resp, err := r.srv.raftApply(structs.FederationStateRequestType, &req) _, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
if err != nil { if err != nil {
return false, err return false, err
} }
if respErr, ok := resp.(error); ok {
return false, respErr
}
if i < len(updates)-1 { if i < len(updates)-1 {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -7,13 +7,14 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
) )
var IntentionSummaries = []prometheus.SummaryDefinition{ var IntentionSummaries = []prometheus.SummaryDefinition{
@ -157,15 +158,8 @@ func (s *Intention) Apply(args *structs.IntentionRequest, reply *string) error {
args.Mutation = mut args.Mutation = mut
args.Intention = nil args.Intention = nil
resp, err := s.srv.raftApply(structs.IntentionRequestType, args) _, err = s.srv.raftApply(structs.IntentionRequestType, args)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
func (s *Intention) computeApplyChangesLegacyCreate( func (s *Intention) computeApplyChangesLegacyCreate(

View File

@ -7,12 +7,13 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
) )
var KVSummaries = []prometheus.SummaryDefinition{ var KVSummaries = []prometheus.SummaryDefinition{
@ -122,11 +123,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
// Apply the update. // Apply the update.
resp, err := k.srv.raftApply(structs.KVSRequestType, args) resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil { if err != nil {
k.logger.Error("Raft apply failed", "error", err) return fmt.Errorf("raft apply failed: %w", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
} }
// Check if the return type is a bool. // Check if the return type is a bool.

View File

@ -12,13 +12,6 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
@ -26,6 +19,14 @@ import (
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/types"
) )
var LeaderSummaries = []prometheus.SummaryDefinition{ var LeaderSummaries = []prometheus.SummaryDefinition{
@ -478,9 +479,6 @@ func (s *Server) initializeLegacyACL() error {
return fmt.Errorf("failed to initialize ACL bootstrap: %v", err) return fmt.Errorf("failed to initialize ACL bootstrap: %v", err)
} }
switch v := resp.(type) { switch v := resp.(type) {
case error:
return fmt.Errorf("failed to initialize ACL bootstrap: %v", v)
case bool: case bool:
if v { if v {
s.logger.Info("ACL bootstrap enabled") s.logger.Info("ACL bootstrap enabled")
@ -766,14 +764,10 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true} req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true}
resp, err := s.raftApply(structs.ACLTokenSetRequestType, req) _, err = s.raftApply(structs.ACLTokenSetRequestType, req)
if err != nil { if err != nil {
s.logger.Error("failed to apply acl token upgrade batch", "error", err) s.logger.Error("failed to apply acl token upgrade batch", "error", err)
} }
if err, ok := resp.(error); ok {
s.logger.Error("failed to apply acl token upgrade batch", "error", err)
}
} }
} }
@ -1088,12 +1082,7 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
Entry: entry, Entry: entry,
} }
resp, err := s.raftApply(structs.ConfigEntryRequestType, &req) _, err := s.raftApply(structs.ConfigEntryRequestType, &req)
if err == nil {
if respErr, ok := resp.(error); ok {
err = respErr
}
}
if err != nil { if err != nil {
return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err) return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err)
} }

View File

@ -135,15 +135,8 @@ func (s *Server) pruneCARoots() error {
args.Op = structs.CAOpSetRoots args.Op = structs.CAOpSetRoots
args.Index = idx args.Index = idx
args.Roots = newRoots args.Roots = newRoots
resp, err := s.raftApply(structs.ConnectCARequestType, args) _, err = s.raftApply(structs.ConnectCARequestType, args)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
// retryLoopBackoff loops a given function indefinitely, backing off exponentially // retryLoopBackoff loops a given function indefinitely, backing off exponentially

View File

@ -628,15 +628,12 @@ func TestLeader_Vault_PrimaryCA_FixSigningKeyID_OnRestart(t *testing.T) {
activePrimaryRoot.SigningKeyID = primaryRootSigningKeyID activePrimaryRoot.SigningKeyID = primaryRootSigningKeyID
// Store the root cert in raft // Store the root cert in raft
resp, err := s1pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{ _, err = s1pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
Op: structs.CAOpSetRoots, Op: structs.CAOpSetRoots,
Index: idx, Index: idx,
Roots: []*structs.CARoot{activePrimaryRoot}, Roots: []*structs.CARoot{activePrimaryRoot},
}) })
require.NoError(t, err) require.NoError(t, err)
if respErr, ok := resp.(error); ok {
t.Fatalf("respErr: %v", respErr)
}
} }
// Shutdown s1pre and restart it to trigger the secondary CA init to correct // Shutdown s1pre and restart it to trigger the secondary CA init to correct
@ -731,15 +728,12 @@ func TestLeader_SecondaryCA_FixSigningKeyID_via_IntermediateRefresh(t *testing.T
activeSecondaryRoot.SigningKeyID = secondaryRootSigningKeyID activeSecondaryRoot.SigningKeyID = secondaryRootSigningKeyID
// Store the root cert in raft // Store the root cert in raft
resp, err := s2pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{ _, err = s2pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
Op: structs.CAOpSetRoots, Op: structs.CAOpSetRoots,
Index: idx, Index: idx,
Roots: []*structs.CARoot{activeSecondaryRoot}, Roots: []*structs.CARoot{activeSecondaryRoot},
}) })
require.NoError(t, err) require.NoError(t, err)
if respErr, ok := resp.(error); ok {
t.Fatalf("respErr: %v", respErr)
}
} }
// Shutdown s2pre and restart it to trigger the secondary CA init to correct // Shutdown s2pre and restart it to trigger the secondary CA init to correct

View File

@ -5,9 +5,10 @@ import (
"fmt" "fmt"
"time" "time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
) )
const ( const (
@ -117,13 +118,10 @@ func (s *Server) updateOurFederationState(curr *structs.FederationState) error {
if s.config.Datacenter == s.config.PrimaryDatacenter { if s.config.Datacenter == s.config.PrimaryDatacenter {
// We are the primary, so we can't do an RPC as we don't have a replication token. // We are the primary, so we can't do an RPC as we don't have a replication token.
resp, err := s.raftApply(structs.FederationStateRequestType, args) _, err := s.raftApply(structs.FederationStateRequestType, args)
if err != nil { if err != nil {
return err return err
} }
if respErr, ok := resp.(error); ok {
return respErr
}
} else { } else {
args.WriteRequest = structs.WriteRequest{ args.WriteRequest = structs.WriteRequest{
Token: s.tokens.ReplicationToken(), Token: s.tokens.ReplicationToken(),
@ -225,13 +223,10 @@ func (s *Server) pruneStaleFederationStates() error {
Datacenter: dc, Datacenter: dc,
}, },
} }
resp, err := s.raftApply(structs.FederationStateRequestType, &req) _, err := s.raftApply(structs.FederationStateRequestType, &req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to delete federation state %s: %v", dc, err) return fmt.Errorf("Failed to delete federation state %s: %v", dc, err)
} }
if respErr, ok := resp.(error); ok {
return fmt.Errorf("Failed to delete federation state %s: %v", dc, respErr)
}
} }
return nil return nil

View File

@ -164,10 +164,8 @@ func (s *Server) legacyIntentionsMigrationCleanupPhase(quiet bool) error {
req := structs.IntentionRequest{ req := structs.IntentionRequest{
Op: structs.IntentionOpDeleteAll, Op: structs.IntentionOpDeleteAll,
} }
if resp, err := s.raftApply(structs.IntentionRequestType, req); err != nil { if _, err := s.raftApply(structs.IntentionRequestType, req); err != nil {
return err return err
} else if respErr, ok := resp.(error); ok {
return respErr
} }
// Bypass the serf component and jump right to the final state. // Bypass the serf component and jump right to the final state.
@ -410,9 +408,6 @@ func (s *Server) replicateLegacyIntentionsOnce(ctx context.Context, lastFetchInd
if err != nil { if err != nil {
return 0, false, err return 0, false, err
} }
if respErr, ok := resp.(error); ok {
return 0, false, respErr
}
if txnResp, ok := resp.(structs.TxnResponse); ok { if txnResp, ok := resp.(structs.TxnResponse); ok {
if len(txnResp.Errors) > 0 { if len(txnResp.Errors) > 0 {

View File

@ -108,14 +108,8 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
if req.Op != structs.IntentionOpDelete { if req.Op != structs.IntentionOpDelete {
req2.Intention.Hash = req.Intention.Hash // not part of Clone req2.Intention.Hash = req.Intention.Hash // not part of Clone
} }
resp, err := s.raftApply(structs.IntentionRequestType, req2) _, err := s.raftApply(structs.IntentionRequestType, req2)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
// Directly insert legacy intentions into raft in dc1. // Directly insert legacy intentions into raft in dc1.
@ -442,14 +436,11 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
var retained []*structs.Intention var retained []*structs.Intention
for _, ixn := range ixns { for _, ixn := range ixns {
ixn2 := *ixn ixn2 := *ixn
resp, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{ _, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{
Op: structs.IntentionOpCreate, Op: structs.IntentionOpCreate,
Intention: &ixn2, Intention: &ixn2,
}) })
require.NoError(t, err) require.NoError(t, err)
if respErr, ok := resp.(error); ok {
t.Fatalf("respErr: %v", respErr)
}
if _, present := ixn.Meta["unit-test-discarded"]; !present { if _, present := ixn.Meta["unit-test-discarded"]; !present {
retained = append(retained, ixn) retained = append(retained, ixn)

View File

@ -3,10 +3,11 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
autopilot "github.com/hashicorp/raft-autopilot" autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
) )
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. // AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
@ -62,11 +63,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
// Apply the update // Apply the update
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args) resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
if err != nil { if err != nil {
op.logger.Error("Raft apply failed", "error", err) return fmt.Errorf("raft apply failed: %w", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
} }
// Check if the return type is a bool. // Check if the return type is a bool.

View File

@ -7,13 +7,14 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
) )
var PreparedQuerySummaries = []prometheus.SummaryDefinition{ var PreparedQuerySummaries = []prometheus.SummaryDefinition{
@ -128,15 +129,10 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
} }
// Commit the query to the state store. // Commit the query to the state store.
resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args) _, err = p.srv.raftApply(structs.PreparedQueryRequestType, args)
if err != nil { if err != nil {
p.logger.Error("Raft apply failed", "error", err) return fmt.Errorf("raft apply failed: %w", err)
return err
} }
if respErr, ok := resp.(error); ok {
return respErr
}
return nil return nil
} }

View File

@ -14,14 +14,6 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/wanfed"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
connlimit "github.com/hashicorp/go-connlimit" connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
@ -30,6 +22,15 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/yamux" "github.com/hashicorp/yamux"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/wanfed"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
) )
var RPCCounters = []prometheus.CounterDefinition{ var RPCCounters = []prometheus.CounterDefinition{
@ -729,28 +730,34 @@ func (s *Server) keyringRPCs(method string, args interface{}, dcs []string) (*st
type raftEncoder func(structs.MessageType, interface{}) ([]byte, error) type raftEncoder func(structs.MessageType, interface{}) ([]byte, error)
// raftApply is used to encode a message, run it through raft, and return // raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
// the FSM response along with any errors // raftApplyWithEncoder.
// Deprecated: use raftApplyMsgpack
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) { func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
return s.raftApplyMsgpack(t, msg) return s.raftApplyMsgpack(t, msg)
} }
// raftApplyMsgpack will msgpack encode the request and then run it through raft, // raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
// then return the FSM response along with any errors. // raftApplyWithEncoder.
func (s *Server) raftApplyMsgpack(t structs.MessageType, msg interface{}) (interface{}, error) { func (s *Server) raftApplyMsgpack(t structs.MessageType, msg interface{}) (interface{}, error) {
return s.raftApplyWithEncoder(t, msg, structs.Encode) return s.raftApplyWithEncoder(t, msg, structs.Encode)
} }
// raftApplyProtobuf will protobuf encode the request and then run it through raft, // raftApplyProtobuf encodes the msg using protobuf and calls raft.Apply. See
// then return the FSM response along with any errors. // raftApplyWithEncoder.
func (s *Server) raftApplyProtobuf(t structs.MessageType, msg interface{}) (interface{}, error) { func (s *Server) raftApplyProtobuf(t structs.MessageType, msg interface{}) (interface{}, error) {
return s.raftApplyWithEncoder(t, msg, structs.EncodeProtoInterface) return s.raftApplyWithEncoder(t, msg, structs.EncodeProtoInterface)
} }
// raftApplyWithEncoder is used to encode a message, run it through raft, // raftApplyWithEncoder encodes a message, and then calls raft.Apply with the
// and return the FSM response along with any errors. Unlike raftApply this // encoded message. Returns the FSM response along with any errors. If the
// takes the encoder to use as an argument. // FSM.Apply response is an error it will be returned as the error return
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) { // value with a nil response.
func (s *Server) raftApplyWithEncoder(
t structs.MessageType,
msg interface{},
encoder raftEncoder,
) (response interface{}, err error) {
if encoder == nil { if encoder == nil {
return nil, fmt.Errorf("Failed to encode request: nil encoder") return nil, fmt.Errorf("Failed to encode request: nil encoder")
} }
@ -789,17 +796,19 @@ func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, en
// apply function. Downstream client code expects to see any error // apply function. Downstream client code expects to see any error
// from the FSM (as opposed to the apply itself) and decide whether // from the FSM (as opposed to the apply itself) and decide whether
// it can retry in the future's response. // it can retry in the future's response.
return ErrChunkingResubmit, nil return nil, ErrChunkingResubmit
} }
// We expect that this conversion should always work // We expect that this conversion should always work
chunkedSuccess, ok := resp.(raftchunking.ChunkingSuccess) chunkedSuccess, ok := resp.(raftchunking.ChunkingSuccess)
if !ok { if !ok {
return nil, errors.New("unknown type of response back from chunking FSM") return nil, errors.New("unknown type of response back from chunking FSM")
} }
// Return the inner wrapped response resp = chunkedSuccess.Response
return chunkedSuccess.Response, nil
} }
if err, ok := resp.(error); ok {
return nil, err
}
return resp, nil return resp, nil
} }

View File

@ -6,12 +6,13 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
) )
var SessionEndpointSummaries = []prometheus.SummaryDefinition{ var SessionEndpointSummaries = []prometheus.SummaryDefinition{
@ -147,8 +148,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
// Apply the update // Apply the update
resp, err := s.srv.raftApply(structs.SessionRequestType, args) resp, err := s.srv.raftApply(structs.SessionRequestType, args)
if err != nil { if err != nil {
s.logger.Error("Apply failed", "error", err) return fmt.Errorf("apply failed: %w", err)
return err
} }
if args.Op == structs.SessionCreate && args.Session.TTL != "" { if args.Op == structs.SessionCreate && args.Session.TTL != "" {
@ -160,10 +160,6 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
s.srv.clearSessionTimer(args.Session.ID) s.srv.clearSessionTimer(args.Session.ID)
} }
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a string // Check if the return type is a string
if respString, ok := resp.(string); ok { if respString, ok := resp.(string); ok {
*reply = respString *reply = respString

View File

@ -22,15 +22,8 @@ func (s *Server) setSystemMetadataKey(key, val string) error {
Entry: &structs.SystemMetadataEntry{Key: key, Value: val}, Entry: &structs.SystemMetadataEntry{Key: key, Value: val},
} }
resp, err := s.raftApply(structs.SystemMetadataRequestType, args) _, err := s.raftApply(structs.SystemMetadataRequestType, args)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }
func (s *Server) deleteSystemMetadataKey(key string) error { func (s *Server) deleteSystemMetadataKey(key string) error {
@ -39,13 +32,6 @@ func (s *Server) deleteSystemMetadataKey(key string) error {
Entry: &structs.SystemMetadataEntry{Key: key}, Entry: &structs.SystemMetadataEntry{Key: key},
} }
resp, err := s.raftApply(structs.SystemMetadataRequestType, args) _, err := s.raftApply(structs.SystemMetadataRequestType, args)
if err != nil { return err
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
} }

View File

@ -6,10 +6,11 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
) )
var TxnSummaries = []prometheus.SummaryDefinition{ var TxnSummaries = []prometheus.SummaryDefinition{
@ -138,11 +139,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
// Apply the update. // Apply the update.
resp, err := t.srv.raftApply(structs.TxnRequestType, args) resp, err := t.srv.raftApply(structs.TxnRequestType, args)
if err != nil { if err != nil {
t.logger.Error("Raft apply failed", "error", err) return fmt.Errorf("raft apply failed: %w", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
} }
// Convert the return type. This should be a cheap copy since we are // Convert the return type. This should be a cheap copy since we are