From 67a7d25e1c4438a05f13348b9d2c6ce0561f015b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 4 Feb 2014 18:33:15 -0800 Subject: [PATCH] consul: updating state store to associate changes with raft index --- consul/catalog_endpoint.go | 10 +- consul/catalog_endpoint_test.go | 18 +-- consul/fsm.go | 55 ++++--- consul/fsm_test.go | 46 +++--- consul/health_endpoint.go | 10 +- consul/leader.go | 12 +- consul/leader_test.go | 20 +-- consul/mdb_table.go | 58 +++++--- consul/mdb_table_test.go | 41 ++++-- consul/state_store.go | 214 +++++++++++++++++++-------- consul/state_store_test.go | 252 ++++++++++++++++++++------------ 11 files changed, 478 insertions(+), 258 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 81f0694edb..006ca07986 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -93,7 +93,7 @@ func (c *Catalog) ListNodes(dc string, reply *structs.Nodes) error { // Get the current nodes state := c.srv.fsm.State() - nodes := state.Nodes() + _, nodes := state.Nodes() *reply = nodes return nil @@ -107,7 +107,7 @@ func (c *Catalog) ListServices(dc string, reply *structs.Services) error { // Get the current nodes state := c.srv.fsm.State() - services := state.Services() + _, services := state.Services() *reply = services return nil @@ -128,9 +128,9 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru state := c.srv.fsm.State() var nodes structs.ServiceNodes if args.TagFilter { - nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) + _, nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { - nodes = state.ServiceNodes(args.ServiceName) + _, nodes = state.ServiceNodes(args.ServiceName) } *reply = nodes @@ -150,7 +150,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs // Get the node services state := c.srv.fsm.State() - services := state.NodeServices(args.Node) + _, services := state.NodeServices(args.Node) *reply = *services return nil diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index a7979a6a2d..9c8c03f218 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -207,7 +207,7 @@ func TestCatalogListNodes(t *testing.T) { time.Sleep(100 * time.Millisecond) // Just add a node - s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) if err := client.Call("Catalog.ListNodes", "dc1", &out); err != nil { t.Fatalf("err: %v", err) @@ -240,7 +240,7 @@ func BenchmarkCatalogListNodes(t *testing.B) { time.Sleep(100 * time.Millisecond) // Just add a node - s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) for i := 0; i < t.N; i++ { var out structs.Nodes @@ -267,8 +267,8 @@ func TestCatalogListServices(t *testing.T) { time.Sleep(100 * time.Millisecond) // Just add a node - s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"}) - s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000) + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", "primary", 5000}) if err := client.Call("Catalog.ListServices", "dc1", &out); err != nil { t.Fatalf("err: %v", err) @@ -312,8 +312,8 @@ func TestCatalogListServiceNodes(t *testing.T) { time.Sleep(100 * time.Millisecond) // Just add a node - s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"}) - s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000) + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", "primary", 5000}) if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) @@ -356,9 +356,9 @@ func TestCatalogNodeServices(t *testing.T) { time.Sleep(100 * time.Millisecond) // Just add a node - s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"}) - s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000) - s1.fsm.State().EnsureService("foo", "web", "web", "", 80) + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", "primary", 5000}) + s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", "", 80}) if err := client.Call("Catalog.NodeServices", &args, &out); err != nil { t.Fatalf("err: %v", err) diff --git a/consul/fsm.go b/consul/fsm.go index 01afe72d56..78d8226eea 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -25,6 +25,13 @@ type consulSnapshot struct { state *StateSnapshot } +// snapshotHeader is the first entry in our snapshot +type snapshotHeader struct { + // LastIndex is the last index that affects the data. + // This is used when we do the restore for watchers. + LastIndex uint64 +} + // NewFSM is used to construct a new FSM with a blank state func NewFSM(logOutput io.Writer) (*consulFSM, error) { state, err := NewStateStore() @@ -48,34 +55,33 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { buf := log.Data switch structs.MessageType(buf[0]) { case structs.RegisterRequestType: - return c.decodeRegister(buf[1:]) + return c.decodeRegister(buf[1:], log.Index) case structs.DeregisterRequestType: - return c.applyDeregister(buf[1:]) + return c.applyDeregister(buf[1:], log.Index) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) } } -func (c *consulFSM) decodeRegister(buf []byte) interface{} { +func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} { var req structs.RegisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - return c.applyRegister(&req) + return c.applyRegister(&req, index) } -func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} { +func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} { // Ensure the node node := structs.Node{req.Node, req.Address} - if err := c.state.EnsureNode(node); err != nil { + if err := c.state.EnsureNode(index, node); err != nil { c.logger.Printf("[INFO] consul.fsm: EnsureNode failed: %v", err) return err } // Ensure the service if provided if req.Service != nil { - if err := c.state.EnsureService(req.Node, req.Service.ID, req.Service.Service, - req.Service.Tag, req.Service.Port); err != nil { + if err := c.state.EnsureService(index, req.Node, req.Service); err != nil { c.logger.Printf("[INFO] consul.fsm: EnsureService failed: %v", err) return err } @@ -83,7 +89,7 @@ func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} { // Ensure the check if provided if req.Check != nil { - if err := c.state.EnsureCheck(req.Check); err != nil { + if err := c.state.EnsureCheck(index, req.Check); err != nil { c.logger.Printf("[INFO] consul.fsm: EnsureCheck failed: %v", err) return err } @@ -92,7 +98,7 @@ func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} { return nil } -func (c *consulFSM) applyDeregister(buf []byte) interface{} { +func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} { var req structs.DeregisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -100,17 +106,17 @@ func (c *consulFSM) applyDeregister(buf []byte) interface{} { // Either remove the service entry or the whole node if req.ServiceID != "" { - if err := c.state.DeleteNodeService(req.Node, req.ServiceID); err != nil { + if err := c.state.DeleteNodeService(index, req.Node, req.ServiceID); err != nil { c.logger.Printf("[INFO] consul.fsm: DeleteNodeService failed: %v", err) return err } } else if req.CheckID != "" { - if err := c.state.DeleteNodeCheck(req.Node, req.CheckID); err != nil { + if err := c.state.DeleteNodeCheck(index, req.Node, req.CheckID); err != nil { c.logger.Printf("[INFO] consul.fsm: DeleteNodeCheck failed: %v", err) return err } } else { - if err := c.state.DeleteNode(req.Node); err != nil { + if err := c.state.DeleteNode(index, req.Node); err != nil { c.logger.Printf("[INFO] consul.fsm: DeleteNode failed: %v", err) return err } @@ -145,6 +151,12 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { var handle codec.MsgpackHandle dec := codec.NewDecoder(old, &handle) + // Read in the header + var header snapshotHeader + if err := dec.Decode(&header); err != nil { + return err + } + // Populate the new state msgType := make([]byte, 1) for { @@ -163,7 +175,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(&req); err != nil { return err } - c.applyRegister(&req) + c.applyRegister(&req, header.LastIndex) default: return fmt.Errorf("Unrecognized msg type: %v", msgType) @@ -174,13 +186,22 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { - // Get all the nodes - nodes := s.state.Nodes() - // Register the nodes handle := codec.MsgpackHandle{} encoder := codec.NewEncoder(sink, &handle) + // Write the header + header := snapshotHeader{ + LastIndex: s.state.LastIndex(), + } + if err := encoder.Encode(&header); err != nil { + sink.Cancel() + return err + } + + // Get all the nodes + nodes := s.state.Nodes() + // Register each node var req structs.RegisterRequest for i := 0; i < len(nodes); i++ { diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 9d6feb057b..1c2745bb2f 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -57,12 +57,14 @@ func TestFSM_RegisterNode(t *testing.T) { } // Verify we are registered - if found, _ := fsm.state.GetNode("foo"); !found { + if idx, found, _ := fsm.state.GetNode("foo"); !found { t.Fatalf("not found!") + } else if idx != 1 { + t.Fatalf("bad index: %d", idx) } // Verify service registered - services := fsm.state.NodeServices("foo") + _, services := fsm.state.NodeServices("foo") if len(services.Services) != 0 { t.Fatalf("Services: %v", services) } @@ -103,18 +105,18 @@ func TestFSM_RegisterNode_Service(t *testing.T) { } // Verify we are registered - if found, _ := fsm.state.GetNode("foo"); !found { + if _, found, _ := fsm.state.GetNode("foo"); !found { t.Fatalf("not found!") } // Verify service registered - services := fsm.state.NodeServices("foo") + _, services := fsm.state.NodeServices("foo") if _, ok := services.Services["db"]; !ok { t.Fatalf("not registered!") } // Verify check - checks := fsm.state.NodeChecks("foo") + _, checks := fsm.state.NodeChecks("foo") if checks[0].CheckID != "db" { t.Fatalf("not registered!") } @@ -163,12 +165,12 @@ func TestFSM_DeregisterService(t *testing.T) { } // Verify we are registered - if found, _ := fsm.state.GetNode("foo"); !found { + if _, found, _ := fsm.state.GetNode("foo"); !found { t.Fatalf("not found!") } // Verify service not registered - services := fsm.state.NodeServices("foo") + _, services := fsm.state.NodeServices("foo") if _, ok := services.Services["db"]; ok { t.Fatalf("db registered!") } @@ -217,12 +219,12 @@ func TestFSM_DeregisterCheck(t *testing.T) { } // Verify we are registered - if found, _ := fsm.state.GetNode("foo"); !found { + if _, found, _ := fsm.state.GetNode("foo"); !found { t.Fatalf("not found!") } // Verify check not registered - checks := fsm.state.NodeChecks("foo") + _, checks := fsm.state.NodeChecks("foo") if len(checks) != 0 { t.Fatalf("check registered!") } @@ -277,18 +279,18 @@ func TestFSM_DeregisterNode(t *testing.T) { } // Verify we are registered - if found, _ := fsm.state.GetNode("foo"); found { + if _, found, _ := fsm.state.GetNode("foo"); found { t.Fatalf("found!") } // Verify service not registered - services := fsm.state.NodeServices("foo") + _, services := fsm.state.NodeServices("foo") if len(services.Services) != 0 { t.Fatalf("Services: %v", services) } // Verify checks not registered - checks := fsm.state.NodeChecks("foo") + _, checks := fsm.state.NodeChecks("foo") if len(checks) != 0 { t.Fatalf("Services: %v", services) } @@ -301,13 +303,13 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Add some state - fsm.state.EnsureNode(structs.Node{"foo", "127.0.0.1"}) - fsm.state.EnsureNode(structs.Node{"baz", "127.0.0.2"}) - fsm.state.EnsureService("foo", "web", "web", "", 80) - fsm.state.EnsureService("foo", "db", "db", "primary", 5000) - fsm.state.EnsureService("baz", "web", "web", "", 80) - fsm.state.EnsureService("baz", "db", "db", "secondary", 5000) - fsm.state.EnsureCheck(&structs.HealthCheck{ + fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + fsm.state.EnsureNode(2, structs.Node{"baz", "127.0.0.2"}) + fsm.state.EnsureService(3, "foo", &structs.NodeService{"web", "web", "", 80}) + fsm.state.EnsureService(4, "foo", &structs.NodeService{"db", "db", "primary", 5000}) + fsm.state.EnsureService(5, "baz", &structs.NodeService{"web", "web", "", 80}) + fsm.state.EnsureService(6, "baz", &structs.NodeService{"db", "db", "secondary", 5000}) + fsm.state.EnsureCheck(7, &structs.HealthCheck{ Node: "foo", CheckID: "web", Name: "web connectivity", @@ -341,12 +343,12 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Verify the contents - nodes := fsm2.state.Nodes() + _, nodes := fsm2.state.Nodes() if len(nodes) != 2 { t.Fatalf("Bad: %v", nodes) } - fooSrv := fsm2.state.NodeServices("foo") + _, fooSrv := fsm2.state.NodeServices("foo") if len(fooSrv.Services) != 2 { t.Fatalf("Bad: %v", fooSrv) } @@ -357,7 +359,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { t.Fatalf("Bad: %v", fooSrv) } - checks := fsm2.state.NodeChecks("foo") + _, checks := fsm2.state.NodeChecks("foo") if len(checks) != 1 { t.Fatalf("Bad: %v", checks) } diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 8d9f44a938..3ae115e558 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -19,7 +19,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // Get the state specific checks state := h.srv.fsm.State() - checks := state.ChecksInState(args.State) + _, checks := state.ChecksInState(args.State) *reply = checks return nil } @@ -33,7 +33,7 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, // Get the node checks state := h.srv.fsm.State() - checks := state.NodeChecks(args.Node) + _, checks := state.NodeChecks(args.Node) *reply = checks return nil } @@ -53,7 +53,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // Get the service checks state := h.srv.fsm.State() - checks := state.ServiceChecks(args.ServiceName) + _, checks := state.ServiceChecks(args.ServiceName) *reply = checks return nil } @@ -73,9 +73,9 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc state := h.srv.fsm.State() var nodes structs.CheckServiceNodes if args.TagFilter { - nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) + _, nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) } else { - nodes = state.CheckServiceNodes(args.ServiceName) + _, nodes = state.CheckServiceNodes(args.ServiceName) } *reply = nodes diff --git a/consul/leader.go b/consul/leader.go index 6174e40c71..6a28bfdff4 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -156,12 +156,12 @@ func (s *Server) handleAliveMember(member serf.Member) error { } // Check if the node exists - found, addr := state.GetNode(member.Name) + _, found, addr := state.GetNode(member.Name) if found && addr == member.Addr.String() { // Check if the associated service is available if service != nil { match := false - services := state.NodeServices(member.Name) + _, services := state.NodeServices(member.Name) for id, _ := range services.Services { if id == service.ID { match = true @@ -173,7 +173,7 @@ func (s *Server) handleAliveMember(member serf.Member) error { } // Check if the serfCheck is in the passing state - checks := state.NodeChecks(member.Name) + _, checks := state.NodeChecks(member.Name) for _, check := range checks { if check.CheckID == SerfCheckID && check.Status == structs.HealthPassing { return nil @@ -206,10 +206,10 @@ func (s *Server) handleFailedMember(member serf.Member) error { state := s.fsm.State() // Check if the node exists - found, addr := state.GetNode(member.Name) + _, found, addr := state.GetNode(member.Name) if found && addr == member.Addr.String() { // Check if the serfCheck is in the critical state - checks := state.NodeChecks(member.Name) + _, checks := state.NodeChecks(member.Name) for _, check := range checks { if check.CheckID == SerfCheckID && check.Status == structs.HealthCritical { return nil @@ -240,7 +240,7 @@ func (s *Server) handleLeftMember(member serf.Member) error { state := s.fsm.State() // Check if the node does not exists - found, _ := state.GetNode(member.Name) + _, found, _ := state.GetNode(member.Name) if !found { return nil } diff --git a/consul/leader_test.go b/consul/leader_test.go index 0fbd02f9dc..f34b424e1f 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -32,13 +32,13 @@ func TestLeader_RegisterMember(t *testing.T) { // Client should be registered state := s1.fsm.State() - found, _ := state.GetNode(c1.config.NodeName) + _, found, _ := state.GetNode(c1.config.NodeName) if !found { t.Fatalf("client not registered") } // Should have a check - checks := state.NodeChecks(c1.config.NodeName) + _, checks := state.NodeChecks(c1.config.NodeName) if len(checks) != 1 { t.Fatalf("client missing check") } @@ -53,13 +53,13 @@ func TestLeader_RegisterMember(t *testing.T) { } // Server should be registered - found, _ = state.GetNode(s1.config.NodeName) + _, found, _ = state.GetNode(s1.config.NodeName) if !found { t.Fatalf("server not registered") } // Service should be registered - services := state.NodeServices(s1.config.NodeName) + _, services := state.NodeServices(s1.config.NodeName) if _, ok := services.Services["consul"]; !ok { t.Fatalf("consul service not registered: %v", services) } @@ -92,13 +92,13 @@ func TestLeader_FailedMember(t *testing.T) { // Should be registered state := s1.fsm.State() - found, _ := state.GetNode(c1.config.NodeName) + _, found, _ := state.GetNode(c1.config.NodeName) if !found { t.Fatalf("client not registered") } // Should have a check - checks := state.NodeChecks(c1.config.NodeName) + _, checks := state.NodeChecks(c1.config.NodeName) if len(checks) != 1 { t.Fatalf("client missing check") } @@ -137,7 +137,7 @@ func TestLeader_LeftMember(t *testing.T) { // Should be registered state := s1.fsm.State() - found, _ := state.GetNode(c1.config.NodeName) + _, found, _ := state.GetNode(c1.config.NodeName) if !found { t.Fatalf("client not registered") } @@ -150,7 +150,7 @@ func TestLeader_LeftMember(t *testing.T) { time.Sleep(500 * time.Millisecond) // Should be deregistered - found, _ = state.GetNode(c1.config.NodeName) + _, found, _ = state.GetNode(c1.config.NodeName) if found { t.Fatalf("client registered") } @@ -174,7 +174,7 @@ func TestLeader_Reconcile(t *testing.T) { // Should not be registered state := s1.fsm.State() - found, _ := state.GetNode(c1.config.NodeName) + _, found, _ := state.GetNode(c1.config.NodeName) if found { t.Fatalf("client registered") } @@ -183,7 +183,7 @@ func TestLeader_Reconcile(t *testing.T) { time.Sleep(100 * time.Millisecond) // Should be registered - found, _ = state.GetNode(c1.config.NodeName) + _, found, _ = state.GetNode(c1.config.NodeName) if !found { t.Fatalf("client not registered") } diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 4662f23fc1..acafb68fa2 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -311,14 +311,23 @@ AFTER_DELETE: // Get is used to lookup one or more rows. An index an appropriate // fields are specified. The fields can be a prefix of the index. -func (t *MDBTable) Get(index string, parts ...string) ([]interface{}, error) { +func (t *MDBTable) Get(index string, parts ...string) (uint64, []interface{}, error) { // Start a readonly txn tx, err := t.StartTxn(true, nil) if err != nil { - return nil, err + return 0, nil, err } defer tx.Abort() - return t.GetTxn(tx, index, parts...) + + // Get the last associated index + idx, err := t.LastIndexTxn(tx) + if err != nil { + return 0, nil, err + } + + // Get the actual results + res, err := t.GetTxn(tx, index, parts...) + return idx, res, err } // GetTxn is like Get but it operates within a specific transaction. @@ -572,20 +581,6 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, return nil } -// StartTxn is used to create a transaction that spans a list of tables -func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) { - var tx *MDBTxn - for _, table := range t { - newTx, err := table.StartTxn(readonly, tx) - if err != nil { - tx.Abort() - return nil, err - } - tx = newTx - } - return tx, nil -} - // LastIndex is get the last index that updated the table func (t *MDBTable) LastIndex() (uint64, error) { // Start a readonly txn @@ -631,3 +626,32 @@ func (t *MDBTable) SetLastIndexTxn(tx *MDBTxn, index uint64) error { encIndex := uint64ToBytes(index) return tx.tx.Put(tx.dbis[t.Name], encRowId, encIndex, 0) } + +// StartTxn is used to create a transaction that spans a list of tables +func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) { + var tx *MDBTxn + for _, table := range t { + newTx, err := table.StartTxn(readonly, tx) + if err != nil { + tx.Abort() + return nil, err + } + tx = newTx + } + return tx, nil +} + +// LastIndexTxn is used to get the last transaction from all of the tables +func (t MDBTables) LastIndexTxn(tx *MDBTxn) (uint64, error) { + var index uint64 + for _, table := range t { + idx, err := table.LastIndexTxn(tx) + if err != nil { + return index, err + } + if idx > index { + index = idx + } + } + return index, nil +} diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index ab40c7d661..2ce6ca7912 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -119,14 +119,17 @@ func TestMDBTableInsert(t *testing.T) { } // Insert some mock objects - for _, obj := range objs { + for idx, obj := range objs { if err := table.Insert(obj); err != nil { t.Fatalf("err: %v", err) } + if err := table.SetLastIndex(uint64(idx + 1)); err != nil { + t.Fatalf("err: %v", err) + } } // Verify with some gets - res, err := table.Get("id", "1") + idx, res, err := table.Get("id", "1") if err != nil { t.Fatalf("err: %v", err) } @@ -136,8 +139,11 @@ func TestMDBTableInsert(t *testing.T) { if !reflect.DeepEqual(res[0], objs[0]) { t.Fatalf("bad: %#v", res[0]) } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } - res, err = table.Get("name", "Kevin") + idx, res, err = table.Get("name", "Kevin") if err != nil { t.Fatalf("err: %v", err) } @@ -150,8 +156,11 @@ func TestMDBTableInsert(t *testing.T) { if !reflect.DeepEqual(res[1], objs[1]) { t.Fatalf("bad: %#v", res[1]) } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } - res, err = table.Get("country", "Mexico") + idx, res, err = table.Get("country", "Mexico") if err != nil { t.Fatalf("err: %v", err) } @@ -161,8 +170,11 @@ func TestMDBTableInsert(t *testing.T) { if !reflect.DeepEqual(res[0], objs[2]) { t.Fatalf("bad: %#v", res[2]) } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } - res, err = table.Get("id") + idx, res, err = table.Get("id") if err != nil { t.Fatalf("err: %v", err) } @@ -178,6 +190,9 @@ func TestMDBTableInsert(t *testing.T) { if !reflect.DeepEqual(res[2], objs[2]) { t.Fatalf("bad: %#v", res[2]) } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } } func TestMDBTableInsert_MissingFields(t *testing.T) { @@ -338,7 +353,7 @@ func TestMDBTableDelete(t *testing.T) { } } - _, err := table.Get("id", "3") + _, _, err := table.Get("id", "3") if err != nil { t.Fatalf("err: %v", err) } @@ -351,7 +366,7 @@ func TestMDBTableDelete(t *testing.T) { if num != 1 { t.Fatalf("expect 1 delete: %#v", num) } - res, err := table.Get("id", "3") + _, res, err := table.Get("id", "3") if err != nil { t.Fatalf("err: %v", err) } @@ -366,7 +381,7 @@ func TestMDBTableDelete(t *testing.T) { if num != 2 { t.Fatalf("expect 2 deletes: %#v", num) } - res, err = table.Get("name", "Kevin") + _, res, err = table.Get("name", "Kevin") if err != nil { t.Fatalf("err: %v", err) } @@ -449,7 +464,7 @@ func TestMDBTableUpdate(t *testing.T) { } // Verify with some gets - res, err := table.Get("id", "1") + _, res, err := table.Get("id", "1") if err != nil { t.Fatalf("err: %v", err) } @@ -460,7 +475,7 @@ func TestMDBTableUpdate(t *testing.T) { t.Fatalf("bad: %#v", res[0]) } - res, err = table.Get("name", "Kevin") + _, res, err = table.Get("name", "Kevin") if err != nil { t.Fatalf("err: %v", err) } @@ -468,7 +483,7 @@ func TestMDBTableUpdate(t *testing.T) { t.Fatalf("expect 0 result: %#v", res) } - res, err = table.Get("name", "Ahmad") + _, res, err = table.Get("name", "Ahmad") if err != nil { t.Fatalf("err: %v", err) } @@ -479,7 +494,7 @@ func TestMDBTableUpdate(t *testing.T) { t.Fatalf("bad: %#v", res[0]) } - res, err = table.Get("country", "Mexico") + _, res, err = table.Get("country", "Mexico") if err != nil { t.Fatalf("err: %v", err) } @@ -490,7 +505,7 @@ func TestMDBTableUpdate(t *testing.T) { t.Fatalf("bad: %#v", res[0]) } - res, err = table.Get("id") + _, res, err = table.Get("id") if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/state_store.go b/consul/state_store.go index 84ba662674..8aa0c6fad4 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -34,8 +34,9 @@ type StateStore struct { // StateSnapshot is used to provide a point-in-time snapshot // It works by starting a readonly transaction against all tables. type StateSnapshot struct { - store *StateStore - tx *MDBTxn + store *StateStore + tx *MDBTxn + lastIndex uint64 } // Close is used to abort the transaction and allow for cleanup @@ -188,26 +189,39 @@ func (s *StateStore) initialize() error { } // EnsureNode is used to ensure a given node exists, with the provided address -func (s *StateStore) EnsureNode(node structs.Node) error { - return s.nodeTable.Insert(node) +func (s *StateStore) EnsureNode(index uint64, node structs.Node) error { + // Start a new txn + tx, err := s.nodeTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + if err := s.nodeTable.InsertTxn(tx, node); err != nil { + return err + } + if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + return tx.Commit() } // GetNode returns all the address of the known and if it was found -func (s *StateStore) GetNode(name string) (bool, string) { - res, err := s.nodeTable.Get("id", name) +func (s *StateStore) GetNode(name string) (uint64, bool, string) { + idx, res, err := s.nodeTable.Get("id", name) if err != nil { panic(fmt.Errorf("Failed to get node: %v", err)) } if len(res) == 0 { - return false, "" + return idx, false, "" } - return true, res[0].(*structs.Node).Address + return idx, true, res[0].(*structs.Node).Address } // GetNodes returns all the known nodes, the slice alternates between // the node name and address -func (s *StateStore) Nodes() structs.Nodes { - res, err := s.nodeTable.Get("id") +func (s *StateStore) Nodes() (uint64, structs.Nodes) { + idx, res, err := s.nodeTable.Get("id") if err != nil { panic(fmt.Errorf("Failed to get nodes: %v", err)) } @@ -215,11 +229,11 @@ func (s *StateStore) Nodes() structs.Nodes { for i, r := range res { results[i] = *r.(*structs.Node) } - return results + return idx, results } // EnsureService is used to ensure a given node exposes a service -func (s *StateStore) EnsureService(name, id, service, tag string, port int) error { +func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeService) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) @@ -227,7 +241,7 @@ func (s *StateStore) EnsureService(name, id, service, tag string, port int) erro defer tx.Abort() // Ensure the node exists - res, err := s.nodeTable.GetTxn(tx, "id", name) + res, err := s.nodeTable.GetTxn(tx, "id", node) if err != nil { return err } @@ -237,22 +251,25 @@ func (s *StateStore) EnsureService(name, id, service, tag string, port int) erro // Create the entry entry := structs.ServiceNode{ - Node: name, - ServiceID: id, - ServiceName: service, - ServiceTag: tag, - ServicePort: port, + Node: node, + ServiceID: ns.ID, + ServiceName: ns.Service, + ServiceTag: ns.Tag, + ServicePort: ns.Port, } // Ensure the service entry is set if err := s.serviceTable.InsertTxn(tx, &entry); err != nil { return err } + if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { + return err + } return tx.Commit() } // NodeServices is used to return all the services of a given node -func (s *StateStore) NodeServices(name string) *structs.NodeServices { +func (s *StateStore) NodeServices(name string) (uint64, *structs.NodeServices) { tx, err := s.tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) @@ -263,18 +280,24 @@ func (s *StateStore) NodeServices(name string) *structs.NodeServices { // parseNodeServices is used to get the services belonging to a // node, using a given txn -func (s *StateStore) parseNodeServices(tx *MDBTxn, name string) *structs.NodeServices { +func (s *StateStore) parseNodeServices(tx *MDBTxn, name string) (uint64, *structs.NodeServices) { ns := &structs.NodeServices{ Services: make(map[string]*structs.NodeService), } + // Get the maximum index + index, err := s.tables.LastIndexTxn(tx) + if err != nil { + panic(fmt.Errorf("Failed to get last index: %v", err)) + } + // Get the node first res, err := s.nodeTable.GetTxn(tx, "id", name) if err != nil { panic(fmt.Errorf("Failed to get node: %v", err)) } if len(res) == 0 { - return ns + return index, ns } // Set the address @@ -298,51 +321,71 @@ func (s *StateStore) parseNodeServices(tx *MDBTxn, name string) *structs.NodeSer } ns.Services[srv.ID] = srv } - return ns + return index, ns } // DeleteNodeService is used to delete a node service -func (s *StateStore) DeleteNodeService(node, id string) error { +func (s *StateStore) DeleteNodeService(index uint64, node, id string) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - if _, err := s.serviceTable.DeleteTxn(tx, "id", node, id); err != nil { + if n, err := s.serviceTable.DeleteTxn(tx, "id", node, id); err != nil { return err + } else if n > 0 { + if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { + return err + } } - if _, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil { + if n, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil { return err + } else if n > 0 { + if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { + return err + } } return tx.Commit() } // DeleteNode is used to delete a node and all it's services -func (s *StateStore) DeleteNode(node string) error { +func (s *StateStore) DeleteNode(index uint64, node string) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - if _, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil { + if n, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil { return err + } else if n > 0 { + if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { + return err + } } - if _, err := s.checkTable.DeleteTxn(tx, "id", node); err != nil { + if n, err := s.checkTable.DeleteTxn(tx, "id", node); err != nil { return err + } else if n > 0 { + if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { + return err + } } - if _, err := s.nodeTable.DeleteTxn(tx, "id", node); err != nil { + if n, err := s.nodeTable.DeleteTxn(tx, "id", node); err != nil { return err + } else if n > 0 { + if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil { + return err + } } return tx.Commit() } // Services is used to return all the services with a list of associated tags -func (s *StateStore) Services() map[string][]string { +func (s *StateStore) Services() (uint64, map[string][]string) { // TODO: Optimize to not table scan.. We can do a distinct // type of query to avoid this - res, err := s.serviceTable.Get("id") + idx, res, err := s.serviceTable.Get("id") if err != nil { panic(fmt.Errorf("Failed to get node servicess: %v", err)) } @@ -356,31 +399,41 @@ func (s *StateStore) Services() map[string][]string { services[srv.ServiceName] = tags } } - return services + return idx, services } // ServiceNodes returns the nodes associated with a given service -func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes { +func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes) { tx, err := s.tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - res, err := s.serviceTable.Get("service", service) - return parseServiceNodes(tx, s.nodeTable, res, err) + idx, err := s.tables.LastIndexTxn(tx) + if err != nil { + panic(fmt.Errorf("Failed to get last index: %v", err)) + } + + res, err := s.serviceTable.GetTxn(tx, "service", service) + return idx, parseServiceNodes(tx, s.nodeTable, res, err) } // ServiceTagNodes returns the nodes associated with a given service matching a tag -func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes { +func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes) { tx, err := s.tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - res, err := s.serviceTable.Get("service", service, tag) - return parseServiceNodes(tx, s.nodeTable, res, err) + idx, err := s.tables.LastIndexTxn(tx) + if err != nil { + panic(fmt.Errorf("Failed to get last index: %v", err)) + } + + res, err := s.serviceTable.GetTxn(tx, "service", service, tag) + return idx, parseServiceNodes(tx, s.nodeTable, res, err) } // parseServiceNodes parses results ServiceNodes and ServiceTagNodes @@ -407,7 +460,7 @@ func parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interface{}, err error } // EnsureCheck is used to create a check or updates it's state -func (s *StateStore) EnsureCheck(check *structs.HealthCheck) error { +func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error { // Ensure we have a status if check.Status == "" { check.Status = structs.HealthUnknown @@ -447,33 +500,48 @@ func (s *StateStore) EnsureCheck(check *structs.HealthCheck) error { if err := s.checkTable.InsertTxn(tx, check); err != nil { return err } + if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { + return err + } return tx.Commit() } // DeleteNodeCheck is used to delete a node health check -func (s *StateStore) DeleteNodeCheck(node, id string) error { - _, err := s.checkTable.Delete("id", node, id) - return err +func (s *StateStore) DeleteNodeCheck(index uint64, node, id string) error { + tx, err := s.checkTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + if n, err := s.checkTable.DeleteTxn(tx, "id", node, id); err != nil { + return err + } else if n > 0 { + if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + } + return tx.Commit() } // NodeChecks is used to get all the checks for a node -func (s *StateStore) NodeChecks(node string) structs.HealthChecks { +func (s *StateStore) NodeChecks(node string) (uint64, structs.HealthChecks) { return parseHealthChecks(s.checkTable.Get("id", node)) } // ServiceChecks is used to get all the checks for a service -func (s *StateStore) ServiceChecks(service string) structs.HealthChecks { +func (s *StateStore) ServiceChecks(service string) (uint64, structs.HealthChecks) { return parseHealthChecks(s.checkTable.Get("service", service)) } // CheckInState is used to get all the checks for a service in a given state -func (s *StateStore) ChecksInState(state string) structs.HealthChecks { +func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks) { return parseHealthChecks(s.checkTable.Get("status", state)) } // parseHealthChecks is used to handle the resutls of a Get against // the checkTable -func parseHealthChecks(res []interface{}, err error) structs.HealthChecks { +func parseHealthChecks(idx uint64, res []interface{}, err error) (uint64, structs.HealthChecks) { if err != nil { panic(fmt.Errorf("Failed to get checks: %v", err)) } @@ -481,33 +549,43 @@ func parseHealthChecks(res []interface{}, err error) structs.HealthChecks { for i, r := range res { results[i] = r.(*structs.HealthCheck) } - return results + return idx, results } // CheckServiceNodes returns the nodes associated with a given service, along // with any associated check -func (s *StateStore) CheckServiceNodes(service string) structs.CheckServiceNodes { +func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckServiceNodes) { tx, err := s.tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - res, err := s.serviceTable.Get("service", service) - return s.parseCheckServiceNodes(tx, res, err) + idx, err := s.tables.LastIndexTxn(tx) + if err != nil { + panic(fmt.Errorf("Failed to get last index: %v", err)) + } + + res, err := s.serviceTable.GetTxn(tx, "service", service) + return idx, s.parseCheckServiceNodes(tx, res, err) } // CheckServiceNodes returns the nodes associated with a given service, along // with any associated checks -func (s *StateStore) CheckServiceTagNodes(service, tag string) structs.CheckServiceNodes { +func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.CheckServiceNodes) { tx, err := s.tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - res, err := s.serviceTable.Get("service", service, tag) - return s.parseCheckServiceNodes(tx, res, err) + idx, err := s.tables.LastIndexTxn(tx) + if err != nil { + panic(fmt.Errorf("Failed to get last index: %v", err)) + } + + res, err := s.serviceTable.GetTxn(tx, "service", service, tag) + return idx, s.parseCheckServiceNodes(tx, res, err) } // parseCheckServiceNodes parses results CheckServiceNodes and CheckServiceTagNodes @@ -527,10 +605,12 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e } // Get any associated checks of the service - checks := parseHealthChecks(s.checkTable.GetTxn(tx, "node", srv.Node, srv.ServiceID)) + res, err := s.checkTable.GetTxn(tx, "node", srv.Node, srv.ServiceID) + _, checks := parseHealthChecks(0, res, err) // Get any checks of the node, not assciated with any service - nodeChecks := parseHealthChecks(s.checkTable.GetTxn(tx, "node", srv.Node, "")) + res, err = s.checkTable.GetTxn(tx, "node", srv.Node, "") + _, nodeChecks := parseHealthChecks(0, res, err) checks = append(checks, nodeChecks...) // Setup the node @@ -555,14 +635,27 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return nil, err } + // Determine the max index + index, err := s.tables.LastIndexTxn(tx) + if err != nil { + tx.Abort() + return nil, err + } + // Return the snapshot snap := &StateSnapshot{ - store: s, - tx: tx, + store: s, + tx: tx, + lastIndex: index, } return snap, nil } +// LastIndex returns the last index that affects the snapshotted data +func (s *StateSnapshot) LastIndex() uint64 { + return s.lastIndex +} + // Nodes returns all the known nodes, the slice alternates between // the node name and address func (s *StateSnapshot) Nodes() structs.Nodes { @@ -579,10 +672,13 @@ func (s *StateSnapshot) Nodes() structs.Nodes { // NodeServices is used to return all the services of a given node func (s *StateSnapshot) NodeServices(name string) *structs.NodeServices { - return s.store.parseNodeServices(s.tx, name) + _, res := s.store.parseNodeServices(s.tx, name) + return res } // NodeChecks is used to return all the checks of a given node func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks { - return parseHealthChecks(s.store.checkTable.GetTxn(s.tx, "id", node)) + res, err := s.store.checkTable.GetTxn(s.tx, "id", node) + _, checks := parseHealthChecks(s.lastIndex, res, err) + return checks } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 66f394a9ef..355cb045b3 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -14,22 +14,22 @@ func TestEnsureNode(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - found, addr := store.GetNode("foo") - if !found || addr != "127.0.0.1" { - t.Fatalf("Bad: %v %v", found, addr) + idx, found, addr := store.GetNode("foo") + if idx != 3 || !found || addr != "127.0.0.1" { + t.Fatalf("Bad: %v %v %v", idx, found, addr) } - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.2"}); err != nil { + if err := store.EnsureNode(4, structs.Node{"foo", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } - found, addr = store.GetNode("foo") - if !found || addr != "127.0.0.2" { - t.Fatalf("Bad: %v %v", found, addr) + idx, found, addr = store.GetNode("foo") + if idx != 4 || !found || addr != "127.0.0.2" { + t.Fatalf("Bad: %v %v %v", idx, found, addr) } } @@ -40,15 +40,18 @@ func TestGetNodes(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(40, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { + if err := store.EnsureNode(41, structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } - nodes := store.Nodes() + idx, nodes := store.Nodes() + if idx != 41 { + t.Fatalf("idx: %v", idx) + } if len(nodes) != 2 { t.Fatalf("Bad: %v", nodes) } @@ -64,11 +67,11 @@ func BenchmarkGetNodes(b *testing.B) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(100, structs.Node{"foo", "127.0.0.1"}); err != nil { b.Fatalf("err: %v") } - if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { + if err := store.EnsureNode(101, structs.Node{"bar", "127.0.0.2"}); err != nil { b.Fatalf("err: %v") } @@ -84,23 +87,26 @@ func TestEnsureService(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(10, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { + if err := store.EnsureService(11, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api", "api", "", 5001); err != nil { + if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", "", 5001}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { + if err := store.EnsureService(13, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil { t.Fatalf("err: %v", err) } - services := store.NodeServices("foo") + idx, services := store.NodeServices("foo") + if idx != 13 { + t.Fatalf("bad: %v", idx) + } entry, ok := services.Services["api"] if !ok { @@ -126,23 +132,26 @@ func TestEnsureService_DuplicateNode(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(10, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api1", "api", "", 5000); err != nil { + if err := store.EnsureService(11, "foo", &structs.NodeService{"api1", "api", "", 5000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api2", "api", "", 5001); err != nil { + if err := store.EnsureService(12, "foo", &structs.NodeService{"api2", "api", "", 5001}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api3", "api", "", 5002); err != nil { + if err := store.EnsureService(13, "foo", &structs.NodeService{"api3", "api", "", 5002}); err != nil { t.Fatalf("err: %v", err) } - services := store.NodeServices("foo") + idx, services := store.NodeServices("foo") + if idx != 13 { + t.Fatalf("bad: %v", idx) + } entry, ok := services.Services["api1"] if !ok { @@ -176,11 +185,11 @@ func TestDeleteNodeService(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(11, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { + if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil { t.Fatalf("err: %v", err) } @@ -191,21 +200,27 @@ func TestDeleteNodeService(t *testing.T) { Status: structs.HealthPassing, ServiceID: "api", } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(13, check); err != nil { t.Fatalf("err: %v") } - if err := store.DeleteNodeService("foo", "api"); err != nil { + if err := store.DeleteNodeService(14, "foo", "api"); err != nil { t.Fatalf("err: %v", err) } - services := store.NodeServices("foo") + idx, services := store.NodeServices("foo") + if idx != 14 { + t.Fatalf("bad: %v", idx) + } _, ok := services.Services["api"] if ok { t.Fatalf("has api: %#v", services) } - checks := store.NodeChecks("foo") + idx, checks := store.NodeChecks("foo") + if idx != 14 { + t.Fatalf("bad: %v", idx) + } if len(checks) != 0 { t.Fatalf("has check: %#v", checks) } @@ -218,23 +233,26 @@ func TestDeleteNodeService_One(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(11, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { + if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api2", "api", "", 5001); err != nil { + if err := store.EnsureService(13, "foo", &structs.NodeService{"api2", "api", "", 5001}); err != nil { t.Fatalf("err: %v", err) } - if err := store.DeleteNodeService("foo", "api"); err != nil { + if err := store.DeleteNodeService(14, "foo", "api"); err != nil { t.Fatalf("err: %v", err) } - services := store.NodeServices("foo") + idx, services := store.NodeServices("foo") + if idx != 14 { + t.Fatalf("bad: %v", idx) + } _, ok := services.Services["api"] if ok { t.Fatalf("has api: %#v", services) @@ -252,11 +270,11 @@ func TestDeleteNode(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(20, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { + if err := store.EnsureService(21, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil { t.Fatalf("err: %v") } @@ -267,26 +285,35 @@ func TestDeleteNode(t *testing.T) { Status: structs.HealthPassing, ServiceID: "api", } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(22, check); err != nil { t.Fatalf("err: %v", err) } - if err := store.DeleteNode("foo"); err != nil { + if err := store.DeleteNode(23, "foo"); err != nil { t.Fatalf("err: %v", err) } - services := store.NodeServices("foo") + idx, services := store.NodeServices("foo") + if idx != 23 { + t.Fatalf("bad: %v", idx) + } _, ok := services.Services["api"] if ok { t.Fatalf("has api: %#v", services) } - checks := store.NodeChecks("foo") + idx, checks := store.NodeChecks("foo") + if idx != 23 { + t.Fatalf("bad: %v", idx) + } if len(checks) > 0 { t.Fatalf("has checks: %v", checks) } - found, _ := store.GetNode("foo") + idx, found, _ := store.GetNode("foo") + if idx != 23 { + t.Fatalf("bad: %v", idx) + } if found { t.Fatalf("found node") } @@ -299,27 +326,30 @@ func TestGetServices(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(30, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { + if err := store.EnsureNode(31, structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { + if err := store.EnsureService(32, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { + if err := store.EnsureService(33, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil { + if err := store.EnsureService(34, "bar", &structs.NodeService{"db", "db", "slave", 8000}); err != nil { t.Fatalf("err: %v") } - services := store.Services() + idx, services := store.Services() + if idx != 34 { + t.Fatalf("bad: %v", idx) + } tags, ok := services["api"] if !ok { @@ -346,35 +376,38 @@ func TestServiceNodes(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(10, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { + if err := store.EnsureNode(11, structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { + if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "api", "api", "", 5000); err != nil { + if err := store.EnsureService(13, "bar", &structs.NodeService{"api", "api", "", 5000}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { + if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil { + if err := store.EnsureService(15, "bar", &structs.NodeService{"db", "db", "slave", 8000}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db2", "db", "slave", 8001); err != nil { + if err := store.EnsureService(16, "bar", &structs.NodeService{"db2", "db", "slave", 8001}); err != nil { t.Fatalf("err: %v") } - nodes := store.ServiceNodes("db") + idx, nodes := store.ServiceNodes("db") + if idx != 16 { + t.Fatalf("bad: %v", 16) + } if len(nodes) != 3 { t.Fatalf("bad: %v", nodes) } @@ -434,27 +467,30 @@ func TestServiceTagNodes(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(15, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { + if err := store.EnsureNode(16, structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { + if err := store.EnsureService(17, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db2", "db", "slave", 8001); err != nil { + if err := store.EnsureService(18, "foo", &structs.NodeService{"db2", "db", "slave", 8001}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil { + if err := store.EnsureService(19, "bar", &structs.NodeService{"db", "db", "slave", 8000}); err != nil { t.Fatalf("err: %v") } - nodes := store.ServiceTagNodes("db", "master") + idx, nodes := store.ServiceTagNodes("db", "master") + if idx != 19 { + t.Fatalf("bad: %v", idx) + } if len(nodes) != 1 { t.Fatalf("bad: %v", nodes) } @@ -479,23 +515,23 @@ func TestStoreSnapshot(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(8, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { + if err := store.EnsureNode(9, structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { + if err := store.EnsureService(10, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db2", "db", "slave", 8001); err != nil { + if err := store.EnsureService(11, "foo", &structs.NodeService{"db2", "db", "slave", 8001}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil { + if err := store.EnsureService(12, "bar", &structs.NodeService{"db", "db", "slave", 8000}); err != nil { t.Fatalf("err: %v") } @@ -506,7 +542,7 @@ func TestStoreSnapshot(t *testing.T) { Status: structs.HealthPassing, ServiceID: "db", } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(13, check); err != nil { t.Fatalf("err: %v") } @@ -517,6 +553,11 @@ func TestStoreSnapshot(t *testing.T) { } defer snap.Close() + // Check the last nodes + if idx := snap.LastIndex(); idx != 13 { + t.Fatalf("bad: %v", idx) + } + // Check snapshot has old values nodes := snap.Nodes() if len(nodes) != 2 { @@ -547,13 +588,13 @@ func TestStoreSnapshot(t *testing.T) { } // Make some changes! - if err := store.EnsureService("foo", "db", "db", "slave", 8000); err != nil { + if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "slave", 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("bar", "db", "db", "master", 8000); err != nil { + if err := store.EnsureService(15, "bar", &structs.NodeService{"db", "db", "master", 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureNode(structs.Node{"baz", "127.0.0.3"}); err != nil { + if err := store.EnsureNode(16, structs.Node{"baz", "127.0.0.3"}); err != nil { t.Fatalf("err: %v", err) } checkAfter := &structs.HealthCheck{ @@ -563,7 +604,7 @@ func TestStoreSnapshot(t *testing.T) { Status: structs.HealthCritical, ServiceID: "db", } - if err := store.EnsureCheck(checkAfter); err != nil { + if err := store.EnsureCheck(17, checkAfter); err != nil { t.Fatalf("err: %v") } @@ -603,10 +644,10 @@ func TestEnsureCheck(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "db1", "db", "master", 8000); err != nil { + if err := store.EnsureService(2, "foo", &structs.NodeService{"db1", "db", "master", 8000}); err != nil { t.Fatalf("err: %v") } check := &structs.HealthCheck{ @@ -616,7 +657,7 @@ func TestEnsureCheck(t *testing.T) { Status: structs.HealthPassing, ServiceID: "db1", } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(3, check); err != nil { t.Fatalf("err: %v") } @@ -626,11 +667,14 @@ func TestEnsureCheck(t *testing.T) { Name: "memory utilization", Status: structs.HealthWarning, } - if err := store.EnsureCheck(check2); err != nil { + if err := store.EnsureCheck(4, check2); err != nil { t.Fatalf("err: %v") } - checks := store.NodeChecks("foo") + idx, checks := store.NodeChecks("foo") + if idx != 4 { + t.Fatalf("bad: %v", idx) + } if len(checks) != 2 { t.Fatalf("bad: %v", checks) } @@ -641,7 +685,10 @@ func TestEnsureCheck(t *testing.T) { t.Fatalf("bad: %v", checks[1]) } - checks = store.ServiceChecks("db") + idx, checks = store.ServiceChecks("db") + if idx != 4 { + t.Fatalf("bad: %v", idx) + } if len(checks) != 1 { t.Fatalf("bad: %v", checks) } @@ -649,7 +696,10 @@ func TestEnsureCheck(t *testing.T) { t.Fatalf("bad: %v", checks[0]) } - checks = store.ChecksInState(structs.HealthPassing) + idx, checks = store.ChecksInState(structs.HealthPassing) + if idx != 4 { + t.Fatalf("bad: %v", idx) + } if len(checks) != 1 { t.Fatalf("bad: %v", checks) } @@ -657,7 +707,10 @@ func TestEnsureCheck(t *testing.T) { t.Fatalf("bad: %v", checks[0]) } - checks = store.ChecksInState(structs.HealthWarning) + idx, checks = store.ChecksInState(structs.HealthWarning) + if idx != 4 { + t.Fatalf("bad: %v", idx) + } if len(checks) != 1 { t.Fatalf("bad: %v", checks) } @@ -673,10 +726,10 @@ func TestDeleteNodeCheck(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "db1", "db", "master", 8000); err != nil { + if err := store.EnsureService(2, "foo", &structs.NodeService{"db1", "db", "master", 8000}); err != nil { t.Fatalf("err: %v") } check := &structs.HealthCheck{ @@ -686,7 +739,7 @@ func TestDeleteNodeCheck(t *testing.T) { Status: structs.HealthPassing, ServiceID: "db1", } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(3, check); err != nil { t.Fatalf("err: %v") } @@ -696,15 +749,18 @@ func TestDeleteNodeCheck(t *testing.T) { Name: "memory utilization", Status: structs.HealthWarning, } - if err := store.EnsureCheck(check2); err != nil { + if err := store.EnsureCheck(4, check2); err != nil { t.Fatalf("err: %v") } - if err := store.DeleteNodeCheck("foo", "db"); err != nil { + if err := store.DeleteNodeCheck(5, "foo", "db"); err != nil { t.Fatalf("err: %v", err) } - checks := store.NodeChecks("foo") + idx, checks := store.NodeChecks("foo") + if idx != 5 { + t.Fatalf("bad: %v", idx) + } if len(checks) != 1 { t.Fatalf("bad: %v", checks) } @@ -720,10 +776,10 @@ func TestCheckServiceNodes(t *testing.T) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "db1", "db", "master", 8000); err != nil { + if err := store.EnsureService(2, "foo", &structs.NodeService{"db1", "db", "master", 8000}); err != nil { t.Fatalf("err: %v") } check := &structs.HealthCheck{ @@ -733,7 +789,7 @@ func TestCheckServiceNodes(t *testing.T) { Status: structs.HealthPassing, ServiceID: "db1", } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(3, check); err != nil { t.Fatalf("err: %v") } check = &structs.HealthCheck{ @@ -742,11 +798,14 @@ func TestCheckServiceNodes(t *testing.T) { Name: SerfCheckName, Status: structs.HealthPassing, } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(4, check); err != nil { t.Fatalf("err: %v") } - nodes := store.CheckServiceNodes("db") + idx, nodes := store.CheckServiceNodes("db") + if idx != 4 { + t.Fatalf("bad: %v", idx) + } if len(nodes) != 1 { t.Fatalf("Bad: %v", nodes) } @@ -767,7 +826,10 @@ func TestCheckServiceNodes(t *testing.T) { t.Fatalf("Bad: %v", nodes[0]) } - nodes = store.CheckServiceTagNodes("db", "master") + idx, nodes = store.CheckServiceTagNodes("db", "master") + if idx != 4 { + t.Fatalf("bad: %v", idx) + } if len(nodes) != 1 { t.Fatalf("Bad: %v", nodes) } @@ -795,10 +857,10 @@ func BenchmarkCheckServiceNodes(t *testing.B) { } defer store.Close() - if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { + if err := store.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "db1", "db", "master", 8000); err != nil { + if err := store.EnsureService(2, "foo", &structs.NodeService{"db1", "db", "master", 8000}); err != nil { t.Fatalf("err: %v") } check := &structs.HealthCheck{ @@ -808,7 +870,7 @@ func BenchmarkCheckServiceNodes(t *testing.B) { Status: structs.HealthPassing, ServiceID: "db1", } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(3, check); err != nil { t.Fatalf("err: %v") } check = &structs.HealthCheck{ @@ -817,7 +879,7 @@ func BenchmarkCheckServiceNodes(t *testing.B) { Name: SerfCheckName, Status: structs.HealthPassing, } - if err := store.EnsureCheck(check); err != nil { + if err := store.EnsureCheck(4, check); err != nil { t.Fatalf("err: %v") }