Integrates new state store into leader and catalog/health endpoints.

This commit is contained in:
James Phillips 2015-10-12 00:42:09 -07:00
parent 41338c9018
commit 5b6502c1ea
16 changed files with 882 additions and 169 deletions

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
state_store "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
@ -119,13 +120,19 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return err return err
} }
// Get the local state // Get the list of nodes.
state := c.srv.fsm.State() state := c.srv.fsm.StateNew()
return c.srv.blockingRPC(&args.QueryOptions, return c.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("Nodes"), state.GetTableWatch("nodes"),
func() error { func() error {
reply.Index, reply.Nodes = state.Nodes() index, nodes, err := state.Nodes()
if err != nil {
return err
}
reply.Index, reply.Nodes = index, nodes
return nil return nil
}) })
} }
@ -136,13 +143,19 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err return err
} }
// Get the current nodes // Get the list of services and their tags.
state := c.srv.fsm.State() state := c.srv.fsm.StateNew()
return c.srv.blockingRPC(&args.QueryOptions, return c.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("Services"), state.GetTableWatch("services"),
func() error { func() error {
reply.Index, reply.Services = state.Services() index, services, err := state.Services()
if err != nil {
return err
}
reply.Index, reply.Services = index, services
return c.srv.filterACL(args.Token, reply) return c.srv.filterACL(args.Token, reply)
}) })
} }
@ -159,16 +172,26 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
} }
// Get the nodes // Get the nodes
state := c.srv.fsm.State() state := c.srv.fsm.StateNew()
err := c.srv.blockingRPC(&args.QueryOptions, err := c.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("ServiceNodes"), state_store.NewMultiWatch(
state.GetTableWatch("nodes"),
state.GetTableWatch("services")),
func() error { func() error {
var index uint64
var services structs.ServiceNodes
var err error
if args.TagFilter { if args.TagFilter {
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else { } else {
reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName) index, services, err = state.ServiceNodes(args.ServiceName)
} }
if err != nil {
return err
}
reply.Index, reply.ServiceNodes = index, services
return c.srv.filterACL(args.Token, reply) return c.srv.filterACL(args.Token, reply)
}) })
@ -197,12 +220,19 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
} }
// Get the node services // Get the node services
state := c.srv.fsm.State() state := c.srv.fsm.StateNew()
return c.srv.blockingRPC(&args.QueryOptions, return c.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("NodeServices"), state_store.NewMultiWatch(
state.GetTableWatch("nodes"),
state.GetTableWatch("services")),
func() error { func() error {
reply.Index, reply.NodeServices = state.NodeServices(args.Node) index, services, err := state.NodeServices(args.Node)
if err != nil {
return err
}
reply.Index, reply.NodeServices = index, services
return c.srv.filterACL(args.Token, reply) return c.srv.filterACL(args.Token, reply)
}) })
} }

View File

@ -267,7 +267,9 @@ func TestCatalogListNodes(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node // Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
@ -317,12 +319,16 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) {
codec = codec1 codec = codec1
// Inject fake data on the follower! // Inject fake data on the follower!
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
} else { } else {
codec = codec2 codec = codec2
// Inject fake data on the follower! // Inject fake data on the follower!
s2.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s2.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
} }
args := structs.DCSpecificRequest{ args := structs.DCSpecificRequest{
@ -458,7 +464,9 @@ func BenchmarkCatalogListNodes(t *testing.B) {
defer codec.Close() defer codec.Close()
// Just add a node // Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
args := structs.DCSpecificRequest{ args := structs.DCSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -490,8 +498,12 @@ func TestCatalogListServices(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node // Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) t.Fatalf("err: %v", err)
}
if err := s1.fsm.StateNew().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -541,11 +553,16 @@ func TestCatalogListServices_Blocking(t *testing.T) {
args.MaxQueryTime = time.Second args.MaxQueryTime = time.Second
// Async cause a change // Async cause a change
idx := out.Index
start := time.Now() start := time.Now()
go func() { go func() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s1.fsm.StateNew().EnsureNode(idx+1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) t.Fatalf("err: %v", err)
}
if err := s1.fsm.StateNew().EnsureService(idx+2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
}() }()
// Re-run the query // Re-run the query
@ -560,7 +577,7 @@ func TestCatalogListServices_Blocking(t *testing.T) {
} }
// Check the indexes // Check the indexes
if out.Index != 2 { if out.Index != idx+2 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
@ -625,8 +642,12 @@ func TestCatalogListServices_Stale(t *testing.T) {
var out structs.IndexedServices var out structs.IndexedServices
// Inject a fake service // Inject a fake service
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) t.Fatalf("err: %v", err)
}
if err := s1.fsm.StateNew().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
// Run the query, do not wait for leader! // Run the query, do not wait for leader!
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
@ -666,8 +687,12 @@ func TestCatalogListServiceNodes(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node // Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) t.Fatalf("err: %v", err)
}
if err := s1.fsm.StateNew().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -709,9 +734,15 @@ func TestCatalogNodeServices(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node // Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{Node: "foo", Address: "127.0.0.1"}) if err := s1.fsm.StateNew().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) t.Fatalf("err: %v", err)
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}) }
if err := s1.fsm.StateNew().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.StateNew().EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}); err != nil {
t.Fatalf("err: %v", err)
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)

View File

