mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
fsm: Fix snapshot bug with restoring node/service/check indexes
This commit is contained in:
parent
8bea00d974
commit
c39a275666
@ -99,6 +99,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
|
||||
Address: n.Address,
|
||||
TaggedAddresses: n.TaggedAddresses,
|
||||
NodeMeta: n.Meta,
|
||||
RaftIndex: n.RaftIndex,
|
||||
}
|
||||
|
||||
// Register the node itself
|
||||
|
@ -383,6 +383,22 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||
require.NoError(t, fsm.state.FederationStateSet(21, fedState1))
|
||||
require.NoError(t, fsm.state.FederationStateSet(22, fedState2))
|
||||
|
||||
// Update a node, service and health check to make sure the ModifyIndexes are preserved correctly after restore.
|
||||
require.NoError(t, fsm.state.EnsureNode(23, &structs.Node{
|
||||
ID: "610918a6-464f-fa9b-1a95-03bd6e88ed92",
|
||||
Node: "foo",
|
||||
Datacenter: "dc1",
|
||||
Address: "127.0.0.3",
|
||||
}))
|
||||
require.NoError(t, fsm.state.EnsureService(24, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5001}))
|
||||
require.NoError(t, fsm.state.EnsureCheck(25, &structs.HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "web",
|
||||
Name: "web connectivity",
|
||||
Status: api.HealthCritical,
|
||||
ServiceID: "web",
|
||||
}))
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
require.NoError(t, err)
|
||||
@ -455,25 +471,37 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||
require.Equal(t, "testing123", nodes[0].Meta["testMeta"])
|
||||
require.Len(t, nodes[0].TaggedAddresses, 1)
|
||||
require.Equal(t, "1.2.3.4", nodes[0].TaggedAddresses["hello"])
|
||||
require.Equal(t, uint64(2), nodes[0].CreateIndex)
|
||||
require.Equal(t, uint64(2), nodes[0].ModifyIndex)
|
||||
|
||||
require.Equal(t, node1.ID, nodes[1].ID)
|
||||
require.Equal(t, "foo", nodes[1].Node)
|
||||
require.Equal(t, "dc1", nodes[1].Datacenter)
|
||||
require.Equal(t, "127.0.0.1", nodes[1].Address)
|
||||
require.Equal(t, "127.0.0.3", nodes[1].Address)
|
||||
require.Empty(t, nodes[1].TaggedAddresses)
|
||||
require.Equal(t, uint64(1), nodes[1].CreateIndex)
|
||||
require.Equal(t, uint64(23), nodes[1].ModifyIndex)
|
||||
|
||||
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, fooSrv.Services, 2)
|
||||
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
|
||||
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
|
||||
require.Equal(t, 5000, fooSrv.Services["db"].Port)
|
||||
require.Equal(t, 5001, fooSrv.Services["db"].Port)
|
||||
require.Equal(t, uint64(4), fooSrv.Services["db"].CreateIndex)
|
||||
require.Equal(t, uint64(24), fooSrv.Services["db"].ModifyIndex)
|
||||
connectSrv := fooSrv.Services["web"]
|
||||
require.Equal(t, connectConf, connectSrv.Connect)
|
||||
require.Equal(t, uint64(3), fooSrv.Services["web"].CreateIndex)
|
||||
require.Equal(t, uint64(3), fooSrv.Services["web"].ModifyIndex)
|
||||
|
||||
_, checks, err := fsm2.state.NodeChecks(nil, "foo", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, checks, 1)
|
||||
require.Equal(t, "foo", checks[0].Node)
|
||||
require.Equal(t, "web", checks[0].ServiceName)
|
||||
require.Equal(t, uint64(7), checks[0].CreateIndex)
|
||||
require.Equal(t, uint64(25), checks[0].ModifyIndex)
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm2.state.KVSGet(nil, "/test", nil)
|
||||
|
@ -216,7 +216,7 @@ func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
|
||||
// performed within a single transaction to avoid race conditions on state
|
||||
// updates.
|
||||
func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
|
||||
if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil {
|
||||
if err := s.store.ensureRegistrationTxn(s.tx, idx, true, req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -229,19 +229,19 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
|
||||
tx := s.db.WriteTxn(idx)
|
||||
defer tx.Abort()
|
||||
|
||||
if err := s.ensureRegistrationTxn(tx, idx, req); err != nil {
|
||||
if err := s.ensureRegistrationTxn(tx, idx, false, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, node string, check *structs.HealthCheck) error {
|
||||
func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error {
|
||||
if check.Node != node {
|
||||
return fmt.Errorf("check node %q does not match node %q",
|
||||
check.Node, node)
|
||||
}
|
||||
if err := s.ensureCheckTxn(tx, idx, check); err != nil {
|
||||
if err := s.ensureCheckTxn(tx, idx, preserveIndexes, check); err != nil {
|
||||
return fmt.Errorf("failed inserting check: %s on node %q", err, check.Node)
|
||||
}
|
||||
return nil
|
||||
@ -250,7 +250,7 @@ func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, node string, check
|
||||
// ensureRegistrationTxn is used to make sure a node, service, and check
|
||||
// registration is performed within a single transaction to avoid race
|
||||
// conditions on state updates.
|
||||
func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, req *structs.RegisterRequest) error {
|
||||
func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error {
|
||||
if _, err := validateRegisterRequestTxn(tx, req); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -264,6 +264,10 @@ func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, req *structs.Register
|
||||
TaggedAddresses: req.TaggedAddresses,
|
||||
Meta: req.NodeMeta,
|
||||
}
|
||||
if preserveIndexes {
|
||||
node.CreateIndex = req.CreateIndex
|
||||
node.ModifyIndex = req.ModifyIndex
|
||||
}
|
||||
|
||||
// Since this gets called for all node operations (service and check
|
||||
// updates) and churn on the node itself is basically none after the
|
||||
@ -276,7 +280,7 @@ func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, req *structs.Register
|
||||
return fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
if existing == nil || req.ChangesNode(existing.(*structs.Node)) {
|
||||
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
|
||||
if err := s.ensureNodeTxn(tx, idx, preserveIndexes, node); err != nil {
|
||||
return fmt.Errorf("failed inserting node: %s", err)
|
||||
}
|
||||
}
|
||||
@ -291,7 +295,7 @@ func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, req *structs.Register
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
if existing == nil || !(existing.(*structs.ServiceNode).ToNodeService()).IsSame(req.Service) {
|
||||
if err := ensureServiceTxn(tx, idx, req.Node, req.Service); err != nil {
|
||||
if err := ensureServiceTxn(tx, idx, req.Node, preserveIndexes, req.Service); err != nil {
|
||||
return fmt.Errorf("failed inserting service: %s", err)
|
||||
|
||||
}
|
||||
@ -300,12 +304,12 @@ func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, req *structs.Register
|
||||
|
||||
// Add the checks, if any.
|
||||
if req.Check != nil {
|
||||
if err := s.ensureCheckIfNodeMatches(tx, idx, req.Node, req.Check); err != nil {
|
||||
if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, req.Check); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, check := range req.Checks {
|
||||
if err := s.ensureCheckIfNodeMatches(tx, idx, req.Node, check); err != nil {
|
||||
if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, check); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -319,7 +323,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
|
||||
defer tx.Abort()
|
||||
|
||||
// Call the node upsert
|
||||
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
|
||||
if err := s.ensureNodeTxn(tx, idx, false, node); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -384,7 +388,7 @@ func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool,
|
||||
}
|
||||
|
||||
// Perform the update.
|
||||
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
|
||||
if err := s.ensureNodeTxn(tx, idx, false, node); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -394,7 +398,7 @@ func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool,
|
||||
// ensureNodeTxn is the inner function called to actually create a node
|
||||
// registration or modify an existing one in the state store. It allows
|
||||
// passing in a memdb transaction so it may be part of a larger txn.
|
||||
func (s *Store) ensureNodeTxn(tx *txn, idx uint64, node *structs.Node) error {
|
||||
func (s *Store) ensureNodeTxn(tx *txn, idx uint64, preserveIndexes bool, node *structs.Node) error {
|
||||
// See if there's an existing node with this UUID, and make sure the
|
||||
// name is the same.
|
||||
var n *structs.Node
|
||||
@ -454,7 +458,11 @@ func (s *Store) ensureNodeTxn(tx *txn, idx uint64, node *structs.Node) error {
|
||||
return nil
|
||||
}
|
||||
node.ModifyIndex = idx
|
||||
} else {
|
||||
} else if !preserveIndexes || node.CreateIndex == 0 {
|
||||
// If this isn't a snapshot or there were no saved indexes, set CreateIndex
|
||||
// and ModifyIndex from the given index. Prior to 1.9.0/1.8.3/1.7.7/1.6.8 nodes
|
||||
// were not saved with an index, so this is to avoid ending up with a 0 index
|
||||
// when loading a snapshot from an older version.
|
||||
node.CreateIndex = idx
|
||||
node.ModifyIndex = idx
|
||||
}
|
||||
@ -726,7 +734,7 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService)
|
||||
defer tx.Abort()
|
||||
|
||||
// Call the service registration upsert
|
||||
if err := ensureServiceTxn(tx, idx, node, svc); err != nil {
|
||||
if err := ensureServiceTxn(tx, idx, node, false, svc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -757,12 +765,12 @@ func ensureServiceCASTxn(tx *txn, idx uint64, node string, svc *structs.NodeServ
|
||||
return errCASCompareFailed
|
||||
}
|
||||
|
||||
return ensureServiceTxn(tx, idx, node, svc)
|
||||
return ensureServiceTxn(tx, idx, node, false, svc)
|
||||
}
|
||||
|
||||
// ensureServiceTxn is used to upsert a service registration within an
|
||||
// existing memdb transaction.
|
||||
func ensureServiceTxn(tx *txn, idx uint64, node string, svc *structs.NodeService) error {
|
||||
func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
|
||||
// Check for existing service
|
||||
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
|
||||
if err != nil {
|
||||
@ -802,10 +810,13 @@ func ensureServiceTxn(tx *txn, idx uint64, node string, svc *structs.NodeService
|
||||
if entry.IsSameService(serviceNode) {
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
}
|
||||
if !preserveIndexes {
|
||||
entry.ModifyIndex = idx
|
||||
if existing == nil {
|
||||
entry.CreateIndex = idx
|
||||
}
|
||||
entry.ModifyIndex = idx
|
||||
}
|
||||
|
||||
// Insert the service and update the index
|
||||
return catalogInsertService(tx, entry)
|
||||
@ -1532,7 +1543,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
||||
defer tx.Abort()
|
||||
|
||||
// Call the check registration
|
||||
if err := s.ensureCheckTxn(tx, idx, hc); err != nil {
|
||||
if err := s.ensureCheckTxn(tx, idx, false, hc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1579,7 +1590,7 @@ func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck)
|
||||
}
|
||||
|
||||
// Perform the update.
|
||||
if err := s.ensureCheckTxn(tx, idx, hc); err != nil {
|
||||
if err := s.ensureCheckTxn(tx, idx, false, hc); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -1589,7 +1600,7 @@ func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck)
|
||||
// ensureCheckTxn is used as the inner method to handle inserting
|
||||
// a health check into the state store. It ensures safety against inserting
|
||||
// checks with no matching node or service.
|
||||
func (s *Store) ensureCheckTxn(tx *txn, idx uint64, hc *structs.HealthCheck) error {
|
||||
func (s *Store) ensureCheckTxn(tx *txn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
|
||||
// Check if we have an existing health check
|
||||
_, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
|
||||
if err != nil {
|
||||
@ -1601,9 +1612,8 @@ func (s *Store) ensureCheckTxn(tx *txn, idx uint64, hc *structs.HealthCheck) err
|
||||
existingCheck := existing.(*structs.HealthCheck)
|
||||
hc.CreateIndex = existingCheck.CreateIndex
|
||||
hc.ModifyIndex = existingCheck.ModifyIndex
|
||||
} else {
|
||||
} else if !preserveIndexes {
|
||||
hc.CreateIndex = idx
|
||||
hc.ModifyIndex = idx
|
||||
}
|
||||
|
||||
// Use the default check status if none was provided
|
||||
@ -1677,7 +1687,10 @@ func (s *Store) ensureCheckTxn(tx *txn, idx uint64, hc *structs.HealthCheck) err
|
||||
if !modified {
|
||||
return nil
|
||||
}
|
||||
if !preserveIndexes {
|
||||
hc.ModifyIndex = idx
|
||||
}
|
||||
|
||||
return catalogInsertCheck(tx, hc, idx)
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ func TestStateStore_ensureNoNodeWithSimilarNameTxn(t *testing.T) {
|
||||
if err := ensureNoNodeWithSimilarNameTxn(tx, newNode, false); err == nil {
|
||||
t.Fatalf("Should return an error since the previous node is still healthy")
|
||||
}
|
||||
s.ensureCheckTxn(tx, 5, &structs.HealthCheck{
|
||||
s.ensureCheckTxn(tx, 5, false, &structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: structs.SerfCheckID,
|
||||
Status: api.HealthCritical,
|
||||
@ -376,6 +376,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||
ID: makeRandomNodeID(t),
|
||||
Node: "node1",
|
||||
Address: "1.2.3.4",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
}
|
||||
nodeID := string(req.ID)
|
||||
nodeName := req.Node
|
||||
@ -414,6 +418,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
}
|
||||
restore = s.Restore()
|
||||
if err := restore.Registration(2, req); err != nil {
|
||||
@ -446,6 +454,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||
Node: nodeName,
|
||||
CheckID: "check1",
|
||||
Name: "check",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 3,
|
||||
ModifyIndex: 3,
|
||||
},
|
||||
}
|
||||
restore = s.Restore()
|
||||
if err := restore.Registration(3, req); err != nil {
|
||||
@ -483,6 +495,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||
Node: nodeName,
|
||||
CheckID: "check2",
|
||||
Name: "check",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 4,
|
||||
ModifyIndex: 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
restore = s.Restore()
|
||||
|
@ -159,7 +159,7 @@ func (s *Store) txnNode(tx *txn, idx uint64, op *structs.TxnNodeOp) (structs.Txn
|
||||
}
|
||||
|
||||
case api.NodeSet:
|
||||
err = s.ensureNodeTxn(tx, idx, &op.Node)
|
||||
err = s.ensureNodeTxn(tx, idx, false, &op.Node)
|
||||
if err == nil {
|
||||
entry, err = getNode()
|
||||
}
|
||||
@ -222,7 +222,7 @@ func (s *Store) txnService(tx *txn, idx uint64, op *structs.TxnServiceOp) (struc
|
||||
}
|
||||
|
||||
case api.ServiceSet:
|
||||
if err := ensureServiceTxn(tx, idx, op.Node, &op.Service); err != nil {
|
||||
if err := ensureServiceTxn(tx, idx, op.Node, false, &op.Service); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entry, err := getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta)
|
||||
@ -282,7 +282,7 @@ func (s *Store) txnCheck(tx *txn, idx uint64, op *structs.TxnCheckOp) (structs.T
|
||||
}
|
||||
|
||||
case api.CheckSet:
|
||||
err = s.ensureCheckTxn(tx, idx, &op.Check)
|
||||
err = s.ensureCheckTxn(tx, idx, false, &op.Check)
|
||||
if err == nil {
|
||||
_, entry, err = getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta)
|
||||
}
|
||||
|
@ -321,6 +321,7 @@ type RegisterRequest struct {
|
||||
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
||||
|
||||
WriteRequest
|
||||
RaftIndex `bexpr:"-"`
|
||||
}
|
||||
|
||||
func (r *RegisterRequest) RequestDatacenter() string {
|
||||
|
Loading…
x
Reference in New Issue
Block a user