Merge pull request #8485 from hashicorp/catalog-snapshot-fix-1.8.x

Backport catalog snapshot index fix to 1.8.x
This commit is contained in:
Kyle Havlovitz 2020-08-11 17:55:54 -07:00 committed by GitHub
commit 71efb10056
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 31 deletions

View File

@ -99,6 +99,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
Address: n.Address, Address: n.Address,
TaggedAddresses: n.TaggedAddresses, TaggedAddresses: n.TaggedAddresses,
NodeMeta: n.Meta, NodeMeta: n.Meta,
RaftIndex: n.RaftIndex,
} }
// Register the node itself // Register the node itself

View File

@ -383,6 +383,22 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, fsm.state.FederationStateSet(21, fedState1)) require.NoError(t, fsm.state.FederationStateSet(21, fedState1))
require.NoError(t, fsm.state.FederationStateSet(22, fedState2)) require.NoError(t, fsm.state.FederationStateSet(22, fedState2))
// Update a node, service and health check to make sure the ModifyIndexes are preserved correctly after restore.
require.NoError(t, fsm.state.EnsureNode(23, &structs.Node{
ID: "610918a6-464f-fa9b-1a95-03bd6e88ed92",
Node: "foo",
Datacenter: "dc1",
Address: "127.0.0.3",
}))
require.NoError(t, fsm.state.EnsureService(24, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5001}))
require.NoError(t, fsm.state.EnsureCheck(25, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Name: "web connectivity",
Status: api.HealthCritical,
ServiceID: "web",
}))
// Snapshot // Snapshot
snap, err := fsm.Snapshot() snap, err := fsm.Snapshot()
require.NoError(t, err) require.NoError(t, err)
@ -455,25 +471,37 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.Equal(t, "testing123", nodes[0].Meta["testMeta"]) require.Equal(t, "testing123", nodes[0].Meta["testMeta"])
require.Len(t, nodes[0].TaggedAddresses, 1) require.Len(t, nodes[0].TaggedAddresses, 1)
require.Equal(t, "1.2.3.4", nodes[0].TaggedAddresses["hello"]) require.Equal(t, "1.2.3.4", nodes[0].TaggedAddresses["hello"])
require.Equal(t, uint64(2), nodes[0].CreateIndex)
require.Equal(t, uint64(2), nodes[0].ModifyIndex)
require.Equal(t, node1.ID, nodes[1].ID) require.Equal(t, node1.ID, nodes[1].ID)
require.Equal(t, "foo", nodes[1].Node) require.Equal(t, "foo", nodes[1].Node)
require.Equal(t, "dc1", nodes[1].Datacenter) require.Equal(t, "dc1", nodes[1].Datacenter)
require.Equal(t, "127.0.0.1", nodes[1].Address) require.Equal(t, "127.0.0.3", nodes[1].Address)
require.Empty(t, nodes[1].TaggedAddresses) require.Empty(t, nodes[1].TaggedAddresses)
require.Equal(t, uint64(1), nodes[1].CreateIndex)
require.Equal(t, uint64(23), nodes[1].ModifyIndex)
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo", nil) _, fooSrv, err := fsm2.state.NodeServices(nil, "foo", nil)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, fooSrv.Services, 2) require.Len(t, fooSrv.Services, 2)
require.Contains(t, fooSrv.Services["db"].Tags, "primary") require.Contains(t, fooSrv.Services["db"].Tags, "primary")
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary")) require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
require.Equal(t, 5000, fooSrv.Services["db"].Port) require.Equal(t, 5001, fooSrv.Services["db"].Port)
require.Equal(t, uint64(4), fooSrv.Services["db"].CreateIndex)
require.Equal(t, uint64(24), fooSrv.Services["db"].ModifyIndex)
connectSrv := fooSrv.Services["web"] connectSrv := fooSrv.Services["web"]
require.Equal(t, connectConf, connectSrv.Connect) require.Equal(t, connectConf, connectSrv.Connect)
require.Equal(t, uint64(3), fooSrv.Services["web"].CreateIndex)
require.Equal(t, uint64(3), fooSrv.Services["web"].ModifyIndex)
_, checks, err := fsm2.state.NodeChecks(nil, "foo", nil) _, checks, err := fsm2.state.NodeChecks(nil, "foo", nil)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, checks, 1) require.Len(t, checks, 1)
require.Equal(t, "foo", checks[0].Node)
require.Equal(t, "web", checks[0].ServiceName)
require.Equal(t, uint64(7), checks[0].CreateIndex)
require.Equal(t, uint64(25), checks[0].ModifyIndex)
// Verify key is set // Verify key is set
_, d, err := fsm2.state.KVSGet(nil, "/test", nil) _, d, err := fsm2.state.KVSGet(nil, "/test", nil)