@ -27,7 +27,7 @@ type consulFSM struct {
path string path string
stateNew *state.StateStore stateNew *state.StateStore
state *StateStore state *StateStore
gc *TombstoneGC gc *state.TombstoneGC
} }
// consulSnapshot is used to provide a snapshot of the current // consulSnapshot is used to provide a snapshot of the current
@ -46,9 +46,9 @@ type snapshotHeader struct {
} }
// NewFSMPath is used to construct a new FSM with a blank state // NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) { func NewFSM(gc *state.TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) {
// Create the state store. // Create the state store.
stateNew, err := state.NewStateStore() stateNew, err := state.NewStateStore(gc)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -3,6 +3,7 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
state_store "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
@ -19,12 +20,17 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
} }
// Get the state specific checks // Get the state specific checks
state := h.srv.fsm.State() state := h.srv.fsm.StateNew()
return h.srv.blockingRPC(&args.QueryOptions, return h.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("ChecksInState"), state.GetTableWatch("checks"),
func() error { func() error {
reply.Index, reply.HealthChecks = state.ChecksInState(args.State) index, checks, err := state.ChecksInState(args.State)
if err != nil {
return err
}
reply.Index, reply.HealthChecks = index, checks
return h.srv.filterACL(args.Token, reply) return h.srv.filterACL(args.Token, reply)
}) })
} }
@ -37,12 +43,17 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
} }
// Get the node checks // Get the node checks
state := h.srv.fsm.State() state := h.srv.fsm.StateNew()
return h.srv.blockingRPC(&args.QueryOptions, return h.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("NodeChecks"), state.GetTableWatch("checks"),
func() error { func() error {
reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) index, checks, err := state.NodeChecks(args.Node)
if err != nil {
return err
}
reply.Index, reply.HealthChecks = index, checks
return h.srv.filterACL(args.Token, reply) return h.srv.filterACL(args.Token, reply)
}) })
} }
@ -61,12 +72,17 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
} }
// Get the service checks // Get the service checks
state := h.srv.fsm.State() state := h.srv.fsm.StateNew()
return h.srv.blockingRPC(&args.QueryOptions, return h.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("ServiceChecks"), state.GetTableWatch("checks"),
func() error { func() error {
reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) index, checks, err := state.ServiceChecks(args.ServiceName)
if err != nil {
return err
}
reply.Index, reply.HealthChecks = index, checks
return h.srv.filterACL(args.Token, reply) return h.srv.filterACL(args.Token, reply)
}) })
} }
@ -83,16 +99,27 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
} }
// Get the nodes // Get the nodes
state := h.srv.fsm.State() state := h.srv.fsm.StateNew()
err := h.srv.blockingRPC(&args.QueryOptions, err := h.srv.blockingRPCNew(
&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("CheckServiceNodes"), state_store.NewMultiWatch(
state.GetTableWatch("nodes"),
state.GetTableWatch("services"),
state.GetTableWatch("checks")),
func() error { func() error {
var index uint64
var nodes structs.CheckServiceNodes
var err error
if args.TagFilter { if args.TagFilter {
reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) index, nodes, err = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
} else { } else {
reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName) index, nodes, err = state.CheckServiceNodes(args.ServiceName)
} }
if err != nil {
return err
}
reply.Index, reply.Nodes = index, nodes
return h.srv.filterACL(args.Token, reply) return h.srv.filterACL(args.Token, reply)
}) })

View File

@ -46,11 +46,11 @@ func TestHealth_ChecksInState(t *testing.T) {
t.Fatalf("Bad: %v", checks) t.Fatalf("Bad: %v", checks)
} }
// First check is automatically added for the server node // Serf check is automatically added
if checks[0].CheckID != SerfCheckID { if checks[0].Name != "memory utilization" {
t.Fatalf("Bad: %v", checks[0]) t.Fatalf("Bad: %v", checks[0])
} }
if checks[1].Name != "memory utilization" { if checks[1].CheckID != SerfCheckID {
t.Fatalf("Bad: %v", checks[1]) t.Fatalf("Bad: %v", checks[1])
} }
} }
@ -205,22 +205,22 @@ func TestHealth_ServiceNodes(t *testing.T) {
if len(nodes) != 2 { if len(nodes) != 2 {
t.Fatalf("Bad: %v", nodes) t.Fatalf("Bad: %v", nodes)
} }
if nodes[0].Node.Node != "foo" { if nodes[0].Node.Node != "bar" {
t.Fatalf("Bad: %v", nodes[0]) t.Fatalf("Bad: %v", nodes[0])
} }
if nodes[1].Node.Node != "bar" { if nodes[1].Node.Node != "foo" {
t.Fatalf("Bad: %v", nodes[1]) t.Fatalf("Bad: %v", nodes[1])
} }
if !strContains(nodes[0].Service.Tags, "master") { if !strContains(nodes[0].Service.Tags, "slave") {
t.Fatalf("Bad: %v", nodes[0]) t.Fatalf("Bad: %v", nodes[0])
} }
if !strContains(nodes[1].Service.Tags, "slave") { if !strContains(nodes[1].Service.Tags, "master") {
t.Fatalf("Bad: %v", nodes[1]) t.Fatalf("Bad: %v", nodes[1])
} }
if nodes[0].Checks[0].Status != structs.HealthPassing { if nodes[0].Checks[0].Status != structs.HealthWarning {
t.Fatalf("Bad: %v", nodes[0]) t.Fatalf("Bad: %v", nodes[0])
} }
if nodes[1].Checks[0].Status != structs.HealthWarning { if nodes[1].Checks[0].Status != structs.HealthPassing {
t.Fatalf("Bad: %v", nodes[1]) t.Fatalf("Bad: %v", nodes[1])
} }
} }

