golint: No stutter

This commit is contained in:
Frank Schroeder 2017-04-20 17:46:29 -07:00 committed by Frank Schröder
parent cf3ec1cf5c
commit d7e23857ad
29 changed files with 203 additions and 203 deletions

View File

@ -16,7 +16,7 @@ import (
"github.com/hashicorp/serf/serf"
)
type AgentSelf struct {
type Self struct {
Config *Config
Coord *coordinate.Coordinate
Member serf.Member
@ -44,7 +44,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
return nil, errPermissionDenied
}
return AgentSelf{
return Self{
Config: s.agent.config,
Coord: c,
Member: s.agent.LocalMember(),

View File

@ -223,7 +223,7 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("err: %v", err)
}
val := obj.(AgentSelf)
val := obj.(Self)
if int(val.Member.Port) != srv.agent.config.Ports.SerfLan {
t.Fatalf("incorrect port: %v", obj)
}

View File

@ -1090,7 +1090,7 @@ func (c *Command) Run(args []string) int {
// Register the watches
for _, wp := range config.WatchPlans {
go func(wp *watch.WatchPlan) {
go func(wp *watch.Plan) {
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
addr := httpAddr.String()
@ -1307,7 +1307,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
// Register the new watches
for _, wp := range newConf.WatchPlans {
go func(wp *watch.WatchPlan) {
go func(wp *watch.Plan) {
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr.String()); err != nil {

View File

@ -718,7 +718,7 @@ type Config struct {
VersionPrerelease string `mapstructure:"-"`
// WatchPlans contains the compiled watches
WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"`
WatchPlans []*watch.Plan `mapstructure:"-" json:"-"`
// UnixSockets is a map of socket configuration data
UnixSockets UnixSocketConfig `mapstructure:"unix_sockets"`

View File

@ -148,7 +148,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
return a.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, acl, err := state.ACLGet(ws, args.ACL)
if err != nil {
return err
@ -225,7 +225,7 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
return a.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, acls, err := state.ACLList(ws)
if err != nil {
return err

View File

@ -169,7 +169,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var nodes structs.Nodes
var err error
@ -199,7 +199,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var services structs.Services
var err error
@ -231,7 +231,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
err := c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var services structs.ServiceNodes
var err error
@ -286,7 +286,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, services, err := state.NodeServices(ws, args.Node)
if err != nil {
return err

View File

@ -169,7 +169,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, coords, err := state.Coordinates(ws)
if err != nil {
return err

View File

@ -31,7 +31,7 @@ type consulFSM struct {
// new state store). Everything internal here is synchronized by the
// Raft side, so doesn't need to lock this.
stateLock sync.RWMutex
state *state.StateStore
state *state.Store
gc *state.TombstoneGC
}
@ -40,7 +40,7 @@ type consulFSM struct {
// state in a way that can be accessed concurrently with operations
// that may modify the live state.
type consulSnapshot struct {
state *state.StateSnapshot
state *state.Snapshot
}
// snapshotHeader is the first entry in our snapshot
@ -67,7 +67,7 @@ func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {
}
// State is used to return a handle to the current state
func (c *consulFSM) State() *state.StateStore {
func (c *consulFSM) State() *state.Store {
c.stateLock.RLock()
defer c.stateLock.RUnlock()
return c.state

View File

@ -23,7 +23,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
return h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var checks structs.HealthChecks
var err error
@ -53,7 +53,7 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
return h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, checks, err := state.NodeChecks(ws, args.Node)
if err != nil {
return err
@ -79,7 +79,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
return h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var checks structs.HealthChecks
var err error
@ -113,7 +113,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
err := h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var nodes structs.CheckServiceNodes
var err error

View File

@ -26,7 +26,7 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest,
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, dump, err := state.NodeInfo(ws, args.Node)
if err != nil {
return err
@ -47,7 +47,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, dump, err := state.NodeDump(ws)
if err != nil {
return err

View File

@ -123,7 +123,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
return k.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, ent, err := state.KVSGet(ws, args.Key)
if err != nil {
return err
@ -162,7 +162,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
return k.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, ent, err := state.KVSList(ws, args.Key)
if err != nil {
return err
@ -202,7 +202,7 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
return k.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator)
if err != nil {
return err

View File

@ -221,7 +221,7 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
return p.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, query, err := state.PreparedQueryGet(ws, args.QueryID)
if err != nil {
return err
@ -265,7 +265,7 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind
return p.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, queries, err := state.PreparedQueryList(ws)
if err != nil {
return err

View File

@ -344,7 +344,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
// store should be used (vs. calling fsm.State()) since the given state store
// will be correctly watched for changes if the state store is restored from
// a snapshot.
type queryFn func(memdb.WatchSet, *state.StateStore) error
type queryFn func(memdb.WatchSet, *state.Store) error
// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,

View File

@ -84,7 +84,7 @@ func TestRPC_blockingQuery(t *testing.T) {
var opts structs.QueryOptions
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
fn := func(ws memdb.WatchSet, state *state.Store) error {
calls++
return nil
}
@ -103,7 +103,7 @@ func TestRPC_blockingQuery(t *testing.T) {
}
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
fn := func(ws memdb.WatchSet, state *state.Store) error {
if calls == 0 {
meta.Index = 3
@ -132,7 +132,7 @@ func TestRPC_blockingQuery(t *testing.T) {
}
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
fn := func(ws memdb.WatchSet, state *state.Store) error {
if calls == 0 {
meta.Index = 3

View File

@ -147,7 +147,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
return s.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, session, err := state.SessionGet(ws, args.Session)
if err != nil {
return err
@ -176,7 +176,7 @@ func (s *Session) List(args *structs.DCSpecificRequest,
return s.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, sessions, err := state.SessionList(ws)
if err != nil {
return err
@ -200,7 +200,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
return s.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.StateStore) error {
func(ws memdb.WatchSet, state *state.Store) error {
index, sessions, err := state.NodeSessions(ws, args.Node)
if err != nil {
return err

View File

@ -8,7 +8,7 @@ import (
)
// ACLs is used to pull all the ACLs from the snapshot.
func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
func (s *Snapshot) ACLs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("acls", "id")
if err != nil {
return nil, err
@ -17,7 +17,7 @@ func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
}
// ACL is used when restoring from a snapshot. For general inserts, use ACLSet.
func (s *StateRestore) ACL(acl *structs.ACL) error {
func (s *Restore) ACL(acl *structs.ACL) error {
if err := s.tx.Insert("acls", acl); err != nil {
return fmt.Errorf("failed restoring acl: %s", err)
}
@ -30,7 +30,7 @@ func (s *StateRestore) ACL(acl *structs.ACL) error {
}
// ACLSet is used to insert an ACL rule into the state store.
func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error {
func (s *Store) ACLSet(idx uint64, acl *structs.ACL) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -45,7 +45,7 @@ func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error {
// aclSetTxn is the inner method used to insert an ACL rule with the
// proper indexes into the state store.
func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error {
func (s *Store) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error {
// Check that the ID is set
if acl.ID == "" {
return ErrMissingACLID
@ -78,7 +78,7 @@ func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) erro
}
// ACLGet is used to look up an existing ACL by ID.
func (s *StateStore) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.ACL, error) {
func (s *Store) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.ACL, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -99,7 +99,7 @@ func (s *StateStore) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.A
}
// ACLList is used to list out all of the ACLs in the state store.
func (s *StateStore) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) {
func (s *Store) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -116,7 +116,7 @@ func (s *StateStore) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) {
// aclListTxn is used to list out all of the ACLs in the state store. This is a
// function vs. a method so it can be called from the snapshotter.
func (s *StateStore) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs, error) {
func (s *Store) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs, error) {
// Query all of the ACLs in the state store
iter, err := tx.Get("acls", "id")
if err != nil {
@ -135,7 +135,7 @@ func (s *StateStore) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs,
// ACLDelete is used to remove an existing ACL from the state store. If
// the ACL does not exist this is a no-op and no error is returned.
func (s *StateStore) ACLDelete(idx uint64, aclID string) error {
func (s *Store) ACLDelete(idx uint64, aclID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -150,7 +150,7 @@ func (s *StateStore) ACLDelete(idx uint64, aclID string) error {
// aclDeleteTxn is used to delete an ACL from the state store within
// an existing transaction.
func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error {
func (s *Store) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error {
// Look up the existing ACL
acl, err := tx.First("acls", "id", aclID)
if err != nil {

View File

@ -8,7 +8,7 @@ import (
)
// Autopilot is used to pull the autopilot config from the snapshot.
func (s *StateSnapshot) Autopilot() (*structs.AutopilotConfig, error) {
func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) {
c, err := s.tx.First("autopilot-config", "id")
if err != nil {
return nil, err
@ -23,7 +23,7 @@ func (s *StateSnapshot) Autopilot() (*structs.AutopilotConfig, error) {
}
// Autopilot is used when restoring from a snapshot.
func (s *StateRestore) Autopilot(config *structs.AutopilotConfig) error {
func (s *Restore) Autopilot(config *structs.AutopilotConfig) error {
if err := s.tx.Insert("autopilot-config", config); err != nil {
return fmt.Errorf("failed restoring autopilot config: %s", err)
}
@ -32,7 +32,7 @@ func (s *StateRestore) Autopilot(config *structs.AutopilotConfig) error {
}
// AutopilotConfig is used to get the current Autopilot configuration.
func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -51,7 +51,7 @@ func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error)
}
// AutopilotSetConfig is used to set the current Autopilot configuration.
func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -64,7 +64,7 @@ func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotCon
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
@ -88,7 +88,7 @@ func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.Autopi
return true, nil
}
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {

View File

@ -28,7 +28,7 @@ func resizeNodeLookupKey(s string) string {
}
// Nodes is used to pull the full list of nodes for use during snapshots.
func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) {
func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("nodes", "id")
if err != nil {
return nil, err
@ -38,7 +38,7 @@ func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) {
// Services is used to pull the full list of services for a given node for use
// during snapshots.
func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) {
func (s *Snapshot) Services(node string) (memdb.ResultIterator, error) {
iter, err := s.tx.Get("services", "node", node)
if err != nil {
return nil, err
@ -48,7 +48,7 @@ func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) {
// Checks is used to pull the full list of checks for a given node for use
// during snapshots.
func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
iter, err := s.tx.Get("checks", "node", node)
if err != nil {
return nil, err
@ -59,7 +59,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
// Registration is used to make sure a node, service, and check registration is
// performed within a single transaction to avoid race conditions on state
// updates.
func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error {
func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil {
return err
}
@ -69,7 +69,7 @@ func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) er
// EnsureRegistration is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race
// conditions on state updates.
func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest) error {
func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -84,7 +84,7 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
// ensureRegistrationTxn is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race
// conditions on state updates.
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error {
func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error {
// Create a node structure.
node := &structs.Node{
ID: req.ID,
@ -152,7 +152,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *struc
}
// EnsureNode is used to upsert node registration or modification.
func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -168,7 +168,7 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
// ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn.
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error {
func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error {
// See if there's an existing node with this UUID, and make sure the
// name is the same.
var n *structs.Node
@ -218,7 +218,7 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node
}
// GetNode is used to retrieve a node registration by node name ID.
func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -237,7 +237,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
}
// GetNodeID is used to retrieve a node registration by node ID.
func (s *StateStore) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -256,7 +256,7 @@ func (s *StateStore) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
}
// Nodes is used to return all of the known nodes.
func (s *StateStore) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -279,7 +279,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
}
// NodesByMeta is used to return all nodes with the given metadata key/value pairs.
func (s *StateStore) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) {
func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -310,7 +310,7 @@ func (s *StateStore) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (
}
// DeleteNode is used to delete a given node by its ID.
func (s *StateStore) DeleteNode(idx uint64, nodeName string) error {
func (s *Store) DeleteNode(idx uint64, nodeName string) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -325,7 +325,7 @@ func (s *StateStore) DeleteNode(idx uint64, nodeName string) error {
// deleteNodeTxn is the inner method used for removing a node from
// the store within a given transaction.
func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error {
func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error {
// Look up the node.
node, err := tx.First("nodes", "id", nodeName)
if err != nil {
@ -413,7 +413,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
}
// EnsureService is called to upsert creation of a given NodeService.
func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeService) error {
func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -428,7 +428,7 @@ func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeSer
// ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction.
func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error {
func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error {
// Check for existing service
existing, err := tx.First("services", "id", node, svc.ID)
if err != nil {
@ -468,7 +468,7 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv
}
// Services returns all services along with a list of associated tags.
func (s *StateStore) Services(ws memdb.WatchSet) (uint64, structs.Services, error) {
func (s *Store) Services(ws memdb.WatchSet) (uint64, structs.Services, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -509,7 +509,7 @@ func (s *StateStore) Services(ws memdb.WatchSet) (uint64, structs.Services, erro
}
// ServicesByNodeMeta returns all services, filtered by the given node metadata.
func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Services, error) {
func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Services, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -578,7 +578,7 @@ func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]st
}
// ServiceNodes returns the nodes associated with a given service name.
func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -607,7 +607,7 @@ func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64
// ServiceTagNodes returns the nodes associated with a given service, filtering
// out services that don't contain the given tag.
func (s *StateStore) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) {
func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -656,7 +656,7 @@ func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
// parseServiceNodes iterates over a services query and fills in the node details,
// returning a ServiceNodes slice.
func (s *StateStore) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
func (s *Store) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
// We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback.
allNodes, err := tx.Get("nodes", "id")
@ -697,7 +697,7 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, service
// NodeService is used to retrieve a specific service associated with the given
// node.
func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *structs.NodeService, error) {
func (s *Store) NodeService(nodeName string, serviceID string) (uint64, *structs.NodeService, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -718,7 +718,7 @@ func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *st
}
// NodeServices is used to query service registrations by node name or UUID.
func (s *StateStore) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint64, *structs.NodeServices, error) {
func (s *Store) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint64, *structs.NodeServices, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -792,7 +792,7 @@ func (s *StateStore) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint6
}
// DeleteService is used to delete a given service associated with a node.
func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error {
func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -807,7 +807,7 @@ func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error
// deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction.
func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
// Look up the service.
service, err := tx.First("services", "id", nodeName, serviceID)
if err != nil {
@ -852,7 +852,7 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, servi
}
// EnsureCheck is used to store a check registration in the db.
func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -868,7 +868,7 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
// ensureCheckTransaction is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting
// checks with no matching node or service.
func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error {
func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error {
// Check if we have an existing health check
existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID))
if err != nil {
@ -947,7 +947,7 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt
// NodeCheck is used to retrieve a specific check associated with the given
// node.
func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) {
func (s *Store) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -969,7 +969,7 @@ func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64,
// NodeChecks is used to retrieve checks associated with the
// given node from the state store.
func (s *StateStore) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, structs.HealthChecks, error) {
func (s *Store) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -993,7 +993,7 @@ func (s *StateStore) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, str
// ServiceChecks is used to get all checks associated with a
// given service ID. The query is performed against a service
// _name_ instead of a service ID.
func (s *StateStore) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint64, structs.HealthChecks, error) {
func (s *Store) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1017,7 +1017,7 @@ func (s *StateStore) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint6
// ServiceChecksByNodeMeta is used to get all checks associated with a
// given service ID, filtered by the given node metadata values. The query
// is performed against a service _name_ instead of a service ID.
func (s *StateStore) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
@ -1038,7 +1038,7 @@ func (s *StateStore) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName stri
// ChecksInState is used to query the state store for all checks
// which are in the provided state.
func (s *StateStore) ChecksInState(ws memdb.WatchSet, state string) (uint64, structs.HealthChecks, error) {
func (s *Store) ChecksInState(ws memdb.WatchSet, state string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1070,7 +1070,7 @@ func (s *StateStore) ChecksInState(ws memdb.WatchSet, state string) (uint64, str
// ChecksInStateByNodeMeta is used to query the state store for all checks
// which are in the provided state, filtered by the given node metadata values.
func (s *StateStore) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1098,7 +1098,7 @@ func (s *StateStore) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, fi
// parseChecksByNodeMeta is a helper function used to deduplicate some
// repetitive code for returning health checks filtered by node metadata fields.
func (s *StateStore) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet,
func (s *Store) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet,
idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) {
// We don't want to track an unlimited number of nodes, so we pull a
@ -1132,7 +1132,7 @@ func (s *StateStore) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet,
}
// DeleteCheck is used to delete a health check registration.
func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) error {
func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -1147,7 +1147,7 @@ func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID)
// deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction.
func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error {
func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error {
// Try to retrieve the existing health check.
hc, err := tx.First("checks", "id", node, string(checkID))
if err != nil {
@ -1186,7 +1186,7 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, chec
}
// CheckServiceNodes is used to query all nodes and checks for a given service.
func (s *StateStore) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.CheckServiceNodes, error) {
func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1210,7 +1210,7 @@ func (s *StateStore) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (u
// CheckServiceTagNodes is used to query all nodes and checks for a given
// service, filtering out services that don't contain the given tag.
func (s *StateStore) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) {
func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1238,7 +1238,7 @@ func (s *StateStore) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag st
// parseCheckServiceNodes is used to parse through a given set of services,
// and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query.
func (s *StateStore) parseCheckServiceNodes(
func (s *Store) parseCheckServiceNodes(
tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
serviceName string, services structs.ServiceNodes,
err error) (uint64, structs.CheckServiceNodes, error) {
@ -1318,7 +1318,7 @@ func (s *StateStore) parseCheckServiceNodes(
// NodeInfo is used to generate a dump of a single node. The dump includes
// all services and checks which are registered against the node.
func (s *StateStore) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.NodeDump, error) {
func (s *Store) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.NodeDump, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1337,7 +1337,7 @@ func (s *StateStore) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.N
// NodeDump is used to generate a dump of all nodes. This call is expensive
// as it has to query every node, service, and check. The response can also
// be quite large since there is currently no filtering applied.
func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) {
func (s *Store) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1356,7 +1356,7 @@ func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, erro
// parseNodes takes an iterator over a set of nodes and returns a struct
// containing the nodes along with all of their associated services
// and/or health checks.
func (s *StateStore) parseNodes(tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
func (s *Store) parseNodes(tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
iter memdb.ResultIterator) (uint64, structs.NodeDump, error) {
// We don't want to track an unlimited number of services, so we pull a

View File

@ -9,7 +9,7 @@ import (
)
// Coordinates is used to pull all the coordinates from the snapshot.
func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) {
func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("coordinates", "id")
if err != nil {
return nil, err
@ -20,7 +20,7 @@ func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) {
// Coordinates is used when restoring from a snapshot. For general inserts, use
// CoordinateBatchUpdate. We do less vetting of the updates here because they
// already got checked on the way in during a batch update.
func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) error {
func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
for _, update := range updates {
if err := s.tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed restoring coordinate: %s", err)
@ -39,7 +39,7 @@ func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) erro
// nil, none of the Raft or node information is returned. This hits the 90%
// internal-to-Consul use case for this data, and this isn't exposed via an
// endpoint, so it doesn't matter that the Raft info isn't available.
func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
func (s *Store) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -57,7 +57,7 @@ func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, erro
}
// Coordinates queries for all nodes with coordinates.
func (s *StateStore) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
func (s *Store) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -80,7 +80,7 @@ func (s *StateStore) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
// them in a single transaction.
func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
tx := s.db.Txn(true)
defer tx.Abort()

View File

@ -10,7 +10,7 @@ import (
)
// KVs is used to pull the full list of KVS entries for use during snapshots.
func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
func (s *Snapshot) KVs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("kvs", "id_prefix")
if err != nil {
return nil, err
@ -19,12 +19,12 @@ func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
}
// Tombstones is used to pull all the tombstones from the graveyard.
func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) {
func (s *Snapshot) Tombstones() (memdb.ResultIterator, error) {
return s.store.kvsGraveyard.DumpTxn(s.tx)
}
// KVS is used when restoring from a snapshot. Use KVSSet for general inserts.
func (s *StateRestore) KVS(entry *structs.DirEntry) error {
func (s *Restore) KVS(entry *structs.DirEntry) error {
if err := s.tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
@ -37,7 +37,7 @@ func (s *StateRestore) KVS(entry *structs.DirEntry) error {
// Tombstone is used when restoring from a snapshot. For general inserts, use
// Graveyard.InsertTxn.
func (s *StateRestore) Tombstone(stone *Tombstone) error {
func (s *Restore) Tombstone(stone *Tombstone) error {
if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil {
return fmt.Errorf("failed restoring tombstone: %s", err)
}
@ -47,7 +47,7 @@ func (s *StateRestore) Tombstone(stone *Tombstone) error {
// ReapTombstones is used to delete all the tombstones with an index
// less than or equal to the given index. This is used to prevent
// unbounded storage growth of the tombstones.
func (s *StateStore) ReapTombstones(index uint64) error {
func (s *Store) ReapTombstones(index uint64) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -60,7 +60,7 @@ func (s *StateStore) ReapTombstones(index uint64) error {
}
// KVSSet is used to store a key/value pair.
func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -78,7 +78,7 @@ func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
// If updateSession is true, then the incoming entry will set the new
// session (should be validated before calling this). Otherwise, we will keep
// whatever the existing session is.
func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
// Retrieve an existing KV pair
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
@ -115,7 +115,7 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr
}
// KVSGet is used to retrieve a key/value pair from the state store.
func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
func (s *Store) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -124,7 +124,7 @@ func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.Dir
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
// transaction.
func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
func (s *Store) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
// Get the table index.
idx := maxIndexTxn(tx, "kvs", "tombstones")
@ -144,7 +144,7 @@ func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (ui
// prefix is left empty, all keys in the KVS will be returned. The returned
// is the max index of the returned kvs entries or applicable tombstones, or
// else it's the full table indexes for kvs and tombstones.
func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
func (s *Store) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -153,7 +153,7 @@ func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.
// kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix.
func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
func (s *Store) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")
@ -201,7 +201,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string)
// An optional separator may be specified, which can be used to slice off a part
// of the response so that only a subset of the prefix is returned. In this
// mode, the keys which are omitted are still counted in the returned index.
func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) {
func (s *Store) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -274,7 +274,7 @@ func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64,
// KVSDelete is used to perform a shallow delete on a single key in the
// the state store.
func (s *StateStore) KVSDelete(idx uint64, key string) error {
func (s *Store) KVSDelete(idx uint64, key string) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -289,7 +289,7 @@ func (s *StateStore) KVSDelete(idx uint64, key string) error {
// kvsDeleteTxn is the inner method used to perform the actual deletion
// of a key/value pair within an existing transaction.
func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
func (s *Store) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
// Look up the entry in the state store.
entry, err := tx.First("kvs", "id", key)
if err != nil {
@ -319,7 +319,7 @@ func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
// raft index. If the CAS index specified is not equal to the last
// observed index for the given key, then the call is a noop, otherwise
// a normal KV delete is invoked.
func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
@ -334,7 +334,7 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
// transaction.
func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) {
func (s *Store) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) {
// Retrieve the existing kvs entry, if any exists.
entry, err := tx.First("kvs", "id", key)
if err != nil {
@ -360,7 +360,7 @@ func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string
// ModifyIndex in the provided entry is used to determine if we should
// write the entry to the state store or bail. Returns a bool indicating
// if a write happened and any error.
func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
@ -375,7 +375,7 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error
// kvsSetCASTxn is the inner method used to do a CAS inside an existing
// transaction.
func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
func (s *Store) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
@ -405,7 +405,7 @@ func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE
// KVSDeleteTree is used to do a recursive delete on a key prefix
// in the state store. If any keys are modified, the last index is
// set, otherwise this is a no-op.
func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
func (s *Store) KVSDeleteTree(idx uint64, prefix string) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -419,7 +419,7 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction.
func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
// Get an iterator over all of the keys with the given prefix.
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
@ -459,13 +459,13 @@ func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string)
// KVSLockDelay returns the expiration time for any lock delay associated with
// the given key.
func (s *StateStore) KVSLockDelay(key string) time.Time {
func (s *Store) KVSLockDelay(key string) time.Time {
return s.lockDelay.GetExpiration(key)
}
// KVSLock is similar to KVSSet but only performs the set if the lock can be
// acquired.
func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
@ -480,7 +480,7 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error)
// kvsLockTxn is the inner method that does a lock inside an existing
// transaction.
func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
func (s *Store) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present.
if entry.Session == "" {
return false, fmt.Errorf("missing session")
@ -531,7 +531,7 @@ func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEnt
// KVSUnlock is similar to KVSSet but only performs the set if the lock can be
// unlocked (the key must already exist and be locked).
func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
@ -546,7 +546,7 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error
// kvsUnlockTxn is the inner method that does an unlock inside an existing
// transaction.
func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
func (s *Store) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present.
if entry.Session == "" {
return false, fmt.Errorf("missing session")
@ -584,7 +584,7 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE
// kvsCheckSessionTxn checks to see if the given session matches the current
// entry for a key.
func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {
func (s *Store) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)
@ -603,7 +603,7 @@ func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session strin
// kvsCheckIndexTxn checks to see if the given modify index matches the current
// entry for a key.
func (s *StateStore) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) {
func (s *Store) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)

View File

@ -45,7 +45,7 @@ func toPreparedQuery(wrapped interface{}) *structs.PreparedQuery {
}
// PreparedQueries is used to pull all the prepared queries from the snapshot.
func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) {
func (s *Snapshot) PreparedQueries() (structs.PreparedQueries, error) {
queries, err := s.tx.Get("prepared-queries", "id")
if err != nil {
return nil, err
@ -60,7 +60,7 @@ func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) {
// PreparedQuery is used when restoring from a snapshot. For general inserts,
// use PreparedQuerySet.
func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
func (s *Restore) PreparedQuery(query *structs.PreparedQuery) error {
// If this is a template, compile it, otherwise leave the compiled
// template field nil.
var ct *prepared_query.CompiledTemplate
@ -84,7 +84,7 @@ func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
}
// PreparedQuerySet is used to create or update a prepared query.
func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error {
func (s *Store) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -98,7 +98,7 @@ func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery)
// preparedQuerySetTxn is the inner method used to insert a prepared query with
// the proper indexes into the state store.
func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
func (s *Store) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
// Check that the ID is set.
if query.ID == "" {
return ErrMissingQueryID
@ -201,7 +201,7 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc
}
// PreparedQueryDelete deletes the given query by ID.
func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -215,7 +215,7 @@ func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
// preparedQueryDeleteTxn is the inner method used to delete a prepared query
// with the proper indexes into the state store.
func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error {
func (s *Store) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error {
// Pull the query.
wrapped, err := tx.First("prepared-queries", "id", queryID)
if err != nil {
@ -237,7 +237,7 @@ func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID s
}
// PreparedQueryGet returns the given prepared query by ID.
func (s *StateStore) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *structs.PreparedQuery, error) {
func (s *Store) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -256,7 +256,7 @@ func (s *StateStore) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64
// PreparedQueryResolve returns the given prepared query by looking up an ID or
// Name. If the query was looked up by name and it's a template, then the
// template will be rendered before it is returned.
func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
func (s *Store) PreparedQueryResolve(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -331,7 +331,7 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct
}
// PreparedQueryList returns all the prepared queries.
func (s *StateStore) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.PreparedQueries, error) {
func (s *Store) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.PreparedQueries, error) {
tx := s.db.Txn(false)
defer tx.Abort()

View File

@ -10,7 +10,7 @@ import (
)
// Sessions is used to pull the full list of sessions for use during snapshots.
func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
func (s *Snapshot) Sessions() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("sessions", "id")
if err != nil {
return nil, err
@ -20,7 +20,7 @@ func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
// Session is used when restoring from a snapshot. For general inserts, use
// SessionCreate.
func (s *StateRestore) Session(sess *structs.Session) error {
func (s *Restore) Session(sess *structs.Session) error {
// Insert the session.
if err := s.tx.Insert("sessions", sess); err != nil {
return fmt.Errorf("failed inserting session: %s", err)
@ -47,7 +47,7 @@ func (s *StateRestore) Session(sess *structs.Session) error {
}
// SessionCreate is used to register a new session in the state store.
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -70,7 +70,7 @@ func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
// sessionCreateTxn is the inner method used for creating session entries in
// an open transaction. Any health checks registered with the session will be
// checked for failing status. Returns any error encountered.
func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error {
func (s *Store) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error {
// Check that we have a session ID
if sess.ID == "" {
return ErrMissingSessionID
@ -144,7 +144,7 @@ func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.S
}
// SessionGet is used to retrieve an active session from the state store.
func (s *StateStore) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *structs.Session, error) {
func (s *Store) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *structs.Session, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -164,7 +164,7 @@ func (s *StateStore) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *s
}
// SessionList returns a slice containing all of the active sessions.
func (s *StateStore) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, error) {
func (s *Store) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -189,7 +189,7 @@ func (s *StateStore) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, e
// NodeSessions returns a set of active sessions associated
// with the given node ID. The returned index is the highest
// index seen from the result set.
func (s *StateStore) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, structs.Sessions, error) {
func (s *Store) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -214,7 +214,7 @@ func (s *StateStore) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, str
// SessionDestroy is used to remove an active session. This will
// implicitly invalidate the session and invoke the specified
// session destroy behavior.
func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
func (s *Store) SessionDestroy(idx uint64, sessionID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
@ -229,7 +229,7 @@ func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
// deleteSessionTxn is the inner method, which is used to do the actual
// session deletion and handle session invalidation, etc.
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error {
func (s *Store) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error {
// Look up the session.
sess, err := tx.First("sessions", "id", sessionID)
if err != nil {

View File

@ -42,11 +42,11 @@ const (
watchLimit = 2048
)
// StateStore is where we store all of Consul's state, including
// Store is where we store all of Consul's state, including
// records of node registrations, services, checks, key/value
// pairs and more. The DB is entirely in-memory and is constructed
// from the Raft log through the FSM.
type StateStore struct {
type Store struct {
schema *memdb.DBSchema
db *memdb.MemDB
@ -61,18 +61,18 @@ type StateStore struct {
lockDelay *Delay
}
// StateSnapshot is used to provide a point-in-time snapshot. It
// Snapshot is used to provide a point-in-time snapshot. It
// works by starting a read transaction against the whole state store.
type StateSnapshot struct {
store *StateStore
type Snapshot struct {
store *Store
tx *memdb.Txn
lastIndex uint64
}
// StateRestore is used to efficiently manage restoring a large amount of
// Restore is used to efficiently manage restoring a large amount of
// data to a state store.
type StateRestore struct {
store *StateStore
type Restore struct {
store *Store
tx *memdb.Txn
}
@ -93,7 +93,7 @@ type sessionCheck struct {
}
// NewStateStore creates a new in-memory state storage layer.
func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
func NewStateStore(gc *TombstoneGC) (*Store, error) {
// Create the in-memory DB.
schema := stateStoreSchema()
db, err := memdb.NewMemDB(schema)
@ -102,7 +102,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
}
// Create and return the state store.
s := &StateStore{
s := &Store{
schema: schema,
db: db,
abandonCh: make(chan struct{}),
@ -113,7 +113,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
}
// Snapshot is used to create a point-in-time snapshot of the entire db.
func (s *StateStore) Snapshot() *StateSnapshot {
func (s *Store) Snapshot() *Snapshot {
tx := s.db.Txn(false)
var tables []string
@ -122,54 +122,54 @@ func (s *StateStore) Snapshot() *StateSnapshot {
}
idx := maxIndexTxn(tx, tables...)
return &StateSnapshot{s, tx, idx}
return &Snapshot{s, tx, idx}
}
// LastIndex returns that last index that affects the snapshotted data.
func (s *StateSnapshot) LastIndex() uint64 {
func (s *Snapshot) LastIndex() uint64 {
return s.lastIndex
}
// Close performs cleanup of a state snapshot.
func (s *StateSnapshot) Close() {
func (s *Snapshot) Close() {
s.tx.Abort()
}
// Restore is used to efficiently manage restoring a large amount of data into
// the state store. It works by doing all the restores inside of a single
// transaction.
func (s *StateStore) Restore() *StateRestore {
func (s *Store) Restore() *Restore {
tx := s.db.Txn(true)
return &StateRestore{s, tx}
return &Restore{s, tx}
}
// Abort abandons the changes made by a restore. This or Commit should always be
// called.
func (s *StateRestore) Abort() {
func (s *Restore) Abort() {
s.tx.Abort()
}
// Commit commits the changes made by a restore. This or Abort should always be
// called.
func (s *StateRestore) Commit() {
func (s *Restore) Commit() {
s.tx.Commit()
}
// AbandonCh returns a channel you can wait on to know if the state store was
// abandoned.
func (s *StateStore) AbandonCh() <-chan struct{} {
func (s *Store) AbandonCh() <-chan struct{} {
return s.abandonCh
}
// Abandon is used to signal that the given state store has been abandoned.
// Calling this more than one time will panic.
func (s *StateStore) Abandon() {
func (s *Store) Abandon() {
close(s.abandonCh)
}
// maxIndex is a helper used to retrieve the highest known index
// amongst a set of tables in the db.
func (s *StateStore) maxIndex(tables ...string) uint64 {
func (s *Store) maxIndex(tables ...string) uint64 {
tx := s.db.Txn(false)
defer tx.Abort()
return maxIndexTxn(tx, tables...)

View File

@ -25,7 +25,7 @@ func testUUID() string {
buf[10:16])
}
func testStateStore(t *testing.T) *StateStore {
func testStateStore(t *testing.T) *Store {
s, err := NewStateStore(nil)
if err != nil {
t.Fatalf("err: %s", err)
@ -36,11 +36,11 @@ func testStateStore(t *testing.T) *StateStore {
return s
}
func testRegisterNode(t *testing.T, s *StateStore, idx uint64, nodeID string) {
func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) {
testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
}
func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID string, meta map[string]string) {
func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) {
node := &structs.Node{Node: nodeID, Meta: meta}
if err := s.EnsureNode(idx, node); err != nil {
t.Fatalf("err: %s", err)
@ -57,7 +57,7 @@ func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID st
}
}
func testRegisterService(t *testing.T, s *StateStore, idx uint64, nodeID, serviceID string) {
func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
svc := &structs.NodeService{
ID: serviceID,
Service: serviceID,
@ -81,7 +81,7 @@ func testRegisterService(t *testing.T, s *StateStore, idx uint64, nodeID, servic
}
}
func testRegisterCheck(t *testing.T, s *StateStore, idx uint64,
func testRegisterCheck(t *testing.T, s *Store, idx uint64,
nodeID string, serviceID string, checkID types.CheckID, state string) {
chk := &structs.HealthCheck{
Node: nodeID,
@ -107,7 +107,7 @@ func testRegisterCheck(t *testing.T, s *StateStore, idx uint64,
}
}
func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
func testSetKey(t *testing.T, s *Store, idx uint64, key, value string) {
entry := &structs.DirEntry{Key: key, Value: []byte(value)}
if err := s.KVSSet(idx, entry); err != nil {
t.Fatalf("err: %s", err)

View File

@ -9,7 +9,7 @@ import (
)
// txnKVS handles all KV-related operations.
func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
func (s *Store) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
var entry *structs.DirEntry
var err error
@ -111,7 +111,7 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str
}
// txnDispatch runs the given operations inside the state store transaction.
func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops {
@ -149,7 +149,7 @@ func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps)
// any of the operations fail, the entire transaction will be rolled back. This
// is done in a full write transaction on the state store, so reads and writes
// are possible
func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
func (s *Store) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(true)
defer tx.Abort()
@ -165,7 +165,7 @@ func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults,
// TxnRO runs the given operations inside a single read transaction in the state
// store. You must verify outside this function that no write operations are
// present, otherwise you'll get an error from the state store.
func (s *StateStore) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
func (s *Store) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(false)
defer tx.Abort()

View File

@ -8,7 +8,7 @@ import (
// watchFactory is a function that can create a new WatchFunc
// from a parameter configuration
type watchFactory func(params map[string]interface{}) (WatchFunc, error)
type watchFactory func(params map[string]interface{}) (WatcherFunc, error)
// watchFuncFactory maps each type to a factory function
var watchFuncFactory map[string]watchFactory
@ -26,7 +26,7 @@ func init() {
}
// keyWatch is used to return a key watching function
func keyWatch(params map[string]interface{}) (WatchFunc, error) {
func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
@ -39,7 +39,7 @@ func keyWatch(params map[string]interface{}) (WatchFunc, error) {
if key == "" {
return nil, fmt.Errorf("Must specify a single key to watch")
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
pair, meta, err := kv.Get(key, &opts)
@ -55,7 +55,7 @@ func keyWatch(params map[string]interface{}) (WatchFunc, error) {
}
// keyPrefixWatch is used to return a key prefix watching function
func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
@ -68,7 +68,7 @@ func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
if prefix == "" {
return nil, fmt.Errorf("Must specify a single prefix to watch")
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
pairs, meta, err := kv.List(prefix, &opts)
@ -81,13 +81,13 @@ func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
}
// servicesWatch is used to watch the list of available services
func servicesWatch(params map[string]interface{}) (WatchFunc, error) {
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
services, meta, err := catalog.Services(&opts)
@ -100,13 +100,13 @@ func servicesWatch(params map[string]interface{}) (WatchFunc, error) {
}
// nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatchFunc, error) {
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
nodes, meta, err := catalog.Nodes(&opts)
@ -119,7 +119,7 @@ func nodesWatch(params map[string]interface{}) (WatchFunc, error) {
}
// serviceWatch is used to watch a specific service for changes
func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
@ -142,7 +142,7 @@ func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
@ -155,7 +155,7 @@ func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
}
// checksWatch is used to watch a specific checks in a given state
func checksWatch(params map[string]interface{}) (WatchFunc, error) {
func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
@ -175,7 +175,7 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) {
state = "any"
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
var checks []*consulapi.HealthCheck
@ -195,7 +195,7 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) {
}
// eventWatch is used to watch for events, optionally filtering on name
func eventWatch(params map[string]interface{}) (WatchFunc, error) {
func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
// The stale setting doesn't apply to events.
var name string
@ -203,7 +203,7 @@ func eventWatch(params map[string]interface{}) (WatchFunc, error) {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
fn := func(p *Plan) (uint64, interface{}, error) {
event := p.client.Event()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
events, meta, err := event.List(name, &opts)

View File

@ -20,7 +20,7 @@ const (
)
// Run is used to run a watch plan
func (p *WatchPlan) Run(address string) error {
func (p *Plan) Run(address string) error {
// Setup the client
p.address = address
conf := consulapi.DefaultConfig()
@ -45,7 +45,7 @@ func (p *WatchPlan) Run(address string) error {
OUTER:
for !p.shouldStop() {
// Invoke the handler
index, result, err := p.Func(p)
index, result, err := p.Watcher(p)
// Check if we should terminate since the function
// could have blocked for a while
@ -96,7 +96,7 @@ OUTER:
}
// Stop is used to stop running the watch plan
func (p *WatchPlan) Stop() {
func (p *Plan) Stop() {
p.stopLock.Lock()
defer p.stopLock.Unlock()
if p.stop {
@ -106,7 +106,7 @@ func (p *WatchPlan) Stop() {
close(p.stopCh)
}
func (p *WatchPlan) shouldStop() bool {
func (p *Plan) shouldStop() bool {
select {
case <-p.stopCh:
return true

View File

@ -9,15 +9,15 @@ func init() {
watchFuncFactory["noop"] = noopWatch
}
func noopWatch(params map[string]interface{}) (WatchFunc, error) {
fn := func(p *WatchPlan) (uint64, interface{}, error) {
func noopWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) {
idx := p.lastIndex + 1
return idx, idx, nil
}
return fn, nil
}
func mustParse(t *testing.T, q string) *WatchPlan {
func mustParse(t *testing.T, q string) *Plan {
params := makeParams(t, q)
plan, err := Parse(params)
if err != nil {

View File

@ -8,17 +8,17 @@ import (
consulapi "github.com/hashicorp/consul/api"
)
// WatchPlan is the parsed version of a watch specification. A watch provides
// Plan is the parsed version of a watch specification. A watch provides
// the details of a query, which generates a view into the Consul data store.
// This view is watched for changes and a handler is invoked to take any
// appropriate actions.
type WatchPlan struct {
type Plan struct {
Datacenter string
Token string
Type string
Exempt map[string]interface{}
Func WatchFunc
Watcher WatcherFunc
Handler HandlerFunc
LogOutput io.Writer
@ -32,21 +32,21 @@ type WatchPlan struct {
stopLock sync.Mutex
}
// WatchFunc is used to watch for a diff
type WatchFunc func(*WatchPlan) (uint64, interface{}, error)
// WatcherFunc is used to watch for a diff
type WatcherFunc func(*Plan) (uint64, interface{}, error)
// HandlerFunc is used to handle new data
type HandlerFunc func(uint64, interface{})
// Parse takes a watch query and compiles it into a WatchPlan or an error
func Parse(params map[string]interface{}) (*WatchPlan, error) {
func Parse(params map[string]interface{}) (*Plan, error) {
return ParseExempt(params, nil)
}
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
// Any exempt parameters are stored in the Exempt map
func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, error) {
plan := &WatchPlan{
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
plan := &Plan{
stopCh: make(chan struct{}),
}
@ -77,7 +77,7 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, er
if err != nil {
return nil, err
}
plan.Func = fn
plan.Watcher = fn
// Remove the exempt parameters
if len(exempt) > 0 {