View File

@ -216,7 +216,7 @@ func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
// performed within a single transaction to avoid race conditions on state // performed within a single transaction to avoid race conditions on state
// updates. // updates.
func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error { func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil { if err := s.store.ensureRegistrationTxn(s.tx, idx, true, req); err != nil {
return err return err
} }
return nil return nil
@ -229,7 +229,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
if err := s.ensureRegistrationTxn(tx, idx, req); err != nil { if err := s.ensureRegistrationTxn(tx, idx, false, req); err != nil {
return err return err
} }
@ -237,12 +237,12 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
return nil return nil
} }
func (s *Store) ensureCheckIfNodeMatches(tx *memdb.Txn, idx uint64, node string, check *structs.HealthCheck) error { func (s *Store) ensureCheckIfNodeMatches(tx *memdb.Txn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error {
if check.Node != node { if check.Node != node {
return fmt.Errorf("check node %q does not match node %q", return fmt.Errorf("check node %q does not match node %q",
check.Node, node) check.Node, node)
} }
if err := s.ensureCheckTxn(tx, idx, check); err != nil { if err := s.ensureCheckTxn(tx, idx, preserveIndexes, check); err != nil {
return fmt.Errorf("failed inserting check: %s on node %q", err, check.Node) return fmt.Errorf("failed inserting check: %s on node %q", err, check.Node)
} }
return nil return nil
@ -251,7 +251,7 @@ func (s *Store) ensureCheckIfNodeMatches(tx *memdb.Txn, idx uint64, node string,
// ensureRegistrationTxn is used to make sure a node, service, and check // ensureRegistrationTxn is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race // registration is performed within a single transaction to avoid race
// conditions on state updates. // conditions on state updates.
func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error { func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error {
if _, err := s.validateRegisterRequestTxn(tx, req); err != nil { if _, err := s.validateRegisterRequestTxn(tx, req); err != nil {
return err return err
} }
@ -265,6 +265,10 @@ func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.Re
TaggedAddresses: req.TaggedAddresses, TaggedAddresses: req.TaggedAddresses,
Meta: req.NodeMeta, Meta: req.NodeMeta,
} }
if preserveIndexes {
node.CreateIndex = req.CreateIndex
node.ModifyIndex = req.ModifyIndex
}
// Since this gets called for all node operations (service and check // Since this gets called for all node operations (service and check
// updates) and churn on the node itself is basically none after the // updates) and churn on the node itself is basically none after the
@ -277,7 +281,7 @@ func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.Re
return fmt.Errorf("node lookup failed: %s", err) return fmt.Errorf("node lookup failed: %s", err)
} }
if existing == nil || req.ChangesNode(existing.(*structs.Node)) { if existing == nil || req.ChangesNode(existing.(*structs.Node)) {
if err := s.ensureNodeTxn(tx, idx, node); err != nil { if err := s.ensureNodeTxn(tx, idx, preserveIndexes, node); err != nil {
return fmt.Errorf("failed inserting node: %s", err) return fmt.Errorf("failed inserting node: %s", err)
} }
} }
@ -292,7 +296,7 @@ func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.Re
return fmt.Errorf("failed service lookup: %s", err) return fmt.Errorf("failed service lookup: %s", err)
} }
if existing == nil || !(existing.(*structs.ServiceNode).ToNodeService()).IsSame(req.Service) { if existing == nil || !(existing.(*structs.ServiceNode).ToNodeService()).IsSame(req.Service) {
if err := s.ensureServiceTxn(tx, idx, req.Node, req.Service); err != nil { if err := s.ensureServiceTxn(tx, idx, req.Node, preserveIndexes, req.Service); err != nil {
return fmt.Errorf("failed inserting service: %s", err) return fmt.Errorf("failed inserting service: %s", err)
} }
@ -301,12 +305,12 @@ func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.Re
// Add the checks, if any. // Add the checks, if any.
if req.Check != nil { if req.Check != nil {
if err := s.ensureCheckIfNodeMatches(tx, idx, req.Node, req.Check); err != nil { if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, req.Check); err != nil {
return err return err
} }
} }
for _, check := range req.Checks { for _, check := range req.Checks {
if err := s.ensureCheckIfNodeMatches(tx, idx, req.Node, check); err != nil { if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, check); err != nil {
return err return err
} }
} }
@ -320,7 +324,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
defer tx.Abort() defer tx.Abort()
// Call the node upsert // Call the node upsert
if err := s.ensureNodeTxn(tx, idx, node); err != nil { if err := s.ensureNodeTxn(tx, idx, false, node); err != nil {
return err return err
} }
@ -386,7 +390,7 @@ func (s *Store) ensureNodeCASTxn(tx *memdb.Txn, idx uint64, node *structs.Node)
} }
// Perform the update. // Perform the update.
if err := s.ensureNodeTxn(tx, idx, node); err != nil { if err := s.ensureNodeTxn(tx, idx, false, node); err != nil {
return false, err return false, err
} }
@ -396,7 +400,7 @@ func (s *Store) ensureNodeCASTxn(tx *memdb.Txn, idx uint64, node *structs.Node)
// ensureNodeTxn is the inner function called to actually create a node // ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows // registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn. // passing in a memdb transaction so it may be part of a larger txn.
func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, preserveIndexes bool, node *structs.Node) error {
// See if there's an existing node with this UUID, and make sure the // See if there's an existing node with this UUID, and make sure the
// name is the same. // name is the same.
var n *structs.Node var n *structs.Node
@ -456,7 +460,11 @@ func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) err
return nil return nil
} }
node.ModifyIndex = idx node.ModifyIndex = idx
} else { } else if !preserveIndexes || node.CreateIndex == 0 {
// If this isn't a snapshot or there were no saved indexes, set CreateIndex
// and ModifyIndex from the given index. Prior to 1.9.0/1.8.3/1.7.7, nodes
// were not saved with an index, so this is to avoid ending up with a 0 index
// when loading a snapshot from an older version.
node.CreateIndex = idx node.CreateIndex = idx
node.ModifyIndex = idx node.ModifyIndex = idx
} }
@ -729,7 +737,7 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService)
defer tx.Abort() defer tx.Abort()
// Call the service registration upsert // Call the service registration upsert
if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil { if err := s.ensureServiceTxn(tx, idx, node, false, svc); err != nil {
return err return err
} }
@ -761,12 +769,12 @@ func (s *Store) ensureServiceCASTxn(tx *memdb.Txn, idx uint64, node string, svc
return errCASCompareFailed return errCASCompareFailed
} }
return s.ensureServiceTxn(tx, idx, node, svc) return s.ensureServiceTxn(tx, idx, node, false, svc)
} }
// ensureServiceTxn is used to upsert a service registration within an // ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction. // existing memdb transaction.
func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
// Check for existing service // Check for existing service
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
if err != nil { if err != nil {
@ -806,10 +814,13 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if entry.IsSameService(serviceNode) { if entry.IsSameService(serviceNode) {
return nil return nil
} }
} else { }
if !preserveIndexes {
entry.ModifyIndex = idx
if existing == nil {
entry.CreateIndex = idx entry.CreateIndex = idx
} }
entry.ModifyIndex = idx }
// Insert the service and update the index // Insert the service and update the index
return s.catalogInsertService(tx, entry) return s.catalogInsertService(tx, entry)
@ -1537,7 +1548,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
defer tx.Abort() defer tx.Abort()
// Call the check registration // Call the check registration
if err := s.ensureCheckTxn(tx, idx, hc); err != nil { if err := s.ensureCheckTxn(tx, idx, false, hc); err != nil {
return err return err
} }
@ -1585,7 +1596,7 @@ func (s *Store) ensureCheckCASTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthC
} }
// Perform the update. // Perform the update.
if err := s.ensureCheckTxn(tx, idx, hc); err != nil { if err := s.ensureCheckTxn(tx, idx, false, hc); err != nil {
return false, err return false, err
} }
@ -1595,7 +1606,7 @@ func (s *Store) ensureCheckCASTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthC
// ensureCheckTransaction is used as the inner method to handle inserting // ensureCheckTransaction is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting // a health check into the state store. It ensures safety against inserting
// checks with no matching node or service. // checks with no matching node or service.
func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error { func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
// Check if we have an existing health check // Check if we have an existing health check
_, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) _, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
if err != nil { if err != nil {
@ -1607,9 +1618,8 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
existingCheck := existing.(*structs.HealthCheck) existingCheck := existing.(*structs.HealthCheck)
hc.CreateIndex = existingCheck.CreateIndex hc.CreateIndex = existingCheck.CreateIndex
hc.ModifyIndex = existingCheck.ModifyIndex hc.ModifyIndex = existingCheck.ModifyIndex
} else { } else if !preserveIndexes {
hc.CreateIndex = idx hc.CreateIndex = idx
hc.ModifyIndex = idx
} }
// Use the default check status if none was provided // Use the default check status if none was provided
@ -1680,7 +1690,7 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
} }
} }
} }
if modified { if modified && !preserveIndexes {
// We update the modify index, ONLY if something has changed, thus // We update the modify index, ONLY if something has changed, thus
// With constant output, no change is seen when watching a service // With constant output, no change is seen when watching a service
// With huge number of nodes where anti-entropy updates continuously // With huge number of nodes where anti-entropy updates continuously

View File

@ -138,7 +138,7 @@ func TestStateStore_ensureNoNodeWithSimilarNameTxn(t *testing.T) {
if err := s.ensureNoNodeWithSimilarNameTxn(tx, newNode, false); err == nil { if err := s.ensureNoNodeWithSimilarNameTxn(tx, newNode, false); err == nil {
t.Fatalf("Should return an error since the previous node is still healthy") t.Fatalf("Should return an error since the previous node is still healthy")
} }
s.ensureCheckTxn(tx, 5, &structs.HealthCheck{ s.ensureCheckTxn(tx, 5, false, &structs.HealthCheck{
Node: "node1", Node: "node1",
CheckID: structs.SerfCheckID, CheckID: structs.SerfCheckID,
Status: api.HealthCritical, Status: api.HealthCritical,
@ -391,6 +391,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
ID: makeRandomNodeID(t), ID: makeRandomNodeID(t),
Node: "node1", Node: "node1",
Address: "1.2.3.4", Address: "1.2.3.4",
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
} }
nodeID := string(req.ID) nodeID := string(req.ID)
nodeName := req.Node nodeName := req.Node
@ -429,6 +433,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
Address: "1.1.1.1", Address: "1.1.1.1",
Port: 8080, Port: 8080,
Weights: &structs.Weights{Passing: 1, Warning: 1}, Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
} }
restore = s.Restore() restore = s.Restore()
if err := restore.Registration(2, req); err != nil { if err := restore.Registration(2, req); err != nil {
@ -461,6 +469,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
Node: nodeName, Node: nodeName,
CheckID: "check1", CheckID: "check1",
Name: "check", Name: "check",
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
} }
restore = s.Restore() restore = s.Restore()
if err := restore.Registration(3, req); err != nil { if err := restore.Registration(3, req); err != nil {
@ -498,6 +510,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
Node: nodeName, Node: nodeName,
CheckID: "check2", CheckID: "check2",
Name: "check", Name: "check",
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
ModifyIndex: 4,
},
}, },
} }
restore = s.Restore() restore = s.Restore()

