From 0b715f5f2713462718583fef7bbc1e2bc12e0c20 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 10 Aug 2020 22:52:36 -0700 Subject: [PATCH] fsm: Fix snapshot bug with restoring node/service/check indexes --- agent/consul/fsm/snapshot_oss.go | 1 + agent/consul/fsm/snapshot_oss_test.go | 32 +++++++++++++- agent/consul/state/catalog.go | 61 ++++++++++++++++----------- agent/consul/state/catalog_test.go | 18 +++++++- agent/consul/state/txn.go | 6 +-- agent/structs/structs.go | 1 + 6 files changed, 89 insertions(+), 30 deletions(-) diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 16d146f3f2..e1ca30b08e 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -99,6 +99,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink, Address: n.Address, TaggedAddresses: n.TaggedAddresses, NodeMeta: n.Meta, + RaftIndex: n.RaftIndex, } // Register the node itself diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index acfd90cb76..d721b7a4a0 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -383,6 +383,22 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.NoError(t, fsm.state.FederationStateSet(21, fedState1)) require.NoError(t, fsm.state.FederationStateSet(22, fedState2)) + // Update a node, service and health check to make sure the ModifyIndexes are preserved correctly after restore. + require.NoError(t, fsm.state.EnsureNode(23, &structs.Node{ + ID: "610918a6-464f-fa9b-1a95-03bd6e88ed92", + Node: "foo", + Datacenter: "dc1", + Address: "127.0.0.3", + })) + require.NoError(t, fsm.state.EnsureService(24, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5001})) + require.NoError(t, fsm.state.EnsureCheck(25, &structs.HealthCheck{ + Node: "foo", + CheckID: "web", + Name: "web connectivity", + Status: api.HealthCritical, + ServiceID: "web", + })) + // Snapshot snap, err := fsm.Snapshot() require.NoError(t, err) @@ -455,25 +471,37 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.Equal(t, "testing123", nodes[0].Meta["testMeta"]) require.Len(t, nodes[0].TaggedAddresses, 1) require.Equal(t, "1.2.3.4", nodes[0].TaggedAddresses["hello"]) + require.Equal(t, uint64(2), nodes[0].CreateIndex) + require.Equal(t, uint64(2), nodes[0].ModifyIndex) require.Equal(t, node1.ID, nodes[1].ID) require.Equal(t, "foo", nodes[1].Node) require.Equal(t, "dc1", nodes[1].Datacenter) - require.Equal(t, "127.0.0.1", nodes[1].Address) + require.Equal(t, "127.0.0.3", nodes[1].Address) require.Empty(t, nodes[1].TaggedAddresses) + require.Equal(t, uint64(1), nodes[1].CreateIndex) + require.Equal(t, uint64(23), nodes[1].ModifyIndex) _, fooSrv, err := fsm2.state.NodeServices(nil, "foo", nil) require.NoError(t, err) require.Len(t, fooSrv.Services, 2) require.Contains(t, fooSrv.Services["db"].Tags, "primary") require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary")) - require.Equal(t, 5000, fooSrv.Services["db"].Port) + require.Equal(t, 5001, fooSrv.Services["db"].Port) + require.Equal(t, uint64(4), fooSrv.Services["db"].CreateIndex) + require.Equal(t, uint64(24), fooSrv.Services["db"].ModifyIndex) connectSrv := fooSrv.Services["web"] require.Equal(t, connectConf, connectSrv.Connect) + require.Equal(t, uint64(3), fooSrv.Services["web"].CreateIndex) + require.Equal(t, uint64(3), fooSrv.Services["web"].ModifyIndex) _, checks, err := fsm2.state.NodeChecks(nil, "foo", nil) require.NoError(t, err) require.Len(t, checks, 1) + require.Equal(t, "foo", checks[0].Node) + require.Equal(t, "web", checks[0].ServiceName) + require.Equal(t, uint64(7), checks[0].CreateIndex) + require.Equal(t, uint64(25), checks[0].ModifyIndex) // Verify key is set _, d, err := fsm2.state.KVSGet(nil, "/test", nil) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 575213d177..48780a89bf 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -216,7 +216,7 @@ func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) { // performed within a single transaction to avoid race conditions on state // updates. func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error { - if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil { + if err := s.store.ensureRegistrationTxn(s.tx, idx, true, req); err != nil { return err } return nil @@ -229,7 +229,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err tx := s.db.Txn(true) defer tx.Abort() - if err := s.ensureRegistrationTxn(tx, idx, req); err != nil { + if err := s.ensureRegistrationTxn(tx, idx, false, req); err != nil { return err } @@ -237,12 +237,12 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err return nil } -func (s *Store) ensureCheckIfNodeMatches(tx *memdb.Txn, idx uint64, node string, check *structs.HealthCheck) error { +func (s *Store) ensureCheckIfNodeMatches(tx *memdb.Txn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error { if check.Node != node { return fmt.Errorf("check node %q does not match node %q", check.Node, node) } - if err := s.ensureCheckTxn(tx, idx, check); err != nil { + if err := s.ensureCheckTxn(tx, idx, preserveIndexes, check); err != nil { return fmt.Errorf("failed inserting check: %s on node %q", err, check.Node) } return nil @@ -251,7 +251,7 @@ func (s *Store) ensureCheckIfNodeMatches(tx *memdb.Txn, idx uint64, node string, // ensureRegistrationTxn is used to make sure a node, service, and check // registration is performed within a single transaction to avoid race // conditions on state updates. -func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error { +func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error { if _, err := s.validateRegisterRequestTxn(tx, req); err != nil { return err } @@ -265,6 +265,10 @@ func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.Re TaggedAddresses: req.TaggedAddresses, Meta: req.NodeMeta, } + if preserveIndexes { + node.CreateIndex = req.CreateIndex + node.ModifyIndex = req.ModifyIndex + } // Since this gets called for all node operations (service and check // updates) and churn on the node itself is basically none after the @@ -277,7 +281,7 @@ func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.Re return fmt.Errorf("node lookup failed: %s", err) } if existing == nil || req.ChangesNode(existing.(*structs.Node)) { - if err := s.ensureNodeTxn(tx, idx, node); err != nil { + if err := s.ensureNodeTxn(tx, idx, preserveIndexes, node); err != nil { return fmt.Errorf("failed inserting node: %s", err) } } @@ -292,7 +296,7 @@ func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.Re return fmt.Errorf("failed service lookup: %s", err) } if existing == nil || !(existing.(*structs.ServiceNode).ToNodeService()).IsSame(req.Service) { - if err := s.ensureServiceTxn(tx, idx, req.Node, req.Service); err != nil { + if err := s.ensureServiceTxn(tx, idx, req.Node, preserveIndexes, req.Service); err != nil { return fmt.Errorf("failed inserting service: %s", err) } @@ -301,12 +305,12 @@ func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.Re // Add the checks, if any. if req.Check != nil { - if err := s.ensureCheckIfNodeMatches(tx, idx, req.Node, req.Check); err != nil { + if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, req.Check); err != nil { return err } } for _, check := range req.Checks { - if err := s.ensureCheckIfNodeMatches(tx, idx, req.Node, check); err != nil { + if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, check); err != nil { return err } } @@ -320,7 +324,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error { defer tx.Abort() // Call the node upsert - if err := s.ensureNodeTxn(tx, idx, node); err != nil { + if err := s.ensureNodeTxn(tx, idx, false, node); err != nil { return err } @@ -386,7 +390,7 @@ func (s *Store) ensureNodeCASTxn(tx *memdb.Txn, idx uint64, node *structs.Node) } // Perform the update. - if err := s.ensureNodeTxn(tx, idx, node); err != nil { + if err := s.ensureNodeTxn(tx, idx, false, node); err != nil { return false, err } @@ -396,7 +400,7 @@ func (s *Store) ensureNodeCASTxn(tx *memdb.Txn, idx uint64, node *structs.Node) // ensureNodeTxn is the inner function called to actually create a node // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. -func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { +func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, preserveIndexes bool, node *structs.Node) error { // See if there's an existing node with this UUID, and make sure the // name is the same. var n *structs.Node @@ -456,7 +460,11 @@ func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) err return nil } node.ModifyIndex = idx - } else { + } else if !preserveIndexes || node.CreateIndex == 0 { + // If this isn't a snapshot or there were no saved indexes, set CreateIndex + // and ModifyIndex from the given index. Prior to 1.9.0/1.8.3/1.7.7/1.6.8 nodes + // were not saved with an index, so this is to avoid ending up with a 0 index + // when loading a snapshot from an older version. node.CreateIndex = idx node.ModifyIndex = idx } @@ -729,7 +737,7 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService) defer tx.Abort() // Call the service registration upsert - if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil { + if err := s.ensureServiceTxn(tx, idx, node, false, svc); err != nil { return err } @@ -761,12 +769,12 @@ func (s *Store) ensureServiceCASTxn(tx *memdb.Txn, idx uint64, node string, svc return errCASCompareFailed } - return s.ensureServiceTxn(tx, idx, node, svc) + return s.ensureServiceTxn(tx, idx, node, false, svc) } // ensureServiceTxn is used to upsert a service registration within an // existing memdb transaction. -func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { +func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error { // Check for existing service _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) if err != nil { @@ -806,10 +814,13 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st if entry.IsSameService(serviceNode) { return nil } - } else { - entry.CreateIndex = idx } - entry.ModifyIndex = idx + if !preserveIndexes { + entry.ModifyIndex = idx + if existing == nil { + entry.CreateIndex = idx + } + } // Insert the service and update the index return s.catalogInsertService(tx, entry) @@ -1537,7 +1548,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { defer tx.Abort() // Call the check registration - if err := s.ensureCheckTxn(tx, idx, hc); err != nil { + if err := s.ensureCheckTxn(tx, idx, false, hc); err != nil { return err } @@ -1585,7 +1596,7 @@ func (s *Store) ensureCheckCASTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthC } // Perform the update. - if err := s.ensureCheckTxn(tx, idx, hc); err != nil { + if err := s.ensureCheckTxn(tx, idx, false, hc); err != nil { return false, err } @@ -1595,7 +1606,7 @@ func (s *Store) ensureCheckCASTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthC // ensureCheckTransaction is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. -func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error { +func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { // Check if we have an existing health check _, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) if err != nil { @@ -1607,9 +1618,8 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec existingCheck := existing.(*structs.HealthCheck) hc.CreateIndex = existingCheck.CreateIndex hc.ModifyIndex = existingCheck.ModifyIndex - } else { + } else if !preserveIndexes { hc.CreateIndex = idx - hc.ModifyIndex = idx } // Use the default check status if none was provided @@ -1687,6 +1697,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec // the checks, but not the values within the check hc.ModifyIndex = idx } + if !preserveIndexes { + hc.ModifyIndex = idx + } // TODO (state store) TODO (catalog) - should we be reinserting at all. Similar // code in ensureServiceTxn simply returns nil when the service being inserted diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 0fa9a7d385..e36a0a673d 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -138,7 +138,7 @@ func TestStateStore_ensureNoNodeWithSimilarNameTxn(t *testing.T) { if err := s.ensureNoNodeWithSimilarNameTxn(tx, newNode, false); err == nil { t.Fatalf("Should return an error since the previous node is still healthy") } - s.ensureCheckTxn(tx, 5, &structs.HealthCheck{ + s.ensureCheckTxn(tx, 5, false, &structs.HealthCheck{ Node: "node1", CheckID: structs.SerfCheckID, Status: api.HealthCritical, @@ -391,6 +391,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { ID: makeRandomNodeID(t), Node: "node1", Address: "1.2.3.4", + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, } nodeID := string(req.ID) nodeName := req.Node @@ -429,6 +433,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { Address: "1.1.1.1", Port: 8080, Weights: &structs.Weights{Passing: 1, Warning: 1}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, } restore = s.Restore() if err := restore.Registration(2, req); err != nil { @@ -461,6 +469,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { Node: nodeName, CheckID: "check1", Name: "check", + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 3, + }, } restore = s.Restore() if err := restore.Registration(3, req); err != nil { @@ -498,6 +510,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { Node: nodeName, CheckID: "check2", Name: "check", + RaftIndex: structs.RaftIndex{ + CreateIndex: 4, + ModifyIndex: 4, + }, }, } restore = s.Restore() diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index f8c02e25a9..e8b24bf0a2 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -160,7 +160,7 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc } case api.NodeSet: - err = s.ensureNodeTxn(tx, idx, &op.Node) + err = s.ensureNodeTxn(tx, idx, false, &op.Node) if err == nil { entry, err = getNode() } @@ -223,7 +223,7 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp) } case api.ServiceSet: - if err := s.ensureServiceTxn(tx, idx, op.Node, &op.Service); err != nil { + if err := s.ensureServiceTxn(tx, idx, op.Node, false, &op.Service); err != nil { return nil, err } entry, err := s.getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta) @@ -283,7 +283,7 @@ func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (str } case api.CheckSet: - err = s.ensureCheckTxn(tx, idx, &op.Check) + err = s.ensureCheckTxn(tx, idx, false, &op.Check) if err == nil { _, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta) } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 484f011cfa..ab2d643dae 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -310,6 +310,7 @@ type RegisterRequest struct { EnterpriseMeta `hcl:",squash" mapstructure:",squash"` WriteRequest + RaftIndex `bexpr:"-"` } func (r *RegisterRequest) RequestDatacenter() string {