View File

@ -259,8 +259,11 @@ func (s *Server) reconcile() (err error) {
// in a critical state that does not correspond to a known Serf member. We generate // in a critical state that does not correspond to a known Serf member. We generate
// a "reap" event to cause the node to be cleaned up. // a "reap" event to cause the node to be cleaned up.
func (s *Server) reconcileReaped(known map[string]struct{}) error { func (s *Server) reconcileReaped(known map[string]struct{}) error {
state := s.fsm.State() state := s.fsm.StateNew()
_, checks := state.ChecksInState(structs.HealthAny) _, checks, err := state.ChecksInState(structs.HealthAny)
if err != nil {
return err
}
for _, check := range checks { for _, check := range checks {
// Ignore any non serf checks // Ignore any non serf checks
if check.CheckID != SerfCheckID { if check.CheckID != SerfCheckID {
@ -282,7 +285,10 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
} }
// Get the node services, look for ConsulServiceID // Get the node services, look for ConsulServiceID
_, services := state.NodeServices(check.Node) _, services, err := state.NodeServices(check.Node)
if err != nil {
return err
}
serverPort := 0 serverPort := 0
for _, service := range services.Services { for _, service := range services.Services {
if service.ID == ConsulServiceID { if service.ID == ConsulServiceID {
@ -352,8 +358,6 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
// handleAliveMember is used to ensure the node // handleAliveMember is used to ensure the node
// is registered, with a passing health check. // is registered, with a passing health check.
func (s *Server) handleAliveMember(member serf.Member) error { func (s *Server) handleAliveMember(member serf.Member) error {
state := s.fsm.State()
// Register consul service if a server // Register consul service if a server
var service *structs.NodeService var service *structs.NodeService
if valid, parts := isConsulServer(member); valid { if valid, parts := isConsulServer(member); valid {
@ -370,12 +374,19 @@ func (s *Server) handleAliveMember(member serf.Member) error {
} }
// Check if the node exists // Check if the node exists
_, found, addr := state.GetNode(member.Name) state := s.fsm.StateNew()
if found && addr == member.Addr.String() { node, err := state.GetNode(member.Name)
if err != nil {
return fmt.Errorf("failed to lookup node %s: %s", member.Name, err)
}
if node != nil && node.Address == member.Addr.String() {
// Check if the associated service is available // Check if the associated service is available
if service != nil { if service != nil {
match := false match := false
_, services := state.NodeServices(member.Name) _, services, err := state.NodeServices(member.Name)
if err != nil {
return err
}
if services != nil { if services != nil {
for id, _ := range services.Services { for id, _ := range services.Services {
if id == service.ID { if id == service.ID {
@ -389,7 +400,10 @@ func (s *Server) handleAliveMember(member serf.Member) error {
} }
// Check if the serfCheck is in the passing state // Check if the serfCheck is in the passing state
_, checks := state.NodeChecks(member.Name) _, checks, err := state.NodeChecks(member.Name)
if err != nil {
return err
}
for _, check := range checks { for _, check := range checks {
if check.CheckID == SerfCheckID && check.Status == structs.HealthPassing { if check.CheckID == SerfCheckID && check.Status == structs.HealthPassing {
return nil return nil
@ -421,13 +435,18 @@ AFTER_CHECK:
// handleFailedMember is used to mark the node's status // handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown. // as being critical, along with all checks as unknown.
func (s *Server) handleFailedMember(member serf.Member) error { func (s *Server) handleFailedMember(member serf.Member) error {
state := s.fsm.State()
// Check if the node exists // Check if the node exists
_, found, addr := state.GetNode(member.Name) state := s.fsm.StateNew()
if found && addr == member.Addr.String() { node, err := state.GetNode(member.Name)
if err != nil {
return fmt.Errorf("failed to lookup node %s: %s", member.Name, err)
}
if node != nil && node.Address == member.Addr.String() {
// Check if the serfCheck is in the critical state // Check if the serfCheck is in the critical state
_, checks := state.NodeChecks(member.Name) _, checks, err := state.NodeChecks(member.Name)
if err != nil {
return err
}
for _, check := range checks { for _, check := range checks {
if check.CheckID == SerfCheckID && check.Status == structs.HealthCritical { if check.CheckID == SerfCheckID && check.Status == structs.HealthCritical {
return nil return nil
@ -468,7 +487,6 @@ func (s *Server) handleReapMember(member serf.Member) error {
// handleDeregisterMember is used to deregister a member of a given reason // handleDeregisterMember is used to deregister a member of a given reason
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error { func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
state := s.fsm.State()
// Do not deregister ourself. This can only happen if the current leader // Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and // is leaving. Instead, we should allow a follower to take-over and
// deregister us later. // deregister us later.
@ -484,9 +502,13 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
} }
} }
// Check if the node does not exists // Check if the node does not exist
_, found, _ := state.GetNode(member.Name) state := s.fsm.StateNew()
if !found { node, err := state.GetNode(member.Name)
if err != nil {
return fmt.Errorf("failed to lookup node %s: %s", member.Name, err)
}
if node == nil {
return nil return nil
} }

View File

@ -32,16 +32,22 @@ func TestLeader_RegisterMember(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
// Client should be registered // Client should be registered
state := s1.fsm.State() state := s1.fsm.StateNew()
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, found, _ := state.GetNode(c1.config.NodeName) node, err := state.GetNode(c1.config.NodeName)
return found == true, nil if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("client not registered") t.Fatalf("client not registered")
}) })
// Should have a check // Should have a check
_, checks := state.NodeChecks(c1.config.NodeName) _, checks, err := state.NodeChecks(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 { if len(checks) != 1 {
t.Fatalf("client missing check") t.Fatalf("client missing check")
} }
@ -56,13 +62,19 @@ func TestLeader_RegisterMember(t *testing.T) {
} }
// Server should be registered // Server should be registered
_, found, _ := state.GetNode(s1.config.NodeName) node, err := state.GetNode(s1.config.NodeName)
if !found { if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("server not registered") t.Fatalf("server not registered")
} }
// Service should be registered // Service should be registered
_, services := state.NodeServices(s1.config.NodeName) _, services, err := state.NodeServices(s1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services.Services["consul"]; !ok { if _, ok := services.Services["consul"]; !ok {
t.Fatalf("consul service not registered: %v", services) t.Fatalf("consul service not registered: %v", services)
} }
@ -90,16 +102,22 @@ func TestLeader_FailedMember(t *testing.T) {
c1.Shutdown() c1.Shutdown()
// Should be registered // Should be registered
state := s1.fsm.State() state := s1.fsm.StateNew()
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, found, _ := state.GetNode(c1.config.NodeName) node, err := state.GetNode(c1.config.NodeName)
return found == true, nil if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("client not registered") t.Fatalf("client not registered")
}) })
// Should have a check // Should have a check
_, checks := state.NodeChecks(c1.config.NodeName) _, checks, err := state.NodeChecks(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 { if len(checks) != 1 {
t.Fatalf("client missing check") t.Fatalf("client missing check")
} }
@ -111,7 +129,10 @@ func TestLeader_FailedMember(t *testing.T) {
} }
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, checks = state.NodeChecks(c1.config.NodeName) _, checks, err = state.NodeChecks(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
return checks[0].Status == structs.HealthCritical, errors.New(checks[0].Status) return checks[0].Status == structs.HealthCritical, errors.New(checks[0].Status)
}, func(err error) { }, func(err error) {
t.Fatalf("check status is %v, should be critical", err) t.Fatalf("check status is %v, should be critical", err)
@ -134,13 +155,15 @@ func TestLeader_LeftMember(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
var found bool state := s1.fsm.StateNew()
state := s1.fsm.State()
// Should be registered // Should be registered
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName) node, err := state.GetNode(c1.config.NodeName)
return found == true, nil if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("client should be registered") t.Fatalf("client should be registered")
}) })
@ -151,8 +174,11 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be deregistered // Should be deregistered
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName) node, err := state.GetNode(c1.config.NodeName)
return found == false, nil if err != nil {
t.Fatalf("err: %v", err)
}
return node == nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("client should not be registered") t.Fatalf("client should not be registered")
}) })
@ -174,13 +200,15 @@ func TestLeader_ReapMember(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
var found bool state := s1.fsm.StateNew()
state := s1.fsm.State()
// Should be registered // Should be registered
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName) node, err := state.GetNode(c1.config.NodeName)
return found == true, nil if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("client should be registered") t.Fatalf("client should be registered")
}) })
@ -199,8 +227,11 @@ func TestLeader_ReapMember(t *testing.T) {
// Should be deregistered // Should be deregistered
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName) node, err := state.GetNode(c1.config.NodeName)
return found == false, nil if err != nil {
t.Fatalf("err: %v", err)
}
return node == nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("client should not be registered") t.Fatalf("client should not be registered")
}) })
@ -236,9 +267,12 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
} }
// Node should be gone // Node should be gone
state := s1.fsm.State() state := s1.fsm.StateNew()
_, found, _ := state.GetNode("no-longer-around") node, err := state.GetNode("no-longer-around")
if found { if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered") t.Fatalf("client registered")
} }
} }
@ -260,16 +294,22 @@ func TestLeader_Reconcile(t *testing.T) {
} }
// Should not be registered // Should not be registered
state := s1.fsm.State() state := s1.fsm.StateNew()
_, found, _ := state.GetNode(c1.config.NodeName) node, err := state.GetNode(c1.config.NodeName)
if found { if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered") t.Fatalf("client registered")
} }
// Should be registered // Should be registered
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, found, _ = state.GetNode(c1.config.NodeName) node, err = state.GetNode(c1.config.NodeName)
return found == true, nil if err != nil {
t.Fatalf("err: %v", err)
}
return node != nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("client should be registered") t.Fatalf("client should be registered")
}) })
@ -391,10 +431,13 @@ func TestLeader_LeftLeader(t *testing.T) {
} }
// Verify the old leader is deregistered // Verify the old leader is deregistered
state := remain.fsm.State() state := remain.fsm.StateNew()
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, found, _ := state.GetNode(leader.config.NodeName) node, err := state.GetNode(leader.config.NodeName)
return !found, nil if err != nil {
t.Fatalf("err: %v", err)
}
return node == nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("leader should be deregistered") t.Fatalf("leader should be deregistered")
}) })
@ -536,25 +579,39 @@ func TestLeader_ReapTombstones(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Delete the KV entry (tombstoned) // Snag the pre-delete index that the tombstone should
// preserve.
state := s1.fsm.StateNew()
keyIdx, _, err := state.KVSList("test")
if err != nil {
t.Fatalf("err: %v", err)
}
// Delete the KV entry (tombstoned).
arg.Op = structs.KVSDelete arg.Op = structs.KVSDelete
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Ensure we have a tombstone // Make sure the index advances to reflect the delete, instead of sliding
_, res, err := s1.fsm.State().tombstoneTable.Get("id") // backwards.
idx, _, err := state.KVSList("test")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if len(res) == 0 { if idx <= keyIdx {
t.Fatalf("missing tombstones") t.Fatalf("tombstone not working: %d <= %d", idx, keyIdx)
} }
// Check that the new leader has a pending GC expiration // Check that the new leader has a pending GC expiration by
// watching for the index to slide back.
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
_, res, err := s1.fsm.State().tombstoneTable.Get("id") idx, _, err := state.KVSList("test")
return len(res) == 0, err if err != nil {
t.Fatalf("err: %v", err)
}
fmt.Printf("%d %d\n", idx, keyIdx)
return idx < keyIdx, err
}, func(err error) { }, func(err error) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
}) })