View File

@ -160,7 +160,7 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc
} }
case api.NodeSet: case api.NodeSet:
err = s.ensureNodeTxn(tx, idx, &op.Node) err = s.ensureNodeTxn(tx, idx, false, &op.Node)
if err == nil { if err == nil {
entry, err = getNode() entry, err = getNode()
} }
@ -223,7 +223,7 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp)
} }
case api.ServiceSet: case api.ServiceSet:
if err := s.ensureServiceTxn(tx, idx, op.Node, &op.Service); err != nil { if err := s.ensureServiceTxn(tx, idx, op.Node, false, &op.Service); err != nil {
return nil, err return nil, err
} }
entry, err := s.getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta) entry, err := s.getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta)
@ -283,7 +283,7 @@ func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (str
} }
case api.CheckSet: case api.CheckSet:
err = s.ensureCheckTxn(tx, idx, &op.Check) err = s.ensureCheckTxn(tx, idx, false, &op.Check)
if err == nil { if err == nil {
_, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta) _, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta)
} }

View File

@ -310,6 +310,7 @@ type RegisterRequest struct {
EnterpriseMeta `hcl:",squash" mapstructure:",squash"` EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
WriteRequest WriteRequest
RaftIndex `bexpr:"-"`
} }
func (r *RegisterRequest) RequestDatacenter() string { func (r *RegisterRequest) RequestDatacenter() string {