diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 990e23b88f..97607efa34 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -6,6 +6,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" @@ -145,11 +146,9 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, return fmt.Errorf(aclDisabled) } - // Get the local state - state := a.srv.fsm.State() return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, acl, err := state.ACLGet(ws, args.ACL) if err != nil { return err @@ -224,11 +223,9 @@ func (a *ACL) List(args *structs.DCSpecificRequest, return permissionDeniedErr } - // Get the local state - state := a.srv.fsm.State() return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, acls, err := state.ACLList(ws) if err != nil { return err diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index dfa71f2608..1108f5dfee 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -5,6 +5,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-memdb" @@ -163,12 +164,10 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return err } - // Get the list of nodes. - state := c.srv.fsm.State() return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var nodes structs.Nodes var err error @@ -195,12 +194,10 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return err } - // Get the list of services and their tags. - state := c.srv.fsm.State() return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var services structs.Services var err error @@ -229,12 +226,10 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru return fmt.Errorf("Must provide service name") } - // Get the nodes - state := c.srv.fsm.State() err := c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var services structs.ServiceNodes var err error @@ -286,12 +281,10 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs return fmt.Errorf("Must provide node") } - // Get the node services - state := c.srv.fsm.State() return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, services, err := state.NodeServices(ws, args.Node) if err != nil { return err diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 1b5980cf1d..b818f904cd 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/serf/coordinate" @@ -174,10 +175,9 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I return err } - state := c.srv.fsm.State() return c.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, coords, err := state.Coordinates(ws) if err != nil { return err diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 0e33151abf..aa225fb830 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -3,6 +3,7 @@ package consul import ( "fmt" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" ) @@ -19,12 +20,10 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, return err } - // Get the state specific checks - state := h.srv.fsm.State() return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var checks structs.HealthChecks var err error @@ -51,12 +50,10 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, return err } - // Get the node checks - state := h.srv.fsm.State() return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, checks, err := state.NodeChecks(ws, args.Node) if err != nil { return err @@ -79,12 +76,10 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, return err } - // Get the service checks - state := h.srv.fsm.State() return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var checks structs.HealthChecks var err error @@ -115,12 +110,10 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc return fmt.Errorf("Must provide service name") } - // Get the nodes - state := h.srv.fsm.State() err := h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { var index uint64 var nodes structs.CheckServiceNodes var err error diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index ff23fb562e..2d0c059619 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -3,6 +3,7 @@ package consul import ( "fmt" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/serf/serf" @@ -22,12 +23,10 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, return err } - // Get the node info - state := m.srv.fsm.State() return m.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, dump, err := state.NodeInfo(ws, args.Node) if err != nil { return err @@ -45,12 +44,10 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, return err } - // Get all the node info - state := m.srv.fsm.State() return m.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, dump, err := state.NodeDump(ws) if err != nil { return err diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 114a764d2d..9f0d4cd0c5 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -6,6 +6,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" ) @@ -118,12 +119,10 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er return err } - // Get the local state - state := k.srv.fsm.State() return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, ent, err := state.KVSGet(ws, args.Key) if err != nil { return err @@ -159,12 +158,10 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e return err } - // Get the local state - state := k.srv.fsm.State() return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, ent, err := state.KVSList(ws, args.Key) if err != nil { return err @@ -201,12 +198,10 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi return err } - // Get the local state - state := k.srv.fsm.State() return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator) if err != nil { return err diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go index baa14776cb..84ad808148 100644 --- a/consul/prepared_query_endpoint.go +++ b/consul/prepared_query_endpoint.go @@ -8,6 +8,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" @@ -217,12 +218,10 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, return err } - // Get the requested query. - state := p.srv.fsm.State() return p.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, query, err := state.PreparedQueryGet(ws, args.QueryID) if err != nil { return err @@ -263,12 +262,10 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind return err } - // Get the list of queries. - state := p.srv.fsm.State() return p.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, queries, err := state.PreparedQueryList(ws) if err != nil { return err diff --git a/consul/rpc.go b/consul/rpc.go index b71bcaa3d7..0c17ea5ec6 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -11,6 +11,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-memdb" @@ -354,7 +355,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // queryFn is used to perform a query operation. If a re-query is needed, the // passed-in watch set will be used to block for changes. -type queryFn func(memdb.WatchSet) error +type queryFn func(memdb.WatchSet, *state.StateStore) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta, @@ -394,6 +395,11 @@ RUN_QUERY: // Run the query. metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1) + // Operate on a consistent set of state. This makes sure that the + // abandon channel goes with the state that the caller is using to + // build watches. + state := s.fsm.State() + // We can skip all watch tracking if this isn't a blocking query. var ws memdb.WatchSet if queryOpts.MinQueryIndex > 0 { @@ -401,11 +407,11 @@ RUN_QUERY: // This channel will be closed if a snapshot is restored and the // whole state store is abandoned. - ws.Add(s.fsm.State().AbandonCh()) + ws.Add(state.AbandonCh()) } // Block up to the timeout if we didn't see anything fresh. - err := fn(ws) + err := fn(ws, state) if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex { if expired := ws.Watch(timeout.C); !expired { goto RUN_QUERY diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go index cf3679c9cf..557535c56f 100644 --- a/consul/session_endpoint.go +++ b/consul/session_endpoint.go @@ -5,6 +5,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" @@ -140,12 +141,10 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, return err } - // Get the local state - state := s.srv.fsm.State() return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, session, err := state.SessionGet(ws, args.Session) if err != nil { return err @@ -171,12 +170,10 @@ func (s *Session) List(args *structs.DCSpecificRequest, return err } - // Get the local state - state := s.srv.fsm.State() return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, sessions, err := state.SessionList(ws) if err != nil { return err @@ -197,12 +194,10 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, return err } - // Get the local state - state := s.srv.fsm.State() return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet) error { + func(ws memdb.WatchSet, state *state.StateStore) error { index, sessions, err := state.NodeSessions(ws, args.Node) if err != nil { return err