View File

@ -15,6 +15,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb" "github.com/hashicorp/raft-boltdb"
@ -135,7 +136,7 @@ type Server struct {
// tombstoneGC is used to track the pending GC invocations // tombstoneGC is used to track the pending GC invocations
// for the KV tombstones // for the KV tombstones
tombstoneGC *TombstoneGC tombstoneGC *state.TombstoneGC
shutdown bool shutdown bool
shutdownCh chan struct{} shutdownCh chan struct{}
@ -193,7 +194,7 @@ func NewServer(config *Config) (*Server, error) {
logger := log.New(config.LogOutput, "", log.LstdFlags) logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create the tombstone GC // Create the tombstone GC
gc, err := NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -14,15 +14,19 @@ type Tombstone struct {
// Graveyard manages a set of tombstones. // Graveyard manages a set of tombstones.
type Graveyard struct { type Graveyard struct {
// GC is when we create tombstones to track their time-to-live.
// The GC is consumed upstream to manage clearing of tombstones.
gc *TombstoneGC
} }
// NewGraveyard returns a new graveyard. // NewGraveyard returns a new graveyard.
func NewGraveyard() *Graveyard { func NewGraveyard(gc *TombstoneGC) *Graveyard {
return &Graveyard{} return &Graveyard{gc: gc}
} }
// InsertTxn adds a new tombstone. // InsertTxn adds a new tombstone.
func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64) error { func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64) error {
// Insert the tombstone.
stone := &Tombstone{Key: key, Index: idx} stone := &Tombstone{Key: key, Index: idx}
if err := tx.Insert("tombstones", stone); err != nil { if err := tx.Insert("tombstones", stone); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err) return fmt.Errorf("failed inserting tombstone: %s", err)
@ -31,6 +35,11 @@ func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64) error {
if err := tx.Insert("index", &IndexEntry{"tombstones", idx}); err != nil { if err := tx.Insert("index", &IndexEntry{"tombstones", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
// If GC is configured, then we hint that this index requires reaping.
if g.gc != nil {
tx.Defer(func() { g.gc.Hint(idx) })
}
return nil return nil
} }

View File

@ -74,7 +74,7 @@ type sessionCheck struct {
} }
// NewStateStore creates a new in-memory state storage layer. // NewStateStore creates a new in-memory state storage layer.
func NewStateStore() (*StateStore, error) { func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
// Create the in-memory DB. // Create the in-memory DB.
schema := stateStoreSchema() schema := stateStoreSchema()
db, err := memdb.NewMemDB(schema) db, err := memdb.NewMemDB(schema)
@ -98,7 +98,7 @@ func NewStateStore() (*StateStore, error) {
db: db, db: db,
tableWatches: tableWatches, tableWatches: tableWatches,
kvsWatch: NewPrefixWatch(), kvsWatch: NewPrefixWatch(),
kvsGraveyard: NewGraveyard(), kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(), lockDelay: NewDelay(),
} }
return s, nil return s, nil
@ -561,6 +561,133 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv
return nil return nil
} }
// Services returns all services along with a list of associated tags.
func (s *StateStore) Services() (uint64, structs.Services, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// List all the services.
services, err := tx.Get("services", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed querying services: %s", err)
}
// Rip through the services and enumerate them and their unique set of
// tags.
var lindex uint64
unique := make(map[string]map[string]struct{})
for service := services.Next(); service != nil; service = services.Next() {
sn := service.(*structs.ServiceNode)
// Track the highest index
if sn.ModifyIndex > lindex {
lindex = sn.ModifyIndex
}
// Capture the unique set of tags.
tags, ok := unique[sn.ServiceName]
if !ok {
unique[sn.ServiceName] = make(map[string]struct{})
tags = unique[sn.ServiceName]
}
for _, tag := range sn.ServiceTags {
tags[tag] = struct{}{}
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0)
for tag, _ := range tags {
results[service] = append(results[service], tag)
}
}
return lindex, results, nil
}
// ServiceNodes returns the nodes associated with a given service.
func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
services, err := tx.Get("services", "service", service)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
var results structs.ServiceNodes
for s := services.Next(); s != nil; s = services.Next() {
sn := s.(*structs.ServiceNode)
results = append(results, sn)
}
return s.parseServiceNodes(tx, results)
}
// ServiceTagNodes returns the nodes associated with a given service, filtering
// out services that don't contain the given tag.
func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
services, err := tx.Get("services", "service", service)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
var results structs.ServiceNodes
for s := services.Next(); s != nil; s = services.Next() {
sn := s.(*structs.ServiceNode)
if !serviceTagFilter(sn, tag) {
results = append(results, sn)
}
}
return s.parseServiceNodes(tx, results)
}
// serviceTagFilter returns true (should filter) if the given service node
// doesn't contain the given tag.
func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
tag = strings.ToLower(tag)
// Look for the lower cased version of the tag.
for _, t := range sn.ServiceTags {
if strings.ToLower(t) == tag {
return false
}
}
// If we didn't hit the tag above then we should filter.
return true
}
// parseServiceNodes iterates over a services query and fills in the node details,
// returning a ServiceNodes slice.
func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNodes) (uint64, structs.ServiceNodes, error) {
var results structs.ServiceNodes
var lindex uint64
for _, sn := range services {
// Track the highest index.
if sn.ModifyIndex > lindex {
lindex = sn.ModifyIndex
}
// TODO (slackpad) - This is sketchy because we are altering the
// structure from the database, but we are hitting a non-indexed
// field. Think about this a little and make sure it's really
// safe.
// Fill in the address of the node.
n, err := tx.First("nodes", "id", sn.Node)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
sn.Address = n.(*structs.Node).Address
results = append(results, sn)
}
return lindex, results, nil
}
// NodeServices is used to query service registrations by node ID. // NodeServices is used to query service registrations by node ID.
func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices, error) { func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
@ -869,26 +996,56 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
} }
// CheckServiceNodes is used to query all nodes and checks for a given service // CheckServiceNodes is used to query all nodes and checks for a given service
// ID. The results are compounded into a CheckServiceNodes, and the index // The results are compounded into a CheckServiceNodes, and the index returned
// returned is the maximum index observed over any node, check, or service // is the maximum index observed over any node, check, or service in the result
// in the result set. // set.
func (s *StateStore) CheckServiceNodes(serviceID string) (uint64, structs.CheckServiceNodes, error) { func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Query the state store for the service. // Query the state store for the service.
services, err := tx.Get("services", "service", serviceID) services, err := tx.Get("services", "service", service)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err) return 0, nil, fmt.Errorf("failed service lookup: %s", err)
} }
return s.parseCheckServiceNodes(tx, services, err)
var results structs.ServiceNodes
for s := services.Next(); s != nil; s = services.Next() {
sn := s.(*structs.ServiceNode)
results = append(results, sn)
}
return s.parseCheckServiceNodes(tx, results, err)
}
// CheckServiceTagNodes is used to query all nodes and checks for a given
// service, filtering out services that don't contain the given tag. The results
// are compounded into a CheckServiceNodes, and the index returned is the maximum
// index observed over any node, check, or service in the result set.
func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Query the state store for the service.
services, err := tx.Get("services", "service", service)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
var results structs.ServiceNodes
for s := services.Next(); s != nil; s = services.Next() {
sn := s.(*structs.ServiceNode)
if !serviceTagFilter(sn, tag) {
results = append(results, sn)
}
}
return s.parseCheckServiceNodes(tx, results, err)
} }
// parseCheckServiceNodes is used to parse through a given set of services, // parseCheckServiceNodes is used to parse through a given set of services,
// and query for an associated node and a set of checks. This is the inner // and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query. // method used to return a rich set of results from a more simple query.
func (s *StateStore) parseCheckServiceNodes( func (s *StateStore) parseCheckServiceNodes(
tx *memdb.Txn, iter memdb.ResultIterator, tx *memdb.Txn, services structs.ServiceNodes,
err error) (uint64, structs.CheckServiceNodes, error) { err error) (uint64, structs.CheckServiceNodes, error) {
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -896,15 +1053,14 @@ func (s *StateStore) parseCheckServiceNodes(
var results structs.CheckServiceNodes var results structs.CheckServiceNodes
var lindex uint64 var lindex uint64
for service := iter.Next(); service != nil; service = iter.Next() { for _, sn := range services {
// Compute the index // Compute the index.
svc := service.(*structs.ServiceNode) if sn.ModifyIndex > lindex {
if svc.ModifyIndex > lindex { lindex = sn.ModifyIndex
lindex = svc.ModifyIndex
} }
// Retrieve the node // Retrieve the node.
n, err := tx.First("nodes", "id", svc.Node) n, err := tx.First("nodes", "id", sn.Node)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err) return 0, nil, fmt.Errorf("failed node lookup: %s", err)
} }
@ -916,24 +1072,36 @@ func (s *StateStore) parseCheckServiceNodes(
lindex = node.ModifyIndex lindex = node.ModifyIndex
} }
// Get the checks // TODO (slackpad) Make this work as an better indexed operation.
idx, checks, err := s.parseChecks(tx.Get("checks", "node_service", svc.Node, svc.ServiceID))
// We need to return the checks specific to the given service
// as well as the node itself. Unfortunately, memdb won't let
// us use the index to do the latter query so we have to pull
// them all and filter.
var checks structs.HealthChecks
iter, err := tx.Get("checks", "node", sn.Node)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
if idx > lindex { for check := iter.Next(); check != nil; check = iter.Next() {
lindex = idx hc := check.(*structs.HealthCheck)
if hc.ServiceID == "" || hc.ServiceID == sn.ServiceID {
if hc.ModifyIndex > lindex {
lindex = hc.ModifyIndex
}
checks = append(checks, hc)
}
} }
// Append to the results // Append to the results.
results = append(results, structs.CheckServiceNode{ results = append(results, structs.CheckServiceNode{
Node: node, Node: node,
Service: &structs.NodeService{ Service: &structs.NodeService{
ID: svc.ServiceID, ID: sn.ServiceID,
Service: svc.ServiceName, Service: sn.ServiceName,
Address: svc.ServiceAddress, Address: sn.ServiceAddress,
Port: svc.ServicePort, Port: sn.ServicePort,
Tags: svc.ServiceTags, Tags: sn.ServiceTags,
}, },
Checks: checks, Checks: checks,
}) })
@ -944,12 +1112,12 @@ func (s *StateStore) parseCheckServiceNodes(
// NodeInfo is used to generate a dump of a single node. The dump includes // NodeInfo is used to generate a dump of a single node. The dump includes
// all services and checks which are registered against the node. // all services and checks which are registered against the node.
func (s *StateStore) NodeInfo(nodeID string) (uint64, structs.NodeDump, error) { func (s *StateStore) NodeInfo(node string) (uint64, structs.NodeDump, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Query the node by the passed node ID // Query the node by the passed node
nodes, err := tx.Get("nodes", "id", nodeID) nodes, err := tx.Get("nodes", "id", node)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err) return 0, nil, fmt.Errorf("failed node lookup: %s", err)
} }
@ -1315,6 +1483,9 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error
return true, nil return true, nil
} }
// TODO (slackpad) Double check the old KV triggering behavior and make
// sure we are covered here with tests.
// KVSDeleteTree is used to do a recursive delete on a key prefix // KVSDeleteTree is used to do a recursive delete on a key prefix
// in the state store. If any keys are modified, the last index is // in the state store. If any keys are modified, the last index is
// set, otherwise this is a no-op. // set, otherwise this is a no-op.

