diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index c4d06a6719..b2de582065 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -738,10 +738,10 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService) // ensureServiceCASTxn updates a service only if the existing index matches the given index. // Returns a bool indicating if a write happened and any error. func (s *Store) ensureServiceCASTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) (bool, error) { - // Retrieve the existing entry. - existing, err := tx.First("nodes", "id", node) + // Retrieve the existing service. + existing, err := tx.First("services", "id", node, svc.ID) if err != nil { - return false, fmt.Errorf("node lookup failed: %s", err) + return false, fmt.Errorf("failed service lookup: %s", err) } // Check if the we should do the set. A ModifyIndex of 0 means that diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index 578e9721ac..264e04d150 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -132,10 +132,12 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc entry, err = getNodeIDTxn(tx, op.Node.ID) case api.NodeSet: + entry = &op.Node err = s.ensureNodeTxn(tx, idx, &op.Node) case api.NodeCAS: var ok bool + entry = &op.Node ok, err = s.ensureNodeCASTxn(tx, idx, &op.Node) if !ok && err == nil { err = fmt.Errorf("failed to set node %q, index is stale", op.Node.Node) @@ -185,6 +187,7 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp) entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID) case api.ServiceSet: + entry = &op.Service err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service) case api.ServiceCAS: @@ -192,7 +195,9 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp) ok, err = s.ensureServiceCASTxn(tx, idx, op.Node, &op.Service) if !ok && err == nil { err = fmt.Errorf("failed to set service %q on node %q, index is stale", op.Service.ID, op.Node) + break } + entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID) case api.ServiceDelete: err = s.deleteServiceTxn(tx, idx, op.Node, op.Service.ID) diff --git a/agent/consul/state/txn_test.go b/agent/consul/state/txn_test.go index 5e89e8bad0..64af556da2 100644 --- a/agent/consul/state/txn_test.go +++ b/agent/consul/state/txn_test.go @@ -1,12 +1,14 @@ package state import ( + "fmt" "reflect" "strings" "testing" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/require" ) @@ -116,6 +118,379 @@ func TestStateStore_Txn_Intention(t *testing.T) { verify.Values(t, "", actual, intentions) } +func TestStateStore_Txn_Node(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + // Create some nodes. + var nodes [5]structs.Node + for i := 0; i < len(nodes); i++ { + nodes[i] = structs.Node{ + Node: fmt.Sprintf("node%d", i+1), + ID: types.NodeID(testUUID()), + } + + // Leave node5 to be created by an operation. + if i < 5 { + s.EnsureNode(uint64(i+1), &nodes[i]) + } + } + + // Set up a transaction that hits every operation. + ops := structs.TxnOps{ + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeGet, + Node: nodes[0], + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeSet, + Node: nodes[4], + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeCAS, + Node: structs.Node{ + Node: "node2", + ID: nodes[1].ID, + Datacenter: "dc2", + RaftIndex: structs.RaftIndex{ModifyIndex: 2}, + }, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeDelete, + Node: structs.Node{Node: "node3"}, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeDeleteCAS, + Node: structs.Node{ + Node: "node4", + RaftIndex: structs.RaftIndex{ModifyIndex: 4}, + }, + }, + }, + } + results, errors := s.TxnRW(8, ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + + // Make sure the response looks as expected. + nodes[1].Datacenter = "dc2" + nodes[1].ModifyIndex = 8 + expected := structs.TxnResults{ + &structs.TxnResult{ + Node: &nodes[0], + }, + &structs.TxnResult{ + Node: &nodes[4], + }, + &structs.TxnResult{ + Node: &nodes[1], + }, + } + verify.Values(t, "", results, expected) + + // Pull the resulting state store contents. + idx, actual, err := s.Nodes(nil) + require.NoError(err) + if idx != 8 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it looks as expected. + expectedNodes := structs.Nodes{&nodes[0], &nodes[1], &nodes[4]} + verify.Values(t, "", actual, expectedNodes) +} + +func TestStateStore_Txn_Service(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + testRegisterNode(t, s, 1, "node1") + + // Create some services. + for i := 1; i <= 4; i++ { + testRegisterService(t, s, uint64(i+1), "node1", fmt.Sprintf("svc%d", i)) + } + + // Set up a transaction that hits every operation. + ops := structs.TxnOps{ + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceGet, + Node: "node1", + Service: structs.NodeService{ID: "svc1"}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceSet, + Node: "node1", + Service: structs.NodeService{ID: "svc5"}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceCAS, + Node: "node1", + Service: structs.NodeService{ + ID: "svc2", + Tags: []string{"modified"}, + RaftIndex: structs.RaftIndex{ModifyIndex: 3}, + }, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceDelete, + Node: "node1", + Service: structs.NodeService{ID: "svc3"}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceDeleteCAS, + Node: "node1", + Service: structs.NodeService{ + ID: "svc4", + RaftIndex: structs.RaftIndex{ModifyIndex: 5}, + }, + }, + }, + } + results, errors := s.TxnRW(6, ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + + // Make sure the response looks as expected. + expected := structs.TxnResults{ + &structs.TxnResult{ + Service: &structs.NodeService{ + ID: "svc1", + Service: "svc1", + Address: "1.1.1.1", + Port: 1111, + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + }, + }, + &structs.TxnResult{ + Service: &structs.NodeService{ + ID: "svc5", + }, + }, + &structs.TxnResult{ + Service: &structs.NodeService{ + ID: "svc2", + Tags: []string{"modified"}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 6, + }, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + }, + }, + } + verify.Values(t, "", results, expected) + + // Pull the resulting state store contents. + idx, actual, err := s.NodeServices(nil, "node1") + require.NoError(err) + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it looks as expected. + expectedServices := &structs.NodeServices{ + Node: &structs.Node{ + Node: "node1", + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + Services: map[string]*structs.NodeService{ + "svc1": &structs.NodeService{ + ID: "svc1", + Service: "svc1", + Address: "1.1.1.1", + Port: 1111, + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + }, + "svc5": &structs.NodeService{ + ID: "svc5", + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + }, + "svc2": &structs.NodeService{ + ID: "svc2", + Tags: []string{"modified"}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 6, + }, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + }, + }, + } + verify.Values(t, "", actual, expectedServices) +} + +func TestStateStore_Txn_Checks(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + testRegisterNode(t, s, 1, "node1") + + // Create some checks. + for i := 1; i <= 4; i++ { + testRegisterCheck(t, s, uint64(i+1), "node1", "", types.CheckID(fmt.Sprintf("check%d", i)), "failing") + } + + // Set up a transaction that hits every operation. + ops := structs.TxnOps{ + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckGet, + Check: structs.HealthCheck{Node: "node1", CheckID: "check1"}, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckSet, + Check: structs.HealthCheck{Node: "node1", CheckID: "check5", Status: "passing"}, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckCAS, + Check: structs.HealthCheck{ + Node: "node1", + CheckID: "check2", + Status: "warning", + RaftIndex: structs.RaftIndex{ModifyIndex: 3}, + }, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckDelete, + Check: structs.HealthCheck{Node: "node1", CheckID: "check3"}, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckDeleteCAS, + Check: structs.HealthCheck{ + Node: "node1", + CheckID: "check4", + RaftIndex: structs.RaftIndex{ModifyIndex: 5}, + }, + }, + }, + } + results, errors := s.TxnRW(6, ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + + // Make sure the response looks as expected. + expected := structs.TxnResults{ + &structs.TxnResult{ + Check: &structs.HealthCheck{ + Node: "node1", + CheckID: "check1", + Status: "failing", + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + }, + }, + &structs.TxnResult{ + Check: &structs.HealthCheck{ + Node: "node1", + CheckID: "check5", + Status: "passing", + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, + }, + }, + &structs.TxnResult{ + Check: &structs.HealthCheck{ + Node: "node1", + CheckID: "check2", + Status: "warning", + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 6, + }, + }, + }, + } + verify.Values(t, "", results, expected) + + // Pull the resulting state store contents. + idx, actual, err := s.NodeChecks(nil, "node1") + require.NoError(err) + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it looks as expected. + expectedChecks := structs.HealthChecks{ + &structs.HealthCheck{ + Node: "node1", + CheckID: "check1", + Status: "failing", + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + }, + &structs.HealthCheck{ + Node: "node1", + CheckID: "check2", + Status: "warning", + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 6, + }, + }, + &structs.HealthCheck{ + Node: "node1", + CheckID: "check5", + Status: "passing", + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, + }, + } + verify.Values(t, "", actual, expectedChecks) +} + func TestStateStore_Txn_KVS(t *testing.T) { s := testStateStore(t)