From 5b6502c1ea581835dae2884f99e17e428ad2a5f8 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 12 Oct 2015 00:42:09 -0700 Subject: [PATCH] Integrates new state store into leader and catalog/health endpoints. --- consul/catalog_endpoint.go | 68 +++-- consul/catalog_endpoint_test.go | 63 +++-- consul/fsm.go | 6 +- consul/health_endpoint.go | 61 +++-- consul/health_endpoint_test.go | 18 +- consul/leader.go | 58 ++-- consul/leader_test.go | 143 +++++++--- consul/server.go | 5 +- consul/state/graveyard.go | 13 +- consul/state/state_store.go | 229 ++++++++++++++-- consul/state/state_store_test.go | 345 +++++++++++++++++++++++- consul/{ => state}/tombstone_gc.go | 2 +- consul/{ => state}/tombstone_gc_test.go | 2 +- consul/state/watch.go | 24 ++ consul/state_store.go | 7 +- consul/state_store_test.go | 7 +- 16 files changed, 882 insertions(+), 169 deletions(-) rename consul/{ => state}/tombstone_gc.go (99%) rename consul/{ => state}/tombstone_gc_test.go (99%) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 17d3f5d5c3..865372964d 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -6,6 +6,7 @@ import ( "time" "github.com/armon/go-metrics" + state_store "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" ) @@ -119,13 +120,19 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return err } - // Get the local state - state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.QueryOptions, + // Get the list of nodes. + state := c.srv.fsm.StateNew() + return c.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("Nodes"), + state.GetTableWatch("nodes"), func() error { - reply.Index, reply.Nodes = state.Nodes() + index, nodes, err := state.Nodes() + if err != nil { + return err + } + + reply.Index, reply.Nodes = index, nodes return nil }) } @@ -136,13 +143,19 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return err } - // Get the current nodes - state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.QueryOptions, + // Get the list of services and their tags. + state := c.srv.fsm.StateNew() + return c.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("Services"), + state.GetTableWatch("services"), func() error { - reply.Index, reply.Services = state.Services() + index, services, err := state.Services() + if err != nil { + return err + } + + reply.Index, reply.Services = index, services return c.srv.filterACL(args.Token, reply) }) } @@ -159,16 +172,26 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru } // Get the nodes - state := c.srv.fsm.State() - err := c.srv.blockingRPC(&args.QueryOptions, + state := c.srv.fsm.StateNew() + err := c.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("ServiceNodes"), + state_store.NewMultiWatch( + state.GetTableWatch("nodes"), + state.GetTableWatch("services")), func() error { + var index uint64 + var services structs.ServiceNodes + var err error if args.TagFilter { - reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) + index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { - reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName) + index, services, err = state.ServiceNodes(args.ServiceName) } + if err != nil { + return err + } + reply.Index, reply.ServiceNodes = index, services return c.srv.filterACL(args.Token, reply) }) @@ -197,12 +220,19 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs } // Get the node services - state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.QueryOptions, + state := c.srv.fsm.StateNew() + return c.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("NodeServices"), + state_store.NewMultiWatch( + state.GetTableWatch("nodes"), + state.GetTableWatch("services")), func() error { - reply.Index, reply.NodeServices = state.NodeServices(args.Node) + index, services, err := state.NodeServices(args.Node) + if err != nil { + return err + } + reply.Index, reply.NodeServices = index, services return c.srv.filterACL(args.Token, reply) }) } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 9665c2f33e..4bcad6f9b3 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -267,7 +267,9 @@ func TestCatalogListNodes(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } testutil.WaitForResult(func() (bool, error) { msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) @@ -317,12 +319,16 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) { codec = codec1 // Inject fake data on the follower! - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } } else { codec = codec2 // Inject fake data on the follower! - s2.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + if err := s2.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } } args := structs.DCSpecificRequest{ @@ -458,7 +464,9 @@ func BenchmarkCatalogListNodes(t *testing.B) { defer codec.Close() // Just add a node - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) + if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } args := structs.DCSpecificRequest{ Datacenter: "dc1", @@ -490,8 +498,12 @@ func TestCatalogListServices(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) - s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) + if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.StateNew().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) @@ -541,11 +553,16 @@ func TestCatalogListServices_Blocking(t *testing.T) { args.MaxQueryTime = time.Second // Async cause a change + idx := out.Index start := time.Now() go func() { time.Sleep(100 * time.Millisecond) - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) - s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) + if err := s1.fsm.StateNew().EnsureNode(idx+1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.StateNew().EnsureService(idx+2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } }() // Re-run the query @@ -560,7 +577,7 @@ func TestCatalogListServices_Blocking(t *testing.T) { } // Check the indexes - if out.Index != 2 { + if out.Index != idx+2 { t.Fatalf("bad: %v", out) } @@ -625,8 +642,12 @@ func TestCatalogListServices_Stale(t *testing.T) { var out structs.IndexedServices // Inject a fake service - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) - s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) + if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.StateNew().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } // Run the query, do not wait for leader! if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { @@ -666,8 +687,12 @@ func TestCatalogListServiceNodes(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) - s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) + if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.StateNew().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) @@ -709,9 +734,15 @@ func TestCatalogNodeServices(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node - s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) - s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) - s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}) + if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.StateNew().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.StateNew().EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}); err != nil { + t.Fatalf("err: %v", err) + } if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out); err != nil { t.Fatalf("err: %v", err) diff --git a/consul/fsm.go b/consul/fsm.go index 6d9a924daa..ee594279ef 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -27,7 +27,7 @@ type consulFSM struct { path string stateNew *state.StateStore state *StateStore - gc *TombstoneGC + gc *state.TombstoneGC } // consulSnapshot is used to provide a snapshot of the current @@ -46,9 +46,9 @@ type snapshotHeader struct { } // NewFSMPath is used to construct a new FSM with a blank state -func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) { +func NewFSM(gc *state.TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) { // Create the state store. - stateNew, err := state.NewStateStore() + stateNew, err := state.NewStateStore(gc) if err != nil { return nil, err } diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 6accd3aa64..8f76f7000a 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -3,6 +3,7 @@ package consul import ( "fmt" "github.com/armon/go-metrics" + state_store "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" ) @@ -19,12 +20,17 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, } // Get the state specific checks - state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.QueryOptions, + state := h.srv.fsm.StateNew() + return h.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("ChecksInState"), + state.GetTableWatch("checks"), func() error { - reply.Index, reply.HealthChecks = state.ChecksInState(args.State) + index, checks, err := state.ChecksInState(args.State) + if err != nil { + return err + } + reply.Index, reply.HealthChecks = index, checks return h.srv.filterACL(args.Token, reply) }) } @@ -37,12 +43,17 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, } // Get the node checks - state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.QueryOptions, + state := h.srv.fsm.StateNew() + return h.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("NodeChecks"), + state.GetTableWatch("checks"), func() error { - reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) + index, checks, err := state.NodeChecks(args.Node) + if err != nil { + return err + } + reply.Index, reply.HealthChecks = index, checks return h.srv.filterACL(args.Token, reply) }) } @@ -61,12 +72,17 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, } // Get the service checks - state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.QueryOptions, + state := h.srv.fsm.StateNew() + return h.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("ServiceChecks"), + state.GetTableWatch("checks"), func() error { - reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) + index, checks, err := state.ServiceChecks(args.ServiceName) + if err != nil { + return err + } + reply.Index, reply.HealthChecks = index, checks return h.srv.filterACL(args.Token, reply) }) } @@ -83,16 +99,27 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc } // Get the nodes - state := h.srv.fsm.State() - err := h.srv.blockingRPC(&args.QueryOptions, + state := h.srv.fsm.StateNew() + err := h.srv.blockingRPCNew( + &args.QueryOptions, &reply.QueryMeta, - state.QueryTables("CheckServiceNodes"), + state_store.NewMultiWatch( + state.GetTableWatch("nodes"), + state.GetTableWatch("services"), + state.GetTableWatch("checks")), func() error { + var index uint64 + var nodes structs.CheckServiceNodes + var err error if args.TagFilter { - reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) + index, nodes, err = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) } else { - reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName) + index, nodes, err = state.CheckServiceNodes(args.ServiceName) } + if err != nil { + return err + } + reply.Index, reply.Nodes = index, nodes return h.srv.filterACL(args.Token, reply) }) diff --git a/consul/health_endpoint_test.go b/consul/health_endpoint_test.go index 110b1d8b02..77356983e2 100644 --- a/consul/health_endpoint_test.go +++ b/consul/health_endpoint_test.go @@ -46,11 +46,11 @@ func TestHealth_ChecksInState(t *testing.T) { t.Fatalf("Bad: %v", checks) } - // First check is automatically added for the server node - if checks[0].CheckID != SerfCheckID { + // Serf check is automatically added + if checks[0].Name != "memory utilization" { t.Fatalf("Bad: %v", checks[0]) } - if checks[1].Name != "memory utilization" { + if checks[1].CheckID != SerfCheckID { t.Fatalf("Bad: %v", checks[1]) } } @@ -205,22 +205,22 @@ func TestHealth_ServiceNodes(t *testing.T) { if len(nodes) != 2 { t.Fatalf("Bad: %v", nodes) } - if nodes[0].Node.Node != "foo" { + if nodes[0].Node.Node != "bar" { t.Fatalf("Bad: %v", nodes[0]) } - if nodes[1].Node.Node != "bar" { + if nodes[1].Node.Node != "foo" { t.Fatalf("Bad: %v", nodes[1]) } - if !strContains(nodes[0].Service.Tags, "master") { + if !strContains(nodes[0].Service.Tags, "slave") { t.Fatalf("Bad: %v", nodes[0]) } - if !strContains(nodes[1].Service.Tags, "slave") { + if !strContains(nodes[1].Service.Tags, "master") { t.Fatalf("Bad: %v", nodes[1]) } - if nodes[0].Checks[0].Status != structs.HealthPassing { + if nodes[0].Checks[0].Status != structs.HealthWarning { t.Fatalf("Bad: %v", nodes[0]) } - if nodes[1].Checks[0].Status != structs.HealthWarning { + if nodes[1].Checks[0].Status != structs.HealthPassing { t.Fatalf("Bad: %v", nodes[1]) } } diff --git a/consul/leader.go b/consul/leader.go index f8bb07cd9e..7dc57f01c9 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -259,8 +259,11 @@ func (s *Server) reconcile() (err error) { // in a critical state that does not correspond to a known Serf member. We generate // a "reap" event to cause the node to be cleaned up. func (s *Server) reconcileReaped(known map[string]struct{}) error { - state := s.fsm.State() - _, checks := state.ChecksInState(structs.HealthAny) + state := s.fsm.StateNew() + _, checks, err := state.ChecksInState(structs.HealthAny) + if err != nil { + return err + } for _, check := range checks { // Ignore any non serf checks if check.CheckID != SerfCheckID { @@ -282,7 +285,10 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { } // Get the node services, look for ConsulServiceID - _, services := state.NodeServices(check.Node) + _, services, err := state.NodeServices(check.Node) + if err != nil { + return err + } serverPort := 0 for _, service := range services.Services { if service.ID == ConsulServiceID { @@ -352,8 +358,6 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { // handleAliveMember is used to ensure the node // is registered, with a passing health check. func (s *Server) handleAliveMember(member serf.Member) error { - state := s.fsm.State() - // Register consul service if a server var service *structs.NodeService if valid, parts := isConsulServer(member); valid { @@ -370,12 +374,19 @@ func (s *Server) handleAliveMember(member serf.Member) error { } // Check if the node exists - _, found, addr := state.GetNode(member.Name) - if found && addr == member.Addr.String() { + state := s.fsm.StateNew() + node, err := state.GetNode(member.Name) + if err != nil { + return fmt.Errorf("failed to lookup node %s: %s", member.Name, err) + } + if node != nil && node.Address == member.Addr.String() { // Check if the associated service is available if service != nil { match := false - _, services := state.NodeServices(member.Name) + _, services, err := state.NodeServices(member.Name) + if err != nil { + return err + } if services != nil { for id, _ := range services.Services { if id == service.ID { @@ -389,7 +400,10 @@ func (s *Server) handleAliveMember(member serf.Member) error { } // Check if the serfCheck is in the passing state - _, checks := state.NodeChecks(member.Name) + _, checks, err := state.NodeChecks(member.Name) + if err != nil { + return err + } for _, check := range checks { if check.CheckID == SerfCheckID && check.Status == structs.HealthPassing { return nil @@ -421,13 +435,18 @@ AFTER_CHECK: // handleFailedMember is used to mark the node's status // as being critical, along with all checks as unknown. func (s *Server) handleFailedMember(member serf.Member) error { - state := s.fsm.State() - // Check if the node exists - _, found, addr := state.GetNode(member.Name) - if found && addr == member.Addr.String() { + state := s.fsm.StateNew() + node, err := state.GetNode(member.Name) + if err != nil { + return fmt.Errorf("failed to lookup node %s: %s", member.Name, err) + } + if node != nil && node.Address == member.Addr.String() { // Check if the serfCheck is in the critical state - _, checks := state.NodeChecks(member.Name) + _, checks, err := state.NodeChecks(member.Name) + if err != nil { + return err + } for _, check := range checks { if check.CheckID == SerfCheckID && check.Status == structs.HealthCritical { return nil @@ -468,7 +487,6 @@ func (s *Server) handleReapMember(member serf.Member) error { // handleDeregisterMember is used to deregister a member of a given reason func (s *Server) handleDeregisterMember(reason string, member serf.Member) error { - state := s.fsm.State() // Do not deregister ourself. This can only happen if the current leader // is leaving. Instead, we should allow a follower to take-over and // deregister us later. @@ -484,9 +502,13 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error } } - // Check if the node does not exists - _, found, _ := state.GetNode(member.Name) - if !found { + // Check if the node does not exist + state := s.fsm.StateNew() + node, err := state.GetNode(member.Name) + if err != nil { + return fmt.Errorf("failed to lookup node %s: %s", member.Name, err) + } + if node == nil { return nil } diff --git a/consul/leader_test.go b/consul/leader_test.go index 4155caaf67..0b3001af93 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -32,16 +32,22 @@ func TestLeader_RegisterMember(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Client should be registered - state := s1.fsm.State() + state := s1.fsm.StateNew() testutil.WaitForResult(func() (bool, error) { - _, found, _ := state.GetNode(c1.config.NodeName) - return found == true, nil + node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return node != nil, nil }, func(err error) { t.Fatalf("client not registered") }) // Should have a check - _, checks := state.NodeChecks(c1.config.NodeName) + _, checks, err := state.NodeChecks(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } if len(checks) != 1 { t.Fatalf("client missing check") } @@ -56,13 +62,19 @@ func TestLeader_RegisterMember(t *testing.T) { } // Server should be registered - _, found, _ := state.GetNode(s1.config.NodeName) - if !found { + node, err := state.GetNode(s1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + if node == nil { t.Fatalf("server not registered") } // Service should be registered - _, services := state.NodeServices(s1.config.NodeName) + _, services, err := state.NodeServices(s1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } if _, ok := services.Services["consul"]; !ok { t.Fatalf("consul service not registered: %v", services) } @@ -90,16 +102,22 @@ func TestLeader_FailedMember(t *testing.T) { c1.Shutdown() // Should be registered - state := s1.fsm.State() + state := s1.fsm.StateNew() testutil.WaitForResult(func() (bool, error) { - _, found, _ := state.GetNode(c1.config.NodeName) - return found == true, nil + node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return node != nil, nil }, func(err error) { t.Fatalf("client not registered") }) // Should have a check - _, checks := state.NodeChecks(c1.config.NodeName) + _, checks, err := state.NodeChecks(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } if len(checks) != 1 { t.Fatalf("client missing check") } @@ -111,7 +129,10 @@ func TestLeader_FailedMember(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - _, checks = state.NodeChecks(c1.config.NodeName) + _, checks, err = state.NodeChecks(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } return checks[0].Status == structs.HealthCritical, errors.New(checks[0].Status) }, func(err error) { t.Fatalf("check status is %v, should be critical", err) @@ -134,13 +155,15 @@ func TestLeader_LeftMember(t *testing.T) { t.Fatalf("err: %v", err) } - var found bool - state := s1.fsm.State() + state := s1.fsm.StateNew() // Should be registered testutil.WaitForResult(func() (bool, error) { - _, found, _ = state.GetNode(c1.config.NodeName) - return found == true, nil + node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return node != nil, nil }, func(err error) { t.Fatalf("client should be registered") }) @@ -151,8 +174,11 @@ func TestLeader_LeftMember(t *testing.T) { // Should be deregistered testutil.WaitForResult(func() (bool, error) { - _, found, _ = state.GetNode(c1.config.NodeName) - return found == false, nil + node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return node == nil, nil }, func(err error) { t.Fatalf("client should not be registered") }) @@ -174,13 +200,15 @@ func TestLeader_ReapMember(t *testing.T) { t.Fatalf("err: %v", err) } - var found bool - state := s1.fsm.State() + state := s1.fsm.StateNew() // Should be registered testutil.WaitForResult(func() (bool, error) { - _, found, _ = state.GetNode(c1.config.NodeName) - return found == true, nil + node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return node != nil, nil }, func(err error) { t.Fatalf("client should be registered") }) @@ -199,8 +227,11 @@ func TestLeader_ReapMember(t *testing.T) { // Should be deregistered testutil.WaitForResult(func() (bool, error) { - _, found, _ = state.GetNode(c1.config.NodeName) - return found == false, nil + node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return node == nil, nil }, func(err error) { t.Fatalf("client should not be registered") }) @@ -236,9 +267,12 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) { } // Node should be gone - state := s1.fsm.State() - _, found, _ := state.GetNode("no-longer-around") - if found { + state := s1.fsm.StateNew() + node, err := state.GetNode("no-longer-around") + if err != nil { + t.Fatalf("err: %v", err) + } + if node != nil { t.Fatalf("client registered") } } @@ -260,16 +294,22 @@ func TestLeader_Reconcile(t *testing.T) { } // Should not be registered - state := s1.fsm.State() - _, found, _ := state.GetNode(c1.config.NodeName) - if found { + state := s1.fsm.StateNew() + node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + if node != nil { t.Fatalf("client registered") } // Should be registered testutil.WaitForResult(func() (bool, error) { - _, found, _ = state.GetNode(c1.config.NodeName) - return found == true, nil + node, err = state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return node != nil, nil }, func(err error) { t.Fatalf("client should be registered") }) @@ -391,10 +431,13 @@ func TestLeader_LeftLeader(t *testing.T) { } // Verify the old leader is deregistered - state := remain.fsm.State() + state := remain.fsm.StateNew() testutil.WaitForResult(func() (bool, error) { - _, found, _ := state.GetNode(leader.config.NodeName) - return !found, nil + node, err := state.GetNode(leader.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return node == nil, nil }, func(err error) { t.Fatalf("leader should be deregistered") }) @@ -536,25 +579,39 @@ func TestLeader_ReapTombstones(t *testing.T) { t.Fatalf("err: %v", err) } - // Delete the KV entry (tombstoned) + // Snag the pre-delete index that the tombstone should + // preserve. + state := s1.fsm.StateNew() + keyIdx, _, err := state.KVSList("test") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Delete the KV entry (tombstoned). arg.Op = structs.KVSDelete if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } - // Ensure we have a tombstone - _, res, err := s1.fsm.State().tombstoneTable.Get("id") + // Make sure the index advances to reflect the delete, instead of sliding + // backwards. + idx, _, err := state.KVSList("test") if err != nil { t.Fatalf("err: %v", err) } - if len(res) == 0 { - t.Fatalf("missing tombstones") + if idx <= keyIdx { + t.Fatalf("tombstone not working: %d <= %d", idx, keyIdx) } - // Check that the new leader has a pending GC expiration + // Check that the new leader has a pending GC expiration by + // watching for the index to slide back. testutil.WaitForResult(func() (bool, error) { - _, res, err := s1.fsm.State().tombstoneTable.Get("id") - return len(res) == 0, err + idx, _, err := state.KVSList("test") + if err != nil { + t.Fatalf("err: %v", err) + } + fmt.Printf("%d %d\n", idx, keyIdx) + return idx < keyIdx, err }, func(err error) { t.Fatalf("err: %v", err) }) diff --git a/consul/server.go b/consul/server.go index 34ec75224b..3ff10e31a7 100644 --- a/consul/server.go +++ b/consul/server.go @@ -15,6 +15,7 @@ import ( "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/raft" "github.com/hashicorp/raft-boltdb" @@ -135,7 +136,7 @@ type Server struct { // tombstoneGC is used to track the pending GC invocations // for the KV tombstones - tombstoneGC *TombstoneGC + tombstoneGC *state.TombstoneGC shutdown bool shutdownCh chan struct{} @@ -193,7 +194,7 @@ func NewServer(config *Config) (*Server, error) { logger := log.New(config.LogOutput, "", log.LstdFlags) // Create the tombstone GC - gc, err := NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) + gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) if err != nil { return nil, err } diff --git a/consul/state/graveyard.go b/consul/state/graveyard.go index f5d12ba5e6..88181e83c7 100644 --- a/consul/state/graveyard.go +++ b/consul/state/graveyard.go @@ -14,15 +14,19 @@ type Tombstone struct { // Graveyard manages a set of tombstones. type Graveyard struct { + // GC is when we create tombstones to track their time-to-live. + // The GC is consumed upstream to manage clearing of tombstones. + gc *TombstoneGC } // NewGraveyard returns a new graveyard. -func NewGraveyard() *Graveyard { - return &Graveyard{} +func NewGraveyard(gc *TombstoneGC) *Graveyard { + return &Graveyard{gc: gc} } // InsertTxn adds a new tombstone. func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64) error { + // Insert the tombstone. stone := &Tombstone{Key: key, Index: idx} if err := tx.Insert("tombstones", stone); err != nil { return fmt.Errorf("failed inserting tombstone: %s", err) @@ -31,6 +35,11 @@ func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64) error { if err := tx.Insert("index", &IndexEntry{"tombstones", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } + + // If GC is configured, then we hint that this index requires reaping. + if g.gc != nil { + tx.Defer(func() { g.gc.Hint(idx) }) + } return nil } diff --git a/consul/state/state_store.go b/consul/state/state_store.go index e4ead273f6..5f82173034 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -74,7 +74,7 @@ type sessionCheck struct { } // NewStateStore creates a new in-memory state storage layer. -func NewStateStore() (*StateStore, error) { +func NewStateStore(gc *TombstoneGC) (*StateStore, error) { // Create the in-memory DB. schema := stateStoreSchema() db, err := memdb.NewMemDB(schema) @@ -98,7 +98,7 @@ func NewStateStore() (*StateStore, error) { db: db, tableWatches: tableWatches, kvsWatch: NewPrefixWatch(), - kvsGraveyard: NewGraveyard(), + kvsGraveyard: NewGraveyard(gc), lockDelay: NewDelay(), } return s, nil @@ -561,6 +561,133 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv return nil } +// Services returns all services along with a list of associated tags. +func (s *StateStore) Services() (uint64, structs.Services, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // List all the services. + services, err := tx.Get("services", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed querying services: %s", err) + } + + // Rip through the services and enumerate them and their unique set of + // tags. + var lindex uint64 + unique := make(map[string]map[string]struct{}) + for service := services.Next(); service != nil; service = services.Next() { + sn := service.(*structs.ServiceNode) + + // Track the highest index + if sn.ModifyIndex > lindex { + lindex = sn.ModifyIndex + } + + // Capture the unique set of tags. + tags, ok := unique[sn.ServiceName] + if !ok { + unique[sn.ServiceName] = make(map[string]struct{}) + tags = unique[sn.ServiceName] + } + for _, tag := range sn.ServiceTags { + tags[tag] = struct{}{} + } + } + + // Generate the output structure. + var results = make(structs.Services) + for service, tags := range unique { + results[service] = make([]string, 0) + for tag, _ := range tags { + results[service] = append(results[service], tag) + } + } + return lindex, results, nil +} + +// ServiceNodes returns the nodes associated with a given service. +func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + services, err := tx.Get("services", "service", service) + if err != nil { + return 0, nil, fmt.Errorf("failed service lookup: %s", err) + } + + var results structs.ServiceNodes + for s := services.Next(); s != nil; s = services.Next() { + sn := s.(*structs.ServiceNode) + results = append(results, sn) + } + return s.parseServiceNodes(tx, results) +} + +// ServiceTagNodes returns the nodes associated with a given service, filtering +// out services that don't contain the given tag. +func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + services, err := tx.Get("services", "service", service) + if err != nil { + return 0, nil, fmt.Errorf("failed service lookup: %s", err) + } + + var results structs.ServiceNodes + for s := services.Next(); s != nil; s = services.Next() { + sn := s.(*structs.ServiceNode) + if !serviceTagFilter(sn, tag) { + results = append(results, sn) + } + } + return s.parseServiceNodes(tx, results) +} + +// serviceTagFilter returns true (should filter) if the given service node +// doesn't contain the given tag. +func serviceTagFilter(sn *structs.ServiceNode, tag string) bool { + tag = strings.ToLower(tag) + + // Look for the lower cased version of the tag. + for _, t := range sn.ServiceTags { + if strings.ToLower(t) == tag { + return false + } + } + + // If we didn't hit the tag above then we should filter. + return true +} + +// parseServiceNodes iterates over a services query and fills in the node details, +// returning a ServiceNodes slice. +func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNodes) (uint64, structs.ServiceNodes, error) { + var results structs.ServiceNodes + var lindex uint64 + for _, sn := range services { + // Track the highest index. + if sn.ModifyIndex > lindex { + lindex = sn.ModifyIndex + } + + // TODO (slackpad) - This is sketchy because we are altering the + // structure from the database, but we are hitting a non-indexed + // field. Think about this a little and make sure it's really + // safe. + + // Fill in the address of the node. + n, err := tx.First("nodes", "id", sn.Node) + if err != nil { + return 0, nil, fmt.Errorf("failed node lookup: %s", err) + } + sn.Address = n.(*structs.Node).Address + results = append(results, sn) + } + return lindex, results, nil +} + // NodeServices is used to query service registrations by node ID. func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices, error) { tx := s.db.Txn(false) @@ -869,26 +996,56 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc } // CheckServiceNodes is used to query all nodes and checks for a given service -// ID. The results are compounded into a CheckServiceNodes, and the index -// returned is the maximum index observed over any node, check, or service -// in the result set. -func (s *StateStore) CheckServiceNodes(serviceID string) (uint64, structs.CheckServiceNodes, error) { +// The results are compounded into a CheckServiceNodes, and the index returned +// is the maximum index observed over any node, check, or service in the result +// set. +func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Query the state store for the service. - services, err := tx.Get("services", "service", serviceID) + services, err := tx.Get("services", "service", service) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } - return s.parseCheckServiceNodes(tx, services, err) + + var results structs.ServiceNodes + for s := services.Next(); s != nil; s = services.Next() { + sn := s.(*structs.ServiceNode) + results = append(results, sn) + } + return s.parseCheckServiceNodes(tx, results, err) +} + +// CheckServiceTagNodes is used to query all nodes and checks for a given +// service, filtering out services that don't contain the given tag. The results +// are compounded into a CheckServiceNodes, and the index returned is the maximum +// index observed over any node, check, or service in the result set. +func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.CheckServiceNodes, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Query the state store for the service. + services, err := tx.Get("services", "service", service) + if err != nil { + return 0, nil, fmt.Errorf("failed service lookup: %s", err) + } + + var results structs.ServiceNodes + for s := services.Next(); s != nil; s = services.Next() { + sn := s.(*structs.ServiceNode) + if !serviceTagFilter(sn, tag) { + results = append(results, sn) + } + } + return s.parseCheckServiceNodes(tx, results, err) } // parseCheckServiceNodes is used to parse through a given set of services, // and query for an associated node and a set of checks. This is the inner // method used to return a rich set of results from a more simple query. func (s *StateStore) parseCheckServiceNodes( - tx *memdb.Txn, iter memdb.ResultIterator, + tx *memdb.Txn, services structs.ServiceNodes, err error) (uint64, structs.CheckServiceNodes, error) { if err != nil { return 0, nil, err @@ -896,15 +1053,14 @@ func (s *StateStore) parseCheckServiceNodes( var results structs.CheckServiceNodes var lindex uint64 - for service := iter.Next(); service != nil; service = iter.Next() { - // Compute the index - svc := service.(*structs.ServiceNode) - if svc.ModifyIndex > lindex { - lindex = svc.ModifyIndex + for _, sn := range services { + // Compute the index. + if sn.ModifyIndex > lindex { + lindex = sn.ModifyIndex } - // Retrieve the node - n, err := tx.First("nodes", "id", svc.Node) + // Retrieve the node. + n, err := tx.First("nodes", "id", sn.Node) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } @@ -916,24 +1072,36 @@ func (s *StateStore) parseCheckServiceNodes( lindex = node.ModifyIndex } - // Get the checks - idx, checks, err := s.parseChecks(tx.Get("checks", "node_service", svc.Node, svc.ServiceID)) + // TODO (slackpad) Make this work as an better indexed operation. + + // We need to return the checks specific to the given service + // as well as the node itself. Unfortunately, memdb won't let + // us use the index to do the latter query so we have to pull + // them all and filter. + var checks structs.HealthChecks + iter, err := tx.Get("checks", "node", sn.Node) if err != nil { return 0, nil, err } - if idx > lindex { - lindex = idx + for check := iter.Next(); check != nil; check = iter.Next() { + hc := check.(*structs.HealthCheck) + if hc.ServiceID == "" || hc.ServiceID == sn.ServiceID { + if hc.ModifyIndex > lindex { + lindex = hc.ModifyIndex + } + checks = append(checks, hc) + } } - // Append to the results + // Append to the results. results = append(results, structs.CheckServiceNode{ Node: node, Service: &structs.NodeService{ - ID: svc.ServiceID, - Service: svc.ServiceName, - Address: svc.ServiceAddress, - Port: svc.ServicePort, - Tags: svc.ServiceTags, + ID: sn.ServiceID, + Service: sn.ServiceName, + Address: sn.ServiceAddress, + Port: sn.ServicePort, + Tags: sn.ServiceTags, }, Checks: checks, }) @@ -944,12 +1112,12 @@ func (s *StateStore) parseCheckServiceNodes( // NodeInfo is used to generate a dump of a single node. The dump includes // all services and checks which are registered against the node. -func (s *StateStore) NodeInfo(nodeID string) (uint64, structs.NodeDump, error) { +func (s *StateStore) NodeInfo(node string) (uint64, structs.NodeDump, error) { tx := s.db.Txn(false) defer tx.Abort() - // Query the node by the passed node ID - nodes, err := tx.Get("nodes", "id", nodeID) + // Query the node by the passed node + nodes, err := tx.Get("nodes", "id", node) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } @@ -1315,6 +1483,9 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error return true, nil } +// TODO (slackpad) Double check the old KV triggering behavior and make +// sure we are covered here with tests. + // KVSDeleteTree is used to do a recursive delete on a key prefix // in the state store. If any keys are modified, the last index is // set, otherwise this is a no-op. diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index ed10a40d5e..367d98dbd2 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -3,6 +3,7 @@ package state import ( "fmt" "reflect" + "sort" "strings" "testing" "time" @@ -10,8 +11,10 @@ import ( "github.com/hashicorp/consul/consul/structs" ) +// TODO (slackpad) Make sure the GC tests are complete. + func testStateStore(t *testing.T) *StateStore { - s, err := NewStateStore() + s, err := NewStateStore(nil) if err != nil { t.Fatalf("err: %s", err) } @@ -800,6 +803,286 @@ func TestStateStore_EnsureService(t *testing.T) { } } +func TestStateStore_Services(t *testing.T) { + s := testStateStore(t) + + // Register several nodes and services. + testRegisterNode(t, s, 1, "node1") + ns1 := &structs.NodeService{ + ID: "service1", + Service: "redis", + Tags: []string{"prod", "master"}, + Address: "1.1.1.1", + Port: 1111, + } + if err := s.EnsureService(2, "node1", ns1); err != nil { + t.Fatalf("err: %s", err) + } + testRegisterService(t, s, 3, "node1", "dogs") + testRegisterNode(t, s, 4, "node2") + ns2 := &structs.NodeService{ + ID: "service3", + Service: "redis", + Tags: []string{"prod", "slave"}, + Address: "1.1.1.1", + Port: 1111, + } + if err := s.EnsureService(5, "node2", ns2); err != nil { + t.Fatalf("err: %s", err) + } + + // Pull all the services. + idx, services, err := s.Services() + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } + + // Verify the result. We sort the lists since the order is + // non-deterministic (it's built using a map internally). + expected := structs.Services{ + "redis": []string{"prod", "master", "slave"}, + "dogs": []string{}, + } + sort.Strings(expected["redis"]) + for _, tags := range services { + sort.Strings(tags) + } + if !reflect.DeepEqual(expected, services) { + t.Fatalf("bad: %#v", services) + } +} + +// strContains checks if a list contains a string +func strContains(l []string, s string) bool { + for _, v := range l { + if v == s { + return true + } + } + return false +} + +func TestStateStore_ServiceNodes(t *testing.T) { + s := testStateStore(t) + + if err := s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(12, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(14, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(15, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}); err != nil { + t.Fatalf("err: %v", err) + } + + idx, nodes, err := s.ServiceNodes("db") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 16 { + t.Fatalf("bad: %v", 16) + } + if len(nodes) != 3 { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Node != "bar" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Address != "127.0.0.2" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServiceID != "db" { + t.Fatalf("bad: %v", nodes) + } + if !strContains(nodes[0].ServiceTags, "slave") { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServicePort != 8000 { + t.Fatalf("bad: %v", nodes) + } + + if nodes[1].Node != "bar" { + t.Fatalf("bad: %v", nodes) + } + if nodes[1].Address != "127.0.0.2" { + t.Fatalf("bad: %v", nodes) + } + if nodes[1].ServiceID != "db2" { + t.Fatalf("bad: %v", nodes) + } + if !strContains(nodes[1].ServiceTags, "slave") { + t.Fatalf("bad: %v", nodes) + } + if nodes[1].ServicePort != 8001 { + t.Fatalf("bad: %v", nodes) + } + + if nodes[2].Node != "foo" { + t.Fatalf("bad: %v", nodes) + } + if nodes[2].Address != "127.0.0.1" { + t.Fatalf("bad: %v", nodes) + } + if nodes[2].ServiceID != "db" { + t.Fatalf("bad: %v", nodes) + } + if !strContains(nodes[2].ServiceTags, "master") { + t.Fatalf("bad: %v", nodes) + } + if nodes[2].ServicePort != 8000 { + t.Fatalf("bad: %v", nodes) + } +} + +func TestStateStore_ServiceTagNodes(t *testing.T) { + s := testStateStore(t) + + if err := s.EnsureNode(15, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureNode(16, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(17, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(18, "foo", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(19, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + + idx, nodes, err := s.ServiceTagNodes("db", "master") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 17 { + t.Fatalf("bad: %v", idx) + } + if len(nodes) != 1 { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Node != "foo" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Address != "127.0.0.1" { + t.Fatalf("bad: %v", nodes) + } + if !strContains(nodes[0].ServiceTags, "master") { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServicePort != 8000 { + t.Fatalf("bad: %v", nodes) + } +} + +func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) { + s := testStateStore(t) + + if err := s.EnsureNode(15, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureNode(16, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(17, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master", "v2"}, Address: "", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(18, "foo", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave", "v2", "dev"}, Address: "", Port: 8001}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.EnsureService(19, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave", "v2"}, Address: "", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + + idx, nodes, err := s.ServiceTagNodes("db", "master") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 17 { + t.Fatalf("bad: %v", idx) + } + if len(nodes) != 1 { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Node != "foo" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Address != "127.0.0.1" { + t.Fatalf("bad: %v", nodes) + } + if !strContains(nodes[0].ServiceTags, "master") { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServicePort != 8000 { + t.Fatalf("bad: %v", nodes) + } + + idx, nodes, err = s.ServiceTagNodes("db", "v2") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 19 { + t.Fatalf("bad: %v", idx) + } + if len(nodes) != 3 { + t.Fatalf("bad: %v", nodes) + } + + idx, nodes, err = s.ServiceTagNodes("db", "dev") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 18 { + t.Fatalf("bad: %v", idx) + } + if len(nodes) != 1 { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Node != "foo" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Address != "127.0.0.1" { + t.Fatalf("bad: %v", nodes) + } + if !strContains(nodes[0].ServiceTags, "dev") { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServicePort != 8001 { + t.Fatalf("bad: %v", nodes) + } +} + func TestStateStore_DeleteService(t *testing.T) { s := testStateStore(t) @@ -1231,12 +1514,12 @@ func TestStateStore_CheckServiceNodes(t *testing.T) { t.Fatalf("bad index: %d", idx) } - // Make sure we get the expected result + // Make sure we get the expected result (service check + node check) if n := len(results); n != 1 { t.Fatalf("expected 1 result, got: %d", n) } csn := results[0] - if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 1 { + if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 2 { t.Fatalf("bad output: %#v", csn) } @@ -1271,6 +1554,62 @@ func TestStateStore_CheckServiceNodes(t *testing.T) { } } +func TestStateStore_CheckServiceTagNodes(t *testing.T) { + s := testStateStore(t) + + if err := s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s.EnsureService(2, "foo", &structs.NodeService{ID: "db1", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + check := &structs.HealthCheck{ + Node: "foo", + CheckID: "db", + Name: "can connect", + Status: structs.HealthPassing, + ServiceID: "db1", + } + if err := s.EnsureCheck(3, check); err != nil { + t.Fatalf("err: %v", err) + } + check = &structs.HealthCheck{ + Node: "foo", + CheckID: "check1", + Name: "another check", + Status: structs.HealthPassing, + } + if err := s.EnsureCheck(4, check); err != nil { + t.Fatalf("err: %v", err) + } + + idx, nodes, err := s.CheckServiceTagNodes("db", "master") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 4 { + t.Fatalf("bad: %v", idx) + } + if len(nodes) != 1 { + t.Fatalf("Bad: %v", nodes) + } + if nodes[0].Node.Node != "foo" { + t.Fatalf("Bad: %v", nodes[0]) + } + if nodes[0].Service.ID != "db1" { + t.Fatalf("Bad: %v", nodes[0]) + } + if len(nodes[0].Checks) != 2 { + t.Fatalf("Bad: %v", nodes[0]) + } + if nodes[0].Checks[0].CheckID != "check1" { + t.Fatalf("Bad: %v", nodes[0]) + } + if nodes[0].Checks[1].CheckID != "db" { + t.Fatalf("Bad: %v", nodes[0]) + } +} + func TestStateStore_Check_Snapshot(t *testing.T) { s := testStateStore(t) diff --git a/consul/tombstone_gc.go b/consul/state/tombstone_gc.go similarity index 99% rename from consul/tombstone_gc.go rename to consul/state/tombstone_gc.go index 8dd2e1a5aa..0d530eb696 100644 --- a/consul/tombstone_gc.go +++ b/consul/state/tombstone_gc.go @@ -1,4 +1,4 @@ -package consul +package state import ( "fmt" diff --git a/consul/tombstone_gc_test.go b/consul/state/tombstone_gc_test.go similarity index 99% rename from consul/tombstone_gc_test.go rename to consul/state/tombstone_gc_test.go index ac51e6418d..44ca19874a 100644 --- a/consul/tombstone_gc_test.go +++ b/consul/state/tombstone_gc_test.go @@ -1,4 +1,4 @@ -package consul +package state import ( "testing" diff --git a/consul/state/watch.go b/consul/state/watch.go index 04b071f266..cb45643564 100644 --- a/consul/state/watch.go +++ b/consul/state/watch.go @@ -130,3 +130,27 @@ func (w *PrefixWatch) Notify(prefix string, subtree bool) { w.watches.Delete(cleanup[i]) } } + +// MultiWatch wraps several watches and allows any of them to trigger the +// caller. +type MultiWatch struct { + watches []Watch +} + +func NewMultiWatch(watches ...Watch) *MultiWatch { + return &MultiWatch{watches: watches} +} + +// See Watch. +func (w *MultiWatch) Wait(notifyCh chan struct{}) { + for _, watch := range w.watches { + watch.Wait(notifyCh) + } +} + +// See Watch. +func (w *MultiWatch) Clear(notifyCh chan struct{}) { + for _, watch := range w.watches { + watch.Clear(notifyCh) + } +} diff --git a/consul/state_store.go b/consul/state_store.go index 8284885c5e..6073ffe18f 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -13,6 +13,7 @@ import ( "github.com/armon/go-radix" "github.com/armon/gomdb" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" ) @@ -88,7 +89,7 @@ type StateStore struct { // GC is when we create tombstones to track their time-to-live. // The GC is consumed upstream to manage clearing of tombstones. - gc *TombstoneGC + gc *state.TombstoneGC } // StateSnapshot is used to provide a point-in-time snapshot @@ -115,7 +116,7 @@ func (s *StateSnapshot) Close() error { } // NewStateStore is used to create a new state store -func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) { +func NewStateStore(gc *state.TombstoneGC, logOutput io.Writer) (*StateStore, error) { // Create a new temp dir path, err := ioutil.TempDir("", "consul") if err != nil { @@ -126,7 +127,7 @@ func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) { // NewStateStorePath is used to create a new state store at a given path // The path is cleared on closing. -func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) { +func NewStateStorePath(gc *state.TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) { // Open the env env, err := mdb.NewEnv() if err != nil { diff --git a/consul/state_store_test.go b/consul/state_store_test.go index dcd2f3d19c..296779c985 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + state_store "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" ) @@ -1539,7 +1540,7 @@ func TestKVSDelete(t *testing.T) { ttl := 10 * time.Millisecond gran := 5 * time.Millisecond - gc, err := NewTombstoneGC(ttl, gran) + gc, err := state_store.NewTombstoneGC(ttl, gran) if err != nil { t.Fatalf("err: %v", err) } @@ -2062,7 +2063,7 @@ func TestKVSDeleteTree(t *testing.T) { ttl := 10 * time.Millisecond gran := 5 * time.Millisecond - gc, err := NewTombstoneGC(ttl, gran) + gc, err := state_store.NewTombstoneGC(ttl, gran) if err != nil { t.Fatalf("err: %v", err) } @@ -2169,7 +2170,7 @@ func TestReapTombstones(t *testing.T) { ttl := 10 * time.Millisecond gran := 5 * time.Millisecond - gc, err := NewTombstoneGC(ttl, gran) + gc, err := state_store.NewTombstoneGC(ttl, gran) if err != nil { t.Fatalf("err: %v", err) }