View File

@ -3,6 +3,7 @@ package state
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sort"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -10,8 +11,10 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
// TODO (slackpad) Make sure the GC tests are complete.
func testStateStore(t *testing.T) *StateStore { func testStateStore(t *testing.T) *StateStore {
s, err := NewStateStore() s, err := NewStateStore(nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -800,6 +803,286 @@ func TestStateStore_EnsureService(t *testing.T) {
} }
} }
func TestStateStore_Services(t *testing.T) {
s := testStateStore(t)
// Register several nodes and services.
testRegisterNode(t, s, 1, "node1")
ns1 := &structs.NodeService{
ID: "service1",
Service: "redis",
Tags: []string{"prod", "master"},
Address: "1.1.1.1",
Port: 1111,
}
if err := s.EnsureService(2, "node1", ns1); err != nil {
t.Fatalf("err: %s", err)
}
testRegisterService(t, s, 3, "node1", "dogs")
testRegisterNode(t, s, 4, "node2")
ns2 := &structs.NodeService{
ID: "service3",
Service: "redis",
Tags: []string{"prod", "slave"},
Address: "1.1.1.1",
Port: 1111,
}
if err := s.EnsureService(5, "node2", ns2); err != nil {
t.Fatalf("err: %s", err)
}
// Pull all the services.
idx, services, err := s.Services()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
// Verify the result. We sort the lists since the order is
// non-deterministic (it's built using a map internally).
expected := structs.Services{
"redis": []string{"prod", "master", "slave"},
"dogs": []string{},
}
sort.Strings(expected["redis"])
for _, tags := range services {
sort.Strings(tags)
}
if !reflect.DeepEqual(expected, services) {
t.Fatalf("bad: %#v", services)
}
}
// strContains checks if a list contains a string
func strContains(l []string, s string) bool {
for _, v := range l {
if v == s {
return true
}
}
return false
}
func TestStateStore_ServiceNodes(t *testing.T) {
s := testStateStore(t)
if err := s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(12, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(14, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(15, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}); err != nil {
t.Fatalf("err: %v", err)
}
idx, nodes, err := s.ServiceNodes("db")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 16 {
t.Fatalf("bad: %v", 16)
}
if len(nodes) != 3 {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Node != "bar" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Address != "127.0.0.2" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServiceID != "db" {
t.Fatalf("bad: %v", nodes)
}
if !strContains(nodes[0].ServiceTags, "slave") {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].Node != "bar" {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].Address != "127.0.0.2" {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].ServiceID != "db2" {
t.Fatalf("bad: %v", nodes)
}
if !strContains(nodes[1].ServiceTags, "slave") {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].ServicePort != 8001 {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].Node != "foo" {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].ServiceID != "db" {
t.Fatalf("bad: %v", nodes)
}
if !strContains(nodes[2].ServiceTags, "master") {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
}
func TestStateStore_ServiceTagNodes(t *testing.T) {
s := testStateStore(t)
if err := s.EnsureNode(15, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureNode(16, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(17, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(18, "foo", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(19, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
idx, nodes, err := s.ServiceTagNodes("db", "master")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 17 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Node != "foo" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes)
}
if !strContains(nodes[0].ServiceTags, "master") {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
}
func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
s := testStateStore(t)
if err := s.EnsureNode(15, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureNode(16, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(17, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master", "v2"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(18, "foo", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave", "v2", "dev"}, Address: "", Port: 8001}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(19, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave", "v2"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
idx, nodes, err := s.ServiceTagNodes("db", "master")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 17 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Node != "foo" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes)
}
if !strContains(nodes[0].ServiceTags, "master") {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
idx, nodes, err = s.ServiceTagNodes("db", "v2")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 19 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 3 {
t.Fatalf("bad: %v", nodes)
}
idx, nodes, err = s.ServiceTagNodes("db", "dev")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 18 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Node != "foo" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes)
}
if !strContains(nodes[0].ServiceTags, "dev") {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServicePort != 8001 {
t.Fatalf("bad: %v", nodes)
}
}
func TestStateStore_DeleteService(t *testing.T) { func TestStateStore_DeleteService(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
@ -1231,12 +1514,12 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Make sure we get the expected result // Make sure we get the expected result (service check + node check)
if n := len(results); n != 1 { if n := len(results); n != 1 {
t.Fatalf("expected 1 result, got: %d", n) t.Fatalf("expected 1 result, got: %d", n)
} }
csn := results[0] csn := results[0]
if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 1 { if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 2 {
t.Fatalf("bad output: %#v", csn) t.Fatalf("bad output: %#v", csn)
} }
@ -1271,6 +1554,62 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
} }
} }
func TestStateStore_CheckServiceTagNodes(t *testing.T) {
s := testStateStore(t)
if err := s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(2, "foo", &structs.NodeService{ID: "db1", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "db",
Name: "can connect",
Status: structs.HealthPassing,
ServiceID: "db1",
}
if err := s.EnsureCheck(3, check); err != nil {
t.Fatalf("err: %v", err)
}
check = &structs.HealthCheck{
Node: "foo",
CheckID: "check1",
Name: "another check",
Status: structs.HealthPassing,
}
if err := s.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %v", err)
}
idx, nodes, err := s.CheckServiceTagNodes("db", "master")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
t.Fatalf("Bad: %v", nodes)
}
if nodes[0].Node.Node != "foo" {
t.Fatalf("Bad: %v", nodes[0])
}
if nodes[0].Service.ID != "db1" {
t.Fatalf("Bad: %v", nodes[0])
}
if len(nodes[0].Checks) != 2 {
t.Fatalf("Bad: %v", nodes[0])
}
if nodes[0].Checks[0].CheckID != "check1" {
t.Fatalf("Bad: %v", nodes[0])
}
if nodes[0].Checks[1].CheckID != "db" {
t.Fatalf("Bad: %v", nodes[0])
}
}
func TestStateStore_Check_Snapshot(t *testing.T) { func TestStateStore_Check_Snapshot(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)

View File

@ -1,4 +1,4 @@
package consul package state
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package consul package state
import ( import (
"testing" "testing"

View File

@ -130,3 +130,27 @@ func (w *PrefixWatch) Notify(prefix string, subtree bool) {
w.watches.Delete(cleanup[i]) w.watches.Delete(cleanup[i])
} }
} }
// MultiWatch wraps several watches and allows any of them to trigger the
// caller.
type MultiWatch struct {
watches []Watch
}
func NewMultiWatch(watches ...Watch) *MultiWatch {
return &MultiWatch{watches: watches}
}
// See Watch.
func (w *MultiWatch) Wait(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Wait(notifyCh)
}
}
// See Watch.
func (w *MultiWatch) Clear(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Clear(notifyCh)
}
}

View File

@ -13,6 +13,7 @@ import (
"github.com/armon/go-radix" "github.com/armon/go-radix"
"github.com/armon/gomdb" "github.com/armon/gomdb"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
@ -88,7 +89,7 @@ type StateStore struct {
// GC is when we create tombstones to track their time-to-live. // GC is when we create tombstones to track their time-to-live.
// The GC is consumed upstream to manage clearing of tombstones. // The GC is consumed upstream to manage clearing of tombstones.
gc *TombstoneGC gc *state.TombstoneGC
} }
// StateSnapshot is used to provide a point-in-time snapshot // StateSnapshot is used to provide a point-in-time snapshot
@ -115,7 +116,7 @@ func (s *StateSnapshot) Close() error {
} }
// NewStateStore is used to create a new state store // NewStateStore is used to create a new state store
func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) { func NewStateStore(gc *state.TombstoneGC, logOutput io.Writer) (*StateStore, error) {
// Create a new temp dir // Create a new temp dir
path, err := ioutil.TempDir("", "consul") path, err := ioutil.TempDir("", "consul")
if err != nil { if err != nil {
@ -126,7 +127,7 @@ func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) {
// NewStateStorePath is used to create a new state store at a given path // NewStateStorePath is used to create a new state store at a given path
// The path is cleared on closing. // The path is cleared on closing.
func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) { func NewStateStorePath(gc *state.TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) {
// Open the env // Open the env
env, err := mdb.NewEnv() env, err := mdb.NewEnv()
if err != nil { if err != nil {

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
state_store "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
@ -1539,7 +1540,7 @@ func TestKVSDelete(t *testing.T) {
ttl := 10 * time.Millisecond ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran) gc, err := state_store.NewTombstoneGC(ttl, gran)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -2062,7 +2063,7 @@ func TestKVSDeleteTree(t *testing.T) {
ttl := 10 * time.Millisecond ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran) gc, err := state_store.NewTombstoneGC(ttl, gran)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -2169,7 +2170,7 @@ func TestReapTombstones(t *testing.T) {
ttl := 10 * time.Millisecond ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran) gc, err := state_store.NewTombstoneGC(ttl, gran)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }