added ent to ce downgrade changes (#19311)

* added ent to ce downgrade changes

* added changelog

* added busl headers
This commit is contained in:
aahel 2023-10-20 22:34:25 +05:30 committed by GitHub
parent b1871fd08c
commit 1280f45485
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1242 additions and 22 deletions

3
.changelog/19311.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
raft: Fix panic during downgrade from enterprise to oss.
```

View File

@ -4,6 +4,7 @@
package fsm
import (
"errors"
"fmt"
"time"
@ -152,7 +153,11 @@ func init() {
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeRegistrationReq(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted register request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
@ -167,7 +172,11 @@ func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeDeregistrationReq(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted deregister request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
@ -195,7 +204,11 @@ func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {
var req structs.KVSRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeKVSRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted KV request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(),
@ -240,7 +253,11 @@ func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {
func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeSessionRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted session request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(),
@ -299,7 +316,11 @@ func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
// state store.
func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
var req structs.PreparedQueryRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodePreparedQueryRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted prepared query request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
@ -318,7 +339,7 @@ func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{}
func (c *FSM) applyTxn(buf []byte, index uint64) interface{} {
var req structs.TxnRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeTxnRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now())
@ -485,7 +506,7 @@ func (c *FSM) applyConnectCALeafOperation(buf []byte, index uint64) interface{}
func (c *FSM) applyACLTokenSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLTokenBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLTokenBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(),
@ -523,7 +544,7 @@ func (c *FSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} {
func (c *FSM) applyACLPolicySetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLPolicyBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLPolicyBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(),
@ -544,10 +565,12 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
}
func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {
req := structs.ConfigEntryRequest{
Entry: &structs.ProxyConfigEntry{},
}
if err := structs.Decode(buf, &req); err != nil {
req := structs.ConfigEntryRequest{}
if err := decodeConfigEntryOperationRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted config entry request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
@ -594,7 +617,7 @@ func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {
func (c *FSM) applyACLRoleSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLRoleBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLRoleBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "role"}, time.Now(),
@ -616,7 +639,7 @@ func (c *FSM) applyACLRoleDeleteOperation(buf []byte, index uint64) interface{}
func (c *FSM) applyACLBindingRuleSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLBindingRuleBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLBindingRuleBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "bindingrule"}, time.Now(),
@ -638,7 +661,7 @@ func (c *FSM) applyACLBindingRuleDeleteOperation(buf []byte, index uint64) inter
func (c *FSM) applyACLAuthMethodSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLAuthMethodBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLAuthMethodBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "authmethod"}, time.Now(),
@ -649,7 +672,11 @@ func (c *FSM) applyACLAuthMethodSetOperation(buf []byte, index uint64) interface
func (c *FSM) applyACLAuthMethodDeleteOperation(buf []byte, index uint64) interface{} {
var req structs.ACLAuthMethodBatchDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLAuthMethodBatchDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted acl auth method delete request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "authmethod"}, time.Now(),
@ -706,7 +733,11 @@ func (c *FSM) applySystemMetadataOperation(buf []byte, index uint64) interface{}
func (c *FSM) applyPeeringWrite(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringWriteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringWriteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering write request")
return nil
}
panic(fmt.Errorf("failed to decode peering write request: %v", err))
}
@ -718,7 +749,11 @@ func (c *FSM) applyPeeringWrite(buf []byte, index uint64) interface{} {
func (c *FSM) applyPeeringDelete(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringDeleteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering delete request")
return nil
}
panic(fmt.Errorf("failed to decode peering delete request: %v", err))
}
@ -758,7 +793,11 @@ func (c *FSM) applyPeeringTerminate(buf []byte, index uint64) interface{} {
func (c *FSM) applyPeeringTrustBundleWrite(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringTrustBundleWriteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringTrustBundleWriteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering trust bundle write request")
return nil
}
panic(fmt.Errorf("failed to decode peering trust bundle write request: %v", err))
}
@ -770,7 +809,11 @@ func (c *FSM) applyPeeringTrustBundleWrite(buf []byte, index uint64) interface{}
func (c *FSM) applyPeeringTrustBundleDelete(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringTrustBundleDeleteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringTrustBundleDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering trust bundle delete request")
return nil
}
panic(fmt.Errorf("failed to decode peering trust bundle delete request: %v", err))
}
@ -790,7 +833,11 @@ func (f *FSM) applyResourceOperation(buf []byte, idx uint64) any {
func (c *FSM) applyManualVirtualIPs(buf []byte, index uint64) interface{} {
var req state.ServiceVirtualIP
if err := structs.Decode(buf, &req); err != nil {
if err := decodeServiceVirtualIPRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted virtual ip request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

View File

@ -0,0 +1,145 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
//go:build !consulent
// +build !consulent
package fsm
import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
)
func decodeRegistrationReq(buf []byte, req *structs.RegisterRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeRegistration(buf, req)
}
func decodeDeregistrationReq(buf []byte, req *structs.DeregisterRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeDeregistration(buf, req)
}
func decodeKVSRequest(buf []byte, req *structs.KVSRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeKVS(buf, req)
}
func decodeSessionRequest(buf []byte, req *structs.SessionRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeSession(buf, req)
}
func decodePreparedQueryRequest(buf []byte, req *structs.PreparedQueryRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodePreparedQuery(buf, req)
}
func decodeTxnRequest(buf []byte, req *structs.TxnRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeTxn(buf, req)
}
func decodeACLTokenBatchSetRequest(buf []byte, req *structs.ACLTokenBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLTokenBatchSet(buf, req)
}
func decodeACLPolicyBatchSetRequest(buf []byte, req *structs.ACLPolicyBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLPolicyBatchSet(buf, req)
}
func decodeACLRoleBatchSetRequest(buf []byte, req *structs.ACLRoleBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLRoleBatchSet(buf, req)
}
func decodeACLBindingRuleBatchSetRequest(buf []byte, req *structs.ACLBindingRuleBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLBindingRuleBatchSet(buf, req)
}
func decodeACLAuthMethodBatchSetRequest(buf []byte, req *structs.ACLAuthMethodBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLAuthMethodBatchSet(buf, req)
}
func decodeACLAuthMethodBatchDeleteRequest(buf []byte, req *structs.ACLAuthMethodBatchDeleteRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLAuthMethodBatchDelete(buf, req)
}
func decodeServiceVirtualIPRequest(buf []byte, req *state.ServiceVirtualIP) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeServiceVirtualIP(buf, req)
}
func decodePeeringWriteRequest(buf []byte, req *pbpeering.PeeringWriteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringWrite(buf, req)
}
func decodePeeringDeleteRequest(buf []byte, req *pbpeering.PeeringDeleteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringDelete(buf, req)
}
func decodePeeringTrustBundleWriteRequest(buf []byte, req *pbpeering.PeeringTrustBundleWriteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringTrustBundleWrite(buf, req)
}
func decodePeeringTrustBundleDeleteRequest(buf []byte, req *pbpeering.PeeringTrustBundleDeleteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringTrustBundleDelete(buf, req)
}
func decodeConfigEntryOperationRequest(buf []byte, req *structs.ConfigEntryRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeConfigEntryOperation(buf, req)
}

File diff suppressed because it is too large Load Diff

View File

@ -195,6 +195,10 @@ func (c *FSM) Apply(log *raft.Log) interface{} {
c.logger.Warn("ignoring unknown message type, upgrade to newer version", "type", msgType)
return nil
}
if structs.CEDowngrade && msgType >= 64 {
c.logger.Warn("ignoring enterprise message, for downgrading to oss", "type", msgType)
return nil
}
panic(fmt.Errorf("failed to apply request: %#v", buf))
}
@ -263,7 +267,10 @@ func (c *FSM) Restore(old io.ReadCloser) error {
return err
}
default:
if msg >= 64 {
if structs.CEDowngrade && msg >= 64 {
c.logger.Warn("ignoring enterprise message , for downgrading to oss", "type", msg)
return nil
} else if msg >= 64 {
return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul CE cannot restore it", msg)
} else {
return fmt.Errorf("Unrecognized msg type %d", msg)

View File

@ -202,6 +202,9 @@ func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, req *pbpeering.SecretsWriteR
return fmt.Errorf("failed to read peering by id: %w", err)
}
if peering == nil {
if structs.CEDowngrade {
return nil
}
return fmt.Errorf("unknown peering %q for secret", req.PeerID)
}

View File

@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"os"
"reflect"
"regexp"
"sort"
@ -227,6 +228,9 @@ const (
var allowedConsulMetaKeysForMeshGateway = map[string]struct{}{MetaWANFederationKey: {}}
// CEDowngrade indicates if we are in downgrading from ent to ce
var CEDowngrade = os.Getenv("CONSUL_ENTERPRISE_DOWNGRADE_TO_CE") == "true"
var (
NodeMaintCheckID = NewCheckID(NodeMaint, nil)
)