From a8d08e759f88df08ea0f02bab9a913ee7249b1a8 Mon Sep 17 00:00:00 2001 From: Michael Zalimeni Date: Thu, 11 Apr 2024 17:08:44 -0400 Subject: [PATCH] fix: consume ignored entries in CE downgrade via Ent snapshot (#20977) This operation would previously fail due to unconsumed bytes in the decoder buffer when reading the Ent snapshot (the first byte of the record would be misinterpreted as a type indicator, and the remaining bytes would fail to be deserialized or read as invalid data). Ensure restore succeeds by decoding the ignored record as an interface{}, which will consume the record bytes without requiring a concrete target struct, then moving on to the next record. --- .changelog/20977.txt | 3 + agent/consul/fsm/fsm.go | 7 +- agent/consul/fsm/snapshot_ce_test.go | 99 ++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 3 deletions(-) create mode 100644 .changelog/20977.txt diff --git a/.changelog/20977.txt b/.changelog/20977.txt new file mode 100644 index 0000000000..06cfb68c0f --- /dev/null +++ b/.changelog/20977.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: fix Ent snapshot restore on CE when CE downgrade is enabled +``` diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 7449858af2..f4e2e94fb6 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -196,7 +196,7 @@ func (c *FSM) Apply(log *raft.Log) interface{} { return nil } if structs.CEDowngrade && msgType >= 64 { - c.logger.Warn("ignoring enterprise message, for downgrading to oss", "type", msgType) + c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msgType) return nil } panic(fmt.Errorf("failed to apply request: %#v", buf)) @@ -268,8 +268,9 @@ func (c *FSM) Restore(old io.ReadCloser) error { } default: if structs.CEDowngrade && msg >= 64 { - c.logger.Warn("ignoring enterprise message , for downgrading to oss", "type", msg) - return nil + c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msg) + var ignore interface{} + return dec.Decode(&ignore) } else if msg >= 64 { return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul CE cannot restore it", msg) } else { diff --git a/agent/consul/fsm/snapshot_ce_test.go b/agent/consul/fsm/snapshot_ce_test.go index 70ab2000fa..01d4e23079 100644 --- a/agent/consul/fsm/snapshot_ce_test.go +++ b/agent/consul/fsm/snapshot_ce_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/stringslice" "github.com/hashicorp/consul/sdk/testutil" ) @@ -60,3 +61,101 @@ func TestRestoreFromEnterprise(t *testing.T) { require.EqualError(t, fsm.Restore(sink), "msg type <65> is a Consul Enterprise log entry. Consul CE cannot restore it") sink.Cancel() } + +func TestRestoreFromEnterprise_CEDowngrade(t *testing.T) { + logger := testutil.Logger(t) + + handle := &testRaftHandle{} + storageBackend := newStorageBackend(t, handle) + handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil } + + fsm := NewFromDeps(Deps{ + Logger: logger, + NewStateStore: func() *state.Store { + return state.NewStateStore(nil) + }, + StorageBackend: storageBackend, + }) + + // To verify if a proper message is displayed when Consul CE tries to + // unsuccessfully restore entries from a Consul Ent snapshot. + buf := bytes.NewBuffer(nil) + sink := &MockSink{buf, false} + + type EntMock struct { + ID int + Type string + } + + entMockEntry := EntMock{ + ID: 65, + Type: "A Consul Ent Log Type", + } + + // Create one entry to exercise the Go struct marshaller, and one to exercise the + // Binary Marshaller interface. This verifies that regardless of whether the struct gets + // encoded as a msgpack byte string (binary marshaller) or msgpack map (other struct), + // it will still be skipped over correctly. + registerEntry := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"primary"}, + Port: 8000, + }, + } + proxyDefaultsEntry := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Entry: &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "foo": "bar", + }, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + } + + // Write the header and records. + header := SnapshotHeader{ + LastIndex: 0, + } + encoder := codec.NewEncoder(sink, structs.MsgpackHandle) + encoder.Encode(&header) + sink.Write([]byte{byte(structs.MessageType(entMockEntry.ID))}) + encoder.Encode(entMockEntry) + sink.Write([]byte{byte(structs.RegisterRequestType)}) + encoder.Encode(registerEntry) + sink.Write([]byte{byte(structs.ConfigEntryRequestType)}) + encoder.Encode(proxyDefaultsEntry) + + defer func() { + structs.CEDowngrade = false + }() + structs.CEDowngrade = true + + require.NoError(t, fsm.Restore(sink), "failed to decode Ent snapshot to CE") + + // Verify the register request + _, nodes, err := fsm.state.Nodes(nil, nil, "") + require.NoError(t, err) + require.Len(t, nodes, 1, "incorrect number of nodes: %v", nodes) + require.Equal(t, "foo", nodes[0].Node) + require.Equal(t, "dc1", nodes[0].Datacenter) + require.Equal(t, "127.0.0.1", nodes[0].Address) + _, fooSrv, err := fsm.state.NodeServices(nil, "foo", nil, "") + require.NoError(t, err) + require.Len(t, fooSrv.Services, 1) + require.Contains(t, fooSrv.Services["db"].Tags, "primary") + require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary")) + require.Equal(t, 8000, fooSrv.Services["db"].Port) + + // Verify the proxy defaults request + _, configEntry, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global", structs.DefaultEnterpriseMetaInDefaultPartition()) + require.NoError(t, err) + configEntry.SetHash(proxyDefaultsEntry.Entry.GetHash()) + require.Equal(t, proxyDefaultsEntry.Entry, configEntry) +}