From d92577c16b2e0d5df3d18779508dbde87c38e35a Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 20 Mar 2019 16:13:13 -0700 Subject: [PATCH] Fix fsm serialization and add snapshot/restore --- agent/consul/fsm/commands_oss.go | 30 +++++++++++++--- agent/consul/fsm/commands_oss_test.go | 4 ++- agent/consul/fsm/snapshot_oss.go | 49 +++++++++++++++++++++++++++ agent/consul/fsm/snapshot_oss_test.go | 22 ++++++++++++ agent/consul/state/config_entry.go | 25 ++++++++++++-- agent/structs/config_entry.go | 32 ++++++++++++----- agent/structs/structs.go | 48 +++++++++++++------------- 7 files changed, 171 insertions(+), 39 deletions(-) diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index ac503fd5b1..9061cc6960 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -29,7 +29,8 @@ func init() { registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) - registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation) + registerCommand(structs.ServiceConfigEntryRequestType, (*FSM).applyServiceConfigEntryOperation) + registerCommand(structs.ProxyConfigEntryRequestType, (*FSM).applyProxyConfigEntryOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -430,18 +431,37 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{ return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) } -func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} { - var req structs.ConfigEntryRequest +func (c *FSM) applyServiceConfigEntryOperation(buf []byte, index uint64) interface{} { + req := structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{}, + } if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + return c.applyConfigEntryOperation(index, req) +} + +func (c *FSM) applyProxyConfigEntryOperation(buf []byte, index uint64) interface{} { + req := structs.ConfigEntryRequest{ + Entry: &structs.ProxyConfigEntry{}, + } + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + if err := c.applyConfigEntryOperation(index, req); err != nil { + panic(err) + } + return true +} + +func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryRequest) error { switch req.Op { case structs.ConfigEntryUpsert: - defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), []metrics.Label{{Name: "op", Value: "upsert"}}) return c.state.EnsureConfigEntry(index, req.Entry) case structs.ConfigEntryDelete: - defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), []metrics.Label{{Name: "op", Value: "delete"}}) return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName()) default: diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index d8a98644e0..67b70aa053 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -1379,7 +1379,7 @@ func TestFSM_ConfigEntry(t *testing.T) { } { - buf, err := structs.Encode(structs.ConfigEntryRequestType, req) + buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req) assert.Nil(err) assert.True(fsm.Apply(makeLog(buf)).(bool)) } @@ -1388,6 +1388,8 @@ func TestFSM_ConfigEntry(t *testing.T) { { _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") assert.Nil(err) + entry.RaftIndex.CreateIndex = 1 + entry.RaftIndex.ModifyIndex = 1 assert.Equal(entry, config) } } diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 00adfa9869..02be4c17e3 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -1,6 +1,8 @@ package fsm import ( + "fmt" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" @@ -27,6 +29,7 @@ func init() { registerRestorer(structs.IndexRequestType, restoreIndex) registerRestorer(structs.ACLTokenSetRequestType, restoreToken) registerRestorer(structs.ACLPolicySetRequestType, restorePolicy) + registerRestorer(structs.ConfigEntryRequestType, restoreConfigEntry) } func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error { @@ -63,6 +66,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err if err := s.persistConnectCAConfig(sink, encoder); err != nil { return err } + if err := s.persistConfigEntries(sink, encoder); err != nil { + return err + } if err := s.persistIndex(sink, encoder); err != nil { return err } @@ -360,6 +366,27 @@ func (s *snapshot) persistIntentions(sink raft.SnapshotSink, return nil } +func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + entries, err := s.state.ConfigEntries() + if err != nil { + return err + } + + for _, entry := range entries { + if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil { + return err + } + if err := encoder.Encode(entry.GetKind()); err != nil { + return err + } + if err := encoder.Encode(entry); err != nil { + return err + } + } + return nil +} + func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the indexes iter, err := s.state.Indexes() @@ -565,3 +592,25 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code } return restore.ACLPolicy(&req) } + +func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.ConfigEntry + var kind string + if err := decoder.Decode(&kind); err != nil { + return err + } + + switch kind { + case structs.ServiceDefaults: + req = &structs.ServiceConfigEntry{} + case structs.ProxyDefaults: + req = &structs.ProxyConfigEntry{} + default: + return fmt.Errorf("invalid config type: %s", kind) + } + + if err := decoder.Decode(&req); err != nil { + return err + } + return restore.ConfigEntry(req) +} diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 1d80e3965d..685c103749 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -202,6 +202,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { err = fsm.state.CASetConfig(17, caConfig) assert.Nil(err) + // Config entries + serviceConfig := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + } + proxyConfig := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + } + assert.Nil(fsm.state.EnsureConfigEntry(18, serviceConfig)) + assert.Nil(fsm.state.EnsureConfigEntry(19, proxyConfig)) + // Snapshot snap, err := fsm.Snapshot() if err != nil { @@ -388,6 +401,15 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { assert.Nil(err) assert.Equal(caConfig, caConf) + // Verify config entries are restored + _, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo") + assert.Nil(err) + assert.Equal(serviceConfig, serviceConfEntry) + + _, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global") + assert.Nil(err) + assert.Equal(proxyConfig, proxyConfEntry) + // Snapshot snap, err = fsm2.Snapshot() if err != nil { diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 0a1e2a70aa..c3cc96adbf 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -57,7 +57,7 @@ func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) { return ret, nil } -// Configuration is used when restoring from a snapshot. +// ConfigEntry is used when restoring from a snapshot. func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { // Insert if err := s.tx.Insert(configTableName, c); err != nil { @@ -70,7 +70,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { return nil } -// Configuration is called to get a given config entry. +// ConfigEntry is called to get a given config entry. func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { tx := s.db.Txn(true) defer tx.Abort() @@ -95,6 +95,27 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err return idx, conf, nil } +// ConfigEntries is called to get all config entry objects. +func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get the index + idx := maxIndexTxn(tx, configTableName) + + // Get all + iter, err := tx.Get(configTableName, "id") + if err != nil { + return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) + } + + var results []structs.ConfigEntry + for v := iter.Next(); v != nil; v = iter.Next() { + results = append(results, v.(structs.ConfigEntry)) + } + return idx, results, nil +} + // EnsureConfigEntry is called to upsert creation of a given config entry. func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { tx := s.db.Txn(true) diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 0d070e1195..21b12ce788 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -29,10 +29,26 @@ type ServiceConfigEntry struct { RaftIndex } -func (s *ServiceConfigEntry) GetKind() string { +func (e *ServiceConfigEntry) GetKind() string { return ServiceDefaults } +func (e *ServiceConfigEntry) GetName() string { + return e.Name +} + +func (e *ServiceConfigEntry) Normalize() error { + return nil +} + +func (e *ServiceConfigEntry) Validate() error { + return nil +} + +func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex { + return &e.RaftIndex +} + type ConnectConfiguration struct { SidecarProxy bool } @@ -75,24 +91,24 @@ type ProxyConfigEntry struct { RaftIndex } -func (p *ProxyConfigEntry) GetKind() string { +func (e *ProxyConfigEntry) GetKind() string { return ProxyDefaults } -func (p *ProxyConfigEntry) GetName() string { - return p.Name +func (e *ProxyConfigEntry) GetName() string { + return e.Name } -func (p *ProxyConfigEntry) Normalize() error { +func (e *ProxyConfigEntry) Normalize() error { return nil } -func (p *ProxyConfigEntry) Validate() error { +func (e *ProxyConfigEntry) Validate() error { return nil } -func (p *ProxyConfigEntry) GetRaftIndex() *RaftIndex { - return &p.RaftIndex +func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex { + return &e.RaftIndex } type ConfigEntryOp string diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 56d86ab529..fd6532633f 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -33,29 +33,31 @@ type RaftIndex struct { // These are serialized between Consul servers and stored in Consul snapshots, // so entries must only ever be added. const ( - RegisterRequestType MessageType = 0 - DeregisterRequestType = 1 - KVSRequestType = 2 - SessionRequestType = 3 - ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat) - TombstoneRequestType = 5 - CoordinateBatchUpdateType = 6 - PreparedQueryRequestType = 7 - TxnRequestType = 8 - AutopilotRequestType = 9 - AreaRequestType = 10 - ACLBootstrapRequestType = 11 - IntentionRequestType = 12 - ConnectCARequestType = 13 - ConnectCAProviderStateType = 14 - ConnectCAConfigType = 15 // FSM snapshots only. - IndexRequestType = 16 // FSM snapshots only. - ACLTokenSetRequestType = 17 - ACLTokenDeleteRequestType = 18 - ACLPolicySetRequestType = 19 - ACLPolicyDeleteRequestType = 20 - ConnectCALeafRequestType = 21 - ConfigEntryRequestType = 22 + RegisterRequestType MessageType = 0 + DeregisterRequestType = 1 + KVSRequestType = 2 + SessionRequestType = 3 + ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat) + TombstoneRequestType = 5 + CoordinateBatchUpdateType = 6 + PreparedQueryRequestType = 7 + TxnRequestType = 8 + AutopilotRequestType = 9 + AreaRequestType = 10 + ACLBootstrapRequestType = 11 + IntentionRequestType = 12 + ConnectCARequestType = 13 + ConnectCAProviderStateType = 14 + ConnectCAConfigType = 15 // FSM snapshots only. + IndexRequestType = 16 // FSM snapshots only. + ACLTokenSetRequestType = 17 + ACLTokenDeleteRequestType = 18 + ACLPolicySetRequestType = 19 + ACLPolicyDeleteRequestType = 20 + ConnectCALeafRequestType = 21 + ConfigEntryRequestType = 22 // FSM snapshots only. + ServiceConfigEntryRequestType = 23 + ProxyConfigEntryRequestType = 24 